Set dependencies between Airflow DAGs with ExternalTaskSensor
Jan 21, 2019
You are an analyst/data engineer/data scientist building a data processing pipeline in Airflow. Last week you wrote a job that peforms all the necessary processing to build your
sales table in the database. This week, you are building a
customers table that aggregates data from your previous
Should you add the necessary
customers logic as a new task on the existing DAG, or should you create an entirely new DAG? Since the dependency is only in one direction (tomorrow’s
sales data does not depend on today’s
customers data) you decide to decouple into two separate DAGs.
But how can you make sure your new DAG waits until the necessary
sales data is loaded before starting? Airflow offers rich options for specifying intra-DAG scheduling and dependencies, but it is not immediately obvious how to do so for inter-DAG dependencies.
The duct-tape fix here is to schedule
customers to run some sufficient number of minutes/hours later than
sales that we can be reasonably confident it finished. We can do better though.
Airflow provides an out-of-the-box sensor called ExternalTaskSensor that we can use to model this “one-way dependency” between two DAGs. Here’s what we need to do:
dag_Bto have the same
- Instantiate an instance of
dag_Bpointing towards a specific task of
dag_And set it as an upstream dependency of the first task(s) in your pipeline.
- Initiate dagruns for both DAGs at roughly the same time.
dag_Bitself will start, but your task sensor will wait until the corresponding date run of
dag_Afinishes before allowing the actual tasks to start.
from airflow.sensors.external_task_sensor import ExternalTaskSensor with DAG('dag_B') as dag: wait_for_dag_A = ExternalTaskSensor( task_id='wait_for_dag_A', external_dag_id='dag_A', external_task_id='final_task') main_task = PythonOperator(…) wait_for_dag_A >> main_task
Note: This requires tasks to run in parallel, which is not possible when Airflow is using
SequentialExecutor, which is often the default for a barebones Airflow installation. This executor uses an SQLite database to store metadata, and SQLite does not support parallel IO. Using
LocalExecutor will enable parallel operations, but requires an actual database (e.g. Postgres) to function.
Update: I explore some different, possibly better-suited approaches to this problem here including
Source code for airflow.sensors.external_task_sensor [Airflow docs]