Skip to content

ODIBI Configuration System Explained

Last Updated: 2025-11-21 Author: Henry Odibi Purpose: Demystify how YAML configs, Python classes, and execution flow together


📑 Table of Contents

  1. The Big Picture
  2. Configuration Flow (Concept → Execution)
  3. Three Layers of Configuration
  4. Example: Tracing a Pipeline from YAML to Execution
  5. Key Concepts Explained
  6. Common Confusion Points
  7. Decision Trees
  8. Quick Reference

The Big Picture

ODIBI has three distinct layers that work together:

┌─────────────────────────────────────────────────────────────┐
│  LAYER 1: YAML Configuration (What You Write)              │
│  - Declarative syntax (project.yaml, pipelines/)            │
│  - Human-readable, version-controlled                       │
│  - Defines WHAT to do, not HOW                              │
└──────────────────────┬──────────────────────────────────────┘
                       │ Parsed by
┌─────────────────────────────────────────────────────────────┐
│  LAYER 2: Pydantic Models (config.py)                      │
│  - Validates YAML structure                                 │
│  - Enforces required fields, types, constraints             │
│  - Converts YAML → Python objects                           │
└──────────────────────┬──────────────────────────────────────┘
                       │ Used by
┌─────────────────────────────────────────────────────────────┐
│  LAYER 3: Runtime Classes (pipeline.py, node.py, etc.)     │
│  - Executes the actual work                                 │
│  - Manages context, engines, connections                    │
│  - Generates stories, handles errors                        │
└─────────────────────────────────────────────────────────────┘

Key Insight:
You interact with Layer 1 (YAML), but under the hood, Pydantic (Layer 2) ensures correctness before Runtime (Layer 3) does the work.


Configuration Flow (Concept → Execution)

Let's trace a real pipeline from file to execution:

Step 1: You Write YAML (User)

# examples/templates/example_local.yaml
project: My Pipeline
engine: pandas
connections:
  local:
    type: local
    base_path: ./data
pipelines:
  - pipeline: bronze_to_silver
    nodes:
      - name: load_raw_sales
        read:
          connection: local
          path: bronze/sales.csv
          format: csv

Step 2: YAML → Pydantic Models (Automatic)

When you call Pipeline.from_yaml("examples/templates/example_local.yaml"):

# pipeline.py (line 317)
with open(yaml_path, "r") as f:
    config = yaml.safe_load(f)  # Loads YAML as Python dict

# Parse into Pydantic models (config.py)
project_config = ProjectConfig(**config)  # Validates project/engine/connections
pipeline_config = PipelineConfig(**config['pipelines'][0])  # Validates pipeline structure

What Pydantic does: - Checks that project, engine, connections exist - Ensures engine is "pandas", "spark", or "polars" (not "panda" or "pands") - Validates each node has required fields (name, at least one operation) - Converts strings to enums where needed (e.g., engine: "pandas"EngineType.PANDAS)

If validation fails:

ValidationError: 1 validation error for PipelineConfig
nodes.0.read.connection
  field required (type=value_error.missing)
This is Layer 2 catching errors before execution.

Step 3: Create Connections (Automatic)

# pipeline.py (lines 342-351) - PipelineManager.from_yaml()
connections = {}
for conn_name, conn_config in config.get("connections", {}).items():
    if conn_config.get("type") == "local":
        connections[conn_name] = LocalConnection(
            base_path=conn_config.get("base_path", "./data")
        )

Result: Python objects ready to use:

connections = {
    "local": LocalConnection(base_path="./data")
}

Step 4: Create PipelineManager (Automatic)

# pipeline.py (lines 280-314)
manager = PipelineManager(
    yaml_config=config,
    engine="pandas",
    connections=connections,
    story_config=config.get("story", {})
)

What PipelineManager does: - Creates a Pipeline object for each pipeline in the YAML - Stores them in self._pipelines dictionary keyed by pipeline name - Example: manager._pipelines = {"bronze_to_silver": Pipeline(...), "silver_to_gold": Pipeline(...)}

Step 5: Run Pipelines (User)

# You call:
results = manager.run()  # Runs ALL pipelines

# Or:
result = manager.run('bronze_to_silver')  # Runs specific pipeline

What happens: 1. PipelineManager.run() looks up the pipeline by name 2. Calls Pipeline.run() (line 134) 3. Pipeline.run() orchestrates node execution: - Builds dependency graph - Topologically sorts nodes - Executes each node in order - Passes data via Context - Generates story


Three Layers of Configuration

Layer 1: YAML Files (Declarative)

Purpose: Human-readable, version-controlled pipeline definitions

Key Files: - Project-level: project.yaml or any YAML with project + connections + pipelines - Pipeline-level: Individual YAML files with specific pipelines

What You Define: | Section | Purpose | Required | |---------|---------|----------| | project | Project name | ✅ Yes | | engine | Execution engine (pandas/spark/polars) | ✅ Yes | | connections | Where data lives | ✅ Yes | | pipelines | List of pipelines | ✅ Yes | | story | Story generation config | ✅ Yes |

Example:

project: Sales Analytics
engine: pandas
connections:
  warehouse:
    type: local
    base_path: /data/warehouse
pipelines:
  - pipeline: daily_sales
    nodes: [...]


Layer 2: Pydantic Models (Validation)

Purpose: Type-safe, validated Python objects

Key File: odibi/config.py

Main Models:

ProjectConfig (Line 266)

class ProjectConfig(BaseModel):
    project: str  # Required
    version: str = "1.0.0"  # Default
    engine: EngineType = EngineType.PANDAS  # Default
    connections: Dict[str, Dict[str, Any]]  # Required
    story: StoryConfig  # Required
    pipelines: List[PipelineConfig]  # Required
    retry: RetryConfig = RetryConfig()  # Default
    logging: LoggingConfig = LoggingConfig()  # Default

Maps to YAML:

project: My Pipeline      # → project
version: "2.0.0"          # → version
engine: pandas            # → engine (validated as EngineType.PANDAS)
connections:
  data:                   # → connections["data"]
    type: local
    base_path: ./data
  outputs:                # → connections["outputs"]
    type: local
    base_path: ./outputs
  api_source:             # → connections["api_source"]
    type: http
    base_url: "https://api.example.com/v1"
    headers:
      Authorization: "Bearer ${API_TOKEN}"
story:
  connection: outputs     # → story.connection (required)
  path: stories/          # → story.path
  auto_generate: true     # → story.auto_generate
  max_sample_rows: 10     # → story.max_sample_rows
  retention_days: 30      # → story.retention_days
  retention_count: 100    # → story.retention_count
retry:
  enabled: true           # → retry.enabled
  max_attempts: 3         # → retry.max_attempts
  backoff: exponential    # → retry.backoff
logging:
  level: INFO             # → logging.level
pipelines:                # → pipelines (list of pipeline configs)
  - pipeline: example
    nodes: [...]

PipelineConfig (Line 203)

class PipelineConfig(BaseModel):
    pipeline: str  # Required (pipeline name)
    description: Optional[str] = None
    layer: Optional[str] = None
    nodes: List[NodeConfig]  # Required (at least one node)

Maps to YAML:

pipelines:
  - pipeline: bronze_to_silver  # → pipeline
    layer: transformation       # → layer
    description: "Clean data"   # → description
    nodes: [...]               # → nodes

NodeConfig (Line 172)

class NodeConfig(BaseModel):
    name: str  # Required (unique within pipeline)
    depends_on: List[str] = []
    read: Optional[ReadConfig] = None
    inputs: Optional[Dict[str, Union[str, Dict[str, Any]]]] = None  # Cross-pipeline dependencies
    transform: Optional[TransformConfig] = None
    write: Optional[WriteConfig] = None
    cache: bool = False
    sensitive: Union[bool, List[str]] = False  # PII Masking

Maps to YAML:

nodes:
  - name: load_raw_sales         # → name
    depends_on: [prev_node]      # → depends_on
    sensitive: ["email"]         # → sensitive (Redact email in reports)
    read:                        # → read (ReadConfig)
      connection: local
      path: bronze/sales.csv
      format: csv
    cache: true                  # → cache

API Read Configuration:

nodes:
  - name: fetch_api_data
    read:
      connection: my_api         # HTTP connection
      format: api
      path: /v1/records
      options:
        method: POST             # GET (default), POST, PUT, PATCH, DELETE
        params:                  # URL params (GET) or merged into body (POST)
          limit: 100
        request_body:            # JSON body for POST/PUT/PATCH
          filters:
            status: ["active"]
        pagination:
          type: offset_limit
          start_offset: 1        # For 1-indexed APIs
        response:
          items_path: results
          add_fields:
            _fetched_at: "${date:now}"

Cross-Pipeline Dependencies (inputs block):

For multi-input nodes that read from other pipelines, use the inputs block instead of read:

nodes:
  - name: enriched_data
    inputs:
      events: $read_bronze.shift_events      # Cross-pipeline reference
      calendar:                               # Explicit read config
        connection: goat_prod
        path: "bronze/calendar"
        format: delta
    transform:
      steps:
        - operation: join
          left: events
          right: calendar
          on: [date_id]

Reference Syntax: $pipeline_name.node_name - The $ prefix indicates a cross-pipeline reference - References are resolved via the Odibi Catalog (meta_outputs table) - The referenced node must have a write block and the pipeline must have run previously

Validation Rules: - Node must have at least one of: read, inputs, transform, write - All node names must be unique within a pipeline - Connections referenced in read.connection or write.connection should exist (warned, not enforced) - Cannot have both read and inputs — use read for single-source nodes or inputs for multi-source cross-pipeline dependencies


Layer 3: Runtime Classes (Execution)

Purpose: Execute the actual work

Key Files: - odibi/pipeline.py - Pipeline orchestration - odibi/node.py - Individual node execution - odibi/context.py - Data passing between nodes - odibi/engine/pandas_engine.py - Actual read/write/transform logic

Main Classes:

PipelineManager (Line 280)

class PipelineManager:
    def __init__(self, yaml_config, engine, connections, story_config):
        self._pipelines = {}  # Dict[pipeline_name -> Pipeline]
        for pipeline_config_dict in yaml_config["pipelines"]:
            pipeline_config = PipelineConfig(**pipeline_config_dict)
            self._pipelines[pipeline_config.pipeline] = Pipeline(...)

    def run(self, pipelines=None):
        # Run all or specific pipelines

Responsibilities: - Load and validate YAML - Instantiate connections - Create Pipeline objects for each pipeline - Orchestrate multi-pipeline execution - Return results

Pipeline (Line 63)

class Pipeline:
    def __init__(self, pipeline_config, engine, connections, ...):
        self.config = pipeline_config  # PipelineConfig from Layer 2
        self.engine = PandasEngine()   # Or SparkEngine
        self.context = create_context(engine)
        self.graph = DependencyGraph(pipeline_config.nodes)

    def run(self):
        execution_order = self.graph.topological_sort()
        for node_name in execution_order:
            node = Node(...)
            node_result = node.execute()

Responsibilities: - Build dependency graph - Determine execution order - Execute nodes sequentially (or parallel in future) - Manage context for data passing - Generate stories

Node (odibi/node.py)

class Node:
    def execute(self):
        # 1. Read data (if read config exists)
        if self.config.read:
            data = self.engine.read(...)
            self.context.register(self.config.name, data)

        # 2. Transform data (if transform config exists)
        if self.config.transform:
            data = self.engine.execute_transform(...)
            self.context.register(self.config.name, data)

        # 3. Write data (if write config exists)
        if self.config.write:
            self.engine.write(...)

Responsibilities: - Execute read → transform → write for a single node - Use engine for actual operations - Register results in context - Return NodeResult


Example: Tracing a Pipeline from YAML to Execution

Let's trace this simple YAML:

project: Simple Pipeline
engine: pandas
connections:
  local:
    type: local
    base_path: ./data
  outputs:
    type: local
    base_path: ./outputs
story:
  connection: outputs
  path: stories/
  enabled: true
pipelines:
  - pipeline: example
    nodes:
      - name: load_data
        read:
          connection: local
          path: input.csv
          format: csv
        cache: true

      - name: clean_data
        depends_on: [load_data]
        transform:
          steps:
            - "SELECT * FROM load_data WHERE amount > 0"

      - name: save_data
        depends_on: [clean_data]
        write:
          connection: local
          path: output.parquet
          format: parquet
          mode: overwrite

Execution Trace

1. User calls:

from odibi.pipeline import Pipeline
manager = Pipeline.from_yaml("simple.yaml")

2. Pipeline.from_yaml() delegates to PipelineManager.from_yaml() (line 109)

3. PipelineManager.from_yaml() (line 317):

# Load YAML
with open("simple.yaml") as f:
    config = yaml.safe_load(f)
# config = {
#     "project": "Simple Pipeline",
#     "engine": "pandas",
#     "connections": {"local": {"type": "local", "base_path": "./data"}},
#     "pipelines": [{"pipeline": "example", "nodes": [...]}]
# }

# Validate project config (entire YAML - single source of truth)
project_config = ProjectConfig(**config)
# ✅ Validation passed - checks:
#    - Required fields: project, connections, story, pipelines
#    - story.connection exists in connections
#    - engine is valid (pandas, spark, or polars)

# Create connections
connections = {
    "local": LocalConnection(base_path="./data")
}

# Create PipelineManager
manager = PipelineManager(
    project_config=project_config,
    engine="pandas",
    connections=connections
)

4. PipelineManager.__init__() (line 283):

# Loop through pipelines in YAML
for pipeline_config_dict in config["pipelines"]:
    # Validate pipeline config
    pipeline_config = PipelineConfig(
        pipeline="example",
        nodes=[
            NodeConfig(name="load_data", read=ReadConfig(...), cache=True),
            NodeConfig(name="clean_data", depends_on=["load_data"], transform=TransformConfig(...)),
            NodeConfig(name="save_data", depends_on=["clean_data"], write=WriteConfig(...))
        ]
    )
    # ✅ Validation passed (all nodes have unique names, at least one operation each)

    # Create Pipeline instance
    self._pipelines["example"] = Pipeline(
        pipeline_config=pipeline_config,
        engine="pandas",
        connections={"local": LocalConnection(...)},
        story_config={}
    )

5. User runs:

results = manager.run()  # Run all pipelines

6. PipelineManager.run() (line 363):

# pipelines=None means run all
pipeline_names = list(self._pipelines.keys())  # ["example"]

# Run each pipeline
for name in pipeline_names:
    results[name] = self._pipelines[name].run()

7. Pipeline.run() (line 134):

# Get execution order from dependency graph
execution_order = self.graph.topological_sort()
# Returns: ["load_data", "clean_data", "save_data"]

# Execute nodes in order
for node_name in execution_order:  # "load_data"
    node_config = self.graph.nodes["load_data"]
    node = Node(
        config=node_config,
        context=self.context,
        engine=self.engine,  # PandasEngine
        connections={"local": LocalConnection(...)}
    )
    node_result = node.execute()

8. Node.execute() for "load_data" (odibi/node.py):

# Node has read config
if self.config.read:
    # Get connection
    conn = self.connections["local"]  # LocalConnection(base_path="./data")
    full_path = conn.get_path("input.csv")  # "./data/input.csv"

    # Read using engine
    data = self.engine.read(
        path=full_path,
        format="csv",
        options={}
    )
    # data = pandas.DataFrame(...)

    # Register in context
    self.context.register("load_data", data)

9. Node.execute() for "clean_data":

# Node has transform config
if self.config.transform:
    sql = "SELECT * FROM load_data WHERE amount > 0"
    data = self.engine.execute_sql(sql, self.context)
    # Engine gets "load_data" DataFrame from context
    # Executes SQL using pandasql or duckdb
    # Returns filtered DataFrame

    self.context.register("clean_data", data)

10. Node.execute() for "save_data":

# Node has write config
if self.config.write:
    data = self.context.get("clean_data")  # Get from previous node
    conn = self.connections["local"]
    full_path = conn.get_path("output.parquet")  # "./data/output.parquet"

    self.engine.write(
        data=data,
        path=full_path,
        format="parquet",
        mode="overwrite",
        options={}
    )
    # Writes DataFrame to ./data/output.parquet

11. Story generation (if enabled):

story_path = self.story_generator.generate(
    node_results={...},
    completed=["load_data", "clean_data", "save_data"],
    failed=[],
    ...
)
# Generates markdown story in ./stories/

12. Return results to user:

results = {
    "example": PipelineResults(
        pipeline_name="example",
        completed=["load_data", "clean_data", "save_data"],
        failed=[],
        duration=2.3,
        story_path="./stories/example_20251107_143025.md"
    )
}


Key Concepts Explained

1. Config vs Runtime

Config (Layer 1 + 2): - What you declare in YAML - Validated by Pydantic - Immutable once loaded - Example: ReadConfig(connection="local", path="input.csv", format="csv")

Runtime (Layer 3): - What executes the work - Uses config to make decisions - Mutable state (context, results) - Example: PandasEngine.read(path="./data/input.csv", format="csv") → returns DataFrame

Why separate? - Validation happens early (before execution) - Config is reusable (can run same config multiple times) - Easier testing (mock runtime, test config separately)

2. Connection: Config vs Object

Connection Config (YAML):

connections:
  local:
    type: local
    base_path: ./data

Connection Object (Python):

connections = {
    "local": LocalConnection(base_path="./data")
}

What's the difference? - Config is declarative (YAML dict) - Object is executable (Python class with methods like .get_path())

Why both? - YAML is portable (version controlled, shareable) - Objects are functional (can call methods, maintain state)

3. Pipeline vs PipelineManager

Pipeline: - Represents one pipeline - Has nodes, dependencies, execution logic - Example: bronze_to_silver pipeline

PipelineManager: - Manages multiple pipelines - Loads YAML, creates connections, instantiates Pipelines - Provides unified API: manager.run() runs all, manager.run('bronze_to_silver') runs one

Why Pipeline.from_yaml() returns PipelineManager? - Convenience: Most YAMLs have multiple pipelines - Backward compatible: Users can still call Pipeline.from_yaml() - Unified API: manager.run() works for 1 or 10 pipelines

4. from_yaml() - The Boilerplate Eliminator

Before (manual setup):

import yaml
from odibi.pipeline import Pipeline
from odibi.config import PipelineConfig
from odibi.connections import LocalConnection

with open("config.yaml") as f:
    config = yaml.safe_load(f)

pipeline_config = PipelineConfig(**config['pipelines'][0])
connections = {
    'local': LocalConnection(base_path=config['connections']['local']['base_path'])
}
pipeline = Pipeline(
    pipeline_config=pipeline_config,
    engine="pandas",
    connections=connections
)
results = pipeline.run()

After (from_yaml()):

from odibi.pipeline import Pipeline

manager = Pipeline.from_yaml("config.yaml")
results = manager.run()

What from_yaml() does: 1. Load YAML 2. Validate with Pydantic 3. Create connection objects 4. Instantiate PipelineManager 5. Return ready-to-run manager

Result: 2 lines instead of 15!

5. Context - The Data Bus

Purpose: Pass data between nodes without explicit function calls

How it works:

# Node 1: load_data
data = engine.read(...)
context.register("load_data", data)  # Store DataFrame

# Node 2: clean_data (depends_on: [load_data])
data = context.get("load_data")  # Retrieve DataFrame
cleaned = engine.execute_sql("SELECT * FROM load_data WHERE ...", context)
context.register("clean_data", cleaned)

# Node 3: save_data (depends_on: [clean_data])
data = context.get("clean_data")
engine.write(data, ...)

Why not return values? - Nodes execute sequentially but independently - SQL transforms reference DataFrames by name (not variable) - Context provides unified API across Pandas and Spark


Common Confusion Points

Confusion #1: "Why do I see both pipeline and name?"

Answer: Different levels of abstraction!

pipelines:                   # List of pipelines
  - pipeline: bronze_to_silver  # ← Pipeline NAME (identifies the pipeline)
    nodes:                     # List of nodes in THIS pipeline
      - name: load_data        # ← Node NAME (identifies the node)

Analogy: - pipeline is like a book title ("Harry Potter") - name is like a chapter name ("The Boy Who Lived")

In code: - PipelineConfig.pipeline → pipeline name - NodeConfig.name → node name

Confusion #2: "What's the difference between connection: local and type: local?"

Answer: Different contexts!

In connections section (defining connections):

connections:
  local:           # ← Connection NAME (you choose this)
    type: local    # ← Connection TYPE (system type: local, azure_adls, etc.)
    base_path: ./data

In read/write section (using connections):

nodes:
  - name: load_data
    read:
      connection: local  # ← References the CONNECTION NAME from above
      path: input.csv

Analogy: - connections section: "Define a car named 'my_car' of type 'sedan'" - read.connection: "Use the car named 'my_car' to drive somewhere"

Confusion #3: "Why does from_yaml() return a manager instead of a pipeline?"

Answer: YAML files typically have multiple pipelines!

pipelines:
  - pipeline: bronze_to_silver  # Pipeline 1
    nodes: [...]
  - pipeline: silver_to_gold    # Pipeline 2
    nodes: [...]

If it returned a single Pipeline: - Which one? The first? All? - How to run specific pipelines?

By returning PipelineManager: - Access all pipelines - Run all: manager.run() - Run one: manager.run('bronze_to_silver') - Run some: manager.run(['bronze_to_silver', 'silver_to_gold'])

For single pipeline YAMLs:

result = manager.run()  # If only 1 pipeline, returns PipelineResults (not dict)

Confusion #4: "What's the difference between options and params?"

Answer: Different operation types!

options (in read/write):

read:
  connection: local
  path: data.csv
  format: csv
  options:           # ← Format-specific options (passed to pandas.read_csv())
    header: 0
    dtype:
      id: str
Maps to: pandas.read_csv(path, header=0, dtype={"id": str})

params (in transform):

transform:
  steps:
    - function: my_custom_function
      params:        # ← Function arguments
        threshold: 0.5
        mode: strict
Maps to: my_custom_function(context, threshold=0.5, mode='strict')

Key difference: - options → passed to engine (Pandas/Spark I/O functions) - params → passed to your function

Confusion #5: "Where do stories get written?"

Answer: Stories use the connection pattern, just like data!

Story configuration (required):

connections:
  outputs:
    type: local
    base_path: ./outputs

story:
  connection: outputs  # ← References connection name
  path: stories/       # ← Path within connection
  enabled: true

Resolved path: ./outputs/stories/pipeline_name_20251107_143025.md

Why this pattern? - Explicit: Clear where stories are written (no hidden defaults) - Traceable: Connection-based paths preserve truth - Consistent: Same pattern as read.connection and write.connection - Flexible: Stories can go to ADLS, DBFS, or local storage

Before v1.1 (confusing):

# Story path was implicit - where is "stories/" relative to?
connections:
  local:
    type: local
    base_path: ./data

After v1.1 (explicit):

connections:
  outputs:
    type: local
    base_path: ./outputs

story:
  connection: outputs  # Required - must exist in connections
  path: stories/

Confusion #6: "How does SQL find the DataFrames?"

Answer: The engine looks them up in the context!

Your SQL:

SELECT * FROM load_data WHERE amount > 0

What the engine does (simplified):

# PandasEngine.execute_sql()
def execute_sql(self, sql: str, context: Context):
    # 1. Find all table references in SQL
    tables = extract_table_names(sql)  # ["load_data"]

    # 2. Get DataFrames from context
    load_data = context.get("load_data")  # The DataFrame from earlier node

    # 3. Execute SQL using pandasql or duckdb
    result = duckdb.query(sql).to_df()

    return result

Key insight: Table names in SQL must match node names in the pipeline!


Decision Trees

"Which class do I use?"

┌─ Need to load and run a YAML file?
│  ├─ YES → Use `Pipeline.from_yaml("config.yaml")`
│  │         Returns PipelineManager
│  │
│  └─ NO → Are you building custom integrations?
│     ├─ YES → Use `PipelineManager(...)` or `Pipeline(...)` directly
│     └─ NO → Use `Pipeline.from_yaml()` (recommended)

"How do I run my pipelines?"

┌─ How many pipelines in YAML?
│  ├─ ONE → `manager.run()` returns PipelineResults
│  ├─ MANY → `manager.run()` returns Dict[name -> PipelineResults]
│  │
│  └─ Want to run specific pipeline(s)?
│     ├─ ONE → `manager.run('pipeline_name')` returns PipelineResults
│     └─ MULTIPLE → `manager.run(['pipe1', 'pipe2'])` returns Dict

"Where does my configuration live?"

┌─ Is it about the OVERALL project?
│  ├─ YES → Top level (project, engine, connections, story)
│  │
│  └─ NO → Is it about a PIPELINE?
│     ├─ YES → Under `pipelines:` (pipeline, layer, nodes)
│     │
│     └─ NO → Is it about a NODE?
│        ├─ YES → Under `nodes:` (name, read, transform, write)
│        │
│        └─ NO → Is it about an OPERATION?
│           ├─ READ → Under `read:` (connection, path, format, options)
│           ├─ TRANSFORM → Under `transform:` (steps)
│           └─ WRITE → Under `write:` (connection, path, format, mode, options)

Quick Reference

YAML Structure

# PROJECT LEVEL (required)
project: string               # Project name
engine: pandas|spark|polars   # Execution engine

# GLOBAL SETTINGS (optional)
retry:
  enabled: bool
  max_attempts: int
  backoff: exponential|linear|constant
logging:
  level: DEBUG|INFO|WARNING|ERROR
  structured: bool
  metadata: dict

# CONNECTIONS (required, at least one)
connections:
  <connection_name>:          # Your choice of name
    type: local|azure_blob|delta|sql_server|http
    validation_mode: lazy|eager   # optional, defaults to 'lazy'
    <type-specific-config>

# ENVIRONMENTS (optional)
environments:
  <env_name>:
    <overrides>: ...
    # Or use external file: env.<env_name>.yaml

# STORY (required)
story:
  connection: string        # Name of connection to write stories
  path: string              # Relative path under that connection
  auto_generate: bool
  max_sample_rows: int
  retention_days: int (optional)
  retention_count: int (optional)

# PIPELINES (required, at least one)
pipelines:
  - pipeline: string          # Pipeline name
    layer: string (optional)
    description: string (optional)
    nodes:                    # At least one node
      - name: string          # Unique node name
        depends_on: [string]  # List of node names (optional)
        cache: bool (optional)

        # At least ONE of these:
        read:
          connection: string  # Connection name
          path: string        # Relative to connection base_path (Required unless 'query' used)
          table: string       # Table name (alternative to path)
          format: csv|parquet|json|excel|avro|sql_server
          options: dict       # Format-specific (optional)
            query: string     # SQL query (substitutes for path/table in sql_server)

        transform:
          steps:              # List of SQL strings or function calls
            - string (SQL)
            - function: string
              params: dict

        write:
          connection: string
          path: string
          table: string       # Table name (alternative to path)
          register_table: string # Register file output as external table (Spark/Delta only)
          format: csv|parquet|json|excel|avro|delta
          mode: overwrite|append
          options: dict (optional)

Python API Quick Reference

# === RECOMMENDED: Simple Usage ===
from odibi.pipeline import Pipeline

# Load and run all pipelines
manager = Pipeline.from_yaml("examples/templates/template_full.yaml")
results = manager.run()  # Dict[name -> PipelineResults] or single PipelineResults

# Run specific pipeline
result = manager.run('bronze_to_silver')

# List available pipelines
print(manager.list_pipelines())  # ['bronze_to_silver', 'silver_to_gold']

# === ADVANCED: Direct PipelineManager ===
from odibi.pipeline import PipelineManager

manager = PipelineManager.from_yaml("config.yaml")
results = manager.run()

# Access specific pipeline
pipeline = manager.get_pipeline('bronze_to_silver')
result = pipeline.run()

# === ADVANCED: Manual Construction ===
from odibi.pipeline import Pipeline
from odibi.config import PipelineConfig
from odibi.connections import LocalConnection

pipeline_config = PipelineConfig(
    pipeline="my_pipeline",
    nodes=[...]
)
connections = {
    "local": LocalConnection(base_path="./data")
}
pipeline = Pipeline(
    pipeline_config=pipeline_config,
    engine="pandas",
    connections=connections
)
result = pipeline.run()

Common Patterns

Pattern 1: Single pipeline in YAML

manager = Pipeline.from_yaml("simple.yaml")
result = manager.run()  # Returns PipelineResults (not dict)
print(f"Completed: {result.completed}")

Pattern 2: Multiple pipelines, run all

manager = Pipeline.from_yaml("multi.yaml")
results = manager.run()  # Returns Dict[name -> PipelineResults]
for name, result in results.items():
    print(f"{name}: {len(result.completed)} nodes")

Pattern 3: Multiple pipelines, run specific

manager = Pipeline.from_yaml("multi.yaml")
result = manager.run('bronze_to_silver')  # Returns PipelineResults
print(result.to_dict())


Summary

The Three Layers: 1. YAML (Layer 1): What you write (declarative) 2. Pydantic Models (Layer 2): Validation (automatic) 3. Runtime Classes (Layer 3): Execution (automatic)

The Flow:

YAML file
  → yaml.safe_load()
  → Pydantic validation
  → PipelineManager/Pipeline creation
  → manager.run()
  → Node execution
  → Results

Key Takeaways: - Pipeline.from_yaml() returns PipelineManager (not Pipeline) - manager.run() runs all pipelines (or specific ones by name) - Configs are validated before execution (fail fast) - Context passes data between nodes (SQL references node names) - Connections are defined once, referenced many times


Questions? Confusion?
Open an issue on GitHub or check the examples in the examples/ directory for complete YAML references.


This document evolves with the framework. Last updated: 2025-11-20