Friday, April 3, 2020

Getting started with apache-airflow (Part1)

# Apache airflow
quick start link: https://airflow.apache.org/docs/stable/start.html



# export the AIRFLOW_HOME
vi ~/.bash_profile
# setting AIRFLOW HOME
export AIRFLOW_HOME=/User/Desktop/airflow/

cd ~AIRFLOW_HOME

# start the virtual environment
python3 -m venv ./venv

# to show the list of dependencies
pip3 list

# install apache airflow
pip3 install apache-airflow

# initialize the airflow database
$ airflow initdb

# starting the webserver on port 8080
$ airflow webserver -p 8080


Now, we must be able to see Airflow-DAG's on local URL:
http://localhost:8080/admin/


# start the scheduler
$ airflow scheduler


# Try to review the airflow config file found under AIRFLOW_HOME dir or go to UI and then follow the Admin -> Configuration menu.

$ cat airflow.cfg

We can learn more about airflow features from the configuration files as below:

  • It can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search (remote_logs, just specify the remote_log_conn_id)
  • default_timezone = UTC
  • specify executor to use: executor = SequentialExecutor (Some of the executors include SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor)
  • to go the airflow local metadata DB, refer to the property: sql_alchemy_conn (default SQLite, but uses many different database engines)
  • To specify the schema to use for the metadata database, SqlAlchemy supports databases with the concept of multiple schemas
    • sql_alchemy_schema
  • parallelism = 32 (indicates the max no of task instances that should run simultaneously)
  • dag_concurrency = 16 (no of task instances allowed to run concurrently by the scheduler)
  • dags_are_paused_at_creation = True (whether DAGs are paused by default at creation)
  • max_active_runs_per_dag = 16 (max no of active DAG runs per DAG)
  • load_examples = True (better to set this to False in PROD environment)
  • plugins_folder = /User/Desktop/airflow/plugins (folder where airflow plugins are stored)
  • fernet_key = (secret key to save connection passwords in db)
  • dagbag_import_timeout = 30 (how long before timing out a python file import)
  • dag_file_processor_timeout = 50 (how long before timing out a DagFileProcessor, which process a dag file)
  • task_runner = StandardTaskRunner (the class to use for running task instances in a subprocess)
  • security = kerberos (which security model to use)
  • unit_test_mode = False (turn to True will overwrite many configuration options with test values at run time)
  • default_task_retries = 0 (number of retries each task is going to have by default)
  • endpoint_url = http://localhost:8080/ (if web_server_url_prefix is set, we must append it here)
  • auth_backend = to authenticate users of the API
Additional config properties:



- For CeleryExecutor, go to [celery]
- Kerberos - [kerberos]
- ElasticSearch - [elasticsearch]
- Elastic Search Configs - [elasticsearch_configs]
- Kubernetes Configs - [kubernetes]

The PID file for the webserver will be stored in 
$AIRFLOW_HOME/airflow-webserver.pid (OR)
/run/airflow/webserver.pid (if started by systemd)

Airflow uses SQLite DB for which parallelization is not possible.
It works in conjunction with the airflow.executors.sequential_executor.SequentialExecutor which will only run task instances sequentially.

# To trigger a few task instances:





# running the first task instance
airflow run example_bash_operator runme_0 2015-01-01


# run a backfill over 2 days
airflow backfill example_bash_operator -s 2015-01-01 -e 2015-01-02

No comments:

Post a Comment