Agent Skills
Discover and share powerful Agent Skills for AI assistants
airflow-dag-patterns - Agent Skill - Agent Skills
Home/ Skills / airflow-dag-patterns Build production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use when creating data pipelines, orchestrating workflows, or scheduling batch jobs.
Use the skills CLI to install this skill with one command. Auto-detects all installed AI assistants.
Method 1 - skills CLI
npx skills i wshobson/agents/plugins/data-engineering/skills/airflow-dag-patterns CopyMethod 2 - openskills (supports sync & update)
npx openskills install wshobson/agents CopyAuto-detects Claude Code, Cursor, Codex CLI, Gemini CLI, and more. One install, works everywhere.
Installation Path
Download and extract to one of the following locations:
Claude Code Cursor OpenCode Gemini CLI Codex CLI
~/.claude/skills/airflow-dag-patterns/ Back No setup needed. Let our cloud agents run this skill for you.
Select Model
Claude Haiku 4.5 $0.10 Claude Sonnet 4.5 $0.20 Claude Opus 4.5 $0.50 Claude Sonnet 4.5 $0.20 /task
Best for coding tasks
Try NowNo setup required
Apache Airflow DAG Patterns
Production-ready patterns for Apache Airflow including DAG design, operators, sensors, testing, and deployment strategies.
When to Use This Skill
Creating data pipeline orchestration with Airflow
Designing DAG structures and dependencies
Implementing custom operators and sensors
Testing Airflow DAGs locally
Setting up Airflow in production
Debugging failed DAG runs
Core Concepts
1. DAG Design Principles
Principle Description Idempotent Running twice produces same result Atomic Tasks succeed or fail completely Incremental Process only new/changed data Observable Logs, metrics, alerts at every step
2. Task Dependencies
# Linear
task1 >> task2 >> task3
# Fan-out
task1 >> [task2, task3, task4]
# Fan-in
[task1, task2, task3] >> task4
# Complex
task1 >> task2 >> task4
task1 >> task3 >> task4
Quick Start
# dags/example_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
default_args = {
'owner'
Patterns
Pattern 1: TaskFlow API (Airflow 2.0+)
# dags/taskflow_example.py
from datetime import datetime
from airflow.decorators import dag, task
from airflow.models import Variable
@dag (
dag_id
Pattern 2: Dynamic DAG Generation
# dags/dynamic_dag_factory.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python
Pattern 3: Branching and Conditional Logic
# dags/branching_example.py
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule
Pattern 4: Sensors and External Dependencies
# dags/sensor_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.filesystem
Pattern 5: Error Handling and Alerts
# dags/error_handling.py
from datetime import datetime, timedelta
from airflow import DAG
Pattern 6: Testing DAGs
# tests/test_dags.py
import pytest
from datetime import datetime
from airflow.models import DagBag
@pytest.fixture
def dagbag ():
return DagBag( dag_folder
Project Structure
airflow/
├── dags/
│ ├── __init__.py
│ ├── common/
│ │ ├── __init__.py
│ │ ├── operators.py # Custom operators
│ │ ├── sensors.py # Custom sensors
│ │ └── callbacks.py # Alert callbacks
│ ├── etl/
│ │ ├── customers.py
│ │ └── orders.py
│ └── ml/
│ └── training.py
├── plugins/
│ └── custom_plugin.py
├── tests/
│ ├── __init__.py
│ ├── test_dags.py
│ └── test_operators.py
├── docker-compose.yml
└── requirements.txt
Best Practices
Do's
Use TaskFlow API - Cleaner code, automatic XCom
Set timeouts - Prevent zombie tasks
Use mode='reschedule' - For sensors, free up workers
Test DAGs - Unit tests and integration tests
Idempotent tasks - Safe to retry
Don'ts
Don't use depends_on_past=True - Creates bottlenecks
Don't hardcode dates - Use {{ ds }} macros
Don't use global state - Tasks should be stateless
Don't skip catchup blindly - Understand implications
Don't put heavy logic in DAG file - Import from modules
Resources
:
'data-team'
,
'depends_on_past' : False ,
'email_on_failure' : True ,
'email_on_retry' : False ,
'retries' : 3 ,
'retry_delay' : timedelta( minutes = 5 ),
'retry_exponential_backoff' : True ,
'max_retry_delay' : timedelta( hours = 1 ),
}
with DAG(
dag_id = 'example_etl' ,
default_args = default_args,
description = 'Example ETL pipeline' ,
schedule = '0 6 * * *' , # Daily at 6 AM
start_date = datetime( 2024 , 1 , 1 ),
catchup = False ,
tags = [ 'etl' , 'example' ],
max_active_runs = 1 ,
) as dag:
start = EmptyOperator( task_id = 'start' )
def extract_data ( ** context):
execution_date = context[ 'ds' ]
# Extract logic here
return { 'records' : 1000 }
extract = PythonOperator(
task_id = 'extract' ,
python_callable = extract_data,
)
end = EmptyOperator( task_id = 'end' )
start >> extract >> end
=
'taskflow_etl'
,
schedule = '@daily' ,
start_date = datetime( 2024 , 1 , 1 ),
catchup = False ,
tags = [ 'etl' , 'taskflow' ],
)
def taskflow_etl ():
"""ETL pipeline using TaskFlow API"""
@task ()
def extract (source: str ) -> dict :
"""Extract data from source"""
import pandas as pd
df = pd.read_csv( f 's3://bucket/ { source } / {{ ds }} .csv' )
return { 'data' : df.to_dict(), 'rows' : len (df)}
@task ()
def transform (extracted: dict ) -> dict :
"""Transform extracted data"""
import pandas as pd
df = pd.DataFrame(extracted[ 'data' ])
df[ 'processed_at' ] = datetime.now()
df = df.dropna()
return { 'data' : df.to_dict(), 'rows' : len (df)}
@task ()
def load (transformed: dict , target: str ):
"""Load data to target"""
import pandas as pd
df = pd.DataFrame(transformed[ 'data' ])
df.to_parquet( f 's3://bucket/ { target } / {{ ds }} .parquet' )
return transformed[ 'rows' ]
@task ()
def notify (rows_loaded: int ):
"""Send notification"""
print ( f 'Loaded { rows_loaded } rows' )
# Define dependencies with XCom passing
extracted = extract( source = 'raw_data' )
transformed = transform(extracted)
loaded = load(transformed, target = 'processed_data' )
notify(loaded)
# Instantiate the DAG
taskflow_etl()
import
PythonOperator
from airflow.models import Variable
import json
# Configuration for multiple similar pipelines
PIPELINE_CONFIGS = [
{ 'name' : 'customers' , 'schedule' : '@daily' , 'source' : 's3://raw/customers' },
{ 'name' : 'orders' , 'schedule' : '@hourly' , 'source' : 's3://raw/orders' },
{ 'name' : 'products' , 'schedule' : '@weekly' , 'source' : 's3://raw/products' },
]
def create_dag (config: dict ) -> DAG :
"""Factory function to create DAGs from config"""
dag_id = f "etl_ { config[ 'name' ] } "
default_args = {
'owner' : 'data-team' ,
'retries' : 3 ,
'retry_delay' : timedelta( minutes = 5 ),
}
dag = DAG(
dag_id = dag_id,
default_args = default_args,
schedule = config[ 'schedule' ],
start_date = datetime( 2024 , 1 , 1 ),
catchup = False ,
tags = [ 'etl' , 'dynamic' , config[ 'name' ]],
)
with dag:
def extract_fn (source, ** context):
print ( f "Extracting from { source } for { context[ 'ds' ] } " )
def transform_fn ( ** context):
print ( f "Transforming data for { context[ 'ds' ] } " )
def load_fn (table_name, ** context):
print ( f "Loading to { table_name } for { context[ 'ds' ] } " )
extract = PythonOperator(
task_id = 'extract' ,
python_callable = extract_fn,
op_kwargs = { 'source' : config[ 'source' ]},
)
transform = PythonOperator(
task_id = 'transform' ,
python_callable = transform_fn,
)
load = PythonOperator(
task_id = 'load' ,
python_callable = load_fn,
op_kwargs = { 'table_name' : config[ 'name' ]},
)
extract >> transform >> load
return dag
# Generate DAGs
for config in PIPELINE_CONFIGS :
globals ()[ f "dag_ { config[ 'name' ] } " ] = create_dag(config)
import
TriggerRule
@dag (
dag_id = 'branching_pipeline' ,
schedule = '@daily' ,
start_date = datetime( 2024 , 1 , 1 ),
catchup = False ,
)
def branching_pipeline ():
@task ()
def check_data_quality () -> dict :
"""Check data quality and return metrics"""
quality_score = 0.95 # Simulated
return { 'score' : quality_score, 'rows' : 10000 }
def choose_branch ( ** context) -> str :
"""Determine which branch to execute"""
ti = context[ 'ti' ]
metrics = ti.xcom_pull( task_ids = 'check_data_quality' )
if metrics[ 'score' ] >= 0.9 :
return 'high_quality_path'
elif metrics[ 'score' ] >= 0.7 :
return 'medium_quality_path'
else :
return 'low_quality_path'
quality_check = check_data_quality()
branch = BranchPythonOperator(
task_id = 'branch' ,
python_callable = choose_branch,
)
high_quality = EmptyOperator( task_id = 'high_quality_path' )
medium_quality = EmptyOperator( task_id = 'medium_quality_path' )
low_quality = EmptyOperator( task_id = 'low_quality_path' )
# Join point - runs after any branch completes
join = EmptyOperator(
task_id = 'join' ,
trigger_rule = TriggerRule. NONE_FAILED_MIN_ONE_SUCCESS ,
)
quality_check >> branch >> [high_quality, medium_quality, low_quality] >> join
branching_pipeline()
import
FileSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.python import PythonOperator
with DAG(
dag_id = 'sensor_example' ,
schedule = '@daily' ,
start_date = datetime( 2024 , 1 , 1 ),
catchup = False ,
) as dag:
# Wait for file on S3
wait_for_file = S3KeySensor(
task_id = 'wait_for_s3_file' ,
bucket_name = 'data-lake' ,
bucket_key = 'raw/ {{ ds }} /data.parquet' ,
aws_conn_id = 'aws_default' ,
timeout = 60 * 60 * 2 , # 2 hours
poke_interval = 60 * 5 , # Check every 5 minutes
mode = 'reschedule' , # Free up worker slot while waiting
)
# Wait for another DAG to complete
wait_for_upstream = ExternalTaskSensor(
task_id = 'wait_for_upstream_dag' ,
external_dag_id = 'upstream_etl' ,
external_task_id = 'final_task' ,
execution_date_fn =lambda dt: dt, # Same execution date
timeout = 60 * 60 * 3 ,
mode = 'reschedule' ,
)
# Custom sensor using @task.sensor decorator
@task.sensor ( poke_interval = 60 , timeout = 3600 , mode = 'reschedule' )
def wait_for_api () -> PokeReturnValue:
"""Custom sensor for API availability"""
import requests
response = requests.get( 'https://api.example.com/health' )
is_done = response.status_code == 200
return PokeReturnValue( is_done = is_done, xcom_value = response.json())
api_ready = wait_for_api()
def process_data ( ** context):
api_result = context[ 'ti' ].xcom_pull( task_ids = 'wait_for_api' )
print ( f "API returned: { api_result } " )
process = PythonOperator(
task_id = 'process' ,
python_callable = process_data,
)
[wait_for_file, wait_for_upstream, api_ready] >> process
from
airflow.operators.python
import
PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable
def task_failure_callback (context):
"""Callback on task failure"""
task_instance = context[ 'task_instance' ]
exception = context.get( 'exception' )
# Send to Slack/PagerDuty/etc
message = f """
Task Failed!
DAG: { task_instance.dag_id }
Task: { task_instance.task_id }
Execution Date: { context[ 'ds' ] }
Error: { exception }
Log URL: { task_instance.log_url }
"""
# send_slack_alert(message)
print (message)
def dag_failure_callback (context):
"""Callback on DAG failure"""
# Aggregate failures, send summary
pass
with DAG(
dag_id = 'error_handling_example' ,
schedule = '@daily' ,
start_date = datetime( 2024 , 1 , 1 ),
catchup = False ,
on_failure_callback = dag_failure_callback,
default_args = {
'on_failure_callback' : task_failure_callback,
'retries' : 3 ,
'retry_delay' : timedelta( minutes = 5 ),
},
) as dag:
def might_fail ( ** context):
import random
if random.random() < 0.3 :
raise ValueError ( "Random failure!" )
return "Success"
risky_task = PythonOperator(
task_id = 'risky_task' ,
python_callable = might_fail,
)
def cleanup ( ** context):
"""Cleanup runs regardless of upstream failures"""
print ( "Cleaning up..." )
cleanup_task = PythonOperator(
task_id = 'cleanup' ,
python_callable = cleanup,
trigger_rule = TriggerRule. ALL_DONE , # Run even if upstream fails
)
def notify_success ( ** context):
"""Only runs if all upstream succeeded"""
print ( "All tasks succeeded!" )
success_notification = PythonOperator(
task_id = 'notify_success' ,
python_callable = notify_success,
trigger_rule = TriggerRule. ALL_SUCCESS ,
)
risky_task >> [cleanup_task, success_notification]
=
'dags/'
,
include_examples
=
False
)
def test_dag_loaded (dagbag):
"""Test that all DAGs load without errors"""
assert len (dagbag.import_errors) == 0 , f "DAG import errors: { dagbag.import_errors } "
def test_dag_structure (dagbag):
"""Test specific DAG structure"""
dag = dagbag.get_dag( 'example_etl' )
assert dag is not None
assert len (dag.tasks) == 3
assert dag.schedule_interval == '0 6 * * *'
def test_task_dependencies (dagbag):
"""Test task dependencies are correct"""
dag = dagbag.get_dag( 'example_etl' )
extract_task = dag.get_task( 'extract' )
assert 'start' in [t.task_id for t in extract_task.upstream_list]
assert 'end' in [t.task_id for t in extract_task.downstream_list]
def test_dag_integrity (dagbag):
"""Test DAG has no cycles and is valid"""
for dag_id, dag in dagbag.dags.items():
assert dag.test_cycle() is None , f "Cycle detected in { dag_id } "
# Test individual task logic
def test_extract_function ():
"""Unit test for extract function"""
from dags.example_dag import extract_data
result = extract_data( ds = '2024-01-01' )
assert 'records' in result
assert isinstance (result[ 'records' ], int )