AIRFLOW Retries, SLAs, and failure notifications in Airflow
Real pipelines fail, APIs time out, networks blip, databases get slow. Airflow has built-in tools to handle this. Configure all three on a real DAG.
What we're doing
You'll learn how retries, SLAs, and failure notifications work, then build a DAG that intentionally fails so you can watch all three in action.
Step 1: Retries and backoff
The simplest reliability feature, we tell Airflow how many times to retry a failed task and how long to wait between attempts.
from datetime import timedelta
task = PythonOperator(
task_id="flaky_task",
python_callable=my_function,
retries=3,
retry_delay=timedelta(minutes=2)
)
retries=3— if the task fails, try it up to 3 more times before giving upretry_delay=timedelta(minutes=2)— wait 2 minutes between each attempt
Exponential backoff
For services that get more upset the harder you hammer them, use exponential backoff:
task = PythonOperator(
task_id="flaky_task",
python_callable=my_function,
retries=5,
retry_delay=timedelta(seconds=30),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=10)
)
retry_exponential_backoff=True— double the delay between attempts (30s, 1m, 2m, 4m...)max_retry_delay— cap so the delay doesn't grow forever
Step 2: SLAs and knowing when something is late
A task can succeed but still be a problem if it took too long. Maybe the daily report DAG needs to finish by 8 AM, and If it's still running at 10 AM, even though no task has failed, that's a problem.
SLAs (Service Level Agreements) tell Airflow how long a task should take.
task = PythonOperator(
task_id="daily_report",
python_callable=generate_report,
sla=timedelta(hours=1)
)
sla=timedelta(hours=1)— if this task takes more than 1 hour to finish, it's an SLA miss
SLA misses don't fail the task. Airflow logs the miss, shows it in the UI, and can trigger a callback to notify someone.
Step 3: Failure notifications
Retries handle the easy stuff automatically.
Airflow has a callback parameter that runs a Python function on failure:
def on_failure(context):
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
print(f"ALERT: {dag_id}.{task_id} failed!")
# In real life, post to Slack, send an email, page someone
task = PythonOperator(
task_id="critical_task",
python_callable=my_function,
retries=3,
retry_delay=timedelta(minutes=2),
on_failure_callback=on_failure
)
on_failure_callback=on_failure— Airflow calls this function when the task ultimately fails (after all retries)- The callback receives a
contextdictionary with details about the failed task
You can also set on_failure_callback at the DAG level so it applies to every task in the DAG without repeating yourself.
Step 4: Create the DAG file
Click VS Code in the environment panel. Right click on the dags folder and create a new file called reliability_dag.py.
Step 5: Add the imports and the failing function
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import random
random is just so we can simulate intermittent failures.
Now a function that fails most of the time:
def flaky_task():
if random.random() < 0.7:
print("Simulating a flaky API")
raise Exception("Simulated failure")
print("Task succeeded!")
Step 6: Add the failure callback
def on_failure(context):
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
print(f"ALERT: {dag_id}.{task_id} failed after all retries!")
Step 7: Define the DAG
with DAG(
dag_id="reliability_dag",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
default_args={
"retries": 3,
"retry_delay": timedelta(seconds=10),
"on_failure_callback": on_failure,
"sla": timedelta(minutes=5)
}
) as dag:
task = PythonOperator(
task_id="flaky_task",
python_callable=flaky_task
)
retries=3— every task in this DAG retries 3 timesretry_delay=timedelta(seconds=10)— wait 10 seconds between attempts (short for the demo)on_failure_callback=on_failure— call our alert function when a task ultimately failssla=timedelta(minutes=5)— flag tasks that take more than 5 minutesdefault_argsis a dictionary that applies to every task in the DAG. Cleaner than repeating the same settings on each operator.
Save with Ctrl+S.
Step 8: Trigger and watch the retries
Open the Airflow UI from the environment panel. Find reliability_dag on the DAGs page and trigger it with the play button.
Open the Graph view. The task will start running. If it fails, you'll see it turn orange/yellow with a retry icon. Airflow is going to wait 10 seconds and try again.
Click on the task → Logs. Each attempt has its own log section, marked as Attempt 1, Attempt 2, and so on.
After hibernation
If the VM hibernates, reconnect and run in the VS Code terminal:
cd ~/airflow
docker compose up -d
What's next
Start Airflow