Geoff Ruddock

Creating a monthly + daily DAG pattern in Airflow

Problem

You initially built a data pipeline for a project you were working on, but eventually other members of your team started using it as well. You move the logic into Airflow, so that the pipeline is updated automatically on some regular basis.

You’d like to set schedule_interval to daily so that the data is always fresh, but you’d also like the ability to execute relatively quick backfills. With a daily schedule, backfilling data from 5 years ago will take days to complete. Running the job less frequently (monthly?) would make backfills easier, but the data would be less fresh.

Solution

We want to eat our cake and have it too. We can achieve this by creating two separate DAGs—one daily and one monthly—using the same underlying logic.

Astronomer.io has a nice guide to dynamically generating DAGs in Airflow. The key insight is that we want to wrap the DAG definition code into a create_dag function and then call it multiple times at the top-level of the file to actually instantiate your multiple DAGs.

def create_dag(*args, **kwargs):

    dag = DAG(*args, **kwargs)

    with dag:
        # Declare tasks here (operators and sensors)
        
        # Set dependencies between tasks here

    return dag

Our parameters of interest are dag_id, start_date and schedule_interval, so be sure to include those on your create_dag function.

We’d like our monthly job to run on the first of every month, for all historical data.

dag_monthly = create_dag(dag_id=f'{DAG_NAME}_monthly',
                         start_date=START_DATE,
                         schedule_interval='0 7 1 * *')

We’d like our daily job to only run for the current month, but daily

from datetime import datetime
current_month_start = datetime.strptime(datetime.now().strftime('%Y-%m'), '%Y-%m')
dag_daily = create_dag(dag_id=f'{DAG_NAME}_daily',
                       start_date=current_month_start,
                       schedule_interval='0 8 * * *')

Make sure to define both of your DAGs at the top-level of the _def.py file so that Airflow knows to instantiate them. They will appear as separate DAGs in the main UI, but the underlying logic is DRY since they are both defined from the same create_dag function.

Updates

[2019-09-03] – Initially I had schedule_interval='0 7 2-31 * *' on the daily dag to avoid duplicate processing on the 1st day of the month. But Airflow runs jobs when the next schedule interval arrives (somewhat counter-intuitive) so what we actually want do do is skip the job corresponding with the last day of the month, rather than the first day. Unfortunately it is not possible to express this in a simple cron expression, due to the varying length of months.


comments powered by Disqus