AIRFLOW Branching and conditional flows in Airflow
Pipelines don't always run in a straight line and sometimes you need to take one path or another based on a condition. Branching is how Airflow handles that. You'll build a DAG that picks a path at runtime.
What we're doing
You'll learn how branching works, why trigger rules matter when you converge paths, and build a DAG that picks one of two branches based on a real condition.
Step 1: How branching works
BranchPythonOperator is a special operator that runs a Python function and returns the task_id of the next task to run.
def choose_branch():
if some_condition:
return "task_a"
else:
return "task_b"
branch = BranchPythonOperator(
task_id="choose_branch",
python_callable=choose_branch
)
branch >> [task_a, task_b]
When the DAG runs:
choose_branchevaluates the condition and returns"task_a"or"task_b"- Airflow runs the task whose id was returned
- The other branch is skipped — it never runs and doesn't fail
You can also return a list of task_ids to run multiple branches in parallel.
Step 2: Trigger rules
If your branches converge into a final task, that task by default expects all of its upstream tasks to have succeeded. But if one branch was skipped, that's not "succeeded", and the summary will also be skipped.
Every task in Airflow has a trigger_rule parameter that controls when it runs:
all_success(default) — all upstream tasks succeededall_failed— all upstream tasks failedone_success— at least one upstream succeedednone_failed— no upstream failed (skipped is okay)none_failed_min_one_success— no upstream failed AND at least one succeeded
Step 3: Create the DAG file
Click VS Code in the environment panel. Right click on the dags folder and create a new file called branching_dag.py.
Step 4: Add the imports
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from datetime import datetime
BranchPythonOperator is the new one, same import path as PythonOperator.
Step 5: Write the branch function
def choose_path():
today = datetime.now().weekday() # Monday=0, Sunday=6
if today < 5:
print("It's a weekday — running daily report path")
return "weekday_task"
else:
print("It's a weekend — running weekend report path")
return "weekend_task"
This function decides the path, it checks the current day of the week Python's weekday() and returns 0 for Monday through 6 for Sunday.
Step 6: Write the branch tasks and the summary
def weekday_report():
print("Running full daily report...")
def weekend_report():
print("Running lightweight weekend check...")
def summarize():
print("Pipeline finished — branch complete")
Three normal functions. weekday_report and weekend_report are the two possible paths. summarize runs at the end no matter which branch ran.
Step 7: Define the DAG with the branching pattern
with DAG(
dag_id="branching_dag",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False
) as dag:
branch = BranchPythonOperator(
task_id="branch",
python_callable=choose_path
)
weekday_task = PythonOperator(
task_id="weekday_task",
python_callable=weekday_report
)
weekend_task = PythonOperator(
task_id="weekend_task",
python_callable=weekend_report
)
summary = PythonOperator(
task_id="summary",
python_callable=summarize,
trigger_rule="none_failed_min_one_success"
)
branch >> [weekday_task, weekend_task] >> summary
branch >> [weekday_task, weekend_task]— both branch tasks are wired up.[...] >> summary— both branches converge to summarytrigger_rule="none_failed_min_one_success"without this, summary would be skipped because one of the upstream branches was skipped
Save with Ctrl+S.
Step 8: Trigger it and watch the branching
Open the Airflow UI from the environment panel. Find branching_dag on the DAGs page and trigger it with the play button.
Open the Graph view. You'll see four tasks — branch, weekday_task, weekend_task, and summary.
Watch the colors:
branchruns first and turns green- Based on today's day, one of
weekday_taskorweekend_taskturns green - The other one shows up as pink — that's the skipped state
summaryturns green because of the trigger rule
After hibernation
If the VM hibernates, reconnect and run in the VS Code terminal:
cd ~/airflow
docker compose up -d
What's next
Start Airflow