AIRFLOW The TaskFlow API in Airflow
TaskFlow lets you write DAGs as plain Python functions. Rewrite a real DAG with TaskFlow and see the difference.
What we're doing
You'll see how much boilerplate the classic style creates, then rewrite the same DAG with TaskFlow and compare them side by side.
Step 1: The problem with the classic style
This is the XCom DAG you wrote in the previous tutorial:
def extract():
return 42
def transform(**context):
ti = context["ti"]
count = ti.xcom_pull(task_ids="extract")
return count * 2
with DAG(...) as dag:
t1 = PythonOperator(task_id="extract", python_callable=extract)
t2 = PythonOperator(task_id="transform", python_callable=transform)
t1 >> t2
- You write the function
- You wrap it in
PythonOperatorwithtask_idandpython_callable - You access
context["ti"]to get the task instance - You call
xcom_pullwith the righttask_idsstring - One typo in that string and the pull silently returns
None
Step 2: What TaskFlow is
TaskFlow is a decorator-based API that turns plain Python functions into Airflow tasks automatically.
Two decorators:
@dag— turns a function into a DAG@task— turns a function into a task
When you decorate a function with @task, it becomes a task. When you call that function inside the DAG, you're not actually running it, you're telling Airflow that there is a task and what it depends on.
The two big wins:
- No more PythonOperator wrapping —
@taskdoes it for you - No more manual
xcom_pull— passing return values between tasks happens through normal Python function arguments
Step 3: Create the DAG file
Click VS Code in the environment panel. Right click on the dags folder and create a new file called taskflow_dag.py.
Step 4: Add the imports
from airflow.decorators import dag, task
from datetime import datetime
Two imports — dag and task decorators from airflow.decorators. That's it. No DAG class, no PythonOperator , since TaskFlow handles all of that.
Step 5: Write the tasks as decorated functions
@task
def extract():
record_count = 42
print(f"Extracted {record_count} records")
return record_count
@task
def transform(count: int):
print(f"Transforming {count} records...")
return count * 2
- No
**context, noxcom_pull. Thetransformfunction takescountas a plain Python argument - The
@taskdecorator above each function is what makes it an Airflow task.
Step 6: Define the DAG
@dag(
dag_id="taskflow_dag",
start_date=datetime(2024, 1, 1),
schedule="@daily",
catchup=False
)
def my_pipeline():
count = extract()
transform(count)
my_pipeline()
@dag(...)— the decorator that turns the function into a DAG. Same parameters you've always used (dag_id,start_date,schedule,catchup)my_pipeline()inside the function —count = extract()is the push,transform(count)is the pull. Just normal Python function callsmy_pipeline()at the bottom — this last line actually registers the DAG with Airflow
Save with Ctrl+S.
Step 8: Trigger it and verify
Open the Airflow UI from the environment panel. Find taskflow_dag on the DAGs page and trigger it with the play button.
Open the Graph view — you'll see exactly what you'd expect: extract → transform. TaskFlow figured out the dependency from the function call, no >> needed.
Check the logs:
extract:Extracted 42 recordstransform:Transforming 42 records...
When to use TaskFlow vs the classic style
Use TaskFlow when:
- Your tasks are Python functions
- You want clean and readable pipeline code
- You're passing data between tasks
Stick with the classic style when:
- You need non-Python operators (BashOperator, EmailOperator, sensors)
- You want explicit control over task_id, retries, or operator-specific parameters
- You're modifying an existing classic-style DAG and don't want to rewrite it
After hibernation
If the VM hibernates, reconnect and run in the VS Code terminal:
cd ~/airflow
docker compose up -d
What's next
Start Airflow