January 21, 2019

Set dependencies between Airflow DAGs with ExternalTaskSensor

Problem

You are an analyst/data engineer/data scientist building a data processing pipeline in Airflow. Last week you wrote a job that peforms all the necessary processing to build your sales table in the database. This week, you are building a customers table that aggregates data from your previous sales table.

Should you add the necessary customers logic as a new task on the existing DAG, or should you create an entirely new DAG? Since the dependency is only in one direction (tomorrow’s sales data does not depend on today’s customers data) you decide to decouple into two separate DAGs.

But how can you make sure your new DAG waits until the necessary sales data is loaded before starting? Airflow offers rich options for specifying intra-DAG scheduling and dependencies, but it is not immediately obvious how to do so for inter-DAG dependencies.

The duct-tape fix here is to schedule customers to run some sufficient number of minutes/hours later than sales that we can be reasonably confident it finished. We can do better though.

Solution

Airflow provides an out-of-the-box sensor called ExternalTaskSensor that we can use to model this “one-way dependency” between two DAGs. Here’s what we need to do:

  1. Configure dag_A and dag_B to have the same start_date and schedule_interval parameters.
  2. Instantiate an instance of ExternalTaskSensor in dag_B pointing towards a specific task of dag_A nd set it as an upstream dependency of the first task(s) in your pipeline.
  3. Initiate dagruns for both DAGs at roughly the same time. dag_B itself will start, but your task sensor will wait until the corresponding date run of dag_A finishes before allowing the actual tasks to start.
from airflow.sensors.external_task_sensor import ExternalTaskSensor

with DAG('dag_B') as dag:
  
  wait_for_dag_A = ExternalTaskSensor(
    task_id='wait_for_dag_A',
    external_dag_id='dag_A',
    external_task_id='final_task')
  
  main_task = PythonOperator()
  wait_for_dag_A >> main_task

Note: This requires tasks to run in parallel, which is not possible when Airflow is using SequentialExecutor, which is often the default for a barebones Airflow installation. This executor uses an SQLite database to store metadata, and SQLite does not support parallel IO. Using LocalExecutor will enable parallel operations, but requires an actual database (e.g. Postgres) to function.

Update: I explore some different, possibly better-suited approaches to this problem here including SubDagOperator and TriggerDagRunOperator.

Further reading

Source code for airflow.sensors.external_task_sensor [Airflow docs]

Dependencies between DAGs: How to wait until another DAG finishes in Airflow? [Bartosz Mikulski]

© Geoff Ruddock 2019