AIRFLOW

Retries, SLAs, and failure notifications in Airflow

Real pipelines fail, APIs time out, networks blip, databases get slow. Airflow has built-in tools to handle this. Configure all three on a real DAG.

What we're doing

You'll learn how retries, SLAs, and failure notifications work, then build a DAG that intentionally fails so you can watch all three in action.

Step 1: Retries and backoff

The simplest reliability feature, we tell Airflow how many times to retry a failed task and how long to wait between attempts.

from datetime import timedelta

task = PythonOperator(
    task_id="flaky_task",
    python_callable=my_function,
    retries=3,
    retry_delay=timedelta(minutes=2)
)
  • retries=3 — if the task fails, try it up to 3 more times before giving up
  • retry_delay=timedelta(minutes=2) — wait 2 minutes between each attempt

Exponential backoff

For services that get more upset the harder you hammer them, use exponential backoff:

task = PythonOperator(
    task_id="flaky_task",
    python_callable=my_function,
    retries=5,
    retry_delay=timedelta(seconds=30),
    retry_exponential_backoff=True,
    max_retry_delay=timedelta(minutes=10)
)
  • retry_exponential_backoff=True — double the delay between attempts (30s, 1m, 2m, 4m...)
  • max_retry_delay — cap so the delay doesn't grow forever

Step 2: SLAs and knowing when something is late

A task can succeed but still be a problem if it took too long. Maybe the daily report DAG needs to finish by 8 AM, and If it's still running at 10 AM, even though no task has failed, that's a problem.

SLAs (Service Level Agreements) tell Airflow how long a task should take.

task = PythonOperator(
    task_id="daily_report",
    python_callable=generate_report,
    sla=timedelta(hours=1)
)
  • sla=timedelta(hours=1) — if this task takes more than 1 hour to finish, it's an SLA miss

SLA misses don't fail the task. Airflow logs the miss, shows it in the UI, and can trigger a callback to notify someone.

Step 3: Failure notifications

Retries handle the easy stuff automatically.

Airflow has a callback parameter that runs a Python function on failure:

def on_failure(context):
    dag_id = context["dag"].dag_id
    task_id = context["task_instance"].task_id
    print(f"ALERT: {dag_id}.{task_id} failed!")
    # In real life, post to Slack, send an email, page someone

task = PythonOperator(
    task_id="critical_task",
    python_callable=my_function,
    retries=3,
    retry_delay=timedelta(minutes=2),
    on_failure_callback=on_failure
)
  • on_failure_callback=on_failure — Airflow calls this function when the task ultimately fails (after all retries)
  • The callback receives a context dictionary with details about the failed task

You can also set on_failure_callback at the DAG level so it applies to every task in the DAG without repeating yourself.

Step 4: Create the DAG file

Click VS Code in the environment panel. Right click on the dags folder and create a new file called reliability_dag.py.

Step 5: Add the imports and the failing function

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import random

random is just so we can simulate intermittent failures.

Now a function that fails most of the time:

def flaky_task():
    if random.random() < 0.7:
        print("Simulating a flaky API")
        raise Exception("Simulated failure")
    print("Task succeeded!")

Step 6: Add the failure callback

def on_failure(context):
    dag_id = context["dag"].dag_id
    task_id = context["task_instance"].task_id
    print(f"ALERT: {dag_id}.{task_id} failed after all retries!")

Step 7: Define the DAG

with DAG(
    dag_id="reliability_dag",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
    default_args={
        "retries": 3,
        "retry_delay": timedelta(seconds=10),
        "on_failure_callback": on_failure,
        "sla": timedelta(minutes=5)
    }
) as dag:

    task = PythonOperator(
        task_id="flaky_task",
        python_callable=flaky_task
    )
  • retries=3 — every task in this DAG retries 3 times

  • retry_delay=timedelta(seconds=10) — wait 10 seconds between attempts (short for the demo)

  • on_failure_callback=on_failure — call our alert function when a task ultimately fails

  • sla=timedelta(minutes=5) — flag tasks that take more than 5 minutes

  • default_args is a dictionary that applies to every task in the DAG. Cleaner than repeating the same settings on each operator.

Save with Ctrl+S.

Step 8: Trigger and watch the retries

Open the Airflow UI from the environment panel. Find reliability_dag on the DAGs page and trigger it with the play button.

Open the Graph view. The task will start running. If it fails, you'll see it turn orange/yellow with a retry icon. Airflow is going to wait 10 seconds and try again.

Click on the task → Logs. Each attempt has its own log section, marked as Attempt 1, Attempt 2, and so on.

After hibernation

If the VM hibernates, reconnect and run in the VS Code terminal:

cd ~/airflow
docker compose up -d

What's next

Now go and try this out in a live environment — boot a fresh cluster and play with the manifests above.

Start Airflow
Spec 2 CPU / 8 GiB ·Disk 25 GiB ·Lifetime 7 days
Before you start — your account isn't verified, so outbound internet will be restricted. We recommend verifying for free to enable it.
Sign in to launch this environment
Required 1 VM · 2 CPU · 8 GB · 25 GiB disk
Available 1 VM · 1 CPU · 2 GB · 10 GiB disk
Sign in