← Back|DATA-ENGINEERINGSection 1/18
0 of 18 completed

Airflow orchestration

Advanced16 min read📅 Updated: 2026-02-17

Introduction

Imagine nee oru restaurant manager 👨‍🍳. Every morning:

  • 6 AM: Vegetables order pannanum
  • 7 AM: Kitchen prep start
  • 8 AM: Staff assignment
  • 9 AM: Restaurant open

Idha nee manually daily pannureenga — one day nee sick leave. Yaaru handle pannu? 😱


Apache Airflow exactly this problem solve pannum — but for data pipelines! Airbnb 2014 la create pannanga, now world's most popular workflow orchestrator 🌍


Real example — E-commerce company:

  • Every night 12 AM: Extract sales data from database
  • 12:30 AM: Clean and transform data
  • 1 AM: Load into data warehouse
  • 1:30 AM: Generate reports
  • 2 AM: Email reports to stakeholders

Idha manual ah daily run pannureenga ah? One step fail aana? Next steps ah stop pannanum? Airflow handles everything automatically! 🚀

What is Orchestration?

Orchestration = Multiple tasks ah coordinate panni, correct order la, correct time la run panradhu 🎼


Music orchestra madhiri think pannunga:

  • 🎻 Violin first play pannum
  • 🥁 Drums join aagum
  • 🎺 Trumpet specific moment la enter aagum
  • 🎵 Conductor (Airflow) controls everything

Without orchestration:

  • Tasks random order la run aagum ❌
  • One task fail aana downstream tasks ah stop panna mudiyaadhu ❌
  • Retry logic manual ah handle pannanum ❌
  • Monitoring and alerting illai ❌

With Airflow orchestration:

  • Tasks defined order la run aagum ✅
  • Failure handling automatic ✅
  • Retry with backoff built-in ✅
  • Beautiful UI for monitoring ✅
  • Email/Slack alerts on failure ✅

DAG — The Core Concept

DAG = Directed Acyclic Graph — Airflow oda heart ❤️


TermMeaningExample
**Directed**Tasks have direction (A → B)Extract → Transform
**Acyclic**No circular loopsA → B → A ❌ not allowed
**Graph**Network of connected tasksVisual pipeline

Simple DAG example:

code
Extract_Data → Clean_Data → Transform_Data → Load_to_Warehouse → Generate_Report

Complex DAG — parallel tasks:

code
                    ┌→ Clean_Sales ──┐
Extract_Sales ──→   │                │→ Join_Data → Load_DW
Extract_Users ──→   └→ Clean_Users ──┘

DAG define panna Python use pannuvaanga:

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG(
    'ecommerce_etl',
    start_date=datetime(2026, 1, 1),
    schedule_interval='@daily',
    catchup=False,
) as dag:

    extract = PythonOperator(
        task_id='extract_data',
        python_callable=extract_function,
    )

    transform = PythonOperator(
        task_id='transform_data',
        python_callable=transform_function,
    )

    load = PythonOperator(
        task_id='load_data',
        python_callable=load_function,
    )

    extract >> transform >> load  # Dependencies!

>> operator — "this runs after that" nu define pannum. Clean and Pythonic! 🐍

Airflow Operators

💡 Tip

🔧 Operators = Pre-built task types. Common ones:

- PythonOperator — Run any Python function

- BashOperator — Run shell commands

- PostgresOperator — Execute SQL queries

- S3ToRedshiftOperator — AWS data transfer

- EmailOperator — Send notification emails

- SlackWebhookOperator — Slack alerts

- DockerOperator — Run Docker containers

- KubernetesPodOperator — K8s pod launch

Custom operator venum na? BaseOperator extend panni own operator ezhudhalaam! Community 1000+ providers maintain pannuvaanga 🌐

Airflow Architecture

🏗️ Architecture Diagram
Airflow internal ah eppadi work aagum:

```
┌─────────────────────────────────────────────────┐
│                  AIRFLOW SYSTEM                  │
│                                                  │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐  │
│  │   Web    │    │ Scheduler│    │  Metadata │  │
│  │  Server  │◄──►│          │◄──►│    DB     │  │
│  │  (UI)    │    │ (Brain)  │    │(PostgreSQL│  │
│  └──────────┘    └────┬─────┘    └──────────┘  │
│                       │                          │
│              ┌────────┼────────┐                 │
│              ▼        ▼        ▼                 │
│         ┌────────┐┌────────┐┌────────┐          │
│         │Worker 1││Worker 2││Worker 3│          │
│         │(Celery)││(Celery)││(Celery)│          │
│         └────────┘└────────┘└────────┘          │
│              │        │        │                 │
│              ▼        ▼        ▼                 │
│     ┌─────────────────────────────────┐         │
│     │    Task Execution Layer          │         │
│     │  Python | Bash | SQL | API      │         │
│     └─────────────────────────────────┘         │
└─────────────────────────────────────────────────┘
```

**Components:**
- **Web Server** — UI dashboard, DAG visualization, logs view
- **Scheduler** — DAGs parse panni tasks queue ku send pannum (brain 🧠)
- **Executor** — Tasks actually run pannum (Local, Celery, Kubernetes)
- **Metadata DB** — DAG runs, task states, variables store pannum
- **Workers** — Actual computation nadakkum (distributed setup la)

Scheduling & Triggers

Airflow powerful scheduling options provide pannum:


Cron-based schedules:

python
schedule_interval='0 6 * * *'    # Daily 6 AM
schedule_interval='0 */2 * * *'  # Every 2 hours
schedule_interval='0 9 * * 1'    # Every Monday 9 AM

Preset schedules:

PresetMeaningCron
**@daily**Once a day midnight0 0 * * *
**@hourly**Every hour0 * * * *
**@weekly**Every Sunday0 0 * * 0
**@monthly**First of month0 0 1 * *
**@once**Run only once

Timetable (Airflow 2.x):

python
from airflow.timetables.trigger import CronTriggerTimetable

dag = DAG(
    'business_hours_dag',
    timetable=CronTriggerTimetable(
        '0 9-17 * * 1-5',  # Business hours only!
        timezone='Asia/Kolkata',
    ),
)

Data-aware scheduling (Airflow 2.4+):

python
# DAG B runs only after DAG A produces dataset
my_dataset = Dataset('s3://bucket/sales_data/')

# Producer DAG
@dag(schedule='@daily')
def producer():
    @task(outlets=[my_dataset])
    def produce():
        # Write data...
        pass

# Consumer DAG — auto-triggered!
@dag(schedule=[my_dataset])
def consumer():
    @task
    def consume():
        # Read data...
        pass

XComs — Task Communication

Tasks between data share panna XComs (Cross-Communications) use pannuvaanga:


python
# Task 1: Push data
def extract(**context):
    data = {'rows': 1000, 'source': 'postgres'}
    context['ti'].xcom_push(key='extract_meta', value=data)

# Task 2: Pull data
def transform(**context):
    meta = context['ti'].xcom_pull(
        task_ids='extract_data',
        key='extract_meta'
    )
    print(f"Processing {meta['rows']} rows from {meta['source']}")

TaskFlow API (Airflow 2.x) — much cleaner:

python
@dag(schedule='@daily', start_date=datetime(2026, 1, 1))
def ecommerce_pipeline():

    @task
    def extract():
        return {'rows': 1000, 'source': 'postgres'}

    @task
    def transform(data: dict):
        print(f"Processing {data['rows']} rows")
        return {'processed': data['rows']}

    @task
    def load(result: dict):
        print(f"Loading {result['processed']} rows to warehouse")

    data = extract()
    result = transform(data)  # Auto XCom!
    load(result)

ecommerce_pipeline()

⚠️ Warning: XComs metadata DB la store aagum — large data pass panna vendaam! File paths or S3 keys pass pannunga, actual data illa.

Airflow Best Practices

⚠️ Warning

⚠️ Production Airflow Tips:

1. Idempotent tasks — Same task re-run pannaalum same result varanum

2. No heavy computation in DAG file — DAG parse every 30s nadakkum, slow code irukkak koodaadhu

3. Use Connections & Variables — Hardcode credentials panna vendaam!

4. Set retries and retry_delay — Transient failures ku automatic retry

5. Use pools — Resource limit set pannunga (max 10 DB connections)

6. Set SLAs — Task romba time eduthaa alert varum

7. Atomic tasks — One task = one unit of work

8. Test DAGs locallyairflow dags test my_dag 2026-01-01

Error Handling & Monitoring

Production la error handling critical:


Retry configuration:

python
default_args = {
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
    'email_on_failure': True,
    'email': ['data-team@company.com'],
}

Callbacks:

python
def on_failure(context):
    dag_id = context['dag'].dag_id
    task_id = context['task_instance'].task_id
    send_slack_alert(f"❌ {dag_id}.{task_id} FAILED!")

def on_success(context):
    send_metric('task_success', 1)

task = PythonOperator(
    task_id='critical_task',
    python_callable=my_function,
    on_failure_callback=on_failure,
    on_success_callback=on_success,
)

SLA monitoring:

python
task = PythonOperator(
    task_id='must_finish_fast',
    python_callable=my_function,
    sla=timedelta(hours=2),  # Alert if takes > 2 hours
)

Monitoring stack:

  • 📊 Airflow UI — DAG runs, task logs, Gantt charts
  • 📈 Prometheus + Grafana — Metrics dashboard
  • 🔔 PagerDuty/OpsGenie — On-call alerting
  • 📝 ELK Stack — Centralized logging

Executor Types

Airflow different executors support pannum:


ExecutorUse CaseScalability
**SequentialExecutor**Development only❌ Single task
**LocalExecutor**Small teams⚡ Multi-process
**CeleryExecutor**Production standard🚀 Distributed workers
**KubernetesExecutor**Cloud-native🌐 Auto-scaling pods
**CeleryKubernetesExecutor**Hybrid🔥 Best of both

KubernetesExecutor advantages:

  • Each task runs in isolated pod 🐳
  • Different resources per task (heavy task = more CPU/RAM)
  • Auto-scaling — idle pods terminate
  • No persistent workers — cost efficient 💰

python
# Task-specific K8s config
from kubernetes.client import models as k8s

resource_config = k8s.V1ResourceRequirements(
    requests={'memory': '2Gi', 'cpu': '1'},
    limits={'memory': '4Gi', 'cpu': '2'},
)

task = PythonOperator(
    task_id='heavy_task',
    python_callable=process_large_data,
    executor_config={
        'KubernetesExecutor': {
            'resources': resource_config,
            'image': 'my-custom-image:latest',
        }
    },
)

Real-World DAG Example

Complete production DAG — E-commerce daily ETL:


python
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from datetime import datetime, timedelta

@dag(
    schedule='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['ecommerce', 'etl'],
    default_args={
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
        'email_on_failure': True,
    },
)
def ecommerce_daily_etl():

    @task
    def extract_orders(ds=None):
        """Extract yesterday's orders from production DB"""
        query = f"SELECT * FROM orders WHERE date = '{ds}'"
        df = run_query(query)
        path = f"s3://data-lake/raw/orders/{ds}.parquet"
        df.to_parquet(path)
        return {'path': path, 'count': len(df)}

    @task
    def extract_users(ds=None):
        """Extract updated user profiles"""
        query = f"SELECT * FROM users WHERE updated_at = '{ds}'"
        df = run_query(query)
        path = f"s3://data-lake/raw/users/{ds}.parquet"
        df.to_parquet(path)
        return {'path': path, 'count': len(df)}

    @task
    def transform(orders_meta, users_meta):
        """Join and transform data"""
        orders = pd.read_parquet(orders_meta['path'])
        users = pd.read_parquet(users_meta['path'])
        result = orders.merge(users, on='user_id')
        path = "s3://data-lake/processed/daily_summary.parquet"
        result.to_parquet(path)
        return {'path': path, 'count': len(result)}

    @task
    def data_quality_check(meta):
        """Validate transformed data"""
        df = pd.read_parquet(meta['path'])
        assert len(df) > 0, "No data found!"
        assert df['revenue'].min() >= 0, "Negative revenue!"
        return True

    @task
    def notify(meta):
        """Send completion notification"""
        send_slack(f"✅ ETL complete! {meta['count']} rows processed")

    # DAG flow
    orders = extract_orders()
    users = extract_users()
    transformed = transform(orders, users)
    quality = data_quality_check(transformed)
    quality >> notify(transformed)

ecommerce_daily_etl()

Think About This

📋 Copy-Paste Prompt
🤔 **Scenario:** Unga company la 50 DAGs irukku, daily run aagum. One DAG fail aana:
- Downstream 10 DAGs blocked
- Data warehouse stale data show pannum
- Business team wrong decisions eduppanga

**Questions:**
1. Eppadi idha prevent pannuvenga?
2. Failure detection speed eppadi improve pannuvenga?
3. Which executor choose pannuvenga — Celery or Kubernetes? Why?

Hint: Monitoring, alerting, circuit breakers, and data quality checks think pannunga! 🛡️

Common DAG Patterns

Production la frequently use aagura patterns:


1. Fan-out / Fan-in:

code
           ┌→ Process_Region_1 ──┐
Start ──→  ├→ Process_Region_2 ──┤→ Aggregate
           └→ Process_Region_3 ──┘

2. Branching:

python
@task.branch
def check_data_size(ti):
    count = ti.xcom_pull(task_ids='extract')
    if count > 1000000:
        return 'process_large'
    return 'process_small'

3. Dynamic task mapping (Airflow 2.3+):

python
@task
def get_files():
    return ['file1.csv', 'file2.csv', 'file3.csv']

@task
def process(file):
    print(f"Processing {file}")

# Dynamic! Tasks auto-created based on output
process.expand(file=get_files())

4. Sensor — wait for condition:

python
from airflow.sensors.s3_key_sensor import S3KeySensor

wait_for_file = S3KeySensor(
    task_id='wait_for_data',
    bucket_name='data-lake',
    bucket_key='raw/daily/{{ ds }}.parquet',
    timeout=3600,  # Wait max 1 hour
    poke_interval=300,  # Check every 5 minutes
)

Airflow vs Alternatives

Market la other options um irukku:


Feature**Airflow****Prefect****Dagster****Luigi**
**Maturity**🏆 Most matureGrowingGrowingLegacy
**UI**GoodBetterBestBasic
**Python-native**
**Cloud managed**AWS, GCPPrefect CloudDagster+
**Community**LargestMediumMediumSmall
**Learning curve**MediumEasyMediumEasy
**Dynamic pipelines**Airflow 2.3+Built-inBuilt-inLimited
**Data lineage**Datasets (2.4+)BasicExcellent

When to choose Airflow:

  • Enterprise environment with existing investment 🏢
  • Need vast ecosystem of providers/operators
  • Team already knows Airflow
  • Battle-tested for mission-critical pipelines

🏁 🎮 Mini Challenge

Challenge: Complex Airflow DAG Multi-Step


Production-like DAG design practice:


Setup - 5 min:

bash
pip install apache-airflow
airflow db init

DAG Design - 20 min:

python
from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule='@daily', start_date=datetime(2026, 1, 1))
def analytics_pipeline():

    @task
    def extract_data():
        print("Extracting from 3 sources...")
        return {'records': 100000}

    @task
    def validate_data(data: dict):
        print(f"Validating {data['records']} records...")
        assert data['records'] > 0
        return data

    @task
    def transform_data(data: dict):
        print(f"Transforming {data['records']} records...")
        # Complex transformation here
        return {'processed': data['records'] * 0.95}

    @task
    def load_data(data: dict):
        print(f"Loading {data['processed']} records...")
        # Load to warehouse

    @task
    def generate_report(data: dict):
        print(f"Generating report for {data['processed']} records...")

    # DAG Flow
    extracted = extract_data()
    validated = validate_data(extracted)
    transformed = transform_data(validated)

    # Parallel tasks
    load_task = load_data(transformed)
    report_task = generate_report(transformed)

    # Final alert (depends on both)
    [load_task, report_task]

analytics_pipeline()

Run & Monitor - 10 min:

bash
airflow scheduler &
airflow webserver
# Visit http://localhost:8080 – DAG visualization perfect ah visible!

Learning: DAG design = task modeling + dependency management. Clean architecture foundation! 🏗️

💼 Interview Questions

Q1: Airflow DAG design – best practices?

A: Atomic tasks (one task = one unit). Idempotent (re-run safe). No heavy computation in DAG file (parsed every 30s). Dependencies clear. Retries and timeouts. Error handling callbacks. Monitoring. Documentation. Testing (airflow dags test). Production readiness!


Q2: TaskFlow API vs traditional operators?

A: TaskFlow (modern Airflow 2+) cleaner, Pythonic. XCom automatic. Traditional more explicit, boilerplate. TaskFlow preferred now. But team experience matters – learning curve if new. Adopt gradually!


Q3: SLA miss – oru task 2 hours exceed?

A: Alert trigger! Investigation: Network issue? Data volume spike? Resource bottleneck? Code inefficiency? Historical trends check. Tune: Add retry, increase timeout, parallelize, optimize code. Monitoring essential to catch trends early!


Q4: DAG run history – old runs delete pannum safe ah?

A: Carefully! Historical data useful – debugging, understanding patterns. Keep last 90 days minimum. Archive old (database → backup). Metadata DB optimize (cleanup old task logs). But NEVER delete current runs – SLA, audit trail needed!


Q5: Distributed execution (Celery vs Kubernetes)?

A: Celery: Simpler, persistent workers, cost-effective for predictable load. Kubernetes: Complex, auto-scaling, pods isolated, expensive but flexible. Choice: Team skills, load pattern, cost. Start Celery (smaller teams), scale Kubernetes (large enterprises). Either works!

Frequently Asked Questions

Airflow na enna simple ah?
Airflow is a workflow orchestration tool. Simple ah sonna — cron jobs on steroids! Multiple tasks ah define panni, dependencies set panni, schedule panni, monitor panna oru platform.
Airflow vs Luigi vs Prefect — enna difference?
Airflow is the industry standard — huge community, mature ecosystem. Luigi is simpler but less features. Prefect is modern alternative with better developer experience. Most companies Airflow choose pannuvaanga for enterprise support.
Airflow free ah?
Apache Airflow open-source and free! Managed services like MWAA (AWS), Cloud Composer (GCP), Astronomer cost money. Self-host pannaalum server cost irukku.
Airflow learn panna enna prerequisites venum?
Python must — DAGs Python la ezhudhuvaanga. SQL basics, Linux commands, Docker knowledge helpful. Data engineering concepts therinjaa faster ah learn pannalaam.
Airflow real-time processing ku use pannalaamaa?
No! Airflow is for batch orchestration — scheduled workflows. Real-time ku Kafka, Flink use pannunga. Airflow oda minimum schedule interval 1 minute, but typically hourly/daily runs ku use pannuvaanga.

Summary

Airflow Orchestration — Key Takeaways: 🎯


  1. DAG = Directed Acyclic Graph — pipeline as code 📝
  2. Operators = Pre-built task types (Python, Bash, SQL, etc.)
  3. Scheduler = Brain — triggers tasks at right time ⏰
  4. XComs = Task-to-task data sharing (keep it small!)
  5. Executors = How tasks run (Local → Celery → Kubernetes)
  6. TaskFlow API = Modern, cleaner way to write DAGs 🐍
  7. Sensors = Wait for external conditions
  8. Dynamic task mapping = Runtime task generation
  9. Error handling = Retries, callbacks, SLAs — production must-haves! 🛡️
  10. Data-aware scheduling = Dataset-triggered DAGs (Airflow 2.4+)

Airflow learn pannaa data engineering career la massive advantage! Almost every data team uses it. Start with simple DAGs, gradually complex patterns learn pannunga 🚀

🧠Knowledge Check
Quiz 1 of 1

Airflow la `extract >> [clean_sales, clean_users] >> join_data` nu define pannaa enna nadakkum?

0 of 1 answered