AIRFLOW

Dynamic DAGs and task mapping in Airflow

Sometimes you don't know how many tasks you'll need until runtime. Task mapping lets Airflow generate those tasks dynamically. You'll fan out real tasks over a list and watch Airflow spawn them.

What we're doing

You'll learn the difference between dynamic DAGs and dynamic task mapping, then build a DAG that fans out tasks over a list.

Step 1: The problem with static DAGs

So far every DAG we've built has had a fixed shape. You write extract → transform → load and that's what runs. Same three tasks every time.

Real pipelines often need:

  • One task per CSV file in an S3 folder — but the file count changes daily
  • One task per active customer — but the customer list isn't known until you query it
  • One task per region in an API response — but the regions can grow or shrink

Airflow gives you two ways out — dynamic DAGs and dynamic task mapping.

Step 2: Two ways to make DAGs dynamic

Dynamic DAGs (parse-time)

The whole DAG structure is generated when Airflow parses the file. You write a Python loop that creates tasks based on a list:

for region in ["us", "eu", "asia"]:
    PythonOperator(
        task_id=f"process_{region}",
        python_callable=process,
        op_args=[region]
    )

The list has to be known at parse time. It can't depend on the output of another task. If you read it from a database or an API, every parse hits that source.

Dynamic task mapping (run-time)

The mapped tasks are generated when the DAG runs, based on the output of an upstream task:

process.expand(region=get_regions())

get_regions() returns a list at runtime. Airflow takes that list and spawns one instance of process for each item. If get_regions returns 5 items, you get 5 mapped tasks.

Step 3: Create the DAG file

Click VS Code in the environment panel. Right click on the dags folder and create a new file called mapped_dag.py.

Step 4: Add the imports

from airflow.decorators import dag, task
from datetime import datetime

Just the TaskFlow decorators, no extra import for task mapping needed since it is built into it.

Step 5: Write the task that produces the list

@task
def get_files():
    files = ["sales.csv", "users.csv", "events.csv", "logs.csv"]
    print(f"Found {len(files)} files to process")
    return files

Standard TaskFlow task. It returns a list. In a real pipeline this could list files in S3, run a database query and so on.

Step 6: Write the task that processes one item

@task
def process_file(filename: str):
    print(f"Processing {filename}...")
    return f"Done: {filename}"

This task takes one filename and processes it. It takes a single item, not a list. Airflow calls this function once per item in the input list.

Step 7: Define the DAG with .expand()

@dag(
    dag_id="mapped_dag",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False
)
def file_pipeline():
    files = get_files()
    process_file.expand(filename=files)

file_pipeline()

The key line is process_file.expand(filename=files).

  • .expand() is what tells Airflow to map this task over the input list
  • filename=files — for each item in files, run process_file with that item as the filename argument
  • If files has 4 items, Airflow creates 4 mapped instances of process_file — they run in parallel

Save with Ctrl+S.

Step 9: Trigger and watch the mapped tasks

Open the Airflow UI from the environment panel. Find mapped_dag on the DAGs page and trigger it with the play button.

Open the Graph view. You'll see two tasks — get_files and process_file. But notice the small number next to process_file, it shows how many mapped instances there are. With 4 files, you'll see [4].

Click on process_file and you'll see a Mapped Tasks tab. Each instance is numbered — 0, 1, 2, 3. Click any one to see its logs:

  • Instance 0: Processing sales.csv...
  • Instance 1: Processing users.csv...
  • Instance 2: Processing events.csv...
  • Instance 3: Processing logs.csv...

Each mapped task got one filename from the list.

Step 10: Change the list and re-run

Edit get_files to return more items:

files = ["sales.csv", "users.csv", "events.csv", "logs.csv", "products.csv", "orders.csv"]

Save, go back to the UI, and trigger the DAG again. Now you'll see 6 mapped instances of process_file. Airflow figured out the new size automatically.

After hibernation

If the VM hibernates, reconnect and run in the VS Code terminal:

cd ~/airflow
docker compose up -d

What's next

Now go and try this out in a live environment — boot a fresh cluster and play with the manifests above.

Start Airflow
Spec 2 CPU / 8 GiB ·Disk 25 GiB ·Lifetime 7 days
Sign in to launch this environment
Required 1 VM · 2 CPU · 8 GB · 25 GiB disk
Your plan (free) 1 VM · 1 CPU · 2 GB · 10 GiB disk
Blocked by: maxCPU, maxMemoryMi, maxDiskGi
Sign in