Dynamic Tasks in Apache Airflow
Using TaskFlow API & @task Decorator β‘β
The Story: Workflows That Adaptβ
Imagine you are building a data pipeline for multiple clients:
- Client A β 3 tables
- Client B β 5 tables
- Client C β 2 tables
If you hardcode every task:
- DAG becomes huge
- Maintenance nightmare
- Adding a new client requires code changes
What if your DAG could automatically generate tasks based on input data?
Thatβs where Dynamic Tasks shine.
What Are Dynamic Tasks?β
Dynamic tasks are:
- Tasks generated at runtime
- Often based on input lists, configs, or database records
- Enabled by TaskFlow API and
@taskdecorator - Fully compatible with XComs, branching, and trigger rules
Think of it as a factory that creates tasks on demand.
Why Dynamic Tasks Matterβ
- Scalable pipelines β no hardcoding
- Flexible DAGs β adapt to data or configs
- Maintainable workflows β reduce repetition
- Modern Airflow best practice β TaskFlow API friendly
Example Scenario: Process Multiple Clientsβ
Input Dataβ
clients = [
{"name": "Client A", "tables": ["sales", "inventory", "customers"]},
{"name": "Client B", "tables": ["orders", "payments", "products", "shipping"]},
{"name": "Client C", "tables": ["users", "activity"]}
]
DAG Code Example (Dynamic Tasks)β
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="dynamic_tasks_example",
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False,
)
def dynamic_tasks_dag():
@task
def fetch_clients():
return [
{"name": "Client A", "tables": ["sales", "inventory", "customers"]},
{"name": "Client B", "tables": ["orders", "payments", "products", "shipping"]},
{"name": "Client C", "tables": ["users", "activity"]}
]
@task
def process_table(client_name, table_name):
print(f"Processing {table_name} for {client_name}")
return f"{client_name}-{table_name}-done"
clients = fetch_clients()
# Dynamically generate tasks for each table
for client in clients:
for table in client["tables"]:
process_table.partial(client["name"], table).expand()
dynamic_tasks_dag()
Input & Output Exampleβ
Input (from fetch_clients)
[
{"name": "Client A", "tables": ["sales", "inventory", "customers"]},
{"name": "Client B", "tables": ["orders", "payments", "products", "shipping"]},
{"name": "Client C", "tables": ["users", "activity"]}
]
Output (example logs)
Processing sales for Client A
Processing inventory for Client A
Processing customers for Client A
Processing orders for Client B
Processing payments for Client B
Processing products for Client B
Processing shipping for Client B
Processing users for Client C
Processing activity for Client C
Using XComs with Dynamic Tasksβ
Dynamic tasks return values automatically via XComs, enabling downstream aggregation.
Example: Aggregating Resultsβ
@task
def aggregate_results(*results):
print("Aggregated results:", results)
- Use
.expand()to pass multiple dynamic outputs toaggregate_results.
Best Practices for Dynamic Tasksβ
β
Use @task decorator for clarity
β
Avoid deeply nested loops β prefer flat .expand()
β
Keep task logic small and focused
β
Leverage XComs for downstream communication
β
Combine with trigger rules and branching when needed
Common Mistakesβ
β Hardcoding dynamic tasks
β Forgetting XCom returns
β Creating excessive DAG size (too many tasks)
β Ignoring task dependencies
When to Use Dynamic Tasksβ
β Multi-client or multi-table pipelines
β ETL pipelines with varying inputs
β Scalable task generation
β Modern Airflow DAGs using TaskFlow API
When NOT to Use Dynamic Tasksβ
β For simple, linear DAGs
β When inputs are static and fixed
β For small pipelines β static tasks are simpler
Summary π§ β
- Dynamic tasks allow runtime task generation
- TaskFlow API +
@taskmakes this clean and Pythonic - Fully compatible with XComs, branching, trigger rules
- Key to building scalable, maintainable pipelines
Key Takeawaysβ
- Dynamic tasks = adaptive workflows
- Use
.partial()+.expand()for multiple task instances - Keep logic modular and small
- Combine with modern Airflow features for maximum efficiency
Whatβs Next?β
β‘οΈ Dynamic DAG Generation at Runtime
Learn how to create entire DAGs dynamically based on configs, databases, or external systems.