AIRFLOW Dynamic DAGs and task mapping in Airflow
Sometimes you don't know how many tasks you'll need until runtime. Task mapping lets Airflow generate those tasks dynamically. You'll fan out real tasks over a list and watch Airflow spawn them.
What we're doing
You'll learn the difference between dynamic DAGs and dynamic task mapping, then build a DAG that fans out tasks over a list.
Step 1: The problem with static DAGs
So far every DAG we've built has had a fixed shape. You write extract → transform → load and that's what runs. Same three tasks every time.
Real pipelines often need:
- One task per CSV file in an S3 folder — but the file count changes daily
- One task per active customer — but the customer list isn't known until you query it
- One task per region in an API response — but the regions can grow or shrink
Airflow gives you two ways out — dynamic DAGs and dynamic task mapping.
Step 2: Two ways to make DAGs dynamic
Dynamic DAGs (parse-time)
The whole DAG structure is generated when Airflow parses the file. You write a Python loop that creates tasks based on a list:
for region in ["us", "eu", "asia"]:
PythonOperator(
task_id=f"process_{region}",
python_callable=process,
op_args=[region]
)
The list has to be known at parse time. It can't depend on the output of another task. If you read it from a database or an API, every parse hits that source.
Dynamic task mapping (run-time)
The mapped tasks are generated when the DAG runs, based on the output of an upstream task:
process.expand(region=get_regions())
get_regions() returns a list at runtime. Airflow takes that list and spawns one instance of process for each item. If get_regions returns 5 items, you get 5 mapped tasks.
Step 3: Create the DAG file
Click VS Code in the environment panel. Right click on the dags folder and create a new file called mapped_dag.py.
Step 4: Add the imports
from airflow.decorators import dag, task
from datetime import datetime
Just the TaskFlow decorators, no extra import for task mapping needed since it is built into it.
Step 5: Write the task that produces the list
@task
def get_files():
files = ["sales.csv", "users.csv", "events.csv", "logs.csv"]
print(f"Found {len(files)} files to process")
return files
Standard TaskFlow task. It returns a list. In a real pipeline this could list files in S3, run a database query and so on.
Step 6: Write the task that processes one item
@task
def process_file(filename: str):
print(f"Processing {filename}...")
return f"Done: {filename}"
This task takes one filename and processes it. It takes a single item, not a list. Airflow calls this function once per item in the input list.
Step 7: Define the DAG with .expand()
@dag(
dag_id="mapped_dag",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False
)
def file_pipeline():
files = get_files()
process_file.expand(filename=files)
file_pipeline()
The key line is process_file.expand(filename=files).
.expand()is what tells Airflow to map this task over the input listfilename=files— for each item infiles, runprocess_filewith that item as thefilenameargument- If
fileshas 4 items, Airflow creates 4 mapped instances ofprocess_file— they run in parallel
Save with Ctrl+S.
Step 9: Trigger and watch the mapped tasks
Open the Airflow UI from the environment panel. Find mapped_dag on the DAGs page and trigger it with the play button.
Open the Graph view. You'll see two tasks — get_files and process_file. But notice the small number next to process_file, it shows how many mapped instances there are. With 4 files, you'll see [4].
Click on process_file and you'll see a Mapped Tasks tab. Each instance is numbered — 0, 1, 2, 3. Click any one to see its logs:
- Instance 0:
Processing sales.csv... - Instance 1:
Processing users.csv... - Instance 2:
Processing events.csv... - Instance 3:
Processing logs.csv...
Each mapped task got one filename from the list.
Step 10: Change the list and re-run
Edit get_files to return more items:
files = ["sales.csv", "users.csv", "events.csv", "logs.csv", "products.csv", "orders.csv"]
Save, go back to the UI, and trigger the DAG again. Now you'll see 6 mapped instances of process_file. Airflow figured out the new size automatically.
After hibernation
If the VM hibernates, reconnect and run in the VS Code terminal:
cd ~/airflow
docker compose up -d
What's next
Start Airflow