Data Exchange Strategies between Airflow DAGs

Shivananda D
Data Engineer Things
5 min readFeb 19, 2024

In the world of workflow orchestration, Apache Airflow is one of the popular choices due to the multitude of features it offers. However, one notable feature that can prove invaluable is its ability to facilitate data exchange between DAGs, enabling coordination between different workflows.

Assumed Knowledge: Airflow Operators and metadata database

The traditional method of data exchange in Apache Airflow involves utilizing XComs, enabling communication between tasks within a single DAG or between different DAGs. However, this method falls short when it comes to retrieving values from a DAG based on its most recent execution date. In this article, we will look at the traditional method, discuss its limitations, and finally go through a useful technique that allows retrieval of variables from a DAG based on its most recent execution date.

Image by the Author

Data Sharing using XComs — the traditional way

I won’t delve into detailed explanations of conventional data exchange methods as there are plenty of resources to better understand. I will just provide examples here for a clearer grasp of what I want to explain.

1. Sharing data between tasks

This is pretty straightforward where you share data between tasks of a single DAG. Example code below:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime

def task1(**context):
result = "Task1 Result"
context['ti'].xcom_push(key='task1_result', value=result)

def task2(**context):
task1_result = context['ti'].xcom_pull(task_ids='task1', key='task1_result')
print("Result from Task1:", task1_result)

default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 31),
'retries': 1
}

with DAG('example_dag', default_args=default_args, schedule_interval=None) as dag:
start = EmptyOperator(task_id='start')

# task to push XCom value
task1 = PythonOperator(
task_id='task1',
python_callable=task1
)

# task to pull XCom value
task2 = PythonOperator(
task_id='task2',
python_callable=task2
)

end = EmptyOperator(task_id='end')

start >> task1 >> task2 >> end

2. Sharing data between two DAGs — the simple way

In this case, we share data between two DAGs. In the example below, first_dag pushes XCom data, and second_dag pulls data from XCom. Note that we need to specify both the name of the task that published the variable and the DAG identifier in the pull function of second_dag.

'''
File: first_dag.py
'''

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime

# Define default arguments for the DAG
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 31),
'retries': 1
}

# Define the push function
def push_function(**context):
value_to_push = "XCom value"
context['ti'].xcom_push(key='my_key', value=value_to_push)

# Define the DAG
with DAG('first_dag', schedule_interval=None, default_args=default_args, catchup=False) as dag:
# Define start and end tasks for the DAG
start = EmptyOperator(task_id='start_first')
end = EmptyOperator(task_id='end_first')

# Define push task
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True
)

start >> push_task >> end
'''
File: second_dag.py
'''

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime

# Define default arguments for the DAG
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 31),
'retries': 1
}

# Define the pull function
def pull_function(**context):
pulled_value = context['ti'].xcom_pull(dag_id='first_dag', task_ids='push_task', key='my_key')
print("Pulled value from first DAG:", pulled_value)

# Define the DAG
with DAG('second_dag', schedule_interval=None, default_args=default_args, catchup=False) as dag:
# Define start and end tasks for the DAG
start = EmptyOperator(task_id='start_second')
end = EmptyOperator(task_id='end_second')

# Define pull task
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True
)

start >> pull_task >> end

However, there is one limitation associated with this approach making it ineffective in certain situations — both DAGs must have the same execution date. This could potentially limit the usefulness of XComs in scenarios where tasks from different DAGs need to share data regardless of the execution date. It is caused by the implementation of xcom_pull in the TaskInstance class. The current implementation of xcom_pull in Airflow's TaskInstance class restricts the pulling of XCom values from tasks with the same execution date. The code in the Airflow repository looks like this:

query = XCom.get_many(
execution_date=self.execution_date,
key=key,
dag_ids=dag_id,
task_ids=task_ids,
include_prior_dates=include_prior_dates,
session=session,
).with_entities(XCom.value)

We will discuss below a useful technique that can deal with the above-mentioned limitation.

Data Sharing — based on DAG’s latest execution date

This approach is useful when there are multiple executions of first_dag and you want to pull the XCom value from its latest DAG run. To achieve this, direct interaction with the Airflow metadata database becomes necessary which can be done by establishing a database session. The metadata database stores metadata related to DAGs, tasks, task instances, and executions. Specifically, the DagRun table within this database stores details about individual DAG runs, including their execution dates and statuses. We can use this table to determine a DAG’s latest execution date and pull XCom value based on it.

Consider the same first_dag code above. Below is the modified version of second_dag code that pulls my_key XCom value based on the latest execution date of first_dag.

'''
File: second_dag.py
Fetch XCom value based on the latest execution date of first_dag
'''

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime
from airflow.models import DagRun
from airflow.utils.session import create_session
from sqlalchemy import desc

# Function to get the latest execution date for a DAG
def get_dag_latest_execution_date(dag_id):

with create_session() as session:
latest_dagrun = (
session.query(DagRun)
.filter_by(dag_id=dag_id)
.order_by(desc(DagRun.execution_date))
.first()
)

latest_execution_date = latest_dagrun.execution_date if latest_dagrun else None

return latest_execution_date

# Define the pull function
def pull_function(**context):

# Get the latest execution date of first_dag
latest_dag_execution_date = get_dag_latest_execution_date(dag_id='first_dag')

# Get my_key XCom value using the latest execution date of first_dag
pulled_value = XCom.get_one(
execution_date=latest_dag_execution_date,
key='my_key',
dag_id='first_dag',
task_id='push_task'
)

if pulled_value:
print("Pulled value from first DAG:", pulled_value)
else:
print("Could not pull XCom value")

# Define default arguments for the DAG
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 31),
'retries': 1
}

# Define the DAG
with DAG('second_dag', schedule_interval=None, default_args=default_args, catchup=False) as dag:
# Define start and end tasks for the DAG
start = EmptyOperator(task_id='start_second')
end = EmptyOperator(task_id='end_second')

# Define push task
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True
)

start >> pull_task >> end

The code in the get_dag_latest_execution_date function queries the DagRun table in the metadata database, filters by the specified DAG ID, orders the results in descending order based on the execution date and retrieves the first row using .first(). The latest_execution_date variable will hold the execution date of the latest DAG run. Please note that this approach requires direct database access and assumes you have the necessary permissions to query the metadata database.

One use case of this approach would be real-time threat detection and incident response. In this case, the first DAG can gather data from cloud infrastructure components and identify security threats using machine learning and anomaly detection. Then, summarize these events into an XCom variable named security_event_summary. The second DAG can monitor this variable for new events, triggering incident response workflows such as alerting security analysts, blocking suspicious IPs, etc.

Thank you for reading! I hope you found the content valuable. I plan to continue to write about my learning in data engineering, which I think is helpful to the community. Please comment with your thoughts and feedback.

Published in Data Engineer Things

Things learned in our data engineering journey and ideas on data and engineering.

Responses (1)

Write a response

I could not actually get the sample code that uses xcom's to exchange data between separate dags to work (the pulled_value in the second_dag was None)

--