Problem
You are an analyst/data engineer/data scientist building a data processing pipeline in Airflow. Last week you wrote a job that peforms all the necessary processing to build your sales
table in the database. This week, you are building a customers
table that aggregates data from your previous sales
table.
Should you add the necessary customers
logic as a new task on the existing DAG, or should you create an entirely new DAG? Since the dependency is only in one direction (tomorrow’s sales
data does not depend on today’s customers
data) you decide to decouple into two separate DAGs.
But how can you make sure your new DAG waits until the necessary sales
data is loaded before starting? Airflow offers rich options for specifying intra-DAG scheduling and dependencies, but it is not immediately obvious how to do so for inter-DAG dependencies.
The duct-tape fix here is to schedule customers
to run some sufficient number of minutes/hours later than sales
that we can be reasonably confident it finished. We can do better though.
Solution
Airflow provides an out-of-the-box sensor called ExternalTaskSensor that we can use to model this “one-way dependency” between two DAGs. Here’s what we need to do:
- Configure
dag_A
anddag_B
to have the samestart_date
andschedule_interval
parameters. - Instantiate an instance of
ExternalTaskSensor
indag_B
pointing towards a specific task ofdag_A
nd set it as an upstream dependency of the first task(s) in your pipeline. - Initiate dagruns for both DAGs at roughly the same time.
dag_B
itself will start, but your task sensor will wait until the corresponding date run ofdag_A
finishes before allowing the actual tasks to start.
from airflow.sensors.external_task_sensor import ExternalTaskSensor
with DAG('dag_B') as dag:
wait_for_dag_A = ExternalTaskSensor(
task_id='wait_for_dag_A',
external_dag_id='dag_A',
external_task_id='final_task')
main_task = PythonOperator(…)
wait_for_dag_A >> main_task
Note: This requires tasks to run in parallel, which is not possible when Airflow is using SequentialExecutor
, which is often the default for a barebones Airflow installation. This executor uses an SQLite database to store metadata, and SQLite does not support parallel IO. Using LocalExecutor
will enable parallel operations, but requires an actual database (e.g. Postgres) to function.
Update: I explore some different, possibly better-suited approaches to this problem here including SubDagOperator
and TriggerDagRunOperator
.
Further reading
Dependencies between DAGs: How to wait until another DAG finishes in Airflow? [Bartosz Mikulski]
comments powered by Disqus