Create Dynamic DAGs in Apache Airflow

Skuad Engineering
4 min readJan 19, 2023

--

Introduction

Apache Airflow is an open-source tool for orchestrating complex workflows and data processing pipelines. It is a platform to programmatically schedule, and monitor workflows for scheduled jobs.

Apache airflow makes your workflow simple, well organized, and more systematic which can be easily authored and schedules based on the requirement.

Problem Statement

Need to create same pattern of DAGs (Directed Acyclic Graph) for each job. To automate the DAGs using some metadata, or changing DAGs without changing the actual code repeatedly, Dynamic DAGs can be helpful. Using this, one can change the DAG as required without touching the code, using a database, some config file, etc. Or automate the generation of tens or hundreds of DAGs according to required conditions.

What are DAG’s in Airflow?

A DAG or a Directed Acyclic Graph in Airflow is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.

Figure 1.1 — DAG Example

Referencing Figure 1.1, The entire figure represents a DAG and each block represents a job in DAG known as Task, it can be any Airflow Operator, custom-made or imported from the Airflow module or Sensor or Taskflow Decorated \@task.

The lines in Figure 1.1 represent the flow of execution, “start” block is the initial block and the flow goes from the blocks “start” — “section-1*” — “some-other-task” — “section-2*” — “end”.

All the tasks in “section-1-*” are executed in parallel, then “some-other-task” is executed, and again all the tasks in “section-2*” are executed in parallel.

Here, Creating a Dynamic DAG means having dynamic tasks/blocks in the DAG, and/or having a dynamic flow of the tasks/blocks as per our requirements based on some conditions that we impose.

How to create Dynamic Dags?

A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings. It is the directory with all the dag files, where our dag file will be placed. The demo below is about the same dag file, which contains the definition structure and process of the dags.

Airflow refreshes all the DAGs from the dagbag at every set interval. The default is 30 seconds, and it can be changed in the configuration settings in the file airflow.cfg

To handle the creation of dynamic DAG creation, we defined all the DAGs in global namespace as Airflow looks for DAG objects in the global namespace of the DAG file while loading them, and then creates the all the DAG structure according to the structure provided at the end of the DAG file.

This can be achieved in three steps:

Step 1. Create the required DAG objects in the global namespace.

One way to do this is to use the global namespace dictionary using the function globals()”.

We can write the logic as we require containing any loops, conditions, etc which defines the DAG objects in the global namespace.

for num in range(10):
dag_name = "dag_{}".format(num)
globals()[dag_name] = DAG("DAG_name_in_airflow_{}".format(num),
schedule_interval="@once",
is_paused_upon_creation=True)

Referencing above snippet, We are creating 10 dags with variables named as dag_name using a “for” loop. The “globals()” function returns a dictionary of global namespace which we are modifying to add our new DAGs.

After this, We can add the required jobs in the DAG objects. Airflow will load all the DAG objects according to the DAG structure.

Step 2. Use “globals()” again to put the DAG object in the Tasks for that DAG.

Once the DAG objects exist in the global namespace, we can add it to the jobs to be able to make DAG structure inAirflow. This can also be acheived using the “globals()” function using the same variable name used previously.

start = BashOperator(
task_id='start_{}'.format(dag_name),
bash_command='echo start pipeline',
dag=globals()[dag_name] # assign a DAG obejct
)

end = BashOperator(
task_id='end_{}'.format(dag_name),
bash_command='echo end pipeline',
trigger_rule=TriggerRule.ONE_SUCCESS,
dag=globals()[dag_name] # assign a DAG obejct
)

As shown in above snippet, A “dag” argument is used to assign a DAG obejct to the Task in its definition. While creating a DAG structure, Only Tasks assigned the same DAG object should be connected together.

Again we can use any number of loops, conditions as per the requirement in logic to create the flow to execution by attaching the DAG objects together.

Step 3. Create the required structure using the Tasks created.

Once the Tasks have been created, we can create the DAG structure using those tasks. The structure is defined at the end of the DAG file.

for dag_num in range(10):
start >> globals()["task_for_{}".format(dag_num)] >> some_task >> end

We use globals() to add the Task referencing to the appropriate DAG. We have created 10 different DAG structures in this snippet using the “for loop”.

This entire process will create 10 separate DAGs in Airflow using this single DAG file.

Developer Contribution — Kunal Gupta, Nitesh Kumar Singh and Aamir Khan

--

--

No responses yet