GEOFF RUDDOCK

A blog (mainly) about data science

Creating a Monthly + Daily DAG Pattern in Airflow

Posted at — Aug 13, 2019

Problem

You initially built a data pipeline for a project you were working on, but eventually other members of your team started using it as well. You move the logic into Airflow, so that the pipeline is updated automatically on some regular basis.

You’d like to set schedule_interval to daily so that the data is always fresh, but you’d also like the ability to execute relatively quick backfills. With a daily schedule, backfilling data from 5 years ago will take days to complete. Running the job less frequently (monthly?) would make backfills easier, but the data would be less fresh.

Solution

We want to eat our cake and have it too. We can achieve this by creating two separate DAGs—one daily and one monthly—using the same underlying logic.

Astronomer.io has a nice guide to dynamically generating DAGs in Airflow. The key insight is that we want to wrap the DAG definition code into a create_dag function and then call it multiple times at the top-level of the file to actually instantiate your multiple DAGs.

def create_dag(*args, **kwargs):

    dag = DAG(*args, **kwargs)

    with dag:
        # Declare tasks here (operators and sensors)
        
        # Set dependencies between tasks here

    return dag

Our parameters of interest are dag_id, start_date and schedule_interval, so be sure to include those on your create_dag function.

We’d like our monthly job to run on the first of every month, for all historical data.

dag_monthly = create_dag(dag_id=f'{DAG_NAME}_monthly',
                         start_date=START_DATE,
                         schedule_interval='0 7 1 * *')

We’d like our daily job to only run for the current month, and not on the first day of the month, since the monthly job will run that day.

from datetime import datetime
current_month_start = datetime.strptime(datetime.now().strftime('%Y-%m'), '%Y-%m')
dag_daily = create_dag(dag_id=f'{DAG_NAME}_daily',
                       start_date=current_month_start,
                       schedule_interval='0 7 2-31 * *')

Make sure to define both of your DAGs at the top-level of the _def.py file so that Airflow knows to instantiate them. They will appear as separate DAGs in the main UI, but the underlying logic is DRY since they are both defined from the same create_dag function.

comments powered by Disqus