Skip to content

Configuration API

odibi.config

Configuration models for ODIBI framework.

ConnectionConfig = Union[LocalConnectionConfig, AzureBlobConnectionConfig, DeltaConnectionConfig, SQLServerConnectionConfig, HttpConnectionConfig, CustomConnectionConfig] module-attribute

EngineType

Bases: str, Enum

Supported execution engines.

Source code in odibi/config.py
class EngineType(str, Enum):
    """Supported execution engines."""

    SPARK = "spark"
    PANDAS = "pandas"
    POLARS = "polars"

ConnectionType

Bases: str, Enum

Supported connection types.

Source code in odibi/config.py
class ConnectionType(str, Enum):
    """Supported connection types."""

    LOCAL = "local"
    AZURE_BLOB = "azure_blob"
    DELTA = "delta"
    SQL_SERVER = "sql_server"
    HTTP = "http"

WriteMode

Bases: str, Enum

Write modes for output operations.

Values: * overwrite - Replace all existing data. Use for full refresh, dimensions. * append - Add rows without checking for duplicates. Use for true append-only logs. * upsert - Update existing rows by key, insert new. Use for Silver/Gold with updates. * append_once - Insert only rows where keys don't exist (idempotent). Recommended for Bronze ingestion. Requires keys in write options. Safe to retry/rerun without creating duplicates. * merge - SQL Server MERGE via staging table + T-SQL MERGE statement.

Choosing the right mode:

Mode Existing Keys New Keys Use Case
overwrite Deleted Inserted Full refresh, dimensions
append Duplicated Inserted True append-only logs
upsert Updated Inserted Silver/Gold with updates
append_once Skipped Inserted Idempotent Bronze ingestion
merge Updated Inserted SQL Server targets
Source code in odibi/config.py
class WriteMode(str, Enum):
    """Write modes for output operations.

    Values:
    * `overwrite` - Replace all existing data. Use for full refresh, dimensions.
    * `append` - Add rows without checking for duplicates. Use for true append-only logs.
    * `upsert` - Update existing rows by key, insert new. Use for Silver/Gold with updates.
    * `append_once` - Insert only rows where keys don't exist (idempotent). **Recommended for Bronze ingestion.** Requires `keys` in write options. Safe to retry/rerun without creating duplicates.
    * `merge` - SQL Server MERGE via staging table + T-SQL MERGE statement.

    **Choosing the right mode:**

    | Mode | Existing Keys | New Keys | Use Case |
    |------|--------------|----------|----------|
    | overwrite | Deleted | Inserted | Full refresh, dimensions |
    | append | Duplicated | Inserted | True append-only logs |
    | upsert | Updated | Inserted | Silver/Gold with updates |
    | append_once | Skipped | Inserted | Idempotent Bronze ingestion |
    | merge | Updated | Inserted | SQL Server targets |
    """

    OVERWRITE = "overwrite"
    APPEND = "append"
    UPSERT = "upsert"
    APPEND_ONCE = "append_once"
    MERGE = "merge"

AlertConfig

Bases: BaseModel

Configuration for alerts with throttling support.

Supports Slack, Teams, and generic webhooks with event-specific payloads.

Available Events: - on_start - Pipeline started - on_success - Pipeline completed successfully - on_failure - Pipeline failed - on_quarantine - Rows were quarantined - on_gate_block - Quality gate blocked the pipeline - on_threshold_breach - A threshold was exceeded

Example:

alerts:
  - type: slack
    url: "${SLACK_WEBHOOK_URL}"
    on_events:
      - on_failure
      - on_quarantine
      - on_gate_block
    metadata:
      throttle_minutes: 15
      max_per_hour: 10
      channel: "#data-alerts"

Source code in odibi/config.py
class AlertConfig(BaseModel):
    """
    Configuration for alerts with throttling support.

    Supports Slack, Teams, and generic webhooks with event-specific payloads.

    **Available Events:**
    - `on_start` - Pipeline started
    - `on_success` - Pipeline completed successfully
    - `on_failure` - Pipeline failed
    - `on_quarantine` - Rows were quarantined
    - `on_gate_block` - Quality gate blocked the pipeline
    - `on_threshold_breach` - A threshold was exceeded

    Example:
    ```yaml
    alerts:
      - type: slack
        url: "${SLACK_WEBHOOK_URL}"
        on_events:
          - on_failure
          - on_quarantine
          - on_gate_block
        metadata:
          throttle_minutes: 15
          max_per_hour: 10
          channel: "#data-alerts"
    ```
    """

    type: AlertType
    url: str = Field(description="Webhook URL")
    on_events: List[AlertEvent] = Field(
        default=[AlertEvent.ON_FAILURE],
        description="Events to trigger alert: on_start, on_success, on_failure, on_quarantine, on_gate_block, on_threshold_breach",
    )
    metadata: Dict[str, Any] = Field(
        default_factory=dict,
        description="Extra metadata: throttle_minutes, max_per_hour, channel, etc.",
    )

TransformConfig

Bases: BaseModel

Configuration for transformation steps within a node.

When to Use: Custom business logic, data cleaning, SQL transformations.

Key Concepts: - steps: Ordered list of operations (SQL, functions, or both) - Each step receives the DataFrame from the previous step - Steps execute in order: step1 → step2 → step3

See Also: Transformer Catalog

Transformer vs Transform: - transformer: Single heavy operation (scd2, merge, deduplicate) - transform.steps: Chain of lighter operations

🔧 "Transformation Pipeline" Guide

Business Problem: "I have complex logic that mixes SQL for speed and Python for complex calculations."

The Solution: Chain multiple steps together. Output of Step 1 becomes input of Step 2.

Function Registry: The function step type looks up functions registered with @transform (or @register). This allows you to use the same registered functions as both top-level Transformers and steps in a chain.

Recipe: The Mix-and-Match

transform:
  steps:
    # Step 1: SQL Filter (Fast)
    - sql: "SELECT * FROM df WHERE status = 'ACTIVE'"

    # Step 2: Custom Python Function (Complex Logic)
    # Looks up 'calculate_lifetime_value' in the registry
    - function: "calculate_lifetime_value"
      params: { discount_rate: 0.05 }

    # Step 3: Built-in Operation (Standard)
    - operation: "drop_duplicates"
      params: { subset: ["user_id"] }

Source code in odibi/config.py
class TransformConfig(BaseModel):
    """
    Configuration for transformation steps within a node.

    **When to Use:** Custom business logic, data cleaning, SQL transformations.

    **Key Concepts:**
    - `steps`: Ordered list of operations (SQL, functions, or both)
    - Each step receives the DataFrame from the previous step
    - Steps execute in order: step1 → step2 → step3

    **See Also:** [Transformer Catalog](#nodeconfig)

    **Transformer vs Transform:**
    - `transformer`: Single heavy operation (scd2, merge, deduplicate)
    - `transform.steps`: Chain of lighter operations

    ### 🔧 "Transformation Pipeline" Guide

    **Business Problem:**
    "I have complex logic that mixes SQL for speed and Python for complex calculations."

    **The Solution:**
    Chain multiple steps together. Output of Step 1 becomes input of Step 2.

    **Function Registry:**
    The `function` step type looks up functions registered with `@transform` (or `@register`).
    This allows you to use the *same* registered functions as both top-level Transformers and steps in a chain.

    **Recipe: The Mix-and-Match**
    ```yaml
    transform:
      steps:
        # Step 1: SQL Filter (Fast)
        - sql: "SELECT * FROM df WHERE status = 'ACTIVE'"

        # Step 2: Custom Python Function (Complex Logic)
        # Looks up 'calculate_lifetime_value' in the registry
        - function: "calculate_lifetime_value"
          params: { discount_rate: 0.05 }

        # Step 3: Built-in Operation (Standard)
        - operation: "drop_duplicates"
          params: { subset: ["user_id"] }
    ```
    """

    steps: List[Union[str, TransformStep]] = Field(
        description="List of transformation steps (SQL strings or TransformStep configs)"
    )

ValidationConfig

Bases: BaseModel

Configuration for data validation (post-transform checks).

When to Use: Output data quality checks that run after transformation but before writing.

See Also: Validation Guide, Quarantine Guide, Contracts Overview (pre-transform checks)

🛡️ "The Indestructible Pipeline" Pattern

Business Problem: "Bad data polluted our Gold reports, causing executives to make wrong decisions. We need to stop it before it lands."

The Solution: A Quality Gate that runs after transformation but before writing.

Recipe: The Quality Gate

validation:
  mode: "fail"          # fail (stop pipeline) or warn (log only)
  on_fail: "alert"      # alert or ignore

  tests:
    # 1. Completeness
    - type: "not_null"
      columns: ["transaction_id", "customer_id"]

    # 2. Integrity
    - type: "unique"
      columns: ["transaction_id"]

    - type: "accepted_values"
      column: "status"
      values: ["PENDING", "COMPLETED", "FAILED"]

    # 3. Ranges & Patterns
    - type: "range"
      column: "age"
      min: 18
      max: 120

    - type: "regex_match"
      column: "email"
      pattern: "^[\w\.-]+@[\w\.-]+\.\w+$"

    # 4. Business Logic (SQL)
    - type: "custom_sql"
      name: "dates_ordered"
      condition: "created_at <= completed_at"
      threshold: 0.01   # Allow 1% failure

Recipe: Quarantine + Gate

validation:
  tests:
    - type: not_null
      columns: [customer_id]
      on_fail: quarantine
  quarantine:
    connection: silver
    path: customers_quarantine
  gate:
    require_pass_rate: 0.95
    on_fail: abort

Source code in odibi/config.py
class ValidationConfig(BaseModel):
    """
    Configuration for data validation (post-transform checks).

    **When to Use:** Output data quality checks that run after transformation but before writing.

    **See Also:** Validation Guide, Quarantine Guide, Contracts Overview (pre-transform checks)

    ### 🛡️ "The Indestructible Pipeline" Pattern

    **Business Problem:**
    "Bad data polluted our Gold reports, causing executives to make wrong decisions. We need to stop it *before* it lands."

    **The Solution:**
    A Quality Gate that runs *after* transformation but *before* writing.

    **Recipe: The Quality Gate**
    ```yaml
    validation:
      mode: "fail"          # fail (stop pipeline) or warn (log only)
      on_fail: "alert"      # alert or ignore

      tests:
        # 1. Completeness
        - type: "not_null"
          columns: ["transaction_id", "customer_id"]

        # 2. Integrity
        - type: "unique"
          columns: ["transaction_id"]

        - type: "accepted_values"
          column: "status"
          values: ["PENDING", "COMPLETED", "FAILED"]

        # 3. Ranges & Patterns
        - type: "range"
          column: "age"
          min: 18
          max: 120

        - type: "regex_match"
          column: "email"
          pattern: "^[\\w\\.-]+@[\\w\\.-]+\\.\\w+$"

        # 4. Business Logic (SQL)
        - type: "custom_sql"
          name: "dates_ordered"
          condition: "created_at <= completed_at"
          threshold: 0.01   # Allow 1% failure
    ```

    **Recipe: Quarantine + Gate**
    ```yaml
    validation:
      tests:
        - type: not_null
          columns: [customer_id]
          on_fail: quarantine
      quarantine:
        connection: silver
        path: customers_quarantine
      gate:
        require_pass_rate: 0.95
        on_fail: abort
    ```
    """

    mode: ValidationAction = Field(
        default=ValidationAction.FAIL,
        description="Execution mode: 'fail' (stop pipeline) or 'warn' (log only)",
    )
    on_fail: OnFailAction = Field(
        default=OnFailAction.ALERT,
        description="Action on failure: 'alert' (send notification) or 'ignore'",
    )
    tests: List[TestConfig] = Field(default_factory=list, description="List of validation tests")
    quarantine: Optional[QuarantineConfig] = Field(
        default=None,
        description="Quarantine configuration for failed rows",
    )
    gate: Optional[GateConfig] = Field(
        default=None,
        description="Quality gate configuration for batch-level validation",
    )
    fail_fast: bool = Field(
        default=False,
        description="Stop validation on first failure. Skips remaining tests for faster feedback.",
    )
    cache_df: bool = Field(
        default=False,
        description="Cache DataFrame before validation (Spark only). Improves performance with many tests.",
    )

    @model_validator(mode="after")
    def validate_quarantine_config(self):
        """Warn if quarantine config exists but no tests use on_fail: quarantine."""
        import warnings

        if self.quarantine and self.tests:
            has_quarantine_tests = any(t.on_fail == ContractSeverity.QUARANTINE for t in self.tests)
            if not has_quarantine_tests:
                warnings.warn(
                    "Quarantine config is defined but no tests have 'on_fail: quarantine'. "
                    "Quarantine will not be used. Add 'on_fail: quarantine' to tests that "
                    "should route failed rows to quarantine.",
                    UserWarning,
                    stacklevel=2,
                )
        return self

validate_quarantine_config()

Warn if quarantine config exists but no tests use on_fail: quarantine.

Source code in odibi/config.py
@model_validator(mode="after")
def validate_quarantine_config(self):
    """Warn if quarantine config exists but no tests use on_fail: quarantine."""
    import warnings

    if self.quarantine and self.tests:
        has_quarantine_tests = any(t.on_fail == ContractSeverity.QUARANTINE for t in self.tests)
        if not has_quarantine_tests:
            warnings.warn(
                "Quarantine config is defined but no tests have 'on_fail: quarantine'. "
                "Quarantine will not be used. Add 'on_fail: quarantine' to tests that "
                "should route failed rows to quarantine.",
                UserWarning,
                stacklevel=2,
            )
    return self

PipelineConfig

Bases: BaseModel

Configuration for a pipeline.

Example:

pipelines:
  - pipeline: "user_onboarding"
    description: "Ingest and process new users"
    layer: "silver"
    owner: "data-team@example.com"
    freshness_sla: "6h"
    nodes:
      - name: "node1"
        ...

Source code in odibi/config.py
class PipelineConfig(BaseModel):
    """
    Configuration for a pipeline.

    Example:
    ```yaml
    pipelines:
      - pipeline: "user_onboarding"
        description: "Ingest and process new users"
        layer: "silver"
        owner: "data-team@example.com"
        freshness_sla: "6h"
        nodes:
          - name: "node1"
            ...
    ```
    """

    pipeline: str = Field(description="Pipeline name")
    description: Optional[str] = Field(default=None, description="Pipeline description")
    layer: Optional[str] = Field(default=None, description="Logical layer (bronze/silver/gold)")
    owner: Optional[str] = Field(
        default=None,
        description="Pipeline owner (email or name)",
    )
    freshness_sla: Optional[str] = Field(
        default=None,
        description=(
            "Expected data freshness SLA. "
            "Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). "
            "Examples: '6h', '1d', '30m'."
        ),
    )
    freshness_anchor: Literal["run_completion", "table_max_timestamp", "watermark_state"] = Field(
        default="run_completion",
        description="What defines freshness. Only 'run_completion' implemented initially.",
    )
    nodes: List[NodeConfig] = Field(description="List of nodes in this pipeline")
    auto_cache_threshold: Optional[int] = Field(
        default=3,
        ge=2,
        description=(
            "Auto-cache nodes with N or more downstream dependencies. "
            "Prevents redundant ADLS re-reads when a node is used by multiple downstream nodes. "
            "Default: 3. Set to null to disable auto-caching. "
            "Individual nodes can override with explicit cache: true/false."
        ),
    )

    @field_validator("nodes")
    @classmethod
    def check_unique_node_names(cls, nodes: List[NodeConfig]) -> List[NodeConfig]:
        """Ensure all node names are unique within the pipeline."""
        names = [node.name for node in nodes]
        if len(names) != len(set(names)):
            duplicates = [name for name in names if names.count(name) > 1]
            raise ValueError(f"Duplicate node names found: {set(duplicates)}")
        return nodes

    @model_validator(mode="after")
    def auto_populate_depends_on_from_inputs(self):
        """
        Auto-populate depends_on for same-pipeline references in inputs.

        If a node has inputs like $silver.other_node and this is the silver pipeline,
        automatically add 'other_node' to depends_on for correct execution order.
        """
        node_names = {node.name for node in self.nodes}

        for node in self.nodes:
            if not node.inputs:
                continue

            for input_name, ref in node.inputs.items():
                if not isinstance(ref, str) or not ref.startswith("$"):
                    continue

                # Parse $pipeline.node reference
                parts = ref[1:].split(".", 1)
                if len(parts) != 2:
                    continue

                ref_pipeline, ref_node = parts

                # Check if reference is to same pipeline
                if ref_pipeline == self.pipeline and ref_node in node_names:
                    # Add to depends_on if not already there
                    if ref_node not in node.depends_on:
                        node.depends_on.append(ref_node)

        return self

auto_populate_depends_on_from_inputs()

Auto-populate depends_on for same-pipeline references in inputs.

If a node has inputs like $silver.other_node and this is the silver pipeline, automatically add 'other_node' to depends_on for correct execution order.

Source code in odibi/config.py
@model_validator(mode="after")
def auto_populate_depends_on_from_inputs(self):
    """
    Auto-populate depends_on for same-pipeline references in inputs.

    If a node has inputs like $silver.other_node and this is the silver pipeline,
    automatically add 'other_node' to depends_on for correct execution order.
    """
    node_names = {node.name for node in self.nodes}

    for node in self.nodes:
        if not node.inputs:
            continue

        for input_name, ref in node.inputs.items():
            if not isinstance(ref, str) or not ref.startswith("$"):
                continue

            # Parse $pipeline.node reference
            parts = ref[1:].split(".", 1)
            if len(parts) != 2:
                continue

            ref_pipeline, ref_node = parts

            # Check if reference is to same pipeline
            if ref_pipeline == self.pipeline and ref_node in node_names:
                # Add to depends_on if not already there
                if ref_node not in node.depends_on:
                    node.depends_on.append(ref_node)

    return self

check_unique_node_names(nodes) classmethod

Ensure all node names are unique within the pipeline.

Source code in odibi/config.py
@field_validator("nodes")
@classmethod
def check_unique_node_names(cls, nodes: List[NodeConfig]) -> List[NodeConfig]:
    """Ensure all node names are unique within the pipeline."""
    names = [node.name for node in nodes]
    if len(names) != len(set(names)):
        duplicates = [name for name in names if names.count(name) > 1]
        raise ValueError(f"Duplicate node names found: {set(duplicates)}")
    return nodes

StoryConfig

Bases: BaseModel

Story generation configuration.

Stories are ODIBI's core value - execution reports with lineage. They must use a connection for consistent, traceable output.

Example:

story:
  connection: "local_data"
  path: "stories/"
  retention_days: 30
  failure_sample_size: 100
  max_failure_samples: 500
  max_sampled_validations: 5

Failure Sample Settings: - failure_sample_size: Number of failed rows to capture per validation (default: 100) - max_failure_samples: Total failed rows across all validations (default: 500) - max_sampled_validations: After this many validations, show only counts (default: 5)

Source code in odibi/config.py
class StoryConfig(BaseModel):
    """
    Story generation configuration.

    Stories are ODIBI's core value - execution reports with lineage.
    They must use a connection for consistent, traceable output.

    Example:
    ```yaml
    story:
      connection: "local_data"
      path: "stories/"
      retention_days: 30
      failure_sample_size: 100
      max_failure_samples: 500
      max_sampled_validations: 5
    ```

    **Failure Sample Settings:**
    - `failure_sample_size`: Number of failed rows to capture per validation (default: 100)
    - `max_failure_samples`: Total failed rows across all validations (default: 500)
    - `max_sampled_validations`: After this many validations, show only counts (default: 5)
    """

    connection: str = Field(
        description="Connection name for story output (uses connection's path resolution)"
    )
    path: str = Field(description="Path for stories (relative to connection base_path)")
    max_sample_rows: int = Field(
        default=10,
        ge=0,
        le=100,
        description=(
            "Maximum rows to include in data samples within story reports. "
            "Higher values give more debugging context but increase file size. "
            "Set to 0 to disable data sampling."
        ),
    )
    auto_generate: bool = True
    retention_days: Optional[int] = Field(default=30, ge=1, description="Days to keep stories")
    retention_count: Optional[int] = Field(
        default=100, ge=1, description="Max number of stories to keep"
    )

    # Failure sample settings (troubleshooting)
    failure_sample_size: int = Field(
        default=100,
        ge=0,
        le=1000,
        description="Number of failed rows to capture per validation rule",
    )
    max_failure_samples: int = Field(
        default=500,
        ge=0,
        le=5000,
        description="Maximum total failed rows across all validations",
    )
    max_sampled_validations: int = Field(
        default=5,
        ge=1,
        le=20,
        description="After this many validations, show only counts (no samples)",
    )

    # Performance settings
    async_generation: bool = Field(
        default=False,
        description=(
            "Generate stories asynchronously (fire-and-forget). "
            "Pipeline returns immediately while story writes in background. "
            "Improves multi-pipeline performance by ~5-10s per pipeline."
        ),
    )

    # Lineage settings
    generate_lineage: bool = Field(
        default=True,
        description=(
            "Generate combined lineage graph from all stories. "
            "Creates a unified view of data flow across pipelines."
        ),
    )

    # Documentation generation
    docs: Optional["DocsConfig"] = Field(
        default=None,
        description=(
            "Documentation generation settings. "
            "Generates README.md, TECHNICAL_DETAILS.md, NODE_CARDS/*.md from Story data."
        ),
    )

    @model_validator(mode="after")
    def check_retention_policy(self):
        if self.retention_days is None and self.retention_count is None:
            raise ValueError(
                "StoryConfig validation failed: No retention policy specified. "
                "Provide at least one of: 'retention_days' (e.g., 30) or 'retention_count' (e.g., 100). "
                "This controls how long/many story files are kept before cleanup."
            )
        return self