AIRFLOW

Branching and conditional flows in Airflow

Pipelines don't always run in a straight line and sometimes you need to take one path or another based on a condition. Branching is how Airflow handles that. You'll build a DAG that picks a path at runtime.

What we're doing

You'll learn how branching works, why trigger rules matter when you converge paths, and build a DAG that picks one of two branches based on a real condition.

Step 1: How branching works

BranchPythonOperator is a special operator that runs a Python function and returns the task_id of the next task to run.

def choose_branch():
    if some_condition:
        return "task_a"
    else:
        return "task_b"

branch = BranchPythonOperator(
    task_id="choose_branch",
    python_callable=choose_branch
)

branch >> [task_a, task_b]

When the DAG runs:

  1. choose_branch evaluates the condition and returns "task_a" or "task_b"
  2. Airflow runs the task whose id was returned
  3. The other branch is skipped — it never runs and doesn't fail

You can also return a list of task_ids to run multiple branches in parallel.

Step 2: Trigger rules

If your branches converge into a final task, that task by default expects all of its upstream tasks to have succeeded. But if one branch was skipped, that's not "succeeded", and the summary will also be skipped.

Every task in Airflow has a trigger_rule parameter that controls when it runs:

  • all_success (default) — all upstream tasks succeeded
  • all_failed — all upstream tasks failed
  • one_success — at least one upstream succeeded
  • none_failed — no upstream failed (skipped is okay)
  • none_failed_min_one_success — no upstream failed AND at least one succeeded

Step 3: Create the DAG file

Click VS Code in the environment panel. Right click on the dags folder and create a new file called branching_dag.py.

Step 4: Add the imports

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from datetime import datetime

BranchPythonOperator is the new one, same import path as PythonOperator.

Step 5: Write the branch function

def choose_path():
    today = datetime.now().weekday()  # Monday=0, Sunday=6
    if today < 5:
        print("It's a weekday — running daily report path")
        return "weekday_task"
    else:
        print("It's a weekend — running weekend report path")
        return "weekend_task"

This function decides the path, it checks the current day of the week Python's weekday() and returns 0 for Monday through 6 for Sunday.

Step 6: Write the branch tasks and the summary

def weekday_report():
    print("Running full daily report...")

def weekend_report():
    print("Running lightweight weekend check...")

def summarize():
    print("Pipeline finished — branch complete")

Three normal functions. weekday_report and weekend_report are the two possible paths. summarize runs at the end no matter which branch ran.

Step 7: Define the DAG with the branching pattern

with DAG(
    dag_id="branching_dag",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False
) as dag:

    branch = BranchPythonOperator(
        task_id="branch",
        python_callable=choose_path
    )

    weekday_task = PythonOperator(
        task_id="weekday_task",
        python_callable=weekday_report
    )

    weekend_task = PythonOperator(
        task_id="weekend_task",
        python_callable=weekend_report
    )

    summary = PythonOperator(
        task_id="summary",
        python_callable=summarize,
        trigger_rule="none_failed_min_one_success"
    )

    branch >> [weekday_task, weekend_task] >> summary
  • branch >> [weekday_task, weekend_task] — both branch tasks are wired up.
  • [...] >> summary — both branches converge to summary
  • trigger_rule="none_failed_min_one_success" without this, summary would be skipped because one of the upstream branches was skipped

Save with Ctrl+S.

Step 8: Trigger it and watch the branching

Open the Airflow UI from the environment panel. Find branching_dag on the DAGs page and trigger it with the play button.

Open the Graph view. You'll see four tasks — branch, weekday_task, weekend_task, and summary.

Watch the colors:

  • branch runs first and turns green
  • Based on today's day, one of weekday_task or weekend_task turns green
  • The other one shows up as pink — that's the skipped state
  • summary turns green because of the trigger rule

After hibernation

If the VM hibernates, reconnect and run in the VS Code terminal:

cd ~/airflow
docker compose up -d

What's next

Now go and try this out in a live environment — boot a fresh cluster and play with the manifests above.

Start Airflow
Spec 2 CPU / 8 GiB ·Disk 25 GiB ·Lifetime 7 days
Before you start — your account isn't verified, so outbound internet will be restricted. We recommend verifying for free to enable it.
Sign in to launch this environment
Required 1 VM · 2 CPU · 8 GB · 25 GiB disk
Available 1 VM · 1 CPU · 2 GB · 10 GiB disk
Sign in