Orchestration¶
Generate production-ready workflow definitions for Apache Airflow and Dagster from your Odibi pipelines.
Overview¶
Odibi's orchestration module provides:
- Airflow Integration: Generate DAG files with proper task dependencies
- Dagster Integration: Create asset definitions with dependency graphs
- Automatic Dependency Mapping: Node dependencies become task/asset dependencies
- CLI Execution: Each node runs via odibi run for isolation
Airflow Integration¶
AirflowExporter Class¶
The AirflowExporter generates Airflow DAG Python files from Odibi pipeline configurations.
from odibi.config import load_config
from odibi.orchestration.airflow import AirflowExporter
config = load_config("odibi.yaml")
exporter = AirflowExporter(config)
# Generate DAG code for a specific pipeline
dag_code = exporter.generate_code("process_orders")
# Write to Airflow DAGs folder
with open("/airflow/dags/odibi_process_orders.py", "w") as f:
f.write(dag_code)
Generated DAG Structure¶
The exporter creates a DAG with:
- BashOperator tasks for each node
- Proper upstream/downstream dependencies
- Configurable retries from your Odibi config
- Tags for filtering (odibi, layer name)
# Generated DAG example
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'odibi_process_orders',
default_args=default_args,
description='Process incoming orders',
schedule_interval=None,
catchup=False,
tags=['odibi', 'silver'],
) as dag:
ingest_orders = BashOperator(
task_id='ingest_orders',
bash_command='odibi run odibi.yaml --pipeline process_orders --node ingest_orders',
)
validate_orders = BashOperator(
task_id='validate_orders',
bash_command='odibi run odibi.yaml --pipeline process_orders --node validate_orders',
)
# Dependencies
[ingest_orders] >> validate_orders
Airflow Configuration Options¶
| Option | Source | Description |
|---|---|---|
dag_id |
Auto-generated | odibi_{pipeline_name} |
owner |
config.owner |
DAG owner for Airflow UI |
retries |
config.retry.max_attempts |
Retry count (0 if disabled) |
tags |
pipeline.layer |
Includes odibi and layer name |
description |
pipeline.description |
Pipeline description |
Dagster Integration¶
DagsterFactory Class¶
The DagsterFactory creates Dagster asset definitions directly from your Odibi configuration.
# definitions.py
from odibi.config import load_config
from odibi.orchestration.dagster import DagsterFactory
config = load_config("odibi.yaml")
defs = DagsterFactory(config).create_definitions()
Asset Features¶
Each Odibi node becomes a Dagster asset with:
- Dependency tracking: depends_on becomes asset dependencies
- Grouping: Assets grouped by pipeline name
- Compute kind: Tagged as odibi for UI identification
- Op tags: Pipeline and node names for filtering
Generated Assets¶
# Dagster creates assets like:
@asset(
name="validate_orders",
deps=["ingest_orders"],
group_name="process_orders",
description="Validate order data quality",
compute_kind="odibi",
op_tags={"odibi/pipeline": "process_orders", "odibi/node": "validate_orders"},
)
def validate_orders(context: AssetExecutionContext):
# Runs: odibi run odibi.yaml --pipeline process_orders --node validate_orders
...
Dagster Installation¶
Dagster is an optional dependency:
Configuration¶
Project Configuration for Orchestration¶
project: DataPipeline
owner: data-team # Used as Airflow DAG owner
retry:
enabled: true
max_attempts: 3 # Airflow retry count
pipelines:
- pipeline: process_orders
layer: silver # Used as Airflow tag
description: "Process incoming orders"
nodes:
- name: ingest_orders
# ...
- name: validate_orders
depends_on:
- ingest_orders
# ...
- name: transform_orders
depends_on:
- validate_orders
# ...
Dependency Mapping¶
Node dependencies in Odibi map directly to orchestrator dependencies:
| Odibi Config | Airflow | Dagster |
|---|---|---|
depends_on: [node_a] |
[node_a] >> node_b |
deps=["node_a"] |
depends_on: [a, b] |
[a, b] >> node_c |
deps=["a", "b"] |
| No dependencies | First task | No deps |
Examples¶
Complete Airflow Integration¶
# scripts/generate_dags.py
from pathlib import Path
from odibi.config import load_config
from odibi.orchestration.airflow import AirflowExporter
def generate_all_dags(config_path: str, output_dir: str):
config = load_config(config_path)
exporter = AirflowExporter(config)
output = Path(output_dir)
for pipeline in config.pipelines:
dag_code = exporter.generate_code(pipeline.pipeline)
dag_file = output / f"odibi_{pipeline.pipeline}.py"
dag_file.write_text(dag_code)
print(f"Generated: {dag_file}")
if __name__ == "__main__":
generate_all_dags("odibi.yaml", "/opt/airflow/dags")
Complete Dagster Integration¶
# definitions.py
from odibi.config import load_config
from odibi.orchestration.dagster import DagsterFactory
# Load Odibi configuration
config = load_config("odibi.yaml")
# Create Dagster definitions
defs = DagsterFactory(config).create_definitions()
# Run with: dagster dev -f definitions.py
Multi-Pipeline Setup¶
# odibi.yaml
project: DataWarehouse
owner: platform-team
pipelines:
- pipeline: bronze_ingestion
layer: bronze
nodes:
- name: ingest_customers
source:
connection: raw_db
path: customers
- name: ingest_orders
source:
connection: raw_db
path: orders
- pipeline: silver_transformation
layer: silver
nodes:
- name: clean_customers
depends_on: []
source:
connection: bronze
path: customers
- name: clean_orders
depends_on: []
source:
connection: bronze
path: orders
- name: join_customer_orders
depends_on:
- clean_customers
- clean_orders
# Generate DAGs for all pipelines
from odibi.config import load_config
from odibi.orchestration.airflow import AirflowExporter
config = load_config("odibi.yaml")
exporter = AirflowExporter(config)
# Generates separate DAGs:
# - odibi_bronze_ingestion
# - odibi_silver_transformation
for pipeline in config.pipelines:
code = exporter.generate_code(pipeline.pipeline)
print(f"--- {pipeline.pipeline} ---")
print(code)
Best Practices¶
- Use CLI execution - Both adapters use
odibi runfor process isolation - Set owner - Configure
ownerin YAML for Airflow ownership - Enable retries - Configure retry settings in Odibi config
- Layer tags - Use
layerfield for organizing DAGs in Airflow - Generate on deploy - Regenerate DAG files during CI/CD deployment
Related¶
- Pipeline Configuration - YAML schema reference
- CLI Reference -
odibi runcommand details - Retry configuration is defined in your YAML config under the
retrysection