AIRFLOW Connections, hooks and providers in Airflow
DAGs need to talk to real systems such as databases, APIs, S3. Connections store the credentials, hooks let you use them in Python, providers ship pre-built integrations for everything common. Connect a DAG to a real Postgres database on the VM.
What we're doing
You'll learn what connections, hooks, and providers are, set up a Postgres connection in the UI, and write a DAG that inserts and queries data using a hook.
Step 1: Hardcoded credentials
When your DAG needs to talk to a database, the wrong way looks like this:
import psycopg2
def insert_data():
conn = psycopg2.connect(
host="db.example.com",
port=5432,
user="admin",
password="supersecret123",
database="prod"
)
Three problems with this:
- Passwords in code — anyone with repo access has your production password
- Hardcoded for one environment — different passwords for dev, staging, and prod mean different code
- Reinventing the wheel — you write the same connection logic for every DAG that talks to that database
Airflow's right way:
- Connections — store credentials outside the code, in Airflow's database
- Hooks — Python wrappers that use those connections
- Providers — pre-built packages that ship hooks and operators for common systems
Step 2: What connections, hooks, and providers are
Connections
A connection is a saved set of credentials with a name. You store it once in Airflow — host, port, login, password, etc. — and refer to it by name from any DAG.
You can create connections in the UI under Admin → Connections, or with the CLI:
airflow connections add my_postgres \
--conn-type postgres \
--conn-host postgres \
--conn-login airflow \
--conn-password airflow \
--conn-port 5432
Hooks
A hook is a Python class that uses a connection to talk to an external system. You give it the connection name, it gives you a working client.
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="my_postgres")
records = hook.get_records("SELECT * FROM my_table")
Hooks expose simple methods — get_records, get_first, run, insert_rows — so you don't write raw cursor.execute and conn.commit calls.
Providers
A provider is a Python package that ships hooks and operators for a specific service. The Postgres provider gives you PostgresHook and PostgresOperator. The AWS provider gives you S3Hook, RedshiftHook, and dozens more. Same pattern for Snowflake, Slack, MongoDB, etc.
Step 3: Create a Postgres connection in the UI
Open the Airflow UI from the environment panel. Go to Admin → Connections and click the + button.
Fill in:
- Connection Id:
my_postgres - Connection Type:
Postgres - Host:
postgres - Database:
airflow - Login:
airflow - Password:
airflow - Port:
5432
Click Test to verify, then Save.
Step 4: Create the DAG file
Click VS Code in the environment panel. Right click on the dags folder and create a new file called postgres_dag.py.
Step 5: Add the imports
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
Step 6: Write the create-table task
@task
def create_table():
hook = PostgresHook(postgres_conn_id="my_postgres")
hook.run("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
created_at TIMESTAMP DEFAULT NOW()
)
""")
print("Table ready")
PostgresHook(postgres_conn_id="my_postgres")— instantiates the hook using the connection we just created. The name has to match exactlyhook.run(...)— executes a SQL statement. No cursor, no commit — the hook handles all of thatCREATE TABLE IF NOT EXISTS— safe to run repeatedly, won't error if the table already exists
Step 7: Write the insert and read tasks
@task
def insert_users():
hook = PostgresHook(postgres_conn_id="my_postgres")
hook.run("INSERT INTO users (name) VALUES ('Alice'), ('Bob'), ('Charlie')")
print("Inserted 3 users")
@task
def read_users():
hook = PostgresHook(postgres_conn_id="my_postgres")
records = hook.get_records("SELECT id, name FROM users")
for row in records:
print(f"User: id={row[0]}, name={row[1]}")
hook.run(INSERT ...)— executes an insert. No return value neededhook.get_records(SELECT ...)— returns a list of rows. Each row is a tuple of column values
Step 8: Define the DAG
@dag(
dag_id="postgres_dag",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False
)
def postgres_pipeline():
create_table() >> insert_users() >> read_users()
postgres_pipeline()
schedule=None — manual triggers only
create_table() >> insert_users() >> read_users() is a TaskFlow with the classic >> syntax. This is fine and common when tasks don't pass return values to each other.
Save with Ctrl+S.
Step 10: Trigger it and verify
Open the Airflow UI, find postgres_dag on the DAGs page, and trigger it with the play button.
Open the Graph view and wait for all three tasks to go green. Then check the logs:
create_table:Table readyinsert_users:Inserted 3 usersread_users:User: id=1, name=Alice(and so on)
Step 11: Verify directly in the database
In the VS Code terminal:
docker exec -it airflow-postgres-1 psql -U airflow -d airflow -c "SELECT * FROM users;"
You'll see the same three rows. The data your DAG inserted is real, persisted in Postgres.
After hibernation
If the VM hibernates, reconnect and run in the VS Code terminal:
cd ~/airflow
docker compose up -d
What's next
Start Airflow