Skip to main content

Getting started with apache-airflow (Part1)

# Apache airflow
quick start link: https://airflow.apache.org/docs/stable/start.html



# export the AIRFLOW_HOME
vi ~/.bash_profile
# setting AIRFLOW HOME
export AIRFLOW_HOME=/User/Desktop/airflow/

cd ~AIRFLOW_HOME

# start the virtual environment
python3 -m venv ./venv

# to show the list of dependencies
pip3 list

# install apache airflow
pip3 install apache-airflow

# initialize the airflow database
$ airflow initdb

# starting the webserver on port 8080
$ airflow webserver -p 8080


Now, we must be able to see Airflow-DAG's on local URL:
http://localhost:8080/admin/


# start the scheduler
$ airflow scheduler


# Try to review the airflow config file found under AIRFLOW_HOME dir or go to UI and then follow the Admin -> Configuration menu.

$ cat airflow.cfg

We can learn more about airflow features from the configuration files as below:

  • It can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search (remote_logs, just specify the remote_log_conn_id)
  • default_timezone = UTC
  • specify executor to use: executor = SequentialExecutor (Some of the executors include SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor)
  • to go the airflow local metadata DB, refer to the property: sql_alchemy_conn (default SQLite, but uses many different database engines)
  • To specify the schema to use for the metadata database, SqlAlchemy supports databases with the concept of multiple schemas
    • sql_alchemy_schema
  • parallelism = 32 (indicates the max no of task instances that should run simultaneously)
  • dag_concurrency = 16 (no of task instances allowed to run concurrently by the scheduler)
  • dags_are_paused_at_creation = True (whether DAGs are paused by default at creation)
  • max_active_runs_per_dag = 16 (max no of active DAG runs per DAG)
  • load_examples = True (better to set this to False in PROD environment)
  • plugins_folder = /User/Desktop/airflow/plugins (folder where airflow plugins are stored)
  • fernet_key = (secret key to save connection passwords in db)
  • dagbag_import_timeout = 30 (how long before timing out a python file import)
  • dag_file_processor_timeout = 50 (how long before timing out a DagFileProcessor, which process a dag file)
  • task_runner = StandardTaskRunner (the class to use for running task instances in a subprocess)
  • security = kerberos (which security model to use)
  • unit_test_mode = False (turn to True will overwrite many configuration options with test values at run time)
  • default_task_retries = 0 (number of retries each task is going to have by default)
  • endpoint_url = http://localhost:8080/ (if web_server_url_prefix is set, we must append it here)
  • auth_backend = to authenticate users of the API
Additional config properties:



- For CeleryExecutor, go to [celery]
- Kerberos - [kerberos]
- ElasticSearch - [elasticsearch]
- Elastic Search Configs - [elasticsearch_configs]
- Kubernetes Configs - [kubernetes]

The PID file for the webserver will be stored in 
$AIRFLOW_HOME/airflow-webserver.pid (OR)
/run/airflow/webserver.pid (if started by systemd)

Airflow uses SQLite DB for which parallelization is not possible.
It works in conjunction with the airflow.executors.sequential_executor.SequentialExecutor which will only run task instances sequentially.

# To trigger a few task instances:





# running the first task instance
airflow run example_bash_operator runme_0 2015-01-01


# run a backfill over 2 days
airflow backfill example_bash_operator -s 2015-01-01 -e 2015-01-02

Comments

Popular posts from this blog

AWS Connect: Reporting and Visualizations

Amazon connect offers: - built in reports i.e., historical and real-time reports.  We can customize these reports, schedule them and can integrate with any BI tool of our requirement to query and view the connect data.  Sample solution provided by AWS: 1. Make sure Connect is exporting the CTR data using Kinesis Data Stream 2. Use Kinesis Firehose to deliver the CTR that are in KDS to S3. (CTR's can be delivered as batch of records, so one s3 object might have multiple CTR's). AWS Lambda is used to add a new line character to each record, which makes object easier to parse.  3. s3 Event Notifications are used to send an event to modify the CTR record and saves it in S3. 4. Athena queries the modified CTR's using SQL. Use partitions to restrict the amount of data scanned by each query, improving performance and reducing cost. Lambda function is used to maintain the partitions.  5. Quicksight is used to visualize the modified CTRs.  Solution variations: Convert re...

Databricks: Job aborted due to stage failure. Total size of serialized results is bigger that spark driver memory.

  While running a databricks job, especially running a job with large datasets and longer running queries that creates a lot of temp space - we might be facing below issue if we have a minimal configuration set to the cluster.  The simple way to fix this would be changing the spark driver config in the databricks cluster tab spark.driver.maxResultSize = 100G (change the GB based on your cluster size)

Terraform lifecycle

 If we are using terraform, terraform state file is the heart of all the infrastructure that we spin up using terraform templates.  There are several ways to deploy the infrastructure using terraform: 1. Using CLI (setup terraform and then run terraform commands) 2. Automated Build (terraform scripts integrated as part of your jenkins pipeline) No matter of the way we chose, we must make sure that we are using the same terraform state file, so that we are having a sync and proper checklists of the resources that we used.  I would like to share the terraform commands that we do on a daily basis: terraform init = the basic/starting command which initializes the terraform (make sure the proper provider is provided. In my case, I use AWS).  terraform workspace select <workspace name > (creates a new workspace, useful in scenarios where we have different terraform modules - database, servers, logs, storage) terraform state list = shows the list of terraform resour...