DAG-Dependency Patterns in Composer Multi-cluster environment

Akanksha Khushboo
Google Cloud - Community
7 min readFeb 16, 2023

--

Written in collaboration with Bipin Upadhyaya — Feb 15, 2023

Introduction

When designing Airflow DAGs, it’s often a good idea to include all related tasks in the same DAG. However, sometimes you need to create dependencies between your DAGs. According to the Airflow documentation, designing cross-DAG dependencies can be useful when:

  • Two DAGs are dependent on each other but have different schedules
  • Two DAGs are dependent, but they are owned by different teams
  • A task depends on another task, but for a different execution date

DAG to DAG dependency can be easily defined using operators like TriggerDagRunOperator and ExternalTaskSensor, when both upstream and downstream DAGs exist in the same Airflow environment. For example, when a task in one DAG triggers another DAG that is present in the same Composer environment.

But a single instance of Composer may not meet all the needs of the organization. Requirements that can create a need for having multiple Composer clusters in production are:

  • Multiple business units and their access needs: As an organization’s data analytics needs grow, multiple teams/business units may require their own Composer environment and access to these clusters, provisioned only to the team to which the user belongs
  • Workload priority, schedule and pattern: environments with fewer resources that primarily execute on-demand workloads or environments with excessive provisioning that require pipelines to run hourly
  • Domain specific requirements: For environments that only need access to specific data domains (such as Finance or Human Resources), all related workloads should run in the same environment

Implementing dependencies between DAGs can be challenging if the DAGs do not reside in the same Composer cluster. This guide presents different approaches and patterns that can help maintain DAG dependencies between multiple Composer environments. This solution includes complete Terraform code and Airflow code and walks you through the procedure.

PubSub Message Format

The first step before starting to build a solution is to specify a set message format for uniformity and simplicity. The message has source_type, workflow_id and workflow_status as attributes that allow it to be utilized for multiple subscriptions. The PubSub message is published in the following format:

  • The source_type refers to the service that produced the message (e.g. Composer)
  • The workflow_id refers to the dag_id
  • workflow_run_id refers to the run_id of the DAG that produced the status
  • The status of the run_id is reflected in the workflow_status (holds a boolean value of true or false)
{
"type" : "record",
"name" : "workflow",
"fields" : [
{
"name" : "source_type",
"type" : "string"
},
{
"name" : "workflow_id",
"type" : "string"
},
{
"name" : "workflow_run_id",
"type" : "string"
},
{
"name" : "workflow_status",
"type" : "string"
},
]
}

Pattern 1: Using PubSub and PubSubPull Sensors

In this pattern, each upstream DAG needs to publish a message on success or failure of the workflow. The downstream DAG that has the dependency on the upstream DAG will create a subscription and consume the PubSub message. Each downstream DAG or workflow needs to create its own subscription from the topic. The downstream DAG uses PubSub Sensor to pull the message and start its execution.

Figure 1: Dag dependency between DAG1 and DAG 2 between two composer environments

The subscription name is in this format: {destination-dag_id}-{source_dag_id}-{on-action}. If DAG id dag_id_2 (downstream) should be triggered on the successful compilation on dag_id_1 (upstream), then the subscription name should be dag_id_1-dag_id_2-on_success.
Below is an example of the Terraform code that can be used to insert the value on the Terraform .tfvars file. Project_id is the project for the topics and the subscriptions. The name of the topic in the example is airflow_status_topic. Dependencies (within topic subscriptions) are named based on the source and target DAG names and the final state of the source DAG.

project_id = "<<project_id>>"
workflow_status_topics = ["airflow_status_topic"]
topics_subscriptions = {
"airflow_status_topic": {
topic = "airflow_status_topic"
dependencies = ["dag_id_1-dag_id_2-on_success",
"dag_id_1-dag_id_3-on_success",
"dag_id_1-dag_id_4-on_failure"
]
}
}

Run Terraform Plan command, verify the resources that will get created and then run Terraform Apply command to create the resources. Below is an example of the subscription used by dag-id-2. The subscription receives the PubSub message on successful completion of dag-id-2 (Figure 1). Notice that the subscription filter uses source type, workflow_id and workflow_status attributes.

Figure 2: A subscription based on the successful completion of DAG 1

Publishing and Consuming Pub/Sub messages from DAG

Task callbacks and PubSubPullSensor can be used to publish and consume the messages. In the code below, on_success_callback and on_failure_callback is used to publish a message depending on the task or DAG status, the message will be sent with custom attributes (reference link), then a PubSubPullsensor is used to consume the message in the downstream workflow (DAG 2).

Upstream Workflow (excerpt of DAG 1 code):

PUBSUB_PROJECT = os.environ['PUBSUB_PROJECT']
# The topic will be handled by Terraform.
AIRFLOW_STATUS_TOPIC = os.environ['AIRFLOW_STATUS_TOPIC']

default_args = {
'start_date': dates.days_ago(2),
'on_failure_callback': publish_failure_msg_pubsub,
}

with DAG(
'dag_1',
default_args=default_args,
schedule_interval='*/5 * * * *'
) as dag:

t1 = BashOperator(task_id='task1',
bash_command="echo 'Executing task1..'")
t3 = BashOperator(task_id='task3',
bash_command=_bash_command,
on_success_callback= publish_success_msg_pubsub)
t1 >> t3

Callbacks on_success_callback and on_failure_callback must be passed as arguments at the task level. If on_success_callback is used in the default_args dictionary, it will be sent to PubSub one message for each successful task. At the end of the successful DAG execution i.e. after completion of all its tasks, the message is published correctly by the on_success_callback call.

The code snippet shown below shows the downstream workflow (DAG 2) that consumes the message to satisfy the expected dependency of a DAG in a different Composer environment. Full code is available here.

Downstream Workflow (Excerpt of DAG 2 code):

PUBSUB_PROJECT = os.environ['PUBSUB_PROJECT']
SUBSCRIPTION_SUCCESS = 'dag_1-dag_2-on_success' # Cloud Pub/Sub subscription

with DAG(
'dag_2',
default_args=default_args,
schedule_interval='*/5 * * * *',
catchup=False
) as dag:
t1 = PubSubPullSensor(task_id='pull-messages',
ack_messages=True,
project_id=PUBSUB_PROJECT,
subscription=SUBSCRIPTION_SUCCESS,
execution_timeout=timedelta(minutes=10),
on_failure_callback=on_failure_action)

Pattern 2: Using PubSub and Cloud Function to Trigger dependent DAGs

Figure 3: Dag dependency between DAG1 and DAG 2 between two composer environments

In this pattern, the upstream DAG publishes to a PubSub Topic and subsequently a Cloud Function is triggered. The Cloud Function looks up a DAG dependency mapping information that is kept in some sort of data storage. In this solution, a GCS bucket is used to store the dependency mapping file, but any transactional database can be chosen to store the mapping. An object is created for each source workflow and named as {source_workflow_id}.json. Below is the format of the data:

{
"source_workflow_id": "dag_id_1_upstream",
"target": {
"on_success" : [
{
"environment_name": "composer-env-1",
"dag_id": "dag_id_2_downstream"
}
],
"on_failure": [
{
"environment_name": "composer-env-1",
"dag_id": "dag_id_3_downstream"
}
]
}
}

The environment to URL mapping is stored on a separate GCS object (composer-url-mapping-{env}.json). For each environment, project_id is the project where Composer is instantiated and the URL is the Composer web UI. The URL can be copied from Airflow web UI properties in the ENVIRONMENT CONFIGURATION.

{
"composer-env-1": {
"project_id": <<project_id>>,
"url": <<Ariflow web UI>>
}
}
Figure 4: ‘Airflow web UI’ under ENVIRONMENT CONFIGURATION

The upstream workflow for this pattern is the same as pattern 1. When the workflow completes, it posts a success or error message. The downstream workflow is not scheduled, but it is triggered based on the condition specified in the dependency mapping. Full code for pattern 2 here.

Extending Pattern 2

Here’s another recommended version of pattern 2 if you can’t change the upstream pipeline, or the team that owns the upstream pipeline is unwilling to update it. It can be achieved by deploying a new intermediate DAG (named ({upstream-dag-name}-dependency-trigger.py) that publishes the message in the PubSub topic.

Figure 5: Architecture of Extending Pattern 2

Within the new intermediate DAG (deployed in the same environment as the upstream DAG), ExternalSensor operator can poke the existing upstream DAG. When the first task from the new DAG meets the sensor condition, it triggers the second task to publish a PubSub message.

Let’s say there is an existing production DAG called ‘Workflow_1’, a new DAG ‘Workflow_2’ is created in the same Composer environment. This workflow uses an ExternalSensor operator to detect whether the upstream task (i.e. the task to be sensed) has passed or failed, and then publishes a Pub/Sub message accordingly.

Upstream Workflow that publishes the message in Pub/Sub(excerpt of DAG 2 code)

PUBSUB_PROJECT = os.environ['PUBSUB_PROJECT']
AIRFLOW_STATUS_TOPIC = os.environ['AIRFLOW_STATUS_TOPIC']

external_task_sensor = ExternalTaskSensor(
task_id='external_task_sensor',
external_task_id='task_to_be_sensed',
external_dag_id='workflow_1',
dag=dag)

# below task will send the message to PubSub
publish_message = BashOperator(task_id='task_downstream_another_env',
on_success_callback= publish_success_msg_pubsub,
bash_command="echo 'task after met condition on the external sensor...'",
dag=dag)

external_task_sensor >> publish_message

When a message with the custom attributes is published in the PubSub, a Cloud Function triggers the downstream pipeline. This is done in the same way as shown in the previous Pattern 2 example. Full code for pattern 2 extended.

Conclusion

The architectural patterns discussed in this guide can assist Google Cloud developers in implementing cross-cluster DAG dependencies in situations when the interdependent upstream and downstream DAGs are located in distinct Composer environments. Code snippets provided in the Git repository can be used as a base and tailored solutions can be added on top of them to satisfy any particular requirements.

--

--