Skip to content

Odibi Configuration Reference

This manual details the YAML configuration schema for Odibi projects. Auto-generated from Pydantic models.

Project Structure

ProjectConfig

Complete project configuration from YAML.

🏢 "Enterprise Setup" Guide

Business Problem: "We need a robust production environment with alerts, retries, and proper logging."

Recipe: Production Ready

project: "Customer360"
engine: "spark"

# 1. Resilience
retry:
    enabled: true
    max_attempts: 3
    backoff: "exponential"

# 2. Observability
logging:
    level: "INFO"
    structured: true  # JSON logs for Splunk/Datadog

# 3. Alerting
alerts:
    - type: "slack"
    url: "${SLACK_WEBHOOK_URL}"
    on_events: ["on_failure"]

# ... connections and pipelines ...

Field Type Required Default Description
project str Yes - Project name
engine EngineType No EngineType.PANDAS Execution engine
connections Dict[str, ConnectionConfig] Yes - Named connections (at least one required)
Options: LocalConnectionConfig, AzureBlobConnectionConfig, DeltaConnectionConfig, SQLServerConnectionConfig, HttpConnectionConfig, CustomConnectionConfig
pipelines List[PipelineConfig] Yes - Pipeline definitions (at least one required)
story StoryConfig Yes - Story generation configuration (mandatory)
system SystemConfig Yes - System Catalog configuration (mandatory)
lineage Optional[LineageConfig] No - OpenLineage configuration
description Optional[str] No - Project description
version str No 1.0.0 Project version
owner Optional[str] No - Project owner/contact
vars Dict[str, Any] No PydanticUndefined Global variables for substitution (e.g. ${vars.env})
retry RetryConfig No PydanticUndefined Retry configuration for transient failures. Applies to all nodes unless overridden. Default: enabled with 3 attempts, exponential backoff.
logging LoggingConfig No PydanticUndefined Logging configuration for pipeline execution. Set level (DEBUG/INFO/WARNING/ERROR), enable structured JSON logs, add metadata.
alerts List[AlertConfig] No PydanticUndefined Alert configurations
performance PerformanceConfig No PydanticUndefined Performance tuning
environments Optional[Dict[str, Dict[str, Any]]] No - Structure: same as ProjectConfig but with only overridden fields. Not yet validated strictly.
semantic Optional[Dict[str, Any]] No - Semantic layer configuration. Can be inline or reference external file. Contains metrics, dimensions, and materializations for self-service analytics. Example: semantic: { config: 'semantic_config.yaml' } or inline definitions.

PipelineConfig

Used in: ProjectConfig

Configuration for a pipeline.

Example:

pipelines:
  - pipeline: "user_onboarding"
    description: "Ingest and process new users"
    layer: "silver"
    owner: "data-team@example.com"
    freshness_sla: "6h"
    nodes:
      - name: "node1"
        ...

Field Type Required Default Description
pipeline str Yes - Pipeline name
description Optional[str] No - Pipeline description
layer Optional[str] No - Logical layer (bronze/silver/gold)
owner Optional[str] No - Pipeline owner (email or name)
freshness_sla Optional[str] No - Expected data freshness SLA. Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). Examples: '6h', '1d', '30m'.
freshness_anchor Literal['run_completion', 'table_max_timestamp', 'watermark_state'] No run_completion What defines freshness. Only 'run_completion' implemented initially.
nodes List[NodeConfig] Yes - List of nodes in this pipeline
auto_cache_threshold Optional[int] No 3 Auto-cache nodes with N or more downstream dependencies. Prevents redundant ADLS re-reads when a node is used by multiple downstream nodes. Default: 3. Set to null to disable auto-caching. Individual nodes can override with explicit cache: true/false.

NodeConfig

Used in: PipelineConfig

Configuration for a single node.

🧠 "The Smart Node" Pattern

Business Problem: "We need complex dependencies, caching for heavy computations, and the ability to run only specific parts of the pipeline."

The Solution: Nodes are the building blocks. They handle dependencies (depends_on), execution control (tags, enabled), and performance (cache).

🕸️ DAG & Dependencies

The Glue of the Pipeline. Nodes don't run in isolation. They form a Directed Acyclic Graph (DAG).

  • depends_on: Critical! If Node B reads from Node A (in memory), you MUST list ["Node A"].
    • Implicit Data Flow: If a node has no read block, it automatically picks up the DataFrame from its first dependency.

🧠 Smart Read & Incremental Loading

Automated History Management.

Odibi intelligently determines whether to perform a Full Load or an Incremental Load based on the state of the target.

The "Smart Read" Logic: 1. First Run (Full Load): If the target table (defined in write) does not exist: * Incremental filtering rules are ignored. * The entire source dataset is read. * Use write.first_run_query (optional) to override the read query for this initial bootstrap (e.g., to backfill only 1 year of history instead of all time).

  1. Subsequent Runs (Incremental Load): If the target table exists:
    • Rolling Window: Filters source data where column >= NOW() - lookback.
    • Stateful: Filters source data where column > last_high_water_mark.

This ensures you don't need separate "init" and "update" pipelines. One config handles both lifecycle states.

🏷️ Orchestration Tags

Run What You Need. Tags allow you to execute slices of your pipeline. * odibi run --tag daily -> Runs all nodes with "daily" tag. * odibi run --tag critical -> Runs high-priority nodes.

🤖 Choosing Your Logic: Transformer vs. Transform

1. The "Transformer" (Top-Level) * What it is: A pre-packaged, heavy-duty operation that defines the entire purpose of the node. * When to use: When applying a standard Data Engineering pattern (e.g., SCD2, Merge, Deduplicate). * Analogy: "Run this App." * Syntax: transformer: "scd2" + params: {...}

2. The "Transform Steps" (Process Chain) * What it is: A sequence of smaller steps (SQL, functions, operations) executed in order. * When to use: For custom business logic, data cleaning, or feature engineering pipelines. * Analogy: "Run this Script." * Syntax: transform: { steps: [...] }

Note: You can use both! The transformer runs first, then transform steps refine the result.

🔗 Chaining Operations

You can mix and match! The execution order is always: 1. Read (or Dependency Injection) 2. Transformer (The "App" logic, e.g., Deduplicate) 3. Transform Steps (The "Script" logic, e.g., cleanup) 4. Validation 5. Write

Constraint: You must define at least one of read, transformer, transform, or write.

⚡ Example: App vs. Script

Scenario 1: The Full ETL Flow (Chained) Shows explicit Read, Transform Chain, and Write.

# 1. Ingest (The Dependency)
- name: "load_raw_users"
  read: { connection: "s3_landing", format: "json", path: "users/*.json" }
  write: { connection: "bronze", format: "parquet", path: "users_raw" }

# 2. Process (The Consumer)
- name: "clean_users"
  depends_on: ["load_raw_users"]

  # "clean_text" is a registered function from the Transformer Catalog
  transform:
    steps:
      - sql: "SELECT * FROM df WHERE email IS NOT NULL"
      - function: "clean_text"
        params: { columns: ["email"], case: "lower" }

  write: { connection: "silver", format: "delta", table: "dim_users" }

Scenario 2: The "App" Node (Top-Level Transformer) Shows a node that applies a pattern (Deduplicate) to incoming data.

- name: "deduped_users"
  depends_on: ["clean_users"]

  # The "App": Deduplication (From Transformer Catalog)
  transformer: "deduplicate"
  params:
    keys: ["user_id"]
    order_by: "updated_at DESC"

  write: { connection: "gold", format: "delta", table: "users_unique" }

Scenario 3: The Tagged Runner (Reporting) Shows how tags allow running specific slices (e.g., odibi run --tag daily).

- name: "daily_report"
  tags: ["daily", "reporting"]
  depends_on: ["deduped_users"]

  # Ad-hoc aggregation script
  transform:
    steps:
      - sql: "SELECT date_trunc('day', updated_at) as day, count(*) as total FROM df GROUP BY 1"

  write: { connection: "local_data", format: "csv", path: "reports/daily_stats.csv" }

Scenario 4: The "Kitchen Sink" (All Operations) Shows Read -> Transformer -> Transform -> Write execution order.

Why this works: 1. Internal Chaining (df): In every step (Transformer or SQL), df refers to the output of the previous step. 2. External Access (depends_on): If you added depends_on: ["other_node"], you could also run SELECT * FROM other_node in your SQL steps!

- name: "complex_flow"
  # 1. Read -> Creates initial 'df'
  read: { connection: "bronze", format: "parquet", path: "users" }

  # 2. Transformer (The "App": Deduplicate first)
  # Takes 'df' (from Read), dedups it, returns new 'df'
  transformer: "deduplicate"
  params: { keys: ["user_id"], order_by: "updated_at DESC" }

  # 3. Transform Steps (The "Script": Filter AFTER deduplication)
  # SQL sees the deduped data as 'df'
  transform:
    steps:
      - sql: "SELECT * FROM df WHERE status = 'active'"

  # 4. Write -> Saves the final filtered 'df'
  write: { connection: "silver", format: "delta", table: "active_unique_users" }

📚 Transformer Catalog

These are the built-in functions you can use in two ways:

  1. As a Top-Level Transformer: transformer: "name" (Defines the node's main logic)
  2. As a Step in a Chain: transform: { steps: [{ function: "name" }] } (Part of a sequence)

Note: merge and scd2 are special "Heavy Lifters" and should generally be used as Top-Level Transformers.

Data Engineering Patterns * merge: Upsert/Merge into target (Delta/SQL). (Params) * scd2: Slowly Changing Dimensions Type 2. (Params) * deduplicate: Remove duplicates using window functions. (Params)

Relational Algebra * join: Join two datasets. (Params) * union: Stack datasets vertically. (Params) * pivot: Rotate rows to columns. (Params) * unpivot: Rotate columns to rows (melt). (Params) * aggregate: Group by and sum/count/avg. (Params)

Data Quality & Cleaning * validate_and_flag: Check rules and flag invalid rows. (Params) * clean_text: Trim and normalize case. (Params) * filter_rows: SQL-based filtering. (Params) * fill_nulls: Replace NULLs with defaults. (Params)

Feature Engineering * derive_columns: Create new cols via SQL expressions. (Params) * case_when: Conditional logic (if-else). (Params) * generate_surrogate_key: Create MD5 keys from columns. (Params) * date_diff, date_add, date_trunc: Date arithmetic.

Scenario 1: The Full ETL Flow (Show two nodes: one loader, one processor)

# 1. Ingest (The Dependency)
- name: "load_raw_users"
  read: { connection: "s3_landing", format: "json", path: "users/*.json" }
  write: { connection: "bronze", format: "parquet", path: "users_raw" }

# 2. Process (The Consumer)
- name: "clean_users"
  depends_on: ["load_raw_users"]  # <--- Explicit dependency

  # Explicit Transformation Steps
  transform:
    steps:
      - sql: "SELECT * FROM df WHERE email IS NOT NULL"
      - function: "clean_text"
        params: { columns: ["email"], case: "lower" }

  write: { connection: "silver", format: "delta", table: "dim_users" }

Scenario 2: The "App" Node (Transformer) (Show a node that is a Transformer, no read needed if it picks up from dependency)

- name: "deduped_users"
  depends_on: ["clean_users"]

  # The "App": Deduplication
  transformer: "deduplicate"
  params:
    keys: ["user_id"]
    order_by: "updated_at DESC"

  write: { connection: "gold", format: "delta", table: "users_unique" }

Scenario 3: The Tagged Runner Run only this with odibi run --tag daily

- name: "daily_report"
  tags: ["daily", "reporting"]
  # ...

Scenario 4: Pre/Post SQL Hooks Setup and cleanup with SQL statements.

- name: "optimize_sales"
  depends_on: ["load_sales"]
  pre_sql:
    - "SET spark.sql.shuffle.partitions = 200"
    - "CREATE TEMP VIEW staging AS SELECT * FROM bronze.raw_sales"
  transform:
    steps:
      - sql: "SELECT * FROM staging WHERE amount > 0"
  post_sql:
    - "OPTIMIZE gold.fact_sales ZORDER BY (customer_id)"
    - "VACUUM gold.fact_sales RETAIN 168 HOURS"
  write:
    connection: "gold"
    format: "delta"
    table: "fact_sales"

Scenario 5: Materialization Strategies Choose how output is persisted.

# Option 1: View (no physical storage, logical model)
- name: "vw_active_customers"
  materialized: "view"  # Creates SQL view instead of table
  transform:
    steps:
      - sql: "SELECT * FROM customers WHERE status = 'active'"
  write:
    connection: "gold"
    table: "vw_active_customers"

# Option 2: Incremental (append to existing Delta table)
- name: "fact_events"
  materialized: "incremental"  # Uses APPEND mode
  read:
    connection: "bronze"
    table: "raw_events"
    incremental:
      mode: "stateful"
      column: "event_time"
  write:
    connection: "silver"
    format: "delta"
    table: "fact_events"

# Option 3: Table (default - full overwrite)
- name: "dim_products"
  materialized: "table"  # Default behavior
  # ...

Field Type Required Default Description
name str Yes - Unique node name
description Optional[str] No - Human-readable description
explanation Optional[str] No - Markdown-formatted explanation of the node's transformation logic. Rendered in the Data Story HTML report. Supports tables, code blocks, and rich formatting. Use to document business rules, data mappings, and transformation rationale for stakeholder communication. Mutually exclusive with 'explanation_file'.
explanation_file Optional[str] No - Path to external Markdown file containing the explanation, relative to the YAML file. Use for longer documentation to keep YAML files clean. Mutually exclusive with 'explanation'.
runbook_url Optional[str] No - URL to troubleshooting guide or runbook. Shown as 'Troubleshooting guide →' link on failures.
enabled bool No True If False, node is skipped during execution
tags List[str] No PydanticUndefined Operational tags for selective execution (e.g., 'daily', 'critical'). Use with odibi run --tag.
depends_on List[str] No PydanticUndefined List of parent nodes that must complete before this node runs. The output of these nodes is available for reading.
columns Dict[str, ColumnMetadata] No PydanticUndefined Data Dictionary defining the output schema. Used for documentation, PII tagging, and validation.
read Optional[ReadConfig] No - Input operation (Load). If missing, data is taken from the first dependency.
inputs Optional[Dict[str, str | Dict[str, Any]]] No - Multi-input support for cross-pipeline dependencies. Map input names to either: (a) $pipeline.node reference (e.g., '$read_bronze.shift_events') (b) Explicit read config dict. Cannot be used with 'read'. Example: inputs: {events: '$read_bronze.events', calendar: {connection: 'goat', path: 'cal'}}
transform Optional[TransformConfig] No - Chain of fine-grained transformation steps (SQL, functions). Runs after 'transformer' if both are present.
write Optional[WriteConfig] No - Output operation (Save to file/table).
streaming bool No False Enable streaming execution for this node (Spark only)
transformer Optional[str] No - Name of the 'App' logic to run (e.g., 'deduplicate', 'scd2'). See Transformer Catalog for options.
params Dict[str, Any] No PydanticUndefined Parameters for transformer
pre_sql List[str] No PydanticUndefined List of SQL statements to execute before node runs. Use for setup: temp tables, variable initialization, grants. Example: ['SET spark.sql.shuffle.partitions=200', 'CREATE TEMP VIEW src AS SELECT * FROM raw']
post_sql List[str] No PydanticUndefined List of SQL statements to execute after node completes. Use for cleanup, optimization, or audit logging. Example: ['OPTIMIZE gold.fact_sales', 'VACUUM gold.fact_sales RETAIN 168 HOURS']
materialized Optional[Literal['table', 'view', 'incremental']] No - Materialization strategy. Options: 'table' (default physical write), 'view' (creates SQL view instead of table), 'incremental' (uses append mode for Delta tables). Views are useful for Gold layer logical models.
cache bool No False Cache result for reuse
log_level Optional[LogLevel] No - Override log level for this node
on_error ErrorStrategy No ErrorStrategy.FAIL_LATER Failure handling strategy
validation Optional[ValidationConfig] No - -
contracts List[TestConfig] No PydanticUndefined Pre-condition contracts (Circuit Breakers). Runs on input data before transformation.
Options: NotNullTest, UniqueTest, AcceptedValuesTest, RowCountTest, CustomSQLTest, RangeTest, RegexMatchTest, VolumeDropTest, SchemaContract, DistributionContract, FreshnessContract
schema_policy Optional[SchemaPolicyConfig] No - Schema drift handling policy
privacy Optional[PrivacyConfig] No - Privacy Suite: PII anonymization settings
sensitive bool | List[str] No False If true or list of columns, masks sample data in stories
source_yaml Optional[str] No - Internal: source YAML file path for sql_file resolution

ColumnMetadata

Used in: NodeConfig

Metadata for a column in the data dictionary.

Field Type Required Default Description
description Optional[str] No - Column description
pii bool No False Contains PII?
tags List[str] No PydanticUndefined Tags (e.g. 'business_key', 'measure')

SystemConfig

Used in: ProjectConfig

Configuration for the Odibi System Catalog (The Brain).

Stores metadata, state, and pattern configurations. The primary connection must be a storage connection (blob/local) that supports Delta tables.

Example:

system:
  connection: adls_bronze        # Primary - must be blob/local storage
  path: _odibi_system
  environment: dev

With sync to SQL Server (for dashboards/queries):

system:
  connection: adls_prod          # Primary - Delta tables
  environment: prod
  sync_to:
    connection: sql_server_prod  # Secondary - SQL for visibility
    schema_name: odibi_system

With sync to another blob (cross-region backup):

system:
  connection: adls_us_east
  sync_to:
    connection: adls_us_west
    path: _odibi_system_replica

Field Type Required Default Description
connection str Yes - Connection for primary system tables. Must be blob storage (azure_blob) or local filesystem - NOT SQL Server. Delta tables require storage backends.
path str No _odibi_system Path relative to connection root
environment Optional[str] No - Environment tag (e.g., 'dev', 'qat', 'prod'). Written to all system table records for cross-environment querying.
schema_name Optional[str] No - Deprecated. Use sync_to.schema_name for SQL Server targets.
sync_to Optional[SyncToConfig] No - Secondary destination to sync system catalog data to. Use for SQL Server dashboards or cross-region Delta replication.
sync_from Optional[SyncFromConfig] No - Source to sync system data from. Enables pushing local development data to centralized system tables.
cost_per_compute_hour Optional[float] No - Estimated cost per compute hour (USD) for cost tracking
databricks_billing_enabled bool No False Attempt to query Databricks billing tables for actual costs
retention_days Optional[RetentionConfig] No - Retention periods for system tables
optimize_catalog bool No False Run OPTIMIZE + VACUUM on all system catalog Delta tables after each pipeline run. Compacts small files created by frequent MERGE operations. Adds ~15-20s but prevents accumulation of small files that degrade read performance over time. Set to false to skip.
sync_timeout_seconds float No 30.0 Maximum time (seconds) to wait for async catalog sync to complete. Reduced from 300s default to 30s for better performance. Sync is incremental, so incomplete syncs will catch up on next run.
async_derived_updates bool No True Run derived table updates (meta_daily_stats, meta_pipeline_health, etc.) asynchronously in background thread. Saves ~20-30s per pipeline. Updates complete eventually - safe for reporting tables.
async_lineage bool No True Build lineage incrementally as each pipeline completes, then merge at end. Saves ~40s by parallelizing lineage construction with pipeline execution. Lineage still generated, just built in background.
skip_sync_wait_in_databricks bool No True Skip waiting for catalog sync to complete when running in Databricks. Databricks clusters stay alive, so background sync threads complete safely. Saves ~90s overhead. Set to false to always wait for sync completion.

SyncFromConfig

Used in: SystemConfig

Configuration for syncing system data from a source location.

Used to pull system data (runs, state) from another backend into the target.

Example:

sync_from:
  connection: local_parquet
  path: .odibi/system/

Field Type Required Default Description
connection str Yes - Connection name for the source system data
path Optional[str] No - Path to source system data (for file-based sources)
schema_name Optional[str] No - Schema name for SQL Server source (if applicable)

Connections

LocalConnectionConfig

Used in: ProjectConfig

Local filesystem connection.

When to Use: Development, testing, small datasets, local processing.

See Also: AzureBlobConnectionConfig for cloud alternatives.

Example:

local_data:
  type: "local"
  base_path: "./data"

Field Type Required Default Description
type Literal['local'] No ConnectionType.LOCAL -
validation_mode ValidationMode No ValidationMode.LAZY -
base_path str No ./data Base directory path

DeltaConnectionConfig

Used in: ProjectConfig

Delta Lake connection for ACID-compliant data lakes.

When to Use: - Production data lakes on Azure/AWS/GCP - Need time travel, ACID transactions, schema evolution - Upsert/merge operations

See Also: WriteConfig for Delta write options

Scenario 1: Delta via metastore

delta_silver:
  type: "delta"
  catalog: "spark_catalog"
  schema: "silver_db"

Scenario 2: Direct path + Node usage

delta_local:
  type: "local"
  base_path: "dbfs:/mnt/delta"

# In pipeline:
# read:
#   connection: "delta_local"
#   format: "delta"
#   path: "bronze/orders"

Field Type Required Default Description
type Literal['delta'] No ConnectionType.DELTA -
validation_mode ValidationMode No ValidationMode.LAZY -
catalog str Yes - Spark catalog name (e.g. 'spark_catalog')
schema_name str Yes - Database/schema name
table Optional[str] No - Optional default table name for this connection (used by story/pipeline helpers)

AzureBlobConnectionConfig

Used in: ProjectConfig

Azure Blob Storage / ADLS Gen2 connection.

When to Use: Azure-based data lakes, landing zones, raw data storage.

See Also: DeltaConnectionConfig for Delta-specific options

Scenario 1: Prod with Key Vault-managed key

adls_bronze:
  type: "azure_blob"
  account_name: "myaccount"
  container: "bronze"
  auth:
    mode: "key_vault"
    key_vault: "kv-data"
    secret: "adls-account-key"

Scenario 2: Local dev with inline account key

adls_dev:
  type: "azure_blob"
  account_name: "devaccount"
  container: "sandbox"
  auth:
    mode: "account_key"
    account_key: "${ADLS_ACCOUNT_KEY}"

Scenario 3: MSI (no secrets)

adls_msi:
  type: "azure_blob"
  account_name: "myaccount"
  container: "bronze"
  auth:
    mode: "aad_msi"
    # optional: client_id for user-assigned identity
    client_id: "00000000-0000-0000-0000-000000000000"

Field Type Required Default Description
type Literal['azure_blob'] No ConnectionType.AZURE_BLOB -
validation_mode ValidationMode No ValidationMode.LAZY -
account_name str Yes - -
container str Yes - -
auth AzureBlobAuthConfig No PydanticUndefined Authentication configuration. Choose one mode: 'account_key' (storage key), 'sas' (SAS token), 'connection_string', 'key_vault' (Azure Key Vault), or 'aad_msi' (Managed Identity, default). For production, prefer key_vault or aad_msi to avoid storing secrets in config.
Options: AzureBlobKeyVaultAuth, AzureBlobAccountKeyAuth, AzureBlobSasAuth, AzureBlobConnectionStringAuth, AzureBlobMsiAuth

SQLServerConnectionConfig

Used in: ProjectConfig

SQL Server / Azure SQL Database connection.

When to Use: Reading from SQL Server sources, Azure SQL DB, Azure Synapse.

See Also: ReadConfig for query options

Scenario 1: Managed identity (AAD MSI)

sql_dw_msi:
  type: "sql_server"
  host: "server.database.windows.net"
  database: "dw"
  auth:
    mode: "aad_msi"

Scenario 2: SQL login

sql_dw_login:
  type: "sql_server"
  host: "server.database.windows.net"
  database: "dw"
  port: 1433
  driver: "ODBC Driver 17 for SQL Server"
  auth:
    mode: "sql_login"
    username: "dw_writer"
    password: "${DW_PASSWORD}"

Field Type Required Default Description
type Literal['sql_server'] No ConnectionType.SQL_SERVER -
validation_mode ValidationMode No ValidationMode.LAZY -
host str Yes - -
database str Yes - -
port int No 1433 -
driver str No ODBC Driver 18 for SQL Server -
auth SQLServerAuthConfig No PydanticUndefined Authentication configuration. Choose one mode: 'sql_login' (username/password), 'aad_password' (Azure AD service principal), 'aad_msi' (Managed Identity, default), or 'connection_string' (full JDBC string). For Databricks/Azure, prefer aad_msi for passwordless auth.
Options: SQLLoginAuth, SQLAadPasswordAuth, SQLMsiAuth, SQLConnectionStringAuth

HttpConnectionConfig

Used in: ProjectConfig

HTTP connection.

Scenario: Bearer token via env var

api_source:
  type: "http"
  base_url: "https://api.example.com"
  headers:
    User-Agent: "odibi-pipeline"
  auth:
    mode: "bearer"
    token: "${API_TOKEN}"

Field Type Required Default Description
type Literal['http'] No ConnectionType.HTTP -
validation_mode ValidationMode No ValidationMode.LAZY -
base_url str Yes - Base URL for all API requests (e.g., 'https://api.example.com/v1')
headers Dict[str, str] No PydanticUndefined Default HTTP headers included in all requests. Example: {'User-Agent': 'odibi-pipeline', 'Accept': 'application/json'}. Auth headers are typically set via the 'auth' block instead.
auth HttpAuthConfig No PydanticUndefined Authentication configuration. Choose one mode: 'none' (no auth), 'basic' (username/password), 'bearer' (token), or 'api_key' (custom header). Tokens can use env vars: '${API_TOKEN}'.
Options: HttpNoAuth, HttpBasicAuth, HttpBearerAuth, HttpApiKeyAuth

Node Operations

ReadConfig

Used in: NodeConfig

Configuration for reading data into a node.

When to Use: First node in a pipeline, or any node that reads from storage.

Key Concepts: - connection: References a named connection from connections: section - format: File format (csv, parquet, delta, json, sql) - incremental: Enable incremental loading (only new data)

See Also: - Incremental Loading - HWM-based loading - IncrementalConfig - Incremental loading options

📖 "Universal Reader" Guide

Business Problem: "I need to read from files, databases, streams, and even travel back in time to see how data looked yesterday."

Recipe 1: The Time Traveler (Delta/Iceberg) Reproduce a bug by seeing the data exactly as it was.

read:
  connection: "silver_lake"
  format: "delta"
  table: "fact_sales"
  time_travel:
    as_of_timestamp: "2023-10-25T14:00:00Z"

Recipe 2: The Streamer Process data in real-time.

read:
  connection: "event_hub"
  format: "json"
  streaming: true

Recipe 3: The SQL Query Push down filtering to the source database.

read:
  connection: "enterprise_dw"
  format: "sql"
  # Use the query option to filter at source!
  query: "SELECT * FROM huge_table WHERE date >= '2024-01-01'"

Recipe 4: Archive Bad Records (Spark) Capture malformed records for later inspection.

read:
  connection: "landing"
  format: "json"
  path: "events/*.json"
  archive_options:
    badRecordsPath: "/mnt/quarantine/bad_records"

Recipe 5: Optimize JDBC Parallelism (Spark) Control partition count for SQL sources to reduce task overhead.

read:
  connection: "enterprise_dw"
  format: "sql"
  table: "small_lookup_table"
  options:
    numPartitions: 1  # Single partition for small tables

Performance Tip: For small tables (<100K rows), use numPartitions: 1 to avoid excessive Spark task scheduling overhead. For large tables, increase partitions to enable parallel reads (requires partitionColumn, lowerBound, upperBound).

Field Type Required Default Description
connection Optional[str] No - Connection name from project.yaml (null for synthetic/simulation sources)
format ReadFormat Yes - Data format: csv, parquet, delta, json, sql, api, excel, avro, cloudFiles
table Optional[str] No - Table name for SQL/Delta
path Optional[str] No - Path for file-based sources
streaming bool No False Enable streaming read (Spark only)
schema_ddl Optional[str] No - Schema for streaming reads from file sources (required for Avro, JSON, CSV). Use Spark DDL format: 'col1 STRING, col2 INT, col3 TIMESTAMP'. Not required for Delta (schema is inferred from table metadata).
query Optional[str] No - SQL query to filter at source (pushdown). Mutually exclusive with table/path if supported by connector.
sql_file Optional[str] No - Path to external .sql file containing the query, relative to the YAML file defining the node. Mutually exclusive with 'query'.
filter Optional[str] No - SQL WHERE clause filter (pushed down to source for SQL formats). Example: "DAY > '2022-12-31'"
incremental Optional[IncrementalConfig] No - Automatic incremental loading strategy (CDC-like). If set, generates query based on target state (HWM).
time_travel Optional[TimeTravelConfig] No - Time travel options (Delta only)
archive_options Dict[str, Any] No PydanticUndefined Options for archiving bad records (e.g. badRecordsPath for Spark)
options Dict[str, Any] No PydanticUndefined Format-specific options

IncrementalConfig

Used in: ReadConfig

Configuration for automatic incremental loading.

When to Use: Load only new/changed data instead of full table scans.

See Also: ReadConfig

Modes: 1. Rolling Window (Default): Uses a time-based lookback from NOW(). Good for: Stateless loading where you just want "recent" data. Args: lookback, unit

  1. Stateful: Tracks the High-Water Mark (HWM) of the key column. Good for: Exact incremental ingestion (e.g. CDC-like). Args: state_key (optional), watermark_lag (optional)

Generates SQL: - Rolling: WHERE column >= NOW() - lookback - Stateful: WHERE column > :last_hwm

Example (Rolling Window):

incremental:
  mode: "rolling_window"
  column: "updated_at"
  lookback: 3
  unit: "day"

Example (Stateful HWM):

incremental:
  mode: "stateful"
  column: "id"
  # Optional: track separate column for HWM state
  state_key: "last_processed_id"

Example (Stateful with Watermark Lag):

incremental:
  mode: "stateful"
  column: "updated_at"
  # Handle late-arriving data: look back 2 hours from HWM
  watermark_lag: "2h"

Example (Oracle Date Format):

incremental:
  mode: "rolling_window"
  column: "EVENT_START"
  lookback: 3
  unit: "day"
  # For string columns with Oracle format (DD-MON-YY)
  date_format: "oracle"

Supported date_format values: - oracle: DD-MON-YY for Oracle databases (uses TO_TIMESTAMP) - oracle_sqlserver: DD-MON-YY format stored in SQL Server (uses TRY_CONVERT) - sql_server: Uses CONVERT with style 120 - us: MM/DD/YYYY format - eu: DD/MM/YYYY format - iso: YYYY-MM-DDTHH:MM:SS format

Field Type Required Default Description
mode IncrementalMode No IncrementalMode.ROLLING_WINDOW Incremental strategy: 'rolling_window' or 'stateful'
column str Yes - Primary column to filter on (e.g., updated_at)
fallback_column Optional[str] No - Backup column if primary is NULL (e.g., created_at). Generates COALESCE(col, fallback) >= ...
lookback Optional[int] No - Time units to look back (Rolling Window only)
unit Optional[IncrementalUnit] No - Time unit for lookback (Rolling Window only). Options: 'hour', 'day', 'month', 'year'
state_key Optional[str] No - Unique ID for state tracking. Defaults to node name if not provided.
watermark_lag Optional[str] No - Safety buffer for late-arriving data in stateful mode. Subtracts this duration from the stored HWM when filtering. Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). Examples: '2h' (2 hours), '30m' (30 minutes), '1d' (1 day). Use when source has replication lag or eventual consistency.
date_format Optional[str] No - Source date format when the column is stored as a string. Options: 'oracle' (DD-MON-YY for Oracle DB), 'oracle_sqlserver' (DD-MON-YY format in SQL Server), 'sql_server' (uses CONVERT with style 120), 'us' (MM/DD/YYYY), 'eu' (DD/MM/YYYY), 'iso' (YYYY-MM-DDTHH:MM:SS). When set, SQL pushdown will use appropriate CONVERT/TO_TIMESTAMP functions.

TimeTravelConfig

Used in: ReadConfig

Configuration for time travel reading (Delta/Iceberg).

Example:

time_travel:
  as_of_version: 10
  # OR
  as_of_timestamp: "2023-10-01T12:00:00Z"

Field Type Required Default Description
as_of_version Optional[int] No - Version number to time travel to
as_of_timestamp Optional[str] No - Timestamp string to time travel to

TransformConfig

Used in: NodeConfig

Configuration for transformation steps within a node.

When to Use: Custom business logic, data cleaning, SQL transformations.

Key Concepts: - steps: Ordered list of operations (SQL, functions, or both) - Each step receives the DataFrame from the previous step - Steps execute in order: step1 → step2 → step3

See Also: Transformer Catalog

Transformer vs Transform: - transformer: Single heavy operation (scd2, merge, deduplicate) - transform.steps: Chain of lighter operations

🔧 "Transformation Pipeline" Guide

Business Problem: "I have complex logic that mixes SQL for speed and Python for complex calculations."

The Solution: Chain multiple steps together. Output of Step 1 becomes input of Step 2.

Function Registry: The function step type looks up functions registered with @transform (or @register). This allows you to use the same registered functions as both top-level Transformers and steps in a chain.

Recipe: The Mix-and-Match

transform:
  steps:
    # Step 1: SQL Filter (Fast)
    - sql: "SELECT * FROM df WHERE status = 'ACTIVE'"

    # Step 2: Custom Python Function (Complex Logic)
    # Looks up 'calculate_lifetime_value' in the registry
    - function: "calculate_lifetime_value"
      params: { discount_rate: 0.05 }

    # Step 3: Built-in Operation (Standard)
    - operation: "drop_duplicates"
      params: { subset: ["user_id"] }

Field Type Required Default Description
steps List[str | TransformStep] Yes - List of transformation steps (SQL strings or TransformStep configs)

DeleteDetectionConfig

Configuration for delete detection in Silver layer.

🔍 "CDC Without CDC" Guide

Business Problem: "Records are deleted in our Azure SQL source, but our Silver tables still show them."

The Solution: Use delete detection to identify and flag records that no longer exist in the source.

Recipe 1: SQL Compare (Recommended for HWM)

transform:
  steps:
    - operation: detect_deletes
      params:
        mode: sql_compare
        keys: [customer_id]
        source_connection: azure_sql
        source_table: dbo.Customers

Recipe 2: Snapshot Diff (For Full Snapshot Sources) Use ONLY with full snapshot ingestion, NOT with HWM incremental. Requires connection and path to specify the target Delta table for comparison.

transform:
  steps:
    - operation: detect_deletes
      params:
        mode: snapshot_diff
        keys: [customer_id]
        connection: silver_conn    # Required: connection to target Delta table
        path: "silver/customers"   # Required: path to target Delta table

Recipe 3: Conservative Threshold

transform:
  steps:
    - operation: detect_deletes
      params:
        mode: sql_compare
        keys: [customer_id]
        source_connection: erp
        source_table: dbo.Customers
        max_delete_percent: 20.0
        on_threshold_breach: error

Recipe 4: Hard Delete (Remove Rows)

transform:
  steps:
    - operation: detect_deletes
      params:
        mode: sql_compare
        keys: [customer_id]
        source_connection: azure_sql
        source_table: dbo.Customers
        soft_delete_col: null  # removes rows instead of flagging

Field Type Required Default Description
mode DeleteDetectionMode No DeleteDetectionMode.NONE Delete detection strategy: none, snapshot_diff, sql_compare
keys List[str] No PydanticUndefined Business key columns for comparison
connection Optional[str] No - For snapshot_diff: connection name to target Delta table (required for snapshot_diff)
path Optional[str] No - For snapshot_diff: path to target Delta table (required for snapshot_diff)
soft_delete_col Optional[str] No _is_deleted Column to flag deletes (True = deleted). Set to null for hard-delete (removes rows).
source_connection Optional[str] No - For sql_compare: connection name to query live source
source_table Optional[str] No - For sql_compare: table to query for current keys
source_query Optional[str] No - For sql_compare: custom SQL query for keys (overrides source_table)
snapshot_column Optional[str] No - For snapshot_diff on non-Delta: column to identify snapshots. If None, uses Delta time travel (default).
on_first_run FirstRunBehavior No FirstRunBehavior.SKIP Behavior when no previous version exists for snapshot_diff
max_delete_percent Optional[float] No 50.0 Safety threshold: warn/error if more than X% of rows would be deleted
on_threshold_breach ThresholdBreachAction No ThresholdBreachAction.WARN Behavior when delete percentage exceeds max_delete_percent

ValidationConfig

Used in: NodeConfig

Configuration for data validation (post-transform checks).

When to Use: Output data quality checks that run after transformation but before writing.

See Also: Validation Guide, Quarantine Guide, Contracts Overview (pre-transform checks)

🛡️ "The Indestructible Pipeline" Pattern

Business Problem: "Bad data polluted our Gold reports, causing executives to make wrong decisions. We need to stop it before it lands."

The Solution: A Quality Gate that runs after transformation but before writing.

Recipe: The Quality Gate

validation:
  mode: "fail"          # fail (stop pipeline) or warn (log only)
  on_fail: "alert"      # alert or ignore

  tests:
    # 1. Completeness
    - type: "not_null"
      columns: ["transaction_id", "customer_id"]

    # 2. Integrity
    - type: "unique"
      columns: ["transaction_id"]

    - type: "accepted_values"
      column: "status"
      values: ["PENDING", "COMPLETED", "FAILED"]

    # 3. Ranges & Patterns
    - type: "range"
      column: "age"
      min: 18
      max: 120

    - type: "regex_match"
      column: "email"
      pattern: "^[\w\.-]+@[\w\.-]+\.\w+$"

    # 4. Business Logic (SQL)
    - type: "custom_sql"
      name: "dates_ordered"
      condition: "created_at <= completed_at"
      threshold: 0.01   # Allow 1% failure

Recipe: Quarantine + Gate

validation:
  tests:
    - type: not_null
      columns: [customer_id]
      on_fail: quarantine
  quarantine:
    connection: silver
    path: customers_quarantine
  gate:
    require_pass_rate: 0.95
    on_fail: abort

Field Type Required Default Description
mode ValidationAction No ValidationAction.FAIL Execution mode: 'fail' (stop pipeline) or 'warn' (log only)
on_fail OnFailAction No OnFailAction.ALERT Action on failure: 'alert' (send notification) or 'ignore'
tests List[TestConfig] No PydanticUndefined List of validation tests
Options: NotNullTest, UniqueTest, AcceptedValuesTest, RowCountTest, CustomSQLTest, RangeTest, RegexMatchTest, VolumeDropTest, SchemaContract, DistributionContract, FreshnessContract
quarantine Optional[QuarantineConfig] No - Quarantine configuration for failed rows
gate Optional[GateConfig] No - Quality gate configuration for batch-level validation
fail_fast bool No False Stop validation on first failure. Skips remaining tests for faster feedback.
cache_df bool No False Cache DataFrame before validation (Spark only). Improves performance with many tests.

QuarantineConfig

Used in: ValidationConfig

Configuration for quarantine table routing.

When to Use: Capture invalid records for review/reprocessing instead of failing the pipeline.

See Also: Quarantine Guide, ValidationConfig

Routes rows that fail validation tests to a quarantine table with rejection metadata for later analysis/reprocessing.

Example:

validation:
  tests:
    - type: not_null
      columns: [customer_id]
      on_fail: quarantine
  quarantine:
    connection: silver
    path: customers_quarantine
    add_columns:
      _rejection_reason: true
      _rejected_at: true
    max_rows: 10000
    sample_fraction: 0.1

Field Type Required Default Description
connection str Yes - Connection for quarantine writes
path Optional[str] No - Path for quarantine data
table Optional[str] No - Table name for quarantine
add_columns QuarantineColumnsConfig No PydanticUndefined Metadata columns to add to quarantined rows
retention_days Optional[int] No 90 Days to retain quarantined data (auto-cleanup)
max_rows Optional[int] No - Maximum number of rows to quarantine per run. Limits storage for high-failure batches.
sample_fraction Optional[float] No - Sample fraction of invalid rows to quarantine (0.0-1.0). Use for sampling large invalid sets.

QuarantineColumnsConfig

Used in: QuarantineConfig

Columns added to quarantined rows for debugging and reprocessing.

Example:

quarantine:
  connection: silver
  path: customers_quarantine
  add_columns:
    _rejection_reason: true
    _rejected_at: true
    _source_batch_id: true
    _failed_tests: true
    _original_node: false

Field Type Required Default Description
rejection_reason bool No True Add _rejection_reason column with test failure description
rejected_at bool No True Add _rejected_at column with UTC timestamp
source_batch_id bool No True Add _source_batch_id column with run ID for traceability
failed_tests bool No True Add _failed_tests column with comma-separated list of failed test names
original_node bool No False Add _original_node column with source node name

GateConfig

Used in: ValidationConfig

Quality gate configuration for batch-level validation.

When to Use: Pipeline-level pass/fail thresholds, row count limits, change detection.

See Also: Quality Gates, ValidationConfig

Gates evaluate the entire batch before writing, ensuring data quality thresholds are met.

Example:

gate:
  require_pass_rate: 0.95
  on_fail: abort
  thresholds:
    - test: not_null
      min_pass_rate: 0.99
  row_count:
    min: 100
    change_threshold: 0.5

Field Type Required Default Description
require_pass_rate float No 0.95 Minimum percentage of rows passing ALL tests
on_fail GateOnFail No GateOnFail.ABORT Action when gate fails
thresholds List[GateThreshold] No PydanticUndefined Per-test thresholds (overrides global require_pass_rate)
row_count Optional[RowCountGate] No - Row count anomaly detection

GateThreshold

Used in: GateConfig

Per-test threshold configuration for quality gates.

Allows setting different pass rate requirements for specific tests.

Example:

gate:
  thresholds:
    - test: not_null
      min_pass_rate: 0.99
    - test: unique
      min_pass_rate: 1.0

Field Type Required Default Description
test str Yes - Test name or type to apply threshold to
min_pass_rate float Yes - Minimum pass rate required (0.0-1.0, e.g., 0.99 = 99%)

RowCountGate

Used in: GateConfig

Row count anomaly detection for quality gates.

Validates that batch size falls within expected bounds and detects significant changes from previous runs.

Example:

gate:
  row_count:
    min: 100
    max: 1000000
    change_threshold: 0.5

Field Type Required Default Description
min Optional[int] No - Minimum expected row count
max Optional[int] No - Maximum expected row count
change_threshold Optional[float] No - Max allowed change vs previous run (e.g., 0.5 = 50% change triggers failure)

WriteConfig

Used in: NodeConfig

Configuration for writing data from a node.

When to Use: Any node that persists data to storage.

Key Concepts: - mode: How to handle existing data (overwrite, append, upsert) - keys: Required for upsert mode - columns that identify unique records - partition_by: Columns to partition output by (improves query performance)

See Also: - Performance Tuning - Partitioning strategies

🚀 "Big Data Performance" Guide

Business Problem: "My dashboards are slow because the query scans terabytes of data just to find one day's sales."

The Solution: Use Partitioning for coarse filtering (skipping huge chunks) and Z-Ordering for fine-grained skipping (colocating related data).

Recipe: Lakehouse Optimized

write:
  connection: "gold_lake"
  format: "delta"
  table: "fact_sales"
  mode: "append"

  # 1. Partitioning: Physical folders.
  # Use for low-cardinality columns often used in WHERE clauses.
  # WARNING: Do NOT partition by high-cardinality cols like ID or Timestamp!
  partition_by: ["country_code", "txn_year_month"]

  # 2. Z-Ordering: Data clustering.
  # Use for high-cardinality columns often used in JOINs or predicates.
  zorder_by: ["customer_id", "product_id"]

  # 3. Table Properties: Engine tuning.
  table_properties:
    "delta.autoOptimize.optimizeWrite": "true"
    "delta.autoOptimize.autoCompact": "true"

Field Type Required Default Description
connection str Yes - Connection name from project.yaml
format ReadFormat Yes - Output format: csv, parquet, delta, json, sql, api, excel, avro, cloudFiles
table Optional[str] No - Table name for SQL/Delta
path Optional[str] No - Path for file-based outputs
register_table Optional[str] No - Register file output as external table (Spark/Delta only)
mode WriteMode No WriteMode.OVERWRITE Write mode. Options: 'overwrite', 'append', 'upsert', 'append_once', 'merge'. Use 'append_once' for idempotent Bronze ingestion (requires 'keys' in options). See WriteMode enum for details.
partition_by List[str] No PydanticUndefined List of columns to physically partition the output by (folder structure). Use for low-cardinality columns (e.g. date, country).
zorder_by List[str] No PydanticUndefined List of columns to Z-Order by. Improves read performance for high-cardinality columns used in filters/joins (Delta only).
table_properties Dict[str, str] No PydanticUndefined Delta table properties. Overrides global performance.delta_table_properties. Example: {'delta.columnMapping.mode': 'name'} to allow special characters in column names.
merge_schema bool No False Allow schema evolution (mergeSchema option in Delta)
overwrite_schema bool No False Allow schema overwrite on mode=overwrite (overwriteSchema option in Delta). Use when the incoming schema differs from the existing table schema.
first_run_query Optional[str] No - SQL query for full-load on first run (High Water Mark pattern). If set, uses this query when target table doesn't exist, then switches to incremental. Only applies to SQL reads.
options Dict[str, Any] No PydanticUndefined Format-specific options
auto_optimize bool | AutoOptimizeConfig No - Auto-run OPTIMIZE and VACUUM after write (Delta only)
add_metadata bool | WriteMetadataConfig No - Add metadata columns for Bronze layer lineage. Set to true to add all applicable columns, or provide a WriteMetadataConfig for selective columns. Columns: _extracted_at, _source_file (file sources), _source_connection, _source_table (SQL sources).
skip_if_unchanged bool No False Skip write if DataFrame content is identical to previous write. Computes SHA256 hash of entire DataFrame and compares to stored hash in Delta table metadata. Useful for snapshot tables without timestamps to avoid redundant appends. Only supported for Delta format.
skip_hash_columns Optional[List[str]] No - Columns to include in hash computation for skip_if_unchanged. If None, all columns are used. Specify a subset to ignore volatile columns like timestamps.
skip_hash_sort_columns Optional[List[str]] No - Columns to sort by before hashing for deterministic comparison. Required if row order may vary between runs. Typically your business key columns.
streaming Optional[StreamingWriteConfig] No - Streaming write configuration for Spark Structured Streaming. When set, uses writeStream instead of batch write. Requires a streaming DataFrame from a streaming read source.
merge_keys Optional[List[str]] No - Key columns for SQL Server MERGE operations. Required when mode='merge'. These columns form the ON clause of the MERGE statement.
merge_options Optional[SqlServerMergeOptions] No - Options for SQL Server MERGE operations (conditions, staging, audit cols)
overwrite_options Optional[SqlServerOverwriteOptions] No - Options for SQL Server overwrite operations (strategy, audit cols)

WriteMetadataConfig

Used in: WriteConfig

Configuration for metadata columns added during Bronze writes.

📋 Bronze Metadata Guide

Business Problem: "We need lineage tracking and debugging info for our Bronze layer data."

The Solution: Add metadata columns during ingestion for traceability.

Recipe 1: Add All Metadata (Recommended)

write:
  connection: bronze
  table: customers
  mode: append
  add_metadata: true  # adds all applicable columns

Recipe 2: Selective Metadata

write:
  connection: bronze
  table: customers
  mode: append
  add_metadata:
    extracted_at: true
    source_file: true
    source_connection: false
    source_table: false

Available Columns: - _extracted_at: Pipeline execution timestamp (all sources) - _source_file: Source filename/path (file sources only) - _source_connection: Connection name used (all sources) - _source_table: Table or query name (SQL sources only)

Field Type Required Default Description
extracted_at bool No True Add _extracted_at column with pipeline execution timestamp
source_file bool No True Add _source_file column with source filename (file sources only)
source_connection bool No False Add _source_connection column with connection name
source_table bool No False Add _source_table column with table/query name (SQL sources only)

StreamingWriteConfig

Used in: WriteConfig

Configuration for Spark Structured Streaming writes.

🚀 "Real-Time Pipeline" Guide

Business Problem: "I need to process data continuously as it arrives from Kafka/Event Hubs and write it to Delta Lake in near real-time."

The Solution: Configure streaming write with checkpoint location for fault tolerance and trigger interval for processing frequency.

Recipe: Streaming Ingestion

write:
  connection: "silver_lake"
  format: "delta"
  table: "events_stream"
  streaming:
    output_mode: append
    checkpoint_location: "/checkpoints/events_stream"
    trigger:
      processing_time: "10 seconds"

Recipe: One-Time Streaming (Batch-like)

write:
  connection: "silver_lake"
  format: "delta"
  table: "events_batch"
  streaming:
    output_mode: append
    checkpoint_location: "/checkpoints/events_batch"
    trigger:
      available_now: true

Field Type Required Default Description
output_mode Literal['append', 'update', 'complete'] No append Output mode for streaming writes. 'append' - Only new rows. 'update' - Updated rows only. 'complete' - Entire result table (requires aggregation).
checkpoint_location str Yes - Path for streaming checkpoints. Required for fault tolerance. Must be a reliable storage location (e.g., cloud storage, DBFS).
trigger Optional[TriggerConfig] No - Trigger configuration. If not specified, processes data as fast as possible. Use 'processing_time' for micro-batch intervals, 'once' for single batch, 'available_now' for processing all available data then stopping.
query_name Optional[str] No - Name for the streaming query (useful for monitoring and debugging)
await_termination Optional[bool] No False Wait for the streaming query to terminate. Set to True for batch-like streaming with 'once' or 'available_now' triggers.
timeout_seconds Optional[int] No - Timeout in seconds when await_termination is True. If None, waits indefinitely.

TriggerConfig

Used in: StreamingWriteConfig

Configuration for streaming trigger intervals.

Specify exactly one of the trigger options.

Example:

trigger:
  processing_time: "10 seconds"

Or for one-time processing:

trigger:
  once: true

Field Type Required Default Description
processing_time Optional[str] No - Trigger interval as duration string (e.g., '10 seconds', '1 minute')
once Optional[bool] No - Process all available data once and stop
available_now Optional[bool] No - Process all available data in multiple batches, then stop
continuous Optional[str] No - Continuous processing with checkpoint interval (e.g., '1 second')

AutoOptimizeConfig

Used in: WriteConfig

Configuration for Delta Lake automatic optimization.

Example:

auto_optimize:
  enabled: true
  vacuum_retention_hours: 168

Field Type Required Default Description
enabled bool No True Enable auto optimization
vacuum_retention_hours int No 168 Hours to retain history for VACUUM (default 7 days). Set to 0 to disable VACUUM.

ApiOptionsConfig

Complete options configuration for API data sources (format: api).

When to Use: Pull data from REST APIs with pagination, retry, and rate limiting.

See Also: API Data Sources Guide

Example:

nodes:
  - name: api_data
    read:
      connection: my_api
      format: api
      path: /v1/records
      options:
        pagination:
          type: offset_limit
          limit: 1000
          max_pages: 100
        response:
          items_path: data.records
          add_fields:
            _source: "my_api"
            _fetched_at: "$now"
        retry:
          max_retries: 3
        rate_limit:
          requests_per_second: 5

Field Type Required Default Description
pagination Optional[ApiPaginationConfig] No - Pagination configuration
response Optional[ApiResponseConfig] No - Response parsing configuration
retry Optional[ApiRetryConfig] No - Retry configuration
rate_limit Optional[ApiRateLimitConfig] No - Rate limiting configuration
method str No GET HTTP method (GET, POST)
headers Optional[Dict[str, str]] No - Additional HTTP headers
params Optional[Dict[str, str]] No - Additional query parameters
json_body Optional[Dict[str, Any]] No - JSON body for POST requests

ApiPaginationConfig

Used in: ApiOptionsConfig

Pagination configuration for API data sources.

When to Use: Configure how to paginate through API results.

Example (offset/limit pagination):

read:
  format: api
  options:
    pagination:
      type: offset_limit
      offset_param: skip
      limit_param: limit
      limit: 1000
      max_pages: 100

Example (cursor-based pagination):

read:
  format: api
  options:
    pagination:
      type: cursor
      cursor_path: meta.next_cursor
      cursor_param: cursor

Example (link header pagination - GitHub style):

read:
  format: api
  options:
    pagination:
      type: link_header
      limit: 100

Field Type Required Default Description
type ApiPaginationType No ApiPaginationType.OFFSET_LIMIT Pagination strategy to use
offset_param str No offset Query param name for offset (offset_limit type)
limit_param str No limit Query param name for limit
limit int No 100 Number of records per page
max_pages Optional[int] No - Maximum pages to fetch (None = unlimited)
cursor_path Optional[str] No - Dotted path to cursor in response (cursor type)
cursor_param Optional[str] No - Query param name for cursor (cursor type)
page_param str No page Query param name for page number (page_number type)
start_page int No 1 Starting page number (page_number type)

ApiRateLimitConfig

Used in: ApiOptionsConfig

Rate limiting configuration for API requests.

Example:

read:
  format: api
  options:
    rate_limit:
      requests_per_second: 10

Field Type Required Default Description
requests_per_second Optional[float] No - Max requests per second (None = no limit)

ApiResponseConfig

Used in: ApiOptionsConfig

Response parsing configuration for API data sources.

When to Use: Configure how to extract data from API responses.

Date Variables: Use in add_fields OR params for dynamic dates:

Variable Format Example Value
$now ISO timestamp 2024-01-15T10:30:00+00:00
$today YYYY-MM-DD 2024-01-15
$yesterday YYYY-MM-DD 2024-01-14
$date YYYY-MM-DD 2024-01-15
$7_days_ago YYYY-MM-DD 2024-01-08
$30_days_ago YYYY-MM-DD 2023-12-16
$90_days_ago YYYY-MM-DD 2023-10-17
$start_of_week YYYY-MM-DD 2024-01-15
$start_of_month YYYY-MM-DD 2024-01-01
$start_of_year YYYY-MM-DD 2024-01-01
$today_compact YYYYMMDD 20240115
$yesterday_compact YYYYMMDD 20240114
$7_days_ago_compact YYYYMMDD 20240108
$30_days_ago_compact YYYYMMDD 20231216
$90_days_ago_compact YYYYMMDD 20231017

Example (add_fields):

read:
  format: api
  options:
    response:
      items_path: results
      add_fields:
        _fetched_at: "$now"
        _load_date: "$today"

Example (params with compact dates for openFDA):

read:
  format: api
  options:
    params:
      search: "report_date:[$30_days_ago_compact+TO+$today_compact]"

Field Type Required Default Description
items_path str No - Dotted path to items array in response (e.g., 'results', 'data.items'). Empty = response is the array.
dict_to_list bool No False If True and items_path resolves to a dict, extract dict values as rows with keys preserved in '_key' field. Useful for APIs like ddragon that return {'Aatrox': {...}, 'Ahri': {...}}.
add_fields Optional[Dict[str, Any]] No - Fields to add to each record. Supports date variables: $now, $today, $yesterday, $7_days_ago, etc.

ApiRetryConfig

Used in: ApiOptionsConfig

Retry configuration for API requests.

Example:

read:
  format: api
  options:
    retry:
      max_retries: 3
      backoff_factor: 2.0
      retry_codes: [429, 500, 502, 503]

Field Type Required Default Description
max_retries int No 3 Maximum retry attempts
backoff_factor float No 2.0 Exponential backoff multiplier
retry_codes List[int] No [429, 500, 502, 503, 504] HTTP status codes to retry on

PrivacyConfig

Used in: NodeConfig

Configuration for PII anonymization.

🔐 Privacy & PII Protection

How It Works: 1. Mark columns as pii: true in the columns metadata 2. Configure a privacy block with the anonymization method 3. During node execution, all columns marked as PII (and inherited from dependencies) are anonymized 4. Upstream PII markings are inherited by downstream nodes

Example:

columns:
  customer_email:
    pii: true  # Mark as PII
  customer_id:
    pii: false

privacy:
  method: hash       # hash, mask, or redact
  salt: "secret_key" # Optional: makes hash unique/secure
  declassify: []     # Remove columns from PII protection

Methods: - hash: SHA256 hash (length 64). With salt, prevents pre-computed rainbow tables. - mask: Show only last 4 chars, replace rest with *. Example: john@email.com****@email.com - redact: Replace entire value with [REDACTED]

Important: - pii: true alone does NOTHING. You must set a privacy.method to actually mask data. - PII inheritance: If dependency outputs PII columns, this node inherits them unless declassified. - Salt is optional but recommended for hash to prevent attacks.

Field Type Required Default Description
method PrivacyMethod Yes - Anonymization method: 'hash' (SHA256), 'mask' (show last 4), or 'redact' ([REDACTED])
salt Optional[str] No - Salt for hashing (optional but recommended). Appended before hashing to create unique hashes. Example: 'company_secret_key_2025'
declassify List[str] No PydanticUndefined List of columns to remove from PII protection (stops inheritance from upstream). Example: ['customer_id']

SqlServerAuditColsConfig

Used in: SqlServerMergeOptions, SqlServerOverwriteOptions

Audit column configuration for SQL Server merge operations.

These columns are automatically populated with GETUTCDATE() during merge: - created_col: Set on INSERT only - updated_col: Set on INSERT and UPDATE

Example:

audit_cols:
  created_col: created_ts
  updated_col: updated_ts

Field Type Required Default Description
created_col Optional[str] No - Column name for creation timestamp (set on INSERT)
updated_col Optional[str] No - Column name for update timestamp (set on INSERT and UPDATE)

SqlServerMergeOptions

Used in: WriteConfig

Options for SQL Server MERGE operations (Phase 1).

Enables incremental sync from Spark to SQL Server using T-SQL MERGE. Data is written to a staging table, then merged into the target.

Basic Usage

write:
  connection: azure_sql
  format: sql_server
  table: sales.fact_orders
  mode: merge
  merge_keys: [DateId, store_id]
  merge_options:
    update_condition: "source._hash_diff != target._hash_diff"
    exclude_columns: [_hash_diff]
    audit_cols:
      created_col: created_ts
      updated_col: updated_ts

Conditions

  • update_condition: Only update rows matching this condition (e.g., hash diff)
  • delete_condition: Delete rows matching this condition (soft delete pattern)
  • insert_condition: Only insert rows matching this condition
Field Type Required Default Description
update_condition Optional[str] No - SQL condition for WHEN MATCHED UPDATE. Use 'source.' and 'target.' prefixes. Example: 'source._hash_diff != target._hash_diff'
delete_condition Optional[str] No - SQL condition for WHEN MATCHED DELETE. Example: 'source._is_deleted = 1'
insert_condition Optional[str] No - SQL condition for WHEN NOT MATCHED INSERT. Example: 'source.is_valid = 1'
exclude_columns List[str] No PydanticUndefined Columns to exclude from MERGE (not written to target table)
staging_schema str No staging Schema for staging table. Table name: {staging_schema}.{table}_staging
audit_cols Optional[SqlServerAuditColsConfig] No - Audit columns for created/updated timestamps
validations Optional[ForwardRef('SqlServerMergeValidationConfig')] No - Validation checks before merge (null keys, duplicate keys)
auto_create_schema bool No False Auto-create schema if it doesn't exist (Phase 4). Runs CREATE SCHEMA IF NOT EXISTS.
auto_create_table bool No False Auto-create target table if it doesn't exist (Phase 4). Infers schema from DataFrame.
schema_evolution Optional[ForwardRef('SqlServerSchemaEvolutionConfig')] No - Schema evolution configuration (Phase 4). Controls handling of schema differences.
batch_size Optional[int] No - Batch size for staging table writes (Phase 4). Chunks large DataFrames for memory efficiency.
primary_key_on_merge_keys bool No False Create a clustered primary key on merge_keys when auto-creating table. Enforces uniqueness.
index_on_merge_keys bool No False Create a nonclustered index on merge_keys. Use if primary key already exists elsewhere.
incremental bool No False Enable incremental merge optimization. When True, reads target table's keys and hashes to determine which rows changed, then only writes changed rows to staging. Significantly faster when few rows change between runs.
hash_column Optional[str] No - Name of pre-computed hash column in DataFrame for change detection. Used when incremental=True. If not specified, will auto-detect '_hash_diff' column.
change_detection_columns Optional[List[str]] No - Columns to use for computing change detection hash. Used when incremental=True and no hash_column is specified. If None, uses all non-key columns.
bulk_copy bool No False Enable bulk copy mode for fast staging table loads. Writes data to ADLS as staging file, then uses BULK INSERT. 10-50x faster than JDBC for large datasets. Requires staging_connection.
staging_connection Optional[str] No - Connection name for staging files (ADLS/Blob storage). Required when bulk_copy=True. The connection must have write access.
staging_path Optional[str] No - Path prefix for staging files. Defaults to 'odibi_staging/bulk'. Files are automatically cleaned up after successful load.
external_data_source Optional[str] No - SQL Server external data source name for BULK INSERT. If not specified and auto_setup=True, will be auto-generated as 'odibi_{staging_connection}'.
keep_staging_files bool No False Keep staging files after load (for debugging). Default deletes after success.
auto_setup bool No False Auto-create SQL Server external data source and credential if they don't exist. Reads auth credentials from staging_connection and creates matching SQL objects. Requires elevated SQL permissions (ALTER ANY EXTERNAL DATA SOURCE, CONTROL).
force_recreate bool No False Force recreation of external data source and credential even if they exist. Use when you've rotated SAS tokens or storage keys and need to update SQL Server. Has no effect if auto_setup=False.
csv_options Optional[Dict[str, str]] No - Custom CSV options for bulk copy when writing to Azure SQL Database. Passed to Spark CSV writer. Defaults: quote='"', escape='"', escapeQuotes='true', nullValue='', emptyValue='', encoding='UTF-8'. Override any option here.

SqlServerMergeValidationConfig

Used in: SqlServerMergeOptions, SqlServerOverwriteOptions

Validation configuration for SQL Server merge/overwrite operations.

Validates source data before writing to SQL Server.

Example:

merge_options:
  validations:
    check_null_keys: true
    check_duplicate_keys: true
    fail_on_validation_error: true

Field Type Required Default Description
check_null_keys bool No True Fail if merge_keys contain NULL values
check_duplicate_keys bool No True Fail if merge_keys have duplicate combinations
fail_on_validation_error bool No True If False, log warning instead of failing on validation errors

SqlServerOverwriteOptions

Used in: WriteConfig

Options for SQL Server overwrite operations (Phase 2).

Enhanced overwrite with multiple strategies for different use cases.

Strategies

  • truncate_insert: TRUNCATE TABLE then INSERT (fastest, requires TRUNCATE permission)
  • drop_create: DROP TABLE, CREATE TABLE, INSERT (refreshes schema)
  • delete_insert: DELETE FROM then INSERT (works with limited permissions)

Example

write:
  connection: azure_sql
  format: sql_server
  table: fact.combined_downtime
  mode: overwrite
  overwrite_options:
    strategy: truncate_insert
    audit_cols:
      created_col: created_ts
      updated_col: updated_ts
Field Type Required Default Description
strategy SqlServerOverwriteStrategy No SqlServerOverwriteStrategy.TRUNCATE_INSERT Overwrite strategy: truncate_insert, drop_create, delete_insert
audit_cols Optional[SqlServerAuditColsConfig] No - Audit columns for created/updated timestamps
validations Optional[SqlServerMergeValidationConfig] No - Validation checks before overwrite
auto_create_schema bool No False Auto-create schema if it doesn't exist (Phase 4). Runs CREATE SCHEMA IF NOT EXISTS.
auto_create_table bool No False Auto-create target table if it doesn't exist (Phase 4). Infers schema from DataFrame.
schema_evolution Optional[SqlServerSchemaEvolutionConfig] No - Schema evolution configuration (Phase 4). Controls handling of schema differences.
batch_size Optional[int] No - Batch size for writes (Phase 4). Chunks large DataFrames for memory efficiency.
bulk_copy bool No False Enable bulk copy mode for fast writes. Writes data to ADLS as staging file, then uses BULK INSERT. 10-50x faster than JDBC for large datasets. Requires staging_connection.
staging_connection Optional[str] No - Connection name for staging files (ADLS/Blob storage). Required when bulk_copy=True. The connection must have write access.
staging_path Optional[str] No - Path prefix for staging files. Defaults to 'odibi_staging/bulk'. Files are automatically cleaned up after successful load.
external_data_source Optional[str] No - SQL Server external data source name for BULK INSERT. If not specified and auto_setup=True, will be auto-generated as 'odibi_{staging_connection}'.
keep_staging_files bool No False Keep staging files after load (for debugging). Default deletes after success.
auto_setup bool No False Auto-create SQL Server external data source and credential if they don't exist. Reads auth credentials from staging_connection and creates matching SQL objects. Requires elevated SQL permissions (ALTER ANY EXTERNAL DATA SOURCE, CONTROL).
force_recreate bool No False Force recreation of external data source and credential even if they exist. Use when you've rotated SAS tokens or storage keys and need to update SQL Server. Has no effect if auto_setup=False.
csv_options Optional[Dict[str, str]] No - Custom CSV options for bulk copy when writing to Azure SQL Database. Passed to Spark CSV writer. Defaults: quote='"', escape='"', escapeQuotes='true', nullValue='', emptyValue='', encoding='UTF-8'. Override any option here.

SqlServerSchemaEvolutionConfig

Used in: SqlServerMergeOptions, SqlServerOverwriteOptions

Schema evolution configuration for SQL Server operations (Phase 4).

Controls automatic schema changes when DataFrame schema differs from target table.

Example:

merge_options:
  schema_evolution:
    mode: evolve
    add_columns: true

Field Type Required Default Description
mode SqlServerSchemaEvolutionMode No SqlServerSchemaEvolutionMode.STRICT Schema evolution mode: strict (fail), evolve (add columns), ignore (skip mismatched)
add_columns bool No False If mode='evolve', automatically add new columns via ALTER TABLE ADD COLUMN

TransformStep

Used in: TransformConfig

Single transformation step.

Supports four step types (exactly one required):

  • sql - Inline SQL query string
  • sql_file - Path to external .sql file (relative to the YAML file defining the node)
  • function - Registered Python function name
  • operation - Built-in operation (e.g., drop_duplicates)

sql_file Example:

If your project structure is:

project.yaml              # imports pipelines/silver/silver.yaml
pipelines/
  silver/
    silver.yaml           # defines the node
    sql/
      transform.sql       # your SQL file

In silver.yaml, use a path relative to silver.yaml:

transform:
  steps:
    - sql_file: sql/transform.sql   # relative to silver.yaml

Important: The path is resolved relative to the YAML file where the node is defined, NOT the project.yaml that imports it. Do NOT use absolute paths like /pipelines/silver/sql/....

Field Type Required Default Description
sql Optional[str] No - Inline SQL query. Use df to reference the current DataFrame.
sql_file Optional[str] No - Path to external .sql file, relative to the YAML file defining the node. Example: 'sql/transform.sql' resolves relative to the node's source YAML.
function Optional[str] No - Name of a registered Python function (@transform or @register).
operation Optional[str] No - Built-in operation name (e.g., drop_duplicates, fill_na).
params Dict[str, Any] No PydanticUndefined Parameters to pass to function or operation.

Contracts (Data Quality Gates)

Contracts (Pre-Transform Checks)

Contracts are fail-fast data quality checks that run on input data before transformation. They always halt execution on failure - use them to prevent bad data from entering the pipeline.

Contracts vs Validation vs Quality Gates:

Feature When it Runs On Failure Use Case
Contracts Before transform Always fails Input data quality (not-null, unique keys)
Validation After transform Configurable (fail/warn/quarantine) Output data quality (ranges, formats)
Quality Gates After validation Configurable (abort/warn) Pipeline-level thresholds (pass rate, row counts)
Quarantine With validation Routes bad rows Capture invalid records for review

See Also: - Validation Guide - Full validation configuration - Quarantine Guide - Quarantine setup and review - Getting Started: Validation

Example:

- name: "process_orders"
  contracts:
    - type: not_null
      columns: [order_id, customer_id]
    - type: row_count
      min: 100
    - type: freshness
      column: created_at
      max_age: "24h"
  read:
    source: raw_orders


AcceptedValuesTest

Used in: NodeConfig, ValidationConfig

Ensures a column only contains values from an allowed list.

When to Use: Enum-like fields, status columns, categorical data validation.

See Also: Contracts Overview

contracts:
  - type: accepted_values
    column: status
    values: [pending, approved, rejected]
Field Type Required Default Description
type Literal['accepted_values'] No TestType.ACCEPTED_VALUES -
name Optional[str] No - Optional name for the check
on_fail ContractSeverity No ContractSeverity.FAIL Action on failure
column str Yes - Column to check
values List[Any] Yes - Allowed values

CustomSQLTest

Used in: NodeConfig, ValidationConfig

Runs a custom SQL condition and fails if too many rows violate it.

contracts:
  - type: custom_sql
    condition: "amount > 0"
    threshold: 0.01  # Allow up to 1% failures
Field Type Required Default Description
type Literal['custom_sql'] No TestType.CUSTOM_SQL -
name Optional[str] No - Optional name for the check
on_fail ContractSeverity No ContractSeverity.FAIL Action on failure
condition str Yes - SQL condition that should be true for valid rows
threshold float No 0.0 Failure rate threshold (0.0 = strictly no failures allowed)

DistributionContract

Used in: NodeConfig, ValidationConfig

Checks if a column's statistical distribution is within expected bounds.

When to Use: Detect data drift, anomaly detection, statistical monitoring.

See Also: Contracts Overview

contracts:
  - type: distribution
    column: price
    metric: mean
    threshold: ">100"  # Mean must be > 100
    on_fail: warn
Field Type Required Default Description
type Literal['distribution'] No TestType.DISTRIBUTION -
name Optional[str] No - Optional name for the check
on_fail ContractSeverity No ContractSeverity.WARN -
column str Yes - Column to analyze
metric Literal['mean', 'min', 'max', 'null_percentage'] Yes - Statistical metric to check
threshold str Yes - Threshold expression (e.g., '>100', '<0.05')

FreshnessContract

Used in: NodeConfig, ValidationConfig

Validates that data is not stale by checking a timestamp column.

When to Use: Source systems that should update regularly, SLA monitoring.

See Also: Contracts Overview

contracts:
  - type: freshness
    column: updated_at
    max_age: "24h"  # Fail if no data newer than 24 hours
Field Type Required Default Description
type Literal['freshness'] No TestType.FRESHNESS -
name Optional[str] No - Optional name for the check
on_fail ContractSeverity No ContractSeverity.FAIL -
column str No updated_at Timestamp column to check
max_age str Yes - Maximum allowed age (e.g., '24h', '7d')

NotNullTest

Used in: NodeConfig, ValidationConfig

Ensures specified columns contain no NULL values.

When to Use: Primary keys, required fields, foreign keys that must resolve.

See Also: Contracts Overview

contracts:
  - type: not_null
    columns: [order_id, customer_id, created_at]
Field Type Required Default Description
type Literal['not_null'] No TestType.NOT_NULL -
name Optional[str] No - Optional name for the check
on_fail ContractSeverity No ContractSeverity.FAIL Action on failure
columns List[str] Yes - Columns that must not contain nulls

RangeTest

Used in: NodeConfig, ValidationConfig

Ensures column values fall within a specified range.

When to Use: Numeric bounds validation (ages, prices, quantities), date ranges.

See Also: Contracts Overview

contracts:
  - type: range
    column: age
    min: 0
    max: 150
Field Type Required Default Description
type Literal['range'] No TestType.RANGE -
name Optional[str] No - Optional name for the check
on_fail ContractSeverity No ContractSeverity.FAIL Action on failure
column str Yes - Column to check
min int | float | str No - Minimum value (inclusive)
max int | float | str No - Maximum value (inclusive)

RegexMatchTest

Used in: NodeConfig, ValidationConfig

Ensures column values match a regex pattern.

When to Use: Format validation (emails, phone numbers, IDs, codes).

See Also: Contracts Overview

contracts:
  - type: regex_match
    column: email
    pattern: "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
Field Type Required Default Description
type Literal['regex_match'] No TestType.REGEX_MATCH -
name Optional[str] No - Optional name for the check
on_fail ContractSeverity No ContractSeverity.FAIL Action on failure
column str Yes - Column to check
pattern str Yes - Regex pattern to match

RowCountTest

Used in: NodeConfig, ValidationConfig

Validates that row count falls within expected bounds.

When to Use: Ensure minimum data completeness, detect truncated loads, cap batch sizes.

See Also: Contracts Overview, GateConfig

contracts:
  - type: row_count
    min: 1000
    max: 100000
Field Type Required Default Description
type Literal['row_count'] No TestType.ROW_COUNT -
name Optional[str] No - Optional name for the check
on_fail ContractSeverity No ContractSeverity.FAIL Action on failure
min Optional[int] No - Minimum row count
max Optional[int] No - Maximum row count

SchemaContract

Used in: NodeConfig, ValidationConfig

Validates that the DataFrame schema matches expected columns.

When to Use: Enforce schema stability, detect upstream schema drift, ensure column presence.

See Also: Contracts Overview, SchemaPolicyConfig

Uses the columns metadata from NodeConfig to verify schema.

contracts:
  - type: schema
    strict: true  # Fail if extra columns present
Field Type Required Default Description
type Literal['schema'] No TestType.SCHEMA -
name Optional[str] No - Optional name for the check
on_fail ContractSeverity No ContractSeverity.FAIL -
strict bool No True If true, fail on unexpected columns

UniqueTest

Used in: NodeConfig, ValidationConfig

Ensures specified columns (or combination) contain unique values.

When to Use: Primary keys, natural keys, deduplication verification.

See Also: Contracts Overview

contracts:
  - type: unique
    columns: [order_id]  # Single column
  # OR composite key:
  - type: unique
    columns: [customer_id, order_date]  # Composite uniqueness
Field Type Required Default Description
type Literal['unique'] No TestType.UNIQUE -
name Optional[str] No - Optional name for the check
on_fail ContractSeverity No ContractSeverity.FAIL Action on failure
columns List[str] Yes - Columns that must be unique (composite key if multiple)

VolumeDropTest

Used in: NodeConfig, ValidationConfig

Checks if row count dropped significantly compared to history.

When to Use: Detect source outages, partial loads, or data pipeline issues.

See Also: Contracts Overview, RowCountTest

Formula: (current - avg) / avg < -threshold

contracts:
  - type: volume_drop
    threshold: 0.5  # Fail if > 50% drop from 7-day average
    lookback_days: 7
Field Type Required Default Description
type Literal['volume_drop'] No TestType.VOLUME_DROP -
name Optional[str] No - Optional name for the check
on_fail ContractSeverity No ContractSeverity.FAIL Action on failure
threshold float No 0.5 Max allowed drop (0.5 = 50% drop)
lookback_days int No 7 Days of history to average

Global Settings

LineageConfig

Used in: ProjectConfig

Configuration for OpenLineage integration.

Example:

lineage:
  url: "http://localhost:5000"
  namespace: "my_project"

Field Type Required Default Description
url Optional[str] No - OpenLineage API URL
namespace str No odibi Namespace for jobs
api_key Optional[str] No - API Key

AlertConfig

Used in: ProjectConfig

Configuration for alerts with throttling support.

Supports Slack, Teams, and generic webhooks with event-specific payloads.

Available Events: - on_start - Pipeline started - on_success - Pipeline completed successfully - on_failure - Pipeline failed - on_quarantine - Rows were quarantined - on_gate_block - Quality gate blocked the pipeline - on_threshold_breach - A threshold was exceeded

Example:

alerts:
  - type: slack
    url: "${SLACK_WEBHOOK_URL}"
    on_events:
      - on_failure
      - on_quarantine
      - on_gate_block
    metadata:
      throttle_minutes: 15
      max_per_hour: 10
      channel: "#data-alerts"

Field Type Required Default Description
type AlertType Yes - -
url str Yes - Webhook URL
on_events List[AlertEvent] No [<AlertEvent.ON_FAILURE: 'on_failure'>] Events to trigger alert: on_start, on_success, on_failure, on_quarantine, on_gate_block, on_threshold_breach
metadata Dict[str, Any] No PydanticUndefined Extra metadata: throttle_minutes, max_per_hour, channel, etc.

LoggingConfig

Used in: ProjectConfig

Logging configuration.

Example:

logging:
  level: "INFO"
  structured: true

Field Type Required Default Description
level LogLevel No LogLevel.INFO -
structured bool No False Output JSON logs
metadata Dict[str, Any] No PydanticUndefined Extra metadata in logs

PerformanceConfig

Used in: ProjectConfig

Performance tuning configuration.

Example:

performance:
  use_arrow: true
  spark_config:
    "spark.sql.shuffle.partitions": "200"
    "spark.sql.adaptive.enabled": "true"
    "spark.databricks.delta.optimizeWrite.enabled": "true"
  delta_table_properties:
    "delta.columnMapping.mode": "name"

Spark Config Notes: - Configs are applied via spark.conf.set() at runtime - For existing sessions (e.g., Databricks), only runtime-settable configs will take effect - Session-level configs (e.g., spark.executor.memory) require session restart - Common runtime-safe configs: shuffle partitions, adaptive query execution, Delta optimizations

Field Type Required Default Description
use_arrow bool No True Use Apache Arrow-backed DataFrames (Pandas only). Reduces memory and speeds up I/O.
spark_config Dict[str, str] No PydanticUndefined Spark configuration settings applied at runtime via spark.conf.set(). Example: {'spark.sql.shuffle.partitions': '200', 'spark.sql.adaptive.enabled': 'true'}. Note: Some configs require session restart and cannot be set at runtime.
delta_table_properties Dict[str, str] No PydanticUndefined Default table properties applied to all Delta writes. Example: {'delta.columnMapping.mode': 'name'} to allow special characters in column names.
skip_null_profiling bool No False Skip null profiling in metadata collection phase. Reduces execution time for large DataFrames by avoiding an additional Spark job.
skip_catalog_writes bool No False Skip catalog metadata writes (register_asset, track_schema, log_pattern, record_lineage) after each node write. Significantly improves performance for high-throughput pipelines like Bronze layer ingestion. Set to true when catalog tracking is not needed.
skip_run_logging bool No False Skip batch catalog writes at pipeline end (log_runs_batch, register_outputs_batch). Saves 10-20s per pipeline run. Enable when you don't need run history in the catalog. Stories are still generated and contain full execution details.

RetryConfig

Used in: ProjectConfig

Retry configuration for transient failures.

Automatically retries failed operations (database timeouts, network issues, rate limits) with configurable backoff strategy.

Example:

retry:
  enabled: true
  max_attempts: 3
  backoff: "exponential"

Field Type Required Default Description
enabled bool No True Enable automatic retry on transient failures (timeouts, connection errors)
max_attempts int No 3 Maximum number of retry attempts before failing. Total attempts = 1 + max_attempts.
backoff BackoffStrategy No BackoffStrategy.EXPONENTIAL Wait strategy between retries. 'exponential' (2^n seconds, recommended), 'linear' (n seconds), or 'constant' (fixed 1 second).

StoryConfig

Used in: ProjectConfig

Story generation configuration.

Stories are ODIBI's core value - execution reports with lineage. They must use a connection for consistent, traceable output.

Example:

story:
  connection: "local_data"
  path: "stories/"
  retention_days: 30
  failure_sample_size: 100
  max_failure_samples: 500
  max_sampled_validations: 5

Failure Sample Settings: - failure_sample_size: Number of failed rows to capture per validation (default: 100) - max_failure_samples: Total failed rows across all validations (default: 500) - max_sampled_validations: After this many validations, show only counts (default: 5)

Field Type Required Default Description
connection str Yes - Connection name for story output (uses connection's path resolution)
path str Yes - Path for stories (relative to connection base_path)
max_sample_rows int No 10 Maximum rows to include in data samples within story reports. Higher values give more debugging context but increase file size. Set to 0 to disable data sampling.
auto_generate bool No True -
retention_days Optional[int] No 30 Days to keep stories
retention_count Optional[int] No 100 Max number of stories to keep
failure_sample_size int No 100 Number of failed rows to capture per validation rule
max_failure_samples int No 500 Maximum total failed rows across all validations
max_sampled_validations int No 5 After this many validations, show only counts (no samples)
async_generation bool No False Generate stories asynchronously (fire-and-forget). Pipeline returns immediately while story writes in background. Improves multi-pipeline performance by ~5-10s per pipeline.
generate_lineage bool No True Generate combined lineage graph from all stories. Creates a unified view of data flow across pipelines.
docs Optional[ForwardRef('DocsConfig')] No - Documentation generation settings. Generates README.md, TECHNICAL_DETAILS.md, NODE_CARDS/*.md from Story data.

Transformation Reference

How to Use Transformers

You can use any transformer in two ways:

1. As a Top-Level Transformer ("The App") Use this for major operations that define the node's purpose (e.g. Merge, SCD2).

- name: "my_node"
  transformer: "<transformer_name>"
  params:
    <param_name>: <value>

2. As a Step in a Chain ("The Script") Use this for smaller operations within a transform block (e.g. clean_text, filter).

- name: "my_node"
  transform:
    steps:
      - function: "<transformer_name>"
         params:
           <param_name>: <value>

Available Transformers: The models below describe the params required for each transformer.


📂 Common Operations

CaseWhenCase

Back to Catalog

Field Type Required Default Description
condition str Yes - -
value str Yes - -

add_prefix (AddPrefixParams)

Adds a prefix to column names.

Configuration for adding a prefix to column names.

Example - All columns:

add_prefix:
  prefix: "src_"

Example - Specific columns:

add_prefix:
  prefix: "raw_"
  columns: ["id", "name", "value"]

Back to Catalog

Field Type Required Default Description
prefix str Yes - Prefix to add to column names
columns Optional[List[str]] No - Columns to prefix (default: all columns)
exclude Optional[List[str]] No - Columns to exclude from prefixing

add_suffix (AddSuffixParams)

Adds a suffix to column names.

Configuration for adding a suffix to column names.

Example - All columns:

add_suffix:
  suffix: "_raw"

Example - Specific columns:

add_suffix:
  suffix: "_v2"
  columns: ["id", "name", "value"]

Back to Catalog

Field Type Required Default Description
suffix str Yes - Suffix to add to column names
columns Optional[List[str]] No - Columns to suffix (default: all columns)
exclude Optional[List[str]] No - Columns to exclude from suffixing

case_when (CaseWhenParams)

Implements structured CASE WHEN logic.

Configuration for conditional logic.

Example:

case_when:
  output_col: "age_group"
  default: "'Adult'"
  cases:
    - condition: "age < 18"
      value: "'Minor'"
    - condition: "age > 65"
      value: "'Senior'"

Back to Catalog

Field Type Required Default Description
cases List[CaseWhenCase] Yes - List of conditional branches
default str No NULL Default value if no condition met
output_col str Yes - Name of the resulting column

cast_columns (CastColumnsParams)

Casts specific columns to new types while keeping others intact.

Normalizes common type aliases (int->INTEGER, str->STRING, float->DOUBLE) and passes through complex SQL types as-is (e.g., ARRAY).

Args: context: Current execution context with dataframe params: Cast configuration with column-to-type mapping

Returns: New context with columns cast to specified types

Example: Cast columns using simple type aliases and complex SQL types:

cast_columns:
  casts:
    age: "int"
    salary: "double"
    tags: "ARRAY<STRING>"

Configuration for column type casting.

Example:

cast_columns:
  casts:
    age: "int"
    salary: "DOUBLE"
    created_at: "TIMESTAMP"
    tags: "ARRAY<STRING>"  # Raw SQL types allowed

Back to Catalog

Field Type Required Default Description
casts Dict[str, SimpleType | str] Yes - Map of column to target SQL type

clean_text (CleanTextParams)

Applies string cleaning operations (Trim/Case) via SQL.

Configuration for text cleaning.

Example:

clean_text:
  columns: ["email", "username"]
  trim: true
  case: "lower"

Back to Catalog

Field Type Required Default Description
columns List[str] Yes - List of columns to clean
trim bool No True Apply TRIM()
case Literal['lower', 'upper', 'preserve'] No preserve Case conversion

coalesce_columns (CoalesceColumnsParams)

Returns the first non-null value from a list of columns.

Useful for fallback/priority scenarios where you want to use the first available value from multiple columns (e.g., primary phone, backup phone, home phone).

Args: context: Current execution context with dataframe params: Coalesce configuration (columns list, output name, drop source option)

Returns: New context with coalesced column added

Example: Create a primary phone column from multiple fallback options:

coalesce_columns:
  columns: ["mobile_phone", "work_phone", "home_phone"]
  output_col: "primary_phone"
  drop_source: true

Configuration for coalescing columns (first non-null value).

Example - Phone number fallback:

coalesce_columns:
  columns: ["mobile_phone", "work_phone", "home_phone"]
  output_col: "primary_phone"

Example - Timestamp fallback:

coalesce_columns:
  columns: ["updated_at", "modified_at", "created_at"]
  output_col: "last_change_at"

Back to Catalog

Field Type Required Default Description
columns List[str] Yes - List of columns to coalesce (in priority order)
output_col str Yes - Name of the output column
drop_source bool No False Drop the source columns after coalescing

concat_columns (ConcatColumnsParams)

Concatenates multiple columns into one string. NULLs are skipped (treated as empty string) using CONCAT_WS behavior.

Configuration for string concatenation.

Example:

concat_columns:
  columns: ["first_name", "last_name"]
  separator: " "
  output_col: "full_name"

Back to Catalog

Field Type Required Default Description
columns List[str] Yes - Columns to concatenate
separator str No - Separator string
output_col str Yes - Resulting column name

convert_timezone (ConvertTimezoneParams)

Converts a timestamp from one timezone to another. Assumes the input column is a naive timestamp representing time in source_tz, or a timestamp with timezone.

Configuration for timezone conversion.

Example:

convert_timezone:
  col: "utc_time"
  source_tz: "UTC"
  target_tz: "America/New_York"

Back to Catalog

Field Type Required Default Description
col str Yes - Timestamp column to convert
source_tz str No UTC Source timezone (e.g., 'UTC', 'America/New_York')
target_tz str Yes - Target timezone (e.g., 'America/Los_Angeles')
output_col Optional[str] No - Name of the result column (default: {col}_{target_tz})

date_add (DateAddParams)

Adds an interval to a date/timestamp column.

Configuration for date addition.

Example:

date_add:
  col: "created_at"
  value: 1
  unit: "day"

Back to Catalog

Field Type Required Default Description
col str Yes - -
value int Yes - -
unit Literal['day', 'month', 'year', 'hour', 'minute', 'second'] Yes - -

date_diff (DateDiffParams)

Calculates difference between two dates/timestamps. Returns the elapsed time in the specified unit (as float for sub-day units).

Configuration for date difference.

Example:

date_diff:
  start_col: "created_at"
  end_col: "updated_at"
  unit: "day"

Back to Catalog

Field Type Required Default Description
start_col str Yes - -
end_col str Yes - -
unit Literal['day', 'hour', 'minute', 'second'] No day -

date_trunc (DateTruncParams)

Truncates a date/timestamp to the specified precision.

Configuration for date truncation.

Example:

date_trunc:
  col: "created_at"
  unit: "month"

Back to Catalog

Field Type Required Default Description
col str Yes - -
unit Literal['year', 'month', 'day', 'hour', 'minute', 'second'] Yes - -

derive_columns (DeriveColumnsParams)

Appends new columns based on SQL expressions.

Design: - Uses projection to add fields. - Keeps all existing columns via *.

Configuration for derived columns.

Example:

derive_columns:
  derivations:
    total_price: "quantity * unit_price"
    full_name: "concat(first_name, ' ', last_name)"

Note: Engine will fail if expressions reference non-existent columns.

Back to Catalog

Field Type Required Default Description
derivations Dict[str, str] Yes - Map of column name to SQL expression

distinct (DistinctParams)

Return unique rows from the dataset using SQL DISTINCT.

Parameters

context : EngineContext The engine context containing the DataFrame to deduplicate. params : DistinctParams Parameters specifying which columns to consider for uniqueness. If None, all columns are used.

Returns

EngineContext The updated engine context with duplicate rows removed.

Configuration for distinct rows.

Example:

distinct:
  columns: ["category", "status"]

Back to Catalog

Field Type Required Default Description
columns Optional[List[str]] No - Columns to project (if None, keeps all columns unique)

drop_columns (DropColumnsParams)

Removes the specified columns from the DataFrame.

Configuration for dropping specific columns (blacklist).

Example:

drop_columns:
  columns: ["_internal_id", "_temp_flag", "_processing_date"]

Back to Catalog

Field Type Required Default Description
columns List[str] Yes - List of column names to drop

extract_date_parts (ExtractDateParams)

Extracts date parts using ANSI SQL extract/functions.

Configuration for extracting date parts.

Example:

extract_date_parts:
  source_col: "created_at"
  prefix: "created"
  parts: ["year", "month"]

Back to Catalog

Field Type Required Default Description
source_col str Yes - -
prefix Optional[str] No - -
parts Literal[typing.Literal['year', 'month', 'day', 'hour']] No ['year', 'month', 'day'] -

fill_nulls (FillNullsParams)

Replaces null values with specified defaults using COALESCE.

Configuration for filling null values.

Example:

fill_nulls:
  values:
    count: 0
    description: "N/A"

Back to Catalog

Field Type Required Default Description
values Dict[str, str | int | float | bool] Yes - Map of column to fill value

filter_rows (FilterRowsParams)

Filters rows using a standard SQL WHERE clause.

Design: - SQL-First: Pushes filtering to the engine's optimizer. - Zero-Copy: No data movement to Python.

Configuration for filtering rows.

Example:

filter_rows:
  condition: "age > 18 AND status = 'active'"

Example (Null Check):

filter_rows:
  condition: "email IS NOT NULL AND email != ''"

Back to Catalog

Field Type Required Default Description
condition str Yes - SQL WHERE clause (e.g., 'age > 18 AND status = "active"')

limit (LimitParams)

Limit the number of rows returned from the dataset.

Parameters

context : EngineContext The engine context containing the DataFrame to limit. params : LimitParams Parameters specifying the number of rows to return and the offset.

Returns

EngineContext The updated engine context with the limited DataFrame.

Configuration for result limiting.

Example:

limit:
  n: 100
  offset: 0

Back to Catalog

Field Type Required Default Description
n int Yes - Number of rows to return
offset int No 0 Number of rows to skip

normalize_column_names (NormalizeColumnNamesParams)

Normalizes column names to a consistent style.

Useful for cleaning up messy source data with spaces, mixed case, or special characters. Converts names like "First Name" or "firstName" to "first_name".

Args: context: Current execution context with dataframe params: Normalization configuration (style, lowercase, special char handling)

Returns: New context with standardized column names

Example: Convert all columns to lowercase snake_case:

normalize_column_names:
  style: "snake_case"
  lowercase: true
  remove_special: true

Configuration for normalizing column names.

Example:

normalize_column_names:
  style: "snake_case"
  lowercase: true

Back to Catalog

Field Type Required Default Description
style Literal['snake_case', 'none'] No snake_case Naming style: 'snake_case' converts spaces/special chars to underscores
lowercase bool No True Convert names to lowercase
remove_special bool No True Remove special characters except underscores

normalize_schema (NormalizeSchemaParams)

Structural transformation to rename, drop, and reorder columns.

Note: This is one of the few that might behave better with native API in some cases, but SQL projection handles it perfectly and is consistent.

Configuration for schema normalization.

Example:

normalize_schema:
  rename:
    old_col: "new_col"
  drop: ["unused_col"]
  select_order: ["id", "new_col", "created_at"]

Back to Catalog

Field Type Required Default Description
rename Optional[Dict[str, str]] No PydanticUndefined old_name -> new_name
drop Optional[List[str]] No PydanticUndefined Columns to remove; ignored if not present
select_order Optional[List[str]] No - Final column order; any missing columns appended after

rename_columns (RenameColumnsParams)

Renames columns according to the provided mapping. Columns not in the mapping are kept unchanged.

Configuration for bulk column renaming.

Example:

rename_columns:
  mapping:
    customer_id: cust_id
    order_date: date
    total_amount: amount

Back to Catalog

Field Type Required Default Description
mapping Dict[str, str] Yes - Map of old column name to new column name

replace_values (ReplaceValuesParams)

Replaces values in specified columns according to the mapping.

Supports replacing specific values with new values or NULL. Useful for data standardization (e.g., "N/A" -> NULL, "Unknown" -> NULL).

Args: context: Current execution context with dataframe params: Replacement configuration (columns and value mapping)

Returns: New context with replaced values

Example: Standardize missing value indicators:

replace_values:
  columns: ["status", "category"]
  mapping:
    "N/A": null
    "Unknown": null
    "PENDING": "pending"

Configuration for bulk value replacement.

Example - Standardize nulls:

replace_values:
  columns: ["status", "category"]
  mapping:
    "N/A": null
    "": null
    "Unknown": null

Example - Code replacement:

replace_values:
  columns: ["country_code"]
  mapping:
    "US": "USA"
    "UK": "GBR"

Back to Catalog

Field Type Required Default Description
columns List[str] Yes - Columns to apply replacements to
mapping Dict[str, Optional[str]] Yes - Map of old value to new value (use null for NULL)

sample (SampleParams)

Return a random sample of rows from the dataset.

Parameters

context : EngineContext The engine context containing the DataFrame to sample from. params : SampleParams Parameters specifying the fraction of rows to return and the random seed.

Returns

EngineContext The updated engine context with the sampled DataFrame.

Configuration for random sampling.

Example:

sample:
  fraction: 0.1
  seed: 42

Back to Catalog

Field Type Required Default Description
fraction float Yes - Fraction of rows to return (0.0 to 1.0)
seed Optional[int] No - -

select_columns (SelectColumnsParams)

Keeps only the specified columns, dropping all others.

Configuration for selecting specific columns (whitelist).

Example:

select_columns:
  columns: ["id", "name", "created_at"]

Back to Catalog

Field Type Required Default Description
columns List[str] Yes - List of column names to keep

sort (SortParams)

Sort the dataset by one or more columns.

Parameters

context : EngineContext The engine context containing the DataFrame to sort. params : SortParams Parameters specifying columns to sort by and sort order.

Returns

EngineContext The updated engine context with the sorted DataFrame.

Configuration for sorting.

Example:

sort:
  by: ["created_at", "id"]
  ascending: false

Back to Catalog

Field Type Required Default Description
by str | List[str] Yes - Column(s) to sort by
ascending bool No True Sort order

split_part (SplitPartParams)

Extracts the Nth part of a string after splitting by a delimiter.

Configuration for splitting strings.

Example:

split_part:
  col: "email"
  delimiter: "@"
  index: 2  # Extracts domain

Back to Catalog

Field Type Required Default Description
col str Yes - Column to split
delimiter str Yes - Delimiter to split by
index int Yes - 1-based index of the token to extract

trim_whitespace (TrimWhitespaceParams)

Trims leading and trailing whitespace from string columns.

If no columns are specified, applies to all columns (SQL TRIM handles non-strings gracefully). Useful for cleaning up data from sources with inconsistent spacing.

Args: context: Current execution context with dataframe params: Trim configuration (optional column list)

Returns: New context with trimmed string columns

Example: Trim specific columns:

trim_whitespace:
  columns: ["name", "address", "city"]

Trim all string columns:
```yaml
trim_whitespace: {}
```

Configuration for trimming whitespace from string columns.

Example - All string columns:

trim_whitespace: {}

Example - Specific columns:

trim_whitespace:
  columns: ["name", "address", "city"]

Back to Catalog

Field Type Required Default Description
columns Optional[List[str]] No - Columns to trim (default: all string columns detected at runtime)

📂 Relational Algebra

aggregate (AggregateParams)

Performs grouping and aggregation via SQL.

Configuration for aggregation.

Example:

aggregate:
  group_by: ["department", "region"]
  aggregations:
    salary: "sum"
    employee_id: "count"
    age: "avg"

Back to Catalog

Field Type Required Default Description
group_by List[str] Yes - Columns to group by
aggregations Dict[str, AggFunc] Yes - Map of column to aggregation function (sum, avg, min, max, count)

join (JoinParams)

Joins the current dataset with another dataset from the context.

Configuration for joining datasets.

Scenario 1: Simple Left Join

join:
  right_dataset: "customers"
  on: "customer_id"
  how: "left"

Scenario 2: Join with Prefix (avoid collisions)

join:
  right_dataset: "orders"
  on: ["user_id"]
  how: "inner"
  prefix: "ord"  # Result cols: ord_date, ord_amount...

Back to Catalog

Field Type Required Default Description
right_dataset str Yes - Name of the node/dataset to join with
on str | List[str] Yes - Column(s) to join on
how Literal['inner', 'left', 'right', 'full', 'cross', 'anti', 'semi'] No left Join type
prefix Optional[str] No - Prefix for columns from right dataset to avoid collisions

pivot (PivotParams)

Pivots row values into columns.

Configuration for pivoting data.

Example:

pivot:
  group_by: ["product_id", "region"]
  pivot_col: "month"
  agg_col: "sales"
  agg_func: "sum"

Example (Optimized for Spark):

pivot:
  group_by: ["id"]
  pivot_col: "category"
  values: ["A", "B", "C"]  # Explicit values avoid extra pass
  agg_col: "amount"

Back to Catalog

Field Type Required Default Description
group_by List[str] Yes - -
pivot_col str Yes - -
agg_col str Yes - -
agg_func Literal['sum', 'count', 'avg', 'max', 'min', 'first'] No sum -
values Optional[List[str]] No - Specific values to pivot (for Spark optimization)

union (UnionParams)

Unions current dataset with others.

Configuration for unioning datasets.

Example (By Name - Default):

union:
  datasets: ["sales_2023", "sales_2024"]
  by_name: true

Example (By Position):

union:
  datasets: ["legacy_data"]
  by_name: false

Back to Catalog

Field Type Required Default Description
datasets List[str] Yes - List of node names to union with current
by_name bool No True Match columns by name (UNION ALL BY NAME)

unpivot (UnpivotParams)

Unpivots columns into rows (Melt/Stack).

Configuration for unpivoting (melting) data.

Example:

unpivot:
  id_cols: ["product_id"]
  value_vars: ["jan_sales", "feb_sales", "mar_sales"]
  var_name: "month"
  value_name: "sales"

Back to Catalog

Field Type Required Default Description
id_cols List[str] Yes - -
value_vars List[str] Yes - -
var_name str No variable -
value_name str No value -

📂 Data Quality

cross_check (CrossCheckParams)

Perform cross-node validation checks.

Does not return a DataFrame (returns None). Raises ValidationError on failure.

Configuration for cross-node validation checks.

Example (Row Count Mismatch):

transformer: "cross_check"
params:
  type: "row_count_diff"
  inputs: ["node_a", "node_b"]
  threshold: 0.05  # Allow 5% difference

Example (Schema Match):

transformer: "cross_check"
params:
  type: "schema_match"
  inputs: ["staging_orders", "prod_orders"]

Back to Catalog

Field Type Required Default Description
type str Yes - Check type: 'row_count_diff', 'schema_match'
inputs List[str] Yes - List of node names to compare
threshold float No 0.0 Threshold for diff (0.0-1.0)

📂 Warehousing Patterns

AuditColumnsConfig

Configuration for automatic audit timestamp columns during merge operations.

Attributes: created_col: Column to set only on first insert (e.g., "created_at") updated_col: Column to update on every merge operation (e.g., "updated_at")

At least one of created_col or updated_col must be specified.

Example:

audit_cols:
  created_col: "created_at"
  updated_col: "updated_at"

Back to Catalog

Field Type Required Default Description
created_col Optional[str] No - Column to set only on first insert
updated_col Optional[str] No - Column to update on every merge

merge (MergeParams)

Merge transformer implementation. Handles Upsert, Append-Only, and Delete-Match strategies.

Args: context: EngineContext (preferred) or legacy PandasContext/SparkContext params: MergeParams object (when called via function step) or DataFrame (legacy) current: DataFrame (legacy positional arg, deprecated) **kwargs: Parameters when not using MergeParams

Configuration for Merge transformer (Upsert/Append).

⚖️ "GDPR & Compliance" Guide

Business Problem: "A user exercised their 'Right to be Forgotten'. We need to remove them from our Silver tables immediately."

The Solution: Use the delete_match strategy. The source dataframe contains the IDs to be deleted, and the transformer removes them from the target.

Recipe 1: Right to be Forgotten (Delete)

transformer: "merge"
params:
  target: "silver.customers"
  keys: ["customer_id"]
  strategy: "delete_match"

Recipe 2: Conditional Update (SCD Type 1) "Only update if the source record is newer than the target record."

transformer: "merge"
params:
  target: "silver.products"
  keys: ["product_id"]
  strategy: "upsert"
  update_condition: "source.updated_at > target.updated_at"

Recipe 3: Safe Insert (Filter Bad Records) "Only insert records that are not marked as deleted."

transformer: "merge"
params:
  target: "silver.orders"
  keys: ["order_id"]
  strategy: "append_only"
  insert_condition: "source.is_deleted = false"

Recipe 4: Audit Columns "Track when records were created or updated."

transformer: "merge"
params:
  target: "silver.users"
  keys: ["user_id"]
  audit_cols:
    created_col: "dw_created_at"
    updated_col: "dw_updated_at"

Recipe 5: Full Sync (Insert + Update + Delete) "Sync target with source: insert new, update changed, and remove soft-deleted."

transformer: "merge"
params:
  target: "silver.customers"
  keys: ["id"]
  strategy: "upsert"
  # 1. Delete if source says so
  delete_condition: "source.is_deleted = true"
  # 2. Update if changed (and not deleted)
  update_condition: "source.hash != target.hash"
  # 3. Insert new (and not deleted)
  insert_condition: "source.is_deleted = false"

Recipe 6: Connection-based Path Resolution (ADLS) "Use a connection to resolve paths, just like write config."

transform:
  steps:
    - function: merge
      params:
        connection: goat_prod
        path: OEE/silver/customers
        register_table: silver.customers
        keys: ["customer_id"]
        strategy: "upsert"
        audit_cols:
          created_col: "_created_at"
          updated_col: "_updated_at"

Strategies: * upsert (Default): Update existing records, insert new ones. * append_only: Ignore duplicates, only insert new keys. * delete_match: Delete records in target that match keys in source.

Back to Catalog

Field Type Required Default Description
target Optional[str] No - Target table name or full path (use this OR connection+path)
connection Optional[str] No - Connection name to resolve path (use with 'path' param)
path Optional[str] No - Relative path within connection (e.g., 'OEE/silver/customers')
register_table Optional[str] No - Register as Unity Catalog/metastore table after merge (e.g., 'silver.customers')
keys List[str] Yes - List of join keys
strategy MergeStrategy No MergeStrategy.UPSERT Merge behavior: 'upsert', 'append_only', 'delete_match'
audit_cols Optional[AuditColumnsConfig] No - {'created_col': '...', 'updated_col': '...'}
optimize_write bool No False Run OPTIMIZE after write (Spark)
vacuum_hours Optional[int] No - Hours to retain for VACUUM after merge (Spark only). Set to 168 for 7 days. None disables VACUUM.
zorder_by Optional[List[str]] No - Columns to Z-Order by
cluster_by Optional[List[str]] No - Columns to Liquid Cluster by (Delta)
update_condition Optional[str] No - SQL condition for update clause (e.g. 'source.ver > target.ver')
insert_condition Optional[str] No - SQL condition for insert clause (e.g. 'source.status != "deleted"')
delete_condition Optional[str] No - SQL condition for delete clause (e.g. 'source.status = "deleted"')
table_properties Optional[dict] No - Delta table properties for initial table creation (e.g., column mapping)

scd2 (SCD2Params)

Implements SCD Type 2 Logic.

SCD2 is self-contained: it writes directly to the target table on all engines and code paths. No separate write: block is needed.

On Spark with use_delta_merge=True (default), uses an optimized Delta MERGE that only touches changed rows. The legacy path reads the full target, computes the union, and overwrites. Both write directly.

On Pandas, writes directly to the target file (parquet or CSV).

Parameters for SCD Type 2 (Slowly Changing Dimensions) transformer.

🕰️ The "Time Machine" Pattern

Business Problem: "I need to know what the customer's address was last month, not just where they live now."

The Solution: SCD Type 2 tracks the full history of changes. Each record has an "effective window" (start/end dates) and a flag indicating if it is the current version.

Recipe 1: Using table name

transformer: "scd2"
params:
  target: "silver.dim_customers"   # Registered table name
  keys: ["customer_id"]
  track_cols: ["address", "tier"]
  effective_time_col: "txn_date"

Recipe 2: Using connection + path (ADLS)

transformer: "scd2"
params:
  connection: adls_prod            # Connection name
  path: OEE/silver/dim_customers   # Relative path
  keys: ["customer_id"]
  track_cols: ["address", "tier"]
  effective_time_col: "txn_date"

How it works: 1. Match: Finds existing records using keys. 2. Compare: Checks track_cols to see if data changed. 3. Close: If changed, updates the old record's end_time_col to the new effective_time_col. 4. Insert: Adds a new record with start_time_col (renamed from effective_time_col) as the version start, open-ended end_time_col, and is_current = true.

The effective_time_col value is copied into a new start_time_col column (default: valid_from) in the target, giving each version a complete time window: [valid_from, valid_to). The original source column is preserved.

Note: SCD2 is self-contained — it writes directly to the target table on all engines. No separate write: block is needed in your pipeline YAML.

On Spark with Delta targets, uses an optimized Delta MERGE by default (use_delta_merge: true). Set use_delta_merge: false to use the legacy full-overwrite approach (still self-contained, just slower for large tables).

On Pandas, writes directly to the target file (parquet or CSV).

Back to Catalog

Field Type Required Default Description
target Optional[str] No - Target table name or full path (use this OR connection+path)
connection Optional[str] No - Connection name to resolve path (use with 'path' param)
path Optional[str] No - Relative path within connection (e.g., 'OEE/silver/dim_customers')
keys List[str] Yes - Natural keys to identify unique entities
track_cols List[str] Yes - Columns to monitor for changes
effective_time_col str Yes - Source column indicating when the change occurred.
start_time_col str No valid_from Name of the start timestamp column in the target. The effective_time_col value is copied to this column.
end_time_col str No valid_to Name of the end timestamp column
current_flag_col str No is_current Name of the current record flag column
delete_col Optional[str] No - Column indicating soft deletion (boolean)
use_delta_merge bool No True Use Delta Lake MERGE for Spark engine (faster for large tables). Falls back to full overwrite if target is not Delta format.
register_table Optional[str] No - Register as Unity Catalog/metastore table after write (e.g., 'silver.dim_customers'). Spark only.
vacuum_hours Optional[int] No - Hours to retain for VACUUM after SCD2 write (Spark only). Set to 168 for 7 days. None disables VACUUM.

📂 Manufacturing & IoT

PhaseConfig

Configuration for a single phase.

Back to Catalog

Field Type Required Default Description
timer_col str Yes - Timer column name for this phase
start_threshold Optional[int] No - Override default start threshold for this phase (seconds)

detect_sequential_phases (DetectSequentialPhasesParams)

Detect and analyze sequential manufacturing phases.

For each group (e.g., batch), this transformer: 1. Processes phases sequentially (each starts after previous ends) 2. Detects phase start by finding first valid timer reading and back-calculating 3. Detects phase end by finding first repeated (plateaued) timer value 4. Calculates time spent in each status during each phase 5. Aggregates specified metrics within each phase window 6. Outputs one summary row per group

Output columns per phase: - {phase}start: Phase start timestamp - {phase}_end: Phase end timestamp - {phase}_max_minutes: Maximum timer value converted to minutes - {phase}minutes: Time in each status (if status_col provided) - {phase}: Aggregated metrics (if phase_metrics provided)

Detect and analyze sequential manufacturing phases from timer columns.

This transformer processes raw sensor/PLC data where timer columns increment during each phase. It detects phase boundaries, calculates durations, and tracks time spent in each equipment status.

Common use cases: - Batch reactor cycle analysis - CIP (Clean-in-Place) phase timing - Food processing (cook, cool, package cycles) - Any multi-step batch process with PLC timers

Scenario: Analyze FBR cycle times

detect_sequential_phases:
  group_by: BatchID
  timestamp_col: ts
  phases:
    - timer_col: LoadTime
    - timer_col: AcidTime
    - timer_col: DryTime
    - timer_col: CookTime
    - timer_col: CoolTime
    - timer_col: UnloadTime
  start_threshold: 240
  status_col: Status
  status_mapping:
    1: idle
    2: active
    3: hold
    4: faulted
  phase_metrics:
    Level: max
  metadata:
    ProductCode: first_after_start
    Weight: max

Scenario: Group by multiple columns

detect_sequential_phases:
  group_by:
    - BatchID
    - AssetID
  phases: [LoadTime, CookTime]

Back to Catalog

Field Type Required Default Description
group_by str | List[str] Yes - Column(s) to group by. Can be a single column name or list of columns. E.g., 'BatchID' or ['BatchID', 'AssetID']
timestamp_col str No ts Timestamp column for ordering events
phases List[str | PhaseConfig] Yes - List of phase timer columns (strings) or PhaseConfig objects. Phases are processed sequentially - each phase starts after the previous ends.
start_threshold int No 240 Default max timer value (seconds) to consider as valid phase start. Filters out late readings where timer already shows large elapsed time.
status_col Optional[str] No - Column containing equipment status codes
status_mapping Optional[Dict[int, str]] No - Mapping of status codes to names. E.g.,
phase_metrics Optional[Dict[str, str]] No - Columns to aggregate within each phase window. E.g., {Level: max, Pressure: max}. Outputs {Phase}_{Column} columns.
metadata Optional[Dict[str, str]] No - Columns to include in output with aggregation method. Options: 'first', 'last', 'first_after_start', 'max', 'min', 'mean', 'sum'. E.g.,
output_time_format str No %Y-%m-%d %H:%M:%S Format for output timestamp columns
fill_null_minutes bool No False If True, fill null numeric columns (_max_minutes, _status_minutes, _metrics) with 0. Timestamp columns remain null for skipped phases.
spark_native bool No False If True, use native Spark window functions. If False (default), use applyInPandas which is often faster for datasets with many batches.

📂 Advanced & Feature Engineering

ShiftDefinition

Definition of a single shift.

Back to Catalog

Field Type Required Default Description
name str Yes - Name of the shift (e.g., 'Day', 'Night')
start str Yes - Start time in HH:MM format (e.g., '06:00')
end str Yes - End time in HH:MM format (e.g., '14:00')

deduplicate (DeduplicateParams)

Deduplicates data using Window functions.

Configuration for deduplication.

Scenario: Keep latest record

deduplicate:
  keys: ["id"]
  order_by: "updated_at DESC"

Back to Catalog

Field Type Required Default Description
keys List[str] Yes - List of columns to partition by (columns that define uniqueness)
order_by Optional[str] No - SQL Order by clause (e.g. 'updated_at DESC') to determine which record to keep (first one is kept)

dict_based_mapping (DictMappingParams)

Maps values in a column using a provided dictionary.

For each value in the specified column, replaces it with the mapped value. If 'default' is provided, uses it for values not found in the mapping. Supports Spark and Pandas engines.

Configuration for dictionary mapping.

Scenario: Map status codes to labels

dict_based_mapping:
  column: "status_code"
  mapping:
    "1": "Active"
    "0": "Inactive"
  default: "Unknown"
  output_column: "status_desc"

Back to Catalog

Field Type Required Default Description
column str Yes - Column to map values from
mapping Dict[str, str | int | float | bool] Yes - Dictionary of source value -> target value
default str | int | float | bool No - Default value if source value is not found in mapping
output_column Optional[str] No - Name of output column. If not provided, overwrites source column.

explode_list_column (ExplodeParams)

Explodes a list/array column into multiple rows.

For each element in the specified list column, creates a new row. If 'outer' is True, keeps rows with empty lists (like explode_outer). Supports Spark and Pandas engines.

Configuration for exploding lists.

Scenario: Flatten list of items per order

explode_list_column:
  column: "items"
  outer: true  # Keep orders with empty items list

Back to Catalog

Field Type Required Default Description
column str Yes - Column containing the list/array to explode
outer bool No False If True, keep rows with empty lists (explode_outer behavior). If False, drops them.

generate_numeric_key (NumericKeyParams)

Generates a deterministic BIGINT surrogate key from a hash of columns.

This is useful when: - Unioning data from multiple sources - Some sources have IDs, some don't - You need stable numeric IDs for gold layer

The key is generated by: 1. Concatenating columns with separator 2. Computing MD5 hash 3. Converting first 15 hex chars to BIGINT

If coalesce_with is specified, keeps the existing value when not null. If output_col == coalesce_with, the original column is replaced.

Configuration for numeric surrogate key generation.

Generates a deterministic BIGINT key from a hash of specified columns. Useful when unioning data from multiple sources where some have IDs and others don't.

Example:

- function: generate_numeric_key
  params:
    columns: [DateID, store_id, reason_id, duration_min, notes]
    output_col: ID
    coalesce_with: ID  # Keep existing ID if not null

The generated key is: - Deterministic: same input data = same ID every time - BIGINT: large numeric space to avoid collisions - Stable: safe for gold layer / incremental loads

Back to Catalog

Field Type Required Default Description
columns List[str] Yes - Columns to combine for the key
separator str No | Separator between values
output_col str No numeric_key Name of the output column
coalesce_with Optional[str] No - Existing column to coalesce with (keep existing value if not null)

generate_surrogate_key (SurrogateKeyParams)

Generates a deterministic surrogate key (MD5) from a combination of columns. Handles NULLs by treating them as empty strings to ensure consistency.

Configuration for surrogate key generation.

Example:

generate_surrogate_key:
  columns: ["region", "product_id"]
  separator: "-"
  output_col: "unique_id"

Back to Catalog

Field Type Required Default Description
columns List[str] Yes - Columns to combine for the key
separator str No - Separator between values
output_col str No surrogate_key Name of the output column

geocode (GeocodeParams)

Geocoding Stub.

Configuration for geocoding.

Example:

geocode:
  address_col: "full_address"
  output_col: "lat_long"

Back to Catalog

Field Type Required Default Description
address_col str Yes - Column containing the address to geocode
output_col str No lat_long Name of the output column for coordinates

hash_columns (HashParams)

Hashes columns for PII/Anonymization.

Configuration for column hashing.

Example:

hash_columns:
  columns: ["email", "ssn"]
  algorithm: "sha256"

Back to Catalog

Field Type Required Default Description
columns List[str] Yes - List of columns to hash
algorithm HashAlgorithm No HashAlgorithm.SHA256 Hashing algorithm. Options: 'sha256', 'md5'

normalize_json (NormalizeJsonParams)

Flattens a nested JSON/Struct column.

Configuration for JSON normalization.

Example:

normalize_json:
  column: "json_data"
  sep: "_"

Back to Catalog

Field Type Required Default Description
column str Yes - Column containing nested JSON/Struct
sep str No _ Separator for nested fields (e.g., 'parent_child')

parse_json (ParseJsonParams)

Parses a JSON string column into a Struct/Map column.

Configuration for JSON parsing.

Example:

parse_json:
  column: "raw_json"
  json_schema: "id INT, name STRING"
  output_col: "parsed_struct"

Back to Catalog

Field Type Required Default Description
column str Yes - String column containing JSON
json_schema str Yes - DDL schema string (e.g. 'a INT, b STRING') or Spark StructType DDL
output_col Optional[str] No - -

regex_replace (RegexReplaceParams)

Applies a regex replacement to a column.

Uses SQL-based REGEXP_REPLACE to replace all matches of the pattern in the specified column with the given replacement string. Works on both Spark and DuckDB/Pandas engines.

Configuration for regex replacement.

Example:

regex_replace:
  column: "phone"
  pattern: "[^0-9]"
  replacement: ""

Back to Catalog

Field Type Required Default Description
column str Yes - Column to apply regex replacement on
pattern str Yes - Regex pattern to match
replacement str Yes - String to replace matches with

sessionize (SessionizeParams)

Assigns session IDs based on inactivity threshold.

Configuration for sessionization.

Example:

sessionize:
  timestamp_col: "event_time"
  user_col: "user_id"
  threshold_seconds: 1800

Back to Catalog

Field Type Required Default Description
timestamp_col str Yes - Timestamp column to calculate session duration from
user_col str Yes - User identifier to partition sessions by
threshold_seconds int No 1800 Inactivity threshold in seconds (default: 30 minutes). If gap > threshold, new session starts.
session_col str No session_id Output column name for the generated session ID

split_events_by_period (SplitEventsByPeriodParams)

Splits events that span multiple time periods into individual segments.

For events spanning multiple days/hours/shifts, this creates separate rows for each period with adjusted start/end times and recalculated durations.

Configuration for splitting events that span multiple time periods.

Splits events that span multiple days, hours, or shifts into individual segments per period. Useful for OEE/downtime analysis, billing, and time-based aggregations.

Example - Split by day:

split_events_by_period:
  start_col: "Shutdown_Start_Time"
  end_col: "Shutdown_End_Time"
  period: "day"
  duration_col: "Shutdown_Duration_Min"

Example - Split by shift:

split_events_by_period:
  start_col: "event_start"
  end_col: "event_end"
  period: "shift"
  duration_col: "duration_minutes"
  shifts:
    - name: "Day"
      start: "06:00"
      end: "14:00"
    - name: "Swing"
      start: "14:00"
      end: "22:00"
    - name: "Night"
      start: "22:00"
      end: "06:00"

Back to Catalog

Field Type Required Default Description
start_col str Yes - Column containing the event start timestamp
end_col str Yes - Column containing the event end timestamp
period str No day Period type to split by: 'day', 'hour', or 'shift'
duration_col Optional[str] No - Output column name for duration in minutes. If not set, no duration column is added.
shifts Optional[List[ShiftDefinition]] No - List of shift definitions (required when period='shift')
shift_col Optional[str] No shift_name Output column name for shift name (only used when period='shift')

unpack_struct (UnpackStructParams)

Flattens a struct/dict column into top-level columns.

Configuration for unpacking structs.

Example:

unpack_struct:
  column: "user_info"

Back to Catalog

Field Type Required Default Description
column str Yes - Struct/Dictionary column to unpack/flatten into individual columns

validate_and_flag (ValidateAndFlagParams)

Validates rules and appends a column with a list/string of failed rule names.

Configuration for validation flagging.

Example:

validate_and_flag:
  flag_col: "data_issues"
  rules:
    age_check: "age >= 0"
    email_format: "email LIKE '%@%'"

Back to Catalog

Field Type Required Default Description
rules Dict[str, str] Yes - Map of rule name to SQL condition (must be TRUE)
flag_col str No _issues Name of the column to store failed rules

window_calculation (WindowCalculationParams)

Generic wrapper for Window functions.

Configuration for window functions.

Example:

window_calculation:
  target_col: "cumulative_sales"
  function: "sum(sales)"
  partition_by: ["region"]
  order_by: "date ASC"

Back to Catalog

Field Type Required Default Description
target_col str Yes - -
function str Yes - Window function e.g. 'sum(amount)', 'rank()'
partition_by List[str] No PydanticUndefined -
order_by Optional[str] No - -

📂 Other Transformers

ConversionSpec

Specification for a single unit conversion.

Back to Catalog

Field Type Required Default Description
from_unit str Yes - Source unit (e.g., 'psig', 'degF', 'BTU/lb')
to str Yes - Target unit (e.g., 'bar', 'degC', 'kJ/kg')
output Optional[str] No - Output column name. If not specified, overwrites the source column.

PropertyOutputConfig

Configuration for a single output property.

Back to Catalog

Field Type Required Default Description
property str Yes - CoolProp property key: H (enthalpy), S (entropy), D (density), C (specific heat Cp), CVMASS (Cv), V (viscosity), L (conductivity), T (temperature), P (pressure), Q (quality)
unit Optional[str] No - Output unit for this property. If not specified, uses SI units.
output_column Optional[str] No - Custom output column name. Defaults to {prefix}_{property}.

PsychrometricOutputConfig

Configuration for psychrometric output property.

Back to Catalog

Field Type Required Default Description
property str Yes - Property: W (humidity ratio), B (wet bulb), D (dew point), H (enthalpy), V (specific volume), R (relative humidity)
unit Optional[str] No - Output unit
output_column Optional[str] No - Custom output column name

fluid_properties (FluidPropertiesParams)

Calculate thermodynamic properties for any fluid using CoolProp.

Supports 122+ fluids including Water, Ammonia, R134a, Air, CO2, etc. Uses IAPWS-IF97 formulation for water/steam calculations.

Engine parity: Pandas, Spark (via Pandas UDF), Polars

Configuration for fluid property calculations using CoolProp.

Supports 122+ fluids including Water, Ammonia, R134a, Air, CO2, etc. See CoolProp documentation for full list: http://www.coolprop.org/fluid_properties/PurePseudoPure.html

State is defined by two independent properties. Common combinations: - P + T (pressure + temperature) - subcooled/superheated regions - P + Q (pressure + quality) - two-phase region - P + H (pressure + enthalpy) - when enthalpy is known

Scenario: Calculate steam properties from pressure and temperature

fluid_properties:
  fluid: Water
  pressure_col: steam_pressure
  temperature_col: steam_temp
  pressure_unit: psig
  temperature_unit: degF
  gauge_offset: 14.696
  outputs:
    - property: H
      unit: BTU/lb
      output_column: steam_enthalpy
    - property: S
      unit: BTU/(lb·R)
      output_column: steam_entropy

Scenario: Calculate saturated steam properties from pressure only

fluid_properties:
  fluid: Water
  pressure_col: steam_pressure
  quality: 1.0  # Saturated vapor
  pressure_unit: psia
  outputs:
    - property: H
      unit: BTU/lb
    - property: T
      unit: degF

Back to Catalog

Field Type Required Default Description
fluid str No Water CoolProp fluid name (e.g., Water, Ammonia, R134a, Air, CO2)
pressure_col Optional[str] No - Column containing pressure values
temperature_col Optional[str] No - Column containing temperature values
enthalpy_col Optional[str] No - Column containing enthalpy values
quality_col Optional[str] No - Column containing quality values (0-1)
pressure Optional[float] No - Fixed pressure value
temperature Optional[float] No - Fixed temperature value
enthalpy Optional[float] No - Fixed enthalpy value (in input unit)
quality Optional[float] No - Fixed quality value (0=sat liquid, 1=sat vapor)
pressure_unit str No Pa Pressure unit: Pa, kPa, MPa, bar, psia, psig, atm
temperature_unit str No K Temperature unit: K, degC, degF, degR
enthalpy_unit str No J/kg Input enthalpy unit: J/kg, kJ/kg, BTU/lb
gauge_offset float No 14.696 Atmospheric pressure offset for psig (default 14.696 psia)
outputs List[PropertyOutputConfig] No PydanticUndefined List of properties to calculate with their units
prefix str No - Prefix for output column names (e.g., 'steam' -> 'steam_H')

psychrometrics (PsychrometricsParams)

Calculate psychrometric (humid air) properties using CoolProp HAPropsSI.

Engine parity: Pandas, Spark (via Pandas UDF), Polars

Psychrometric (humid air) calculations using CoolProp HAPropsSI.

Calculates moist air properties from dry bulb temperature and one other property (typically relative humidity or humidity ratio).

CoolProp property keys for humid air: - W: Humidity ratio (kg water / kg dry air) - B: Wet bulb temperature - D: Dew point temperature - H: Enthalpy (per kg dry air) - V: Specific volume (per kg dry air) - R: Relative humidity (0-1) - T: Dry bulb temperature - P: Total pressure

Scenario: Calculate humidity ratio from T and RH

psychrometrics:
  dry_bulb_col: ambient_temp
  relative_humidity_col: rh_percent
  temperature_unit: degF
  rh_is_percent: true
  pressure_unit: psia
  elevation_ft: 875
  outputs:
    - property: W
      unit: lb/lb
      output_column: humidity_ratio
    - property: B
      unit: degF
      output_column: wet_bulb
    - property: D
      unit: degF
      output_column: dew_point

Scenario: Calculate RH from temperature and humidity ratio

psychrometrics:
  dry_bulb_col: temp_f
  humidity_ratio_col: w
  temperature_unit: degF
  pressure: 14.696
  pressure_unit: psia
  outputs:
    - property: R
      output_column: relative_humidity

Back to Catalog

Field Type Required Default Description
dry_bulb_col str Yes - Column containing dry bulb temperature
relative_humidity_col Optional[str] No - Column containing relative humidity
humidity_ratio_col Optional[str] No - Column containing humidity ratio (kg/kg or lb/lb)
wet_bulb_col Optional[str] No - Column containing wet bulb temperature
dew_point_col Optional[str] No - Column containing dew point temperature
relative_humidity Optional[float] No - Fixed relative humidity
humidity_ratio Optional[float] No - Fixed humidity ratio
pressure_col Optional[str] No - Column containing pressure
pressure Optional[float] No - Fixed pressure value
elevation_ft Optional[float] No - Elevation in feet (used to estimate pressure if not provided)
elevation_m Optional[float] No - Elevation in meters (used to estimate pressure if not provided)
temperature_unit str No K Temperature unit for all temps
pressure_unit str No Pa Pressure unit
humidity_ratio_unit str No kg/kg Humidity ratio unit (kg/kg, lb/lb, g/kg)
rh_is_percent bool No False If True, RH input is 0-100%, otherwise 0-1
outputs List[PsychrometricOutputConfig] No PydanticUndefined Properties to calculate
prefix str No - Prefix for output columns

saturation_properties (SaturationPropertiesParams)

Calculate saturated liquid or vapor properties.

Convenience wrapper that sets Q=0 (liquid) or Q=1 (vapor) automatically.

Convenience wrapper for saturated liquid (Q=0) or saturated vapor (Q=1) properties.

This is a simplified interface for common saturation calculations.

Scenario: Get saturated steam properties at a given pressure

saturation_properties:
  fluid: Water
  pressure_col: steam_pressure
  pressure_unit: psig
  phase: vapor
  outputs:
    - property: H
      unit: BTU/lb
      output_column: hg
    - property: T
      unit: degF
      output_column: sat_temp

Scenario: Get saturated liquid enthalpy (hf)

saturation_properties:
  pressure_col: pressure_psia
  pressure_unit: psia
  phase: liquid
  outputs:
    - property: H
      unit: BTU/lb
      output_column: hf

Back to Catalog

Field Type Required Default Description
fluid str No Water CoolProp fluid name
pressure_col Optional[str] No - Column containing pressure values
pressure Optional[float] No - Fixed pressure value
temperature_col Optional[str] No - Column containing saturation temp
temperature Optional[float] No - Fixed saturation temperature
pressure_unit str No Pa Pressure unit
temperature_unit str No K Temperature unit
gauge_offset float No 14.696 Gauge pressure offset for psig
phase Literal['liquid', 'vapor'] No vapor Phase: 'liquid' (Q=0) or 'vapor' (Q=1)
outputs List[PropertyOutputConfig] No PydanticUndefined Properties to calculate
prefix str No - Prefix for output columns

unit_convert (UnitConvertParams)

Convert columns from one unit to another.

Uses Pint for comprehensive unit conversion support. Handles all SI units, imperial units, and common engineering units out of the box.

Engine parity: Pandas, Spark, Polars

Configuration for unit conversion transformer.

Converts columns from one unit to another using Pint's comprehensive unit database. Supports all SI units, imperial units, and common engineering units.

Scenario: Normalize sensor data to SI units

unit_convert:
  conversions:
    pressure_psig:
      from: psig
      to: bar
      output: pressure_bar
    temperature_f:
      from: degF
      to: degC
      output: temperature_c
    flow_gpm:
      from: gpm
      to: m³/s
      output: flow_si

Scenario: Convert in-place (overwrite original columns)

unit_convert:
  conversions:
    pressure:
      from: psia
      to: kPa
    temperature:
      from: degF
      to: K

Scenario: Handle gauge pressure with custom atmospheric reference

unit_convert:
  gauge_pressure_offset: 14.696 psia
  conversions:
    steam_pressure:
      from: psig
      to: psia
      output: steam_pressure_abs

Scenario: Complex engineering units

unit_convert:
  conversions:
    heat_transfer_coeff:
      from: BTU/(hr * ft² * degF)
      to: W/(m² * K)
      output: htc_si
    thermal_conductivity:
      from: BTU/(hr * ft * degF)
      to: W/(m * K)
      output: k_si
    specific_heat:
      from: BTU/(lb * degF)
      to: kJ/(kg * K)
      output: cp_si

Back to Catalog

Field Type Required Default Description
conversions Dict[str, ConversionSpec] Yes - Mapping of source column names to conversion specifications
gauge_pressure_offset Optional[str] No 14.696 psia Atmospheric pressure for gauge-to-absolute conversions. Default is sea level (14.696 psia).
errors str No null How to handle conversion errors: 'null' (default), 'raise', or 'ignore'

Simulation Engine

Simulation Engine

Generate realistic time-series data for testing pipelines, dashboards, and analytics. Simulation is configured as format: simulation in a node's read section.

Core building blocks: - SimulationScope: Time boundaries, timestep, row count - EntityConfig: Who generates data (sensors, machines, pumps) - ColumnGeneratorConfig: What data each entity produces - ScheduledEvent: Maintenance windows, setpoint changes, condition-based triggers - ChaosConfig: Outliers, duplicates, downtime for realistic imperfections

Duration/interval format (used by timestep, recurrence, duration, jitter, cooldown, sustain): number + unit, where unit is s (seconds), m (minutes), h (hours), or d (days). Examples: 5m, 1h, 30s, 2d.

Example:

read:
  connection: null
  format: simulation
  options:
    simulation:
      scope:
        start_time: "2026-01-01T00:00:00Z"
        timestep: "5m"
        row_count: 288
        seed: 42
      entities:
        count: 3
        id_prefix: pump_
      columns:
        - name: temperature
          data_type: float
          generator:
            type: random_walk
            start: 75.0
            min: 60.0
            max: 100.0
            volatility: 1.0
            mean_reversion: 0.1

See Also: Simulation Docs, Generator Reference


SimulationConfig

Complete simulation configuration.

Example:

read:
  connection: null
  format: simulation
  options:
    simulation:
      scope:
        start_time: "2026-01-01T00:00:00Z"
        timestep: "5m"
        row_count: 10000
        seed: 42
      entities:
        count: 10
        id_prefix: pump_
      columns:
        - name: entity_id
          data_type: string
          generator:
            type: constant
            value: "{entity_id}"
        - name: timestamp
          data_type: timestamp
          generator:
            type: timestamp
        - name: temperature
          data_type: float
          generator:
            type: range
            min: 60.0
            max: 80.0
            distribution: normal
          null_rate: 0.02
      chaos:
        outlier_rate: 0.01
        outlier_factor: 3.0

Field Type Required Default Description
scope SimulationScope Yes - Simulation scope and boundaries
entities EntityConfig Yes - Entity configuration
columns List[ColumnGeneratorConfig] Yes - Column definitions
chaos Optional[ChaosConfig] No - Chaos parameters
scheduled_events List[ScheduledEvent] No PydanticUndefined Scheduled events that modify simulation behavior

SimulationScope

Used in: SimulationConfig

Simulation scope and boundaries.

Example (row count):

scope:
  start_time: "2026-01-01T00:00:00Z"
  timestep: "5m"
  row_count: 10000
  seed: 42

Example (time range):

scope:
  start_time: "2026-01-01T00:00:00Z"
  end_time: "2026-01-02T00:00:00Z"
  timestep: "10m"
  seed: 42

Field Type Required Default Description
start_time str Yes - Simulation start timestamp in ISO8601 Zulu format (e.g., '2026-01-01T00:00:00Z')
timestep str Yes - Time between rows. Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). Examples: '30s', '5m', '1h', '2d'.
row_count Optional[int] No - Total rows to generate (mutually exclusive with end_time)
end_time Optional[str] No - Simulation end timestamp in ISO8601 Zulu format (e.g., '2026-01-02T00:00:00Z'). Mutually exclusive with row_count
seed int No 42 Random seed for deterministic generation

EntityConfig

Used in: SimulationConfig

Entity declaration for simulation.

Example (auto-generated):

entities:
  count: 10
  id_prefix: pump_

Example (explicit names):

entities:
  names: [pump_01, pump_02, reactor_01]

Field Type Required Default Description
count Optional[int] No - Number of entities to generate
names Optional[List[str]] No - Explicit entity names
id_prefix str No entity_ Prefix for auto-generated entity IDs
id_format Literal['sequential', 'uuid'] No sequential ID format: sequential or uuid

ColumnGeneratorConfig

Used in: SimulationConfig

Configuration for a simulated column.

Example:

- name: temperature
  data_type: float
  generator:
    type: range
    min: 60.0
    max: 100.0
    distribution: normal
  null_rate: 0.02
  entity_overrides:
    pump_01:
      type: range
      min: 80.0
      max: 120.0

Field Type Required Default Description
name str Yes - Column name
data_type SimulationDataType Yes - Data type
generator RangeGeneratorConfig | CategoricalGeneratorConfig | BooleanGeneratorConfig | TimestampGeneratorConfig | SequentialGeneratorConfig | ConstantGeneratorConfig | UUIDGeneratorConfig | EmailGeneratorConfig | IPGeneratorConfig | GeoGeneratorConfig | RandomWalkGeneratorConfig | DailyProfileGeneratorConfig | DerivedGeneratorConfig Yes - Generator configuration
null_rate float No 0.0 Probability of NULL values
entity_overrides Dict[str, RangeGeneratorConfig | CategoricalGeneratorConfig | BooleanGeneratorConfig | TimestampGeneratorConfig | SequentialGeneratorConfig | ConstantGeneratorConfig | UUIDGeneratorConfig | EmailGeneratorConfig | IPGeneratorConfig | GeoGeneratorConfig | RandomWalkGeneratorConfig | DailyProfileGeneratorConfig | DerivedGeneratorConfig] No PydanticUndefined Per-entity generator overrides

ScheduledEvent

Used in: SimulationConfig

Scheduled event that modifies simulation behavior at specific times or conditions.

Enables realistic process simulation with: - Maintenance windows (forced power=0) - Grid curtailment (forced output reduction) - Setpoint changes (scheduled process changes) - Cleaning cycles (efficiency restoration) - Recurring events (e.g., weekly maintenance) - Condition-based triggers (e.g., degrade when efficiency drops)

Example (maintenance window):

scheduled_events:
  - type: forced_value
    entity: Turbine_01
    column: power_kw
    value: 0.0
    start_time: "2026-03-11T14:00:00Z"
    end_time: "2026-03-11T18:00:00Z"

Example (grid curtailment - all entities):

scheduled_events:
  - type: forced_value
    entity: null  # Applies to all entities
    column: max_output_pct
    value: 80.0
    start_time: "2026-03-11T16:00:00Z"
    end_time: "2026-03-11T19:00:00Z"

Example (permanent setpoint change):

scheduled_events:
  - type: setpoint_change
    entity: Reactor_01
    column: temp_setpoint_c
    value: 370.0
    start_time: "2026-03-11T12:00:00Z"
    # No end_time = permanent change

Example (recurring maintenance every 30 days):

scheduled_events:
  - type: forced_value
    entity: Turbine_01
    column: power_kw
    value: 0.0
    start_time: "2026-01-15T06:00:00Z"
    recurrence: "30d"
    duration: "4h"
    jitter: "2d"
    max_occurrences: 12

Example (condition-based event):

scheduled_events:
  - type: parameter_override
    entity: Pump_05
    column: flow_rate_lpm
    value: 50.0
    condition: "actual_efficiency_pct < 70"
    cooldown: "7d"
    sustain: "24h"

Example (ramped setpoint change):

scheduled_events:
  - type: setpoint_change
    entity: Reactor_01
    column: temp_setpoint_c
    value: 370.0
    start_time: "2026-03-11T12:00:00Z"
    duration: "2h"
    transition: ramp

Field Type Required Default Description
type ScheduledEventType Yes - Event type
entity Optional[str] No - Entity name (must match a name from entities.names) or None to apply to all entities
column str Yes - Column name to modify
value Any Yes - Value to apply during event
start_time Optional[str] No - Event start timestamp in ISO8601 Zulu format (e.g., '2026-01-15T06:00:00Z'). Required for time-based events.
end_time Optional[str] No - Event end timestamp in ISO8601 Zulu format (e.g., '2026-01-15T18:00:00Z'). None = permanent change.
priority int No 0 Priority for overlapping events (higher = applied last)
recurrence Optional[str] No - Repeat interval. Event recurs at this interval from start_time. Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). Examples: '30d', '7d', '4h'.
duration Optional[str] No - Duration of each occurrence. Alternative to specifying end_time. Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). Examples: '4h', '30m', '2d'.
jitter Optional[str] No - Random offset ± applied to each recurrence start (deterministic per seed). Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). Examples: '2d', '6h'.
max_occurrences Optional[int] No - Stop repeating after N occurrences.
condition Optional[str] No - Sandboxed Python expression evaluated against current row columns. Supports comparison operators, compound logic (and, or, not), and safe functions (abs, round, min, max). E.g., 'actual_efficiency_pct < 70 and pressure > 50'. Triggers event when true.
cooldown Optional[str] No - Minimum gap between condition triggers. Prevents rapid re-triggering. Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). Examples: '7d', '12h'.
sustain Optional[str] No - Condition must be continuously true for this duration before triggering. Prevents spurious triggers from momentary spikes. Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). Examples: '24h', '30m'.
transition str No instant How value is applied: 'instant' (default, jump to value) or 'ramp' (linear interpolation over duration).

ChaosConfig

Used in: SimulationConfig

Chaos engineering parameters for realistic data imperfections.

Example:

chaos:
  outlier_rate: 0.01
  outlier_factor: 3.0
  duplicate_rate: 0.005
  downtime_events:
    - entity: pump_01
      start_time: "2026-01-01T10:00:00Z"
      end_time: "2026-01-01T12:00:00Z"

Field Type Required Default Description
outlier_rate float No 0.0 Probability of outlier values
outlier_factor float No 3.0 Multiplier for outlier values
duplicate_rate float No 0.0 Probability of duplicating rows
downtime_events List[DowntimeEvent] No PydanticUndefined Time periods with no data generation

DowntimeEvent

Used in: ChaosConfig

Downtime period where no data is generated.

Example:

entity: pump_05
start_time: "2026-01-01T12:00:00Z"
end_time: "2026-01-01T14:00:00Z"

Field Type Required Default Description
entity Optional[str] No - Entity affected (None = all)
start_time str Yes - Downtime start timestamp (ISO8601)
end_time str Yes - Downtime end timestamp (ISO8601)

RangeGeneratorConfig

Used in: ColumnGeneratorConfig

Range generator for numeric values.

Example:

type: range
min: 50.0
max: 100.0
distribution: normal

Field Type Required Default Description
type Literal['range'] Yes - -
min float Yes - Minimum value
max float Yes - Maximum value
distribution DistributionType No DistributionType.UNIFORM Distribution type: uniform or normal
mean Optional[float] No - Mean for normal distribution (defaults to midpoint)
std_dev Optional[float] No - Standard deviation for normal (defaults to range/6)

RandomWalkGeneratorConfig

Used in: ColumnGeneratorConfig

Random walk generator for realistic time-series data.

Produces values where each row depends on the previous row's value, creating smooth, realistic process data instead of independent random values.

Uses an Ornstein-Uhlenbeck process with optional trend for realistic simulation of controlled process variables (temperatures, pressures, flow rates).

Example (static setpoint):

type: random_walk
start: 350.0
min: 300.0
max: 400.0
volatility: 0.5
mean_reversion: 0.1
trend: 0.001
precision: 1
shock_rate: 0.02
shock_magnitude: 30.0
shock_bias: 1.0

Example (dynamic setpoint - temperature tracking ambient):

- name: ambient_temp_c
  generator:
    type: random_walk
    start: 25.0
    min: 15.0
    max: 35.0
    volatility: 0.3
    mean_reversion: 0.05

- name: battery_temp_c
  generator:
    type: random_walk
    start: 28.0
    min: 20.0
    max: 40.0
    volatility: 0.4
    mean_reversion: 0.1
    mean_reversion_to: ambient_temp_c  # Drifts toward ambient

Field Type Required Default Description
type Literal['random_walk'] Yes - -
start float Yes - Initial value / setpoint
min float Yes - Hard lower bound (physical limit)
max float Yes - Hard upper bound (physical limit)
volatility float No 1.0 Standard deviation of step-to-step changes. Controls noise magnitude.
mean_reversion float No 0.0 Strength of pull back toward start value or mean_reversion_to column (0 = pure random walk, 1 = snap back immediately). Simulates PID-like control.
mean_reversion_to Optional[str] No - Column name to use as dynamic setpoint for mean reversion instead of static 'start' value. Enables realistic process simulation where the walk tracks a time-varying reference. Example: PV that drifts toward a changing SP column, or temperature following ambient. If specified, must reference a column defined earlier in dependency order.
trend float No 0.0 Drift per timestep. Positive = gradual increase, negative = gradual decrease. Simulates fouling, degradation, or slow process drift.
precision Optional[int] No - Round values to N decimal places. None = no rounding.
shock_rate float No 0.0 Probability of a sudden shock at each timestep (0.0 = never, 1.0 = every step). Simulates process upsets like valve sticks, feed disruptions, or sensor glitches. The shock perturbs the walk's internal state, so mean_reversion naturally recovers over subsequent steps — just like a real PID-controlled process.
shock_magnitude float No 10.0 Maximum absolute size of a shock event. The actual shock is drawn uniformly from [0, shock_magnitude]. Use values relative to your min/max range — e.g., if range is 300-400, a magnitude of 30 means shocks up to 30% of range.
shock_bias float No 0.0 Directional tendency for shocks. +1.0 = shocks always go UP (e.g., exothermic runaway, pressure buildup). -1.0 = shocks always go DOWN (e.g., pump cavitation, flow drop). 0.0 = shocks go either direction with equal probability. Values between give partial bias (e.g., 0.7 = mostly upward).

DailyProfileGeneratorConfig

Used in: ColumnGeneratorConfig

Daily profile generator for time-of-day patterns.

Produces values that follow a repeating daily curve defined by anchor points. The engine interpolates between anchor points, adds noise, and clamps to [min, max]. Ideal for simulating occupancy, energy demand, traffic, call volume, or any metric with a predictable intraday shape.

Example (building occupancy):

type: daily_profile
min: 0
max: 25
precision: 0
noise: 1.5
interpolation: linear
profile:
  "00:00": 1
  "06:00": 3
  "08:00": 19
  "12:00": 15
  "13:00": 22
  "17:00": 14
  "22:00": 2

Example (network traffic with weekend scaling):

type: daily_profile
min: 0.0
max: 1000.0
noise: 50.0
interpolation: linear
weekend_scale: 0.3
profile:
  "00:00": 50.0
  "06:00": 100.0
  "09:00": 800.0
  "12:00": 650.0
  "13:00": 750.0
  "17:00": 900.0
  "20:00": 400.0
  "23:00": 100.0

Field Type Required Default Description
type Literal['daily_profile'] Yes - -
profile Dict[str, float] Yes - Anchor points mapping time-of-day (HH:MM) to target values. The engine interpolates between these points to produce a smooth daily curve.
min float Yes - Hard lower bound (physical limit)
max float Yes - Hard upper bound (physical limit)
noise float No 0.0 Random noise amplitude (±noise added to interpolated value).
interpolation InterpolationType No InterpolationType.LINEAR Interpolation method between anchor points: linear or step.
precision Optional[int] No - Round values to N decimal places. None = no rounding. 0 = integers.
volatility float No 0.0 Day-to-day variation in anchor point targets. Each day, every anchor point is independently shifted by a random amount drawn from a normal distribution (mean = profile value, std_dev = volatility). This makes each day's curve slightly different while preserving the overall shape. 0.0 = identical curve every day. Higher values = more day-to-day variation.
weekend_scale Optional[float] No - Scale factor for weekends (Saturday/Sunday). 0.0 = zero on weekends, 1.0 = same as weekday. None = no weekend adjustment.

DerivedGeneratorConfig

Used in: ColumnGeneratorConfig

Derived column generator (calculated from other columns).

Example:

type: derived
expression: "temperature * 1.8 + 32"  # Celsius to Fahrenheit

Supported operators: - Arithmetic: +, -, *, /, //, %, ** - Comparison: ==, !=, <, <=, >, >= - Logical: and, or, not - Functions: abs(), round(), min(), max()

Example (conditional):

type: derived
expression: "100 if temperature > 80 else 0"

Field Type Required Default Description
type Literal['derived'] Yes - -
expression str Yes - Sandboxed Python expression referencing column names. Supports context variables (_row_index, entity_id, _timestamp), safe math functions (abs, round, min, max, coalesce, safe_div), and stateful functions (prev, ema, pid, delay).

CategoricalGeneratorConfig

Used in: ColumnGeneratorConfig

Categorical generator for discrete values.

Example:

type: categorical
values: [Running, Idle, Error]
weights: [0.8, 0.15, 0.05]

Field Type Required Default Description
type Literal['categorical'] Yes - -
values List[Any] Yes - List of possible values
weights Optional[List[float]] No - Probability weights (must sum to 1.0)

SequentialGeneratorConfig

Used in: ColumnGeneratorConfig

Sequential number generator.

By default, generates globally unique IDs across all entities by offsetting each entity's sequence range. Entity 0 gets IDs [start, start + rows), entity 1 gets [start + rows, start + 2*rows), etc.

Set unique_across_entities: false for per-entity sequences (all entities share the same ID range).

Example:

type: sequential
start: 1
step: 1
unique_across_entities: true  # default

Field Type Required Default Description
type Literal['sequential'] Yes - -
start int No 1 Starting value
step int No 1 Increment step
unique_across_entities bool No True When true, each entity gets a non-overlapping ID range

ConstantGeneratorConfig

Used in: ColumnGeneratorConfig

Constant value generator.

Example:

type: constant
value: "production"

Field Type Required Default Description
type Literal['constant'] Yes - -
value Any Yes - Constant value for all rows

BooleanGeneratorConfig

Used in: ColumnGeneratorConfig

Boolean generator.

Example:

type: boolean
true_probability: 0.95

Field Type Required Default Description
type Literal['boolean'] Yes - -
true_probability float No 0.5 Probability of True

TimestampGeneratorConfig

Used in: ColumnGeneratorConfig

Timestamp generator (uses simulation scope).

Example:

type: timestamp

Field Type Required Default Description
type Literal['timestamp'] Yes - -

UUIDGeneratorConfig

Used in: ColumnGeneratorConfig

UUID/GUID generator.

Example:

type: uuid
version: 4  # UUID4 (random) or UUID5 (deterministic from namespace)

Field Type Required Default Description
type Literal['uuid'] Yes - -
version Literal[4, 5] No 4 UUID version. Options: 4 (random, default) or 5 (deterministic/namespace-based). Only these values are supported.
namespace Optional[str] No - Namespace seed for UUID5 generation. Arbitrary string; default 'DNS' uses the standard DNS namespace UUID.

EmailGeneratorConfig

Used in: ColumnGeneratorConfig

Email address generator.

Example:

type: email
domain: example.com

Field Type Required Default Description
type Literal['email'] Yes - -
domain str No example.com Email domain
pattern str No {entity}_{index} Username template. Available placeholders: {entity}, {index}, {row}. Default produces usernames like 'entity_01'.

IPGeneratorConfig

Used in: ColumnGeneratorConfig

IPv4 address generator.

Example:

type: ipv4
subnet: "192.168.0.0/16"

Field Type Required Default Description
type Literal['ipv4'] Yes - -
subnet Optional[str] No - CIDR subnet (e.g., '192.168.0.0/16'). If None, uses full range.

GeoGeneratorConfig

Used in: ColumnGeneratorConfig

Geographic coordinate generator.

Example:

type: geo
bbox: [-90, -180, 90, 180]  # [min_lat, min_lon, max_lat, max_lon]

Field Type Required Default Description
type Literal['geo'] Yes - -
bbox List[float] Yes - Bounding box [min_lat, min_lon, max_lat, max_lon]
format Literal['tuple', 'lat_lon_separate'] No tuple Output format: 'tuple' for (lat,lon) or 'lat_lon_separate' for separate columns

Semantic Layer

Semantic Layer

The semantic layer provides a unified interface for defining and querying business metrics. Define metrics once, query them by name across dimensions.

Core Components: - MetricDefinition: Define aggregation expressions (SUM, COUNT, AVG) - DimensionDefinition: Define grouping attributes with hierarchies - MaterializationConfig: Pre-compute metrics at specific grain - SemanticQuery: Execute queries like "revenue BY region, month" - Project: Unified API that connects pipelines and semantic layer

Unified Project API (Recommended):

from odibi import Project

project = Project.load("odibi.yaml")
result = project.query("revenue BY region")
print(result.df)

YAML Configuration:

project: my_warehouse
engine: pandas

connections:
  gold:
    type: delta
    path: /mnt/data/gold

# Semantic layer at project level
semantic:
  metrics:
    - name: revenue
      expr: "SUM(total_amount)"
      source: gold.fact_orders    # connection.table notation
      filters:
        - "status = 'completed'"

  dimensions:
    - name: region
      source: gold.dim_customer
      column: region

materializations:
  - name: monthly_revenue
    metrics: [revenue]
    dimensions: [region, month]
    output: gold/agg_monthly_revenue

The source: gold.fact_orders notation resolves paths automatically from connections.


DimensionDefinition

Used in: SemanticLayerConfig

Definition of a semantic dimension.

A dimension represents an attribute for grouping and filtering metrics (e.g., date, product, region).

Attributes: name: Unique dimension identifier label: Display name for column alias in generated views. Defaults to name. source: Source table reference. Supports three formats: - $pipeline.node (recommended): e.g., $build_warehouse.dim_customer - connection.path: e.g., gold.dim_customer or gold.dims/customer - table_name: Uses default connection column: Column name in source (defaults to name) expr: Custom SQL expression. If provided, overrides column and grain. Example: "YEAR(DATEADD(month, 6, Date))" for fiscal year hierarchy: Optional ordered list of columns for drill-down description: Human-readable description grain: Time grain transformation (day, week, month, quarter, year). Ignored if expr is provided.

Field Type Required Default Description
name str Yes - Unique dimension identifier
label Optional[str] No - Display name for column alias (defaults to name)
source Optional[str] No - Source table reference. Formats: $pipeline.node (e.g., $build_warehouse.dim_customer), connection.path (e.g., gold.dim_customer or gold.dims/customer), or bare table_name
column Optional[str] No - Column name (defaults to name)
expr Optional[str] No - Custom SQL expression. Overrides column and grain. Example: YEAR(DATEADD(month, 6, Date)) for fiscal year
hierarchy List[str] No PydanticUndefined Drill-down hierarchy
description Optional[str] No - Human-readable description
grain Optional[TimeGrain] No - Time grain transformation

MaterializationConfig

Used in: SemanticLayerConfig

Configuration for materializing metrics to a table.

Materialization pre-computes aggregated metrics at a specific grain and persists them for faster querying.

Attributes: name: Unique materialization identifier metrics: List of metric names to include dimensions: List of dimension names (determines grain) output: Output table path schedule: Optional cron schedule for refresh incremental: Configuration for incremental refresh

Field Type Required Default Description
name str Yes - Unique materialization identifier
metrics List[str] Yes - Metrics to materialize
dimensions List[str] Yes - Dimensions for grouping
output str Yes - Output table path
schedule Optional[str] No - Cron schedule
incremental Optional[Dict[str, Any]] No - Incremental refresh config

MetricDefinition

Used in: SemanticLayerConfig

Definition of a semantic metric.

A metric represents a measurable value that can be aggregated across dimensions (e.g., revenue, order_count, avg_order_value).

Attributes: name: Unique metric identifier label: Display name for column alias in generated views. Defaults to name. description: Human-readable description expr: SQL aggregation expression (e.g., "SUM(total_amount)"). Optional for derived metrics. source: Source table reference. Supports three formats: - $pipeline.node (recommended): e.g., $build_warehouse.fact_orders - connection.path: e.g., gold.fact_orders or gold.oee/plant_a/metrics - table_name: Uses default connection filters: Optional WHERE conditions to apply type: "simple" (direct aggregation) or "derived" (references other metrics) components: List of component metric names (required for derived metrics). These metrics must be additive (e.g., SUM-based) for correct recalculation at different grains. formula: Calculation formula using component names (required for derived). Example: "(total_revenue - total_cost) / total_revenue"

Field Type Required Default Description
name str Yes - Unique metric identifier
label Optional[str] No - Display name for column alias (defaults to name)
description Optional[str] No - Human-readable description
expr Optional[str] No - SQL aggregation expression
source Optional[str] No - Source table reference. Formats: $pipeline.node (e.g., $build_warehouse.fact_orders), connection.path (e.g., gold.fact_orders or gold.oee/plant_a/table), or bare table_name
filters List[str] No PydanticUndefined WHERE conditions
type MetricType No MetricType.SIMPLE Metric type
components Optional[List[str]] No - Component metric names for derived metrics
formula Optional[str] No - Calculation formula using component names

SemanticLayerConfig

Complete semantic layer configuration.

Contains all metrics, dimensions, materializations, and views for a semantic layer deployment.

Attributes: metrics: List of metric definitions dimensions: List of dimension definitions materializations: List of materialization configurations views: List of view configurations

Field Type Required Default Description
metrics List[MetricDefinition] No PydanticUndefined Metric definitions
dimensions List[DimensionDefinition] No PydanticUndefined Dimension definitions
materializations List[MaterializationConfig] No PydanticUndefined Materialization configs
views List[ViewConfig] No PydanticUndefined View configurations

FK Validation

FK Validation

Declare and validate referential integrity between fact and dimension tables.

Features: - Declare relationships in YAML - Validate FK constraints on fact load - Detect orphan records - Generate lineage from relationships

Example:

relationships:
  - name: orders_to_customers
    fact: fact_orders
    dimension: dim_customer
    fact_key: customer_sk
    dimension_key: customer_sk
    on_violation: error


RelationshipConfig

Used in: RelationshipRegistry

Configuration for a foreign key relationship.

Attributes: name: Unique relationship identifier fact: Fact table name dimension: Dimension table name fact_key: Foreign key column in fact table dimension_key: Primary/surrogate key column in dimension nullable: Whether nulls are allowed in fact_key on_violation: Action on violation ("warn", "error", "quarantine")

Field Type Required Default Description
name str Yes - Unique relationship identifier
fact str Yes - Fact table name
dimension str Yes - Dimension table name
fact_key str Yes - FK column in fact table
dimension_key str Yes - PK/SK column in dimension
nullable bool No False Allow nulls in fact_key
on_violation str No error Action on violation

RelationshipRegistry

Registry of all declared relationships.

Attributes: relationships: List of relationship configurations

Field Type Required Default Description
relationships List[RelationshipConfig] No PydanticUndefined Relationship definitions

Data Patterns

Data Patterns

Declarative patterns for common data warehouse building blocks. Patterns encapsulate best practices for dimensional modeling, ensuring consistent implementation across your data warehouse.


DimensionPattern

Build complete dimension tables with surrogate keys and SCD (Slowly Changing Dimension) support.

When to Use: - Building dimension tables from source systems (customers, products, locations) - Need surrogate keys for star schema joins - Need to track historical changes (SCD Type 2)

Beginner Note: Dimensions are the "who, what, where, when" of your data warehouse. A customer dimension has customer_id (natural key) and customer_sk (surrogate key). Fact tables join to dimensions via surrogate keys.

See Also: FactPattern, DateDimensionPattern

Features: - Auto-generate integer surrogate keys (MAX(existing) + ROW_NUMBER) - SCD Type 0 (static), 1 (overwrite), 2 (history tracking) - Optional unknown member row (SK=0) for orphan FK handling - Audit columns (load_timestamp, source_system)

Params:

Parameter Type Required Description
natural_key str Yes Natural/business key column name
surrogate_key str Yes Surrogate key column name to generate
scd_type int No 0=static, 1=overwrite, 2=history (default: 1)
track_cols list SCD1/2 Columns to track for change detection
target str SCD2 Target table path to read existing history
unknown_member bool No Insert row with SK=0 for orphan handling
audit.load_timestamp bool No Add load_timestamp column
audit.source_system str No Add source_system column with value

Supported Target Formats: - Spark: catalog.table, Delta paths, .parquet, .csv, .json, .orc - Pandas: .parquet, .csv, .json, .xlsx, .feather, .pickle

Example:

pattern:
  type: dimension
  params:
    natural_key: customer_id
    surrogate_key: customer_sk
    scd_type: 2
    track_cols: [name, email, address, city]
    target: warehouse.dim_customer
    unknown_member: true
    audit:
      load_timestamp: true
      source_system: "crm"


DateDimensionPattern

Generate a complete date dimension table with pre-calculated attributes for BI/reporting.

When to Use: - Every data warehouse needs a date dimension for time-based analytics - Enable date filtering, grouping by week/month/quarter, fiscal year reporting

Beginner Note: The date dimension is foundational for any BI/reporting system. It lets you query "sales by month" or "orders in fiscal Q2" without complex date calculations.

See Also: DimensionPattern

Features: - Generates all dates in a range with rich attributes - Calendar and fiscal year support - ISO week numbering - Weekend/month-end flags

Params:

Parameter Type Required Description
start_date str Yes Start date (YYYY-MM-DD)
end_date str Yes End date (YYYY-MM-DD)
date_key_format str No Format for date_sk (default: yyyyMMdd)
fiscal_year_start_month int No Month fiscal year starts (1-12, default: 1)
unknown_member bool No Add unknown date row with date_sk=0

Generated Columns: date_sk, full_date, day_of_week, day_of_week_num, day_of_month, day_of_year, is_weekend, week_of_year, month, month_name, quarter, quarter_name, year, fiscal_year, fiscal_quarter, is_month_start, is_month_end, is_year_start, is_year_end

Example:

pattern:
  type: date_dimension
  params:
    start_date: "2020-01-01"
    end_date: "2030-12-31"
    fiscal_year_start_month: 7
    unknown_member: true


FactPattern

Build fact tables with automatic surrogate key lookups from dimensions.

When to Use: - Building fact tables from transactional data (orders, events, transactions) - Need to look up surrogate keys from dimension tables - Need to handle orphan records (missing dimension matches)

Beginner Note: Facts are the "how much, how many" of your data warehouse. An orders fact has measures (quantity, revenue) and dimension keys (customer_sk, product_sk). The pattern automatically looks up SKs from dimensions.

See Also: DimensionPattern, QuarantineConfig

Features: - Automatic SK lookups from dimension tables (with SCD2 current-record filtering) - Orphan handling: unknown (SK=0), reject (error), quarantine (route to table) - Grain validation (detect duplicates) - Calculated measures and column renaming - Audit columns

Params:

Parameter Type Required Description
grain list No Columns defining uniqueness (validates no duplicates)
dimensions list No Dimension lookup configurations (see below)
orphan_handling str No "unknown" | "reject" | "quarantine" (default: unknown)
quarantine dict quarantine Quarantine config (see below)
measures list No Measure definitions (passthrough, rename, or calculated)
deduplicate bool No Remove duplicates before processing
keys list dedupe Keys for deduplication
audit.load_timestamp bool No Add load_timestamp column
audit.source_system str No Add source_system column

Dimension Lookup Config:

dimensions:
  - source_column: customer_id      # Column in source fact
    dimension_table: dim_customer   # Dimension in context
    dimension_key: customer_id      # Natural key in dimension
    surrogate_key: customer_sk      # SK to retrieve
    scd2: true                      # Filter is_current=true

Quarantine Config (for orphan_handling: quarantine):

quarantine:
  connection: silver                # Required: connection name
  path: fact_orders_orphans         # OR table: quarantine_table
  add_columns:
    _rejection_reason: true         # Add rejection reason
    _rejected_at: true              # Add rejection timestamp
    _source_dimension: true         # Add dimension name

Example:

pattern:
  type: fact
  params:
    grain: [order_id]
    dimensions:
      - source_column: customer_id
        dimension_table: dim_customer
        dimension_key: customer_id
        surrogate_key: customer_sk
        scd2: true
      - source_column: product_id
        dimension_table: dim_product
        dimension_key: product_id
        surrogate_key: product_sk
    orphan_handling: unknown
    measures:
      - quantity
      - revenue: "quantity * unit_price"
    audit:
      load_timestamp: true
      source_system: "pos"


AggregationPattern

Declarative aggregation with GROUP BY and optional incremental merge.

When to Use: - Building summary/aggregate tables (daily sales, monthly metrics) - Need incremental aggregation (update existing aggregates) - Gold layer reporting tables

Beginner Note: Aggregations summarize facts at a higher grain. Example: daily_sales aggregates orders by date with SUM(revenue).

See Also: FactPattern

Features: - Declare grain (GROUP BY columns) - Define measures with SQL aggregation expressions - Optional HAVING filter - Audit columns

Params:

Parameter Type Required Description
grain list Yes Columns to GROUP BY (defines uniqueness)
measures list Yes Measure definitions with name and expr
having str No HAVING clause for filtering aggregates
incremental.timestamp_column str No Column to identify new data
incremental.merge_strategy str No "replace", "sum", "min", or "max"
audit.load_timestamp bool No Add load_timestamp column
audit.source_system str No Add source_system column

Example:

pattern:
  type: aggregation
  params:
    grain: [date_sk, product_sk, region]
    measures:
      - name: total_revenue
        expr: "SUM(total_amount)"
      - name: order_count
        expr: "COUNT(*)"
      - name: avg_order_value
        expr: "AVG(total_amount)"
    having: "COUNT(*) > 0"
    audit:
      load_timestamp: true


AuditConfig

Configuration for audit columns.

Field Type Required Default Description
load_timestamp bool No True Add load_timestamp column
source_system Optional[str] No - Source system name for source_system column