Odibi Configuration Reference¶
This manual details the YAML configuration schema for Odibi projects. Auto-generated from Pydantic models.
Project Structure¶
ProjectConfig¶
Complete project configuration from YAML.
🏢 "Enterprise Setup" Guide¶
Business Problem: "We need a robust production environment with alerts, retries, and proper logging."
Recipe: Production Ready
project: "Customer360"
engine: "spark"
# 1. Resilience
retry:
enabled: true
max_attempts: 3
backoff: "exponential"
# 2. Observability
logging:
level: "INFO"
structured: true # JSON logs for Splunk/Datadog
# 3. Alerting
alerts:
- type: "slack"
url: "${SLACK_WEBHOOK_URL}"
on_events: ["on_failure"]
# ... connections and pipelines ...
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| project | str | Yes | - | Project name |
| engine | EngineType | No | EngineType.PANDAS |
Execution engine |
| connections | Dict[str, ConnectionConfig] | Yes | - | Named connections (at least one required) Options: LocalConnectionConfig, AzureBlobConnectionConfig, DeltaConnectionConfig, SQLServerConnectionConfig, HttpConnectionConfig, CustomConnectionConfig |
| pipelines | List[PipelineConfig] | Yes | - | Pipeline definitions (at least one required) |
| story | StoryConfig | Yes | - | Story generation configuration (mandatory) |
| system | SystemConfig | Yes | - | System Catalog configuration (mandatory) |
| lineage | Optional[LineageConfig] | No | - | OpenLineage configuration |
| description | Optional[str] | No | - | Project description |
| version | str | No | 1.0.0 |
Project version |
| owner | Optional[str] | No | - | Project owner/contact |
| vars | Dict[str, Any] | No | PydanticUndefined |
Global variables for substitution (e.g. ${vars.env}) |
| retry | RetryConfig | No | PydanticUndefined |
Retry configuration for transient failures. Applies to all nodes unless overridden. Default: enabled with 3 attempts, exponential backoff. |
| logging | LoggingConfig | No | PydanticUndefined |
Logging configuration for pipeline execution. Set level (DEBUG/INFO/WARNING/ERROR), enable structured JSON logs, add metadata. |
| alerts | List[AlertConfig] | No | PydanticUndefined |
Alert configurations |
| performance | PerformanceConfig | No | PydanticUndefined |
Performance tuning |
| environments | Optional[Dict[str, Dict[str, Any]]] | No | - | Structure: same as ProjectConfig but with only overridden fields. Not yet validated strictly. |
| semantic | Optional[Dict[str, Any]] | No | - | Semantic layer configuration. Can be inline or reference external file. Contains metrics, dimensions, and materializations for self-service analytics. Example: semantic: { config: 'semantic_config.yaml' } or inline definitions. |
PipelineConfig¶
Used in: ProjectConfig
Configuration for a pipeline.
Example:
pipelines:
- pipeline: "user_onboarding"
description: "Ingest and process new users"
layer: "silver"
owner: "data-team@example.com"
freshness_sla: "6h"
nodes:
- name: "node1"
...
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| pipeline | str | Yes | - | Pipeline name |
| description | Optional[str] | No | - | Pipeline description |
| layer | Optional[str] | No | - | Logical layer (bronze/silver/gold) |
| owner | Optional[str] | No | - | Pipeline owner (email or name) |
| freshness_sla | Optional[str] | No | - | Expected data freshness SLA. Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). Examples: '6h', '1d', '30m'. |
| freshness_anchor | Literal['run_completion', 'table_max_timestamp', 'watermark_state'] | No | run_completion |
What defines freshness. Only 'run_completion' implemented initially. |
| nodes | List[NodeConfig] | Yes | - | List of nodes in this pipeline |
| auto_cache_threshold | Optional[int] | No | 3 |
Auto-cache nodes with N or more downstream dependencies. Prevents redundant ADLS re-reads when a node is used by multiple downstream nodes. Default: 3. Set to null to disable auto-caching. Individual nodes can override with explicit cache: true/false. |
NodeConfig¶
Used in: PipelineConfig
Configuration for a single node.
🧠 "The Smart Node" Pattern¶
Business Problem: "We need complex dependencies, caching for heavy computations, and the ability to run only specific parts of the pipeline."
The Solution:
Nodes are the building blocks. They handle dependencies (depends_on), execution control (tags, enabled), and performance (cache).
🕸️ DAG & Dependencies¶
The Glue of the Pipeline. Nodes don't run in isolation. They form a Directed Acyclic Graph (DAG).
depends_on: Critical! If Node B reads from Node A (in memory), you MUST list["Node A"].- Implicit Data Flow: If a node has no
readblock, it automatically picks up the DataFrame from its first dependency.
- Implicit Data Flow: If a node has no
🧠 Smart Read & Incremental Loading¶
Automated History Management.
Odibi intelligently determines whether to perform a Full Load or an Incremental Load based on the state of the target.
The "Smart Read" Logic:
1. First Run (Full Load): If the target table (defined in write) does not exist:
* Incremental filtering rules are ignored.
* The entire source dataset is read.
* Use write.first_run_query (optional) to override the read query for this initial bootstrap (e.g., to backfill only 1 year of history instead of all time).
- Subsequent Runs (Incremental Load): If the target table exists:
- Rolling Window: Filters source data where
column >= NOW() - lookback. - Stateful: Filters source data where
column > last_high_water_mark.
- Rolling Window: Filters source data where
This ensures you don't need separate "init" and "update" pipelines. One config handles both lifecycle states.
🏷️ Orchestration Tags¶
Run What You Need.
Tags allow you to execute slices of your pipeline.
* odibi run --tag daily -> Runs all nodes with "daily" tag.
* odibi run --tag critical -> Runs high-priority nodes.
🤖 Choosing Your Logic: Transformer vs. Transform¶
1. The "Transformer" (Top-Level)
* What it is: A pre-packaged, heavy-duty operation that defines the entire purpose of the node.
* When to use: When applying a standard Data Engineering pattern (e.g., SCD2, Merge, Deduplicate).
* Analogy: "Run this App."
* Syntax: transformer: "scd2" + params: {...}
2. The "Transform Steps" (Process Chain)
* What it is: A sequence of smaller steps (SQL, functions, operations) executed in order.
* When to use: For custom business logic, data cleaning, or feature engineering pipelines.
* Analogy: "Run this Script."
* Syntax: transform: { steps: [...] }
Note: You can use both! The transformer runs first, then transform steps refine the result.
🔗 Chaining Operations¶
You can mix and match! The execution order is always: 1. Read (or Dependency Injection) 2. Transformer (The "App" logic, e.g., Deduplicate) 3. Transform Steps (The "Script" logic, e.g., cleanup) 4. Validation 5. Write
Constraint: You must define at least one of read, transformer, transform, or write.
⚡ Example: App vs. Script¶
Scenario 1: The Full ETL Flow (Chained) Shows explicit Read, Transform Chain, and Write.
# 1. Ingest (The Dependency)
- name: "load_raw_users"
read: { connection: "s3_landing", format: "json", path: "users/*.json" }
write: { connection: "bronze", format: "parquet", path: "users_raw" }
# 2. Process (The Consumer)
- name: "clean_users"
depends_on: ["load_raw_users"]
# "clean_text" is a registered function from the Transformer Catalog
transform:
steps:
- sql: "SELECT * FROM df WHERE email IS NOT NULL"
- function: "clean_text"
params: { columns: ["email"], case: "lower" }
write: { connection: "silver", format: "delta", table: "dim_users" }
Scenario 2: The "App" Node (Top-Level Transformer) Shows a node that applies a pattern (Deduplicate) to incoming data.
- name: "deduped_users"
depends_on: ["clean_users"]
# The "App": Deduplication (From Transformer Catalog)
transformer: "deduplicate"
params:
keys: ["user_id"]
order_by: "updated_at DESC"
write: { connection: "gold", format: "delta", table: "users_unique" }
Scenario 3: The Tagged Runner (Reporting)
Shows how tags allow running specific slices (e.g., odibi run --tag daily).
- name: "daily_report"
tags: ["daily", "reporting"]
depends_on: ["deduped_users"]
# Ad-hoc aggregation script
transform:
steps:
- sql: "SELECT date_trunc('day', updated_at) as day, count(*) as total FROM df GROUP BY 1"
write: { connection: "local_data", format: "csv", path: "reports/daily_stats.csv" }
Scenario 4: The "Kitchen Sink" (All Operations) Shows Read -> Transformer -> Transform -> Write execution order.
Why this works:
1. Internal Chaining (df): In every step (Transformer or SQL), df refers to the output of the previous step.
2. External Access (depends_on): If you added depends_on: ["other_node"], you could also run SELECT * FROM other_node in your SQL steps!
- name: "complex_flow"
# 1. Read -> Creates initial 'df'
read: { connection: "bronze", format: "parquet", path: "users" }
# 2. Transformer (The "App": Deduplicate first)
# Takes 'df' (from Read), dedups it, returns new 'df'
transformer: "deduplicate"
params: { keys: ["user_id"], order_by: "updated_at DESC" }
# 3. Transform Steps (The "Script": Filter AFTER deduplication)
# SQL sees the deduped data as 'df'
transform:
steps:
- sql: "SELECT * FROM df WHERE status = 'active'"
# 4. Write -> Saves the final filtered 'df'
write: { connection: "silver", format: "delta", table: "active_unique_users" }
📚 Transformer Catalog¶
These are the built-in functions you can use in two ways:
- As a Top-Level Transformer:
transformer: "name"(Defines the node's main logic) - As a Step in a Chain:
transform: { steps: [{ function: "name" }] }(Part of a sequence)
Note: merge and scd2 are special "Heavy Lifters" and should generally be used as Top-Level Transformers.
Data Engineering Patterns
* merge: Upsert/Merge into target (Delta/SQL). (Params)
* scd2: Slowly Changing Dimensions Type 2. (Params)
* deduplicate: Remove duplicates using window functions. (Params)
Relational Algebra
* join: Join two datasets. (Params)
* union: Stack datasets vertically. (Params)
* pivot: Rotate rows to columns. (Params)
* unpivot: Rotate columns to rows (melt). (Params)
* aggregate: Group by and sum/count/avg. (Params)
Data Quality & Cleaning
* validate_and_flag: Check rules and flag invalid rows. (Params)
* clean_text: Trim and normalize case. (Params)
* filter_rows: SQL-based filtering. (Params)
* fill_nulls: Replace NULLs with defaults. (Params)
Feature Engineering
* derive_columns: Create new cols via SQL expressions. (Params)
* case_when: Conditional logic (if-else). (Params)
* generate_surrogate_key: Create MD5 keys from columns. (Params)
* date_diff, date_add, date_trunc: Date arithmetic.
Scenario 1: The Full ETL Flow (Show two nodes: one loader, one processor)
# 1. Ingest (The Dependency)
- name: "load_raw_users"
read: { connection: "s3_landing", format: "json", path: "users/*.json" }
write: { connection: "bronze", format: "parquet", path: "users_raw" }
# 2. Process (The Consumer)
- name: "clean_users"
depends_on: ["load_raw_users"] # <--- Explicit dependency
# Explicit Transformation Steps
transform:
steps:
- sql: "SELECT * FROM df WHERE email IS NOT NULL"
- function: "clean_text"
params: { columns: ["email"], case: "lower" }
write: { connection: "silver", format: "delta", table: "dim_users" }
Scenario 2: The "App" Node (Transformer) (Show a node that is a Transformer, no read needed if it picks up from dependency)
- name: "deduped_users"
depends_on: ["clean_users"]
# The "App": Deduplication
transformer: "deduplicate"
params:
keys: ["user_id"]
order_by: "updated_at DESC"
write: { connection: "gold", format: "delta", table: "users_unique" }
Scenario 3: The Tagged Runner
Run only this with odibi run --tag daily
Scenario 4: Pre/Post SQL Hooks Setup and cleanup with SQL statements.
- name: "optimize_sales"
depends_on: ["load_sales"]
pre_sql:
- "SET spark.sql.shuffle.partitions = 200"
- "CREATE TEMP VIEW staging AS SELECT * FROM bronze.raw_sales"
transform:
steps:
- sql: "SELECT * FROM staging WHERE amount > 0"
post_sql:
- "OPTIMIZE gold.fact_sales ZORDER BY (customer_id)"
- "VACUUM gold.fact_sales RETAIN 168 HOURS"
write:
connection: "gold"
format: "delta"
table: "fact_sales"
Scenario 5: Materialization Strategies Choose how output is persisted.
# Option 1: View (no physical storage, logical model)
- name: "vw_active_customers"
materialized: "view" # Creates SQL view instead of table
transform:
steps:
- sql: "SELECT * FROM customers WHERE status = 'active'"
write:
connection: "gold"
table: "vw_active_customers"
# Option 2: Incremental (append to existing Delta table)
- name: "fact_events"
materialized: "incremental" # Uses APPEND mode
read:
connection: "bronze"
table: "raw_events"
incremental:
mode: "stateful"
column: "event_time"
write:
connection: "silver"
format: "delta"
table: "fact_events"
# Option 3: Table (default - full overwrite)
- name: "dim_products"
materialized: "table" # Default behavior
# ...
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| name | str | Yes | - | Unique node name |
| description | Optional[str] | No | - | Human-readable description |
| explanation | Optional[str] | No | - | Markdown-formatted explanation of the node's transformation logic. Rendered in the Data Story HTML report. Supports tables, code blocks, and rich formatting. Use to document business rules, data mappings, and transformation rationale for stakeholder communication. Mutually exclusive with 'explanation_file'. |
| explanation_file | Optional[str] | No | - | Path to external Markdown file containing the explanation, relative to the YAML file. Use for longer documentation to keep YAML files clean. Mutually exclusive with 'explanation'. |
| runbook_url | Optional[str] | No | - | URL to troubleshooting guide or runbook. Shown as 'Troubleshooting guide →' link on failures. |
| enabled | bool | No | True |
If False, node is skipped during execution |
| tags | List[str] | No | PydanticUndefined |
Operational tags for selective execution (e.g., 'daily', 'critical'). Use with odibi run --tag. |
| depends_on | List[str] | No | PydanticUndefined |
List of parent nodes that must complete before this node runs. The output of these nodes is available for reading. |
| columns | Dict[str, ColumnMetadata] | No | PydanticUndefined |
Data Dictionary defining the output schema. Used for documentation, PII tagging, and validation. |
| read | Optional[ReadConfig] | No | - | Input operation (Load). If missing, data is taken from the first dependency. |
| inputs | Optional[Dict[str, str | Dict[str, Any]]] | No | - | Multi-input support for cross-pipeline dependencies. Map input names to either: (a) $pipeline.node reference (e.g., '$read_bronze.shift_events') (b) Explicit read config dict. Cannot be used with 'read'. Example: inputs: {events: '$read_bronze.events', calendar: {connection: 'goat', path: 'cal'}} |
| transform | Optional[TransformConfig] | No | - | Chain of fine-grained transformation steps (SQL, functions). Runs after 'transformer' if both are present. |
| write | Optional[WriteConfig] | No | - | Output operation (Save to file/table). |
| streaming | bool | No | False |
Enable streaming execution for this node (Spark only) |
| transformer | Optional[str] | No | - | Name of the 'App' logic to run (e.g., 'deduplicate', 'scd2'). See Transformer Catalog for options. |
| params | Dict[str, Any] | No | PydanticUndefined |
Parameters for transformer |
| pre_sql | List[str] | No | PydanticUndefined |
List of SQL statements to execute before node runs. Use for setup: temp tables, variable initialization, grants. Example: ['SET spark.sql.shuffle.partitions=200', 'CREATE TEMP VIEW src AS SELECT * FROM raw'] |
| post_sql | List[str] | No | PydanticUndefined |
List of SQL statements to execute after node completes. Use for cleanup, optimization, or audit logging. Example: ['OPTIMIZE gold.fact_sales', 'VACUUM gold.fact_sales RETAIN 168 HOURS'] |
| materialized | Optional[Literal['table', 'view', 'incremental']] | No | - | Materialization strategy. Options: 'table' (default physical write), 'view' (creates SQL view instead of table), 'incremental' (uses append mode for Delta tables). Views are useful for Gold layer logical models. |
| cache | bool | No | False |
Cache result for reuse |
| log_level | Optional[LogLevel] | No | - | Override log level for this node |
| on_error | ErrorStrategy | No | ErrorStrategy.FAIL_LATER |
Failure handling strategy |
| validation | Optional[ValidationConfig] | No | - | - |
| contracts | List[TestConfig] | No | PydanticUndefined |
Pre-condition contracts (Circuit Breakers). Runs on input data before transformation. Options: NotNullTest, UniqueTest, AcceptedValuesTest, RowCountTest, CustomSQLTest, RangeTest, RegexMatchTest, VolumeDropTest, SchemaContract, DistributionContract, FreshnessContract |
| schema_policy | Optional[SchemaPolicyConfig] | No | - | Schema drift handling policy |
| privacy | Optional[PrivacyConfig] | No | - | Privacy Suite: PII anonymization settings |
| sensitive | bool | List[str] | No | False |
If true or list of columns, masks sample data in stories |
| source_yaml | Optional[str] | No | - | Internal: source YAML file path for sql_file resolution |
ColumnMetadata¶
Used in: NodeConfig
Metadata for a column in the data dictionary.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| description | Optional[str] | No | - | Column description |
| pii | bool | No | False |
Contains PII? |
| tags | List[str] | No | PydanticUndefined |
Tags (e.g. 'business_key', 'measure') |
SystemConfig¶
Used in: ProjectConfig
Configuration for the Odibi System Catalog (The Brain).
Stores metadata, state, and pattern configurations. The primary connection must be a storage connection (blob/local) that supports Delta tables.
Example:
system:
connection: adls_bronze # Primary - must be blob/local storage
path: _odibi_system
environment: dev
With sync to SQL Server (for dashboards/queries):
system:
connection: adls_prod # Primary - Delta tables
environment: prod
sync_to:
connection: sql_server_prod # Secondary - SQL for visibility
schema_name: odibi_system
With sync to another blob (cross-region backup):
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| connection | str | Yes | - | Connection for primary system tables. Must be blob storage (azure_blob) or local filesystem - NOT SQL Server. Delta tables require storage backends. |
| path | str | No | _odibi_system |
Path relative to connection root |
| environment | Optional[str] | No | - | Environment tag (e.g., 'dev', 'qat', 'prod'). Written to all system table records for cross-environment querying. |
| schema_name | Optional[str] | No | - | Deprecated. Use sync_to.schema_name for SQL Server targets. |
| sync_to | Optional[SyncToConfig] | No | - | Secondary destination to sync system catalog data to. Use for SQL Server dashboards or cross-region Delta replication. |
| sync_from | Optional[SyncFromConfig] | No | - | Source to sync system data from. Enables pushing local development data to centralized system tables. |
| cost_per_compute_hour | Optional[float] | No | - | Estimated cost per compute hour (USD) for cost tracking |
| databricks_billing_enabled | bool | No | False |
Attempt to query Databricks billing tables for actual costs |
| retention_days | Optional[RetentionConfig] | No | - | Retention periods for system tables |
| optimize_catalog | bool | No | False |
Run OPTIMIZE + VACUUM on all system catalog Delta tables after each pipeline run. Compacts small files created by frequent MERGE operations. Adds ~15-20s but prevents accumulation of small files that degrade read performance over time. Set to false to skip. |
| sync_timeout_seconds | float | No | 30.0 |
Maximum time (seconds) to wait for async catalog sync to complete. Reduced from 300s default to 30s for better performance. Sync is incremental, so incomplete syncs will catch up on next run. |
| async_derived_updates | bool | No | True |
Run derived table updates (meta_daily_stats, meta_pipeline_health, etc.) asynchronously in background thread. Saves ~20-30s per pipeline. Updates complete eventually - safe for reporting tables. |
| async_lineage | bool | No | True |
Build lineage incrementally as each pipeline completes, then merge at end. Saves ~40s by parallelizing lineage construction with pipeline execution. Lineage still generated, just built in background. |
| skip_sync_wait_in_databricks | bool | No | True |
Skip waiting for catalog sync to complete when running in Databricks. Databricks clusters stay alive, so background sync threads complete safely. Saves ~90s overhead. Set to false to always wait for sync completion. |
SyncFromConfig¶
Used in: SystemConfig
Configuration for syncing system data from a source location.
Used to pull system data (runs, state) from another backend into the target.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| connection | str | Yes | - | Connection name for the source system data |
| path | Optional[str] | No | - | Path to source system data (for file-based sources) |
| schema_name | Optional[str] | No | - | Schema name for SQL Server source (if applicable) |
Connections¶
LocalConnectionConfig¶
Used in: ProjectConfig
Local filesystem connection.
When to Use: Development, testing, small datasets, local processing.
See Also: AzureBlobConnectionConfig for cloud alternatives.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['local'] | No | ConnectionType.LOCAL |
- |
| validation_mode | ValidationMode | No | ValidationMode.LAZY |
- |
| base_path | str | No | ./data |
Base directory path |
DeltaConnectionConfig¶
Used in: ProjectConfig
Delta Lake connection for ACID-compliant data lakes.
When to Use: - Production data lakes on Azure/AWS/GCP - Need time travel, ACID transactions, schema evolution - Upsert/merge operations
See Also: WriteConfig for Delta write options
Scenario 1: Delta via metastore
Scenario 2: Direct path + Node usage
delta_local:
type: "local"
base_path: "dbfs:/mnt/delta"
# In pipeline:
# read:
# connection: "delta_local"
# format: "delta"
# path: "bronze/orders"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['delta'] | No | ConnectionType.DELTA |
- |
| validation_mode | ValidationMode | No | ValidationMode.LAZY |
- |
| catalog | str | Yes | - | Spark catalog name (e.g. 'spark_catalog') |
| schema_name | str | Yes | - | Database/schema name |
| table | Optional[str] | No | - | Optional default table name for this connection (used by story/pipeline helpers) |
AzureBlobConnectionConfig¶
Used in: ProjectConfig
Azure Blob Storage / ADLS Gen2 connection.
When to Use: Azure-based data lakes, landing zones, raw data storage.
See Also: DeltaConnectionConfig for Delta-specific options
Scenario 1: Prod with Key Vault-managed key
adls_bronze:
type: "azure_blob"
account_name: "myaccount"
container: "bronze"
auth:
mode: "key_vault"
key_vault: "kv-data"
secret: "adls-account-key"
Scenario 2: Local dev with inline account key
adls_dev:
type: "azure_blob"
account_name: "devaccount"
container: "sandbox"
auth:
mode: "account_key"
account_key: "${ADLS_ACCOUNT_KEY}"
Scenario 3: MSI (no secrets)
adls_msi:
type: "azure_blob"
account_name: "myaccount"
container: "bronze"
auth:
mode: "aad_msi"
# optional: client_id for user-assigned identity
client_id: "00000000-0000-0000-0000-000000000000"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['azure_blob'] | No | ConnectionType.AZURE_BLOB |
- |
| validation_mode | ValidationMode | No | ValidationMode.LAZY |
- |
| account_name | str | Yes | - | - |
| container | str | Yes | - | - |
| auth | AzureBlobAuthConfig | No | PydanticUndefined |
Authentication configuration. Choose one mode: 'account_key' (storage key), 'sas' (SAS token), 'connection_string', 'key_vault' (Azure Key Vault), or 'aad_msi' (Managed Identity, default). For production, prefer key_vault or aad_msi to avoid storing secrets in config. Options: AzureBlobKeyVaultAuth, AzureBlobAccountKeyAuth, AzureBlobSasAuth, AzureBlobConnectionStringAuth, AzureBlobMsiAuth |
SQLServerConnectionConfig¶
Used in: ProjectConfig
SQL Server / Azure SQL Database connection.
When to Use: Reading from SQL Server sources, Azure SQL DB, Azure Synapse.
See Also: ReadConfig for query options
Scenario 1: Managed identity (AAD MSI)
sql_dw_msi:
type: "sql_server"
host: "server.database.windows.net"
database: "dw"
auth:
mode: "aad_msi"
Scenario 2: SQL login
sql_dw_login:
type: "sql_server"
host: "server.database.windows.net"
database: "dw"
port: 1433
driver: "ODBC Driver 17 for SQL Server"
auth:
mode: "sql_login"
username: "dw_writer"
password: "${DW_PASSWORD}"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['sql_server'] | No | ConnectionType.SQL_SERVER |
- |
| validation_mode | ValidationMode | No | ValidationMode.LAZY |
- |
| host | str | Yes | - | - |
| database | str | Yes | - | - |
| port | int | No | 1433 |
- |
| driver | str | No | ODBC Driver 18 for SQL Server |
- |
| auth | SQLServerAuthConfig | No | PydanticUndefined |
Authentication configuration. Choose one mode: 'sql_login' (username/password), 'aad_password' (Azure AD service principal), 'aad_msi' (Managed Identity, default), or 'connection_string' (full JDBC string). For Databricks/Azure, prefer aad_msi for passwordless auth. Options: SQLLoginAuth, SQLAadPasswordAuth, SQLMsiAuth, SQLConnectionStringAuth |
HttpConnectionConfig¶
Used in: ProjectConfig
HTTP connection.
Scenario: Bearer token via env var
api_source:
type: "http"
base_url: "https://api.example.com"
headers:
User-Agent: "odibi-pipeline"
auth:
mode: "bearer"
token: "${API_TOKEN}"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['http'] | No | ConnectionType.HTTP |
- |
| validation_mode | ValidationMode | No | ValidationMode.LAZY |
- |
| base_url | str | Yes | - | Base URL for all API requests (e.g., 'https://api.example.com/v1') |
| headers | Dict[str, str] | No | PydanticUndefined |
Default HTTP headers included in all requests. Example: {'User-Agent': 'odibi-pipeline', 'Accept': 'application/json'}. Auth headers are typically set via the 'auth' block instead. |
| auth | HttpAuthConfig | No | PydanticUndefined |
Authentication configuration. Choose one mode: 'none' (no auth), 'basic' (username/password), 'bearer' (token), or 'api_key' (custom header). Tokens can use env vars: '${API_TOKEN}'. Options: HttpNoAuth, HttpBasicAuth, HttpBearerAuth, HttpApiKeyAuth |
Node Operations¶
ReadConfig¶
Used in: NodeConfig
Configuration for reading data into a node.
When to Use: First node in a pipeline, or any node that reads from storage.
Key Concepts:
- connection: References a named connection from connections: section
- format: File format (csv, parquet, delta, json, sql)
- incremental: Enable incremental loading (only new data)
See Also: - Incremental Loading - HWM-based loading - IncrementalConfig - Incremental loading options
📖 "Universal Reader" Guide¶
Business Problem: "I need to read from files, databases, streams, and even travel back in time to see how data looked yesterday."
Recipe 1: The Time Traveler (Delta/Iceberg) Reproduce a bug by seeing the data exactly as it was.
read:
connection: "silver_lake"
format: "delta"
table: "fact_sales"
time_travel:
as_of_timestamp: "2023-10-25T14:00:00Z"
Recipe 2: The Streamer Process data in real-time.
Recipe 3: The SQL Query Push down filtering to the source database.
read:
connection: "enterprise_dw"
format: "sql"
# Use the query option to filter at source!
query: "SELECT * FROM huge_table WHERE date >= '2024-01-01'"
Recipe 4: Archive Bad Records (Spark) Capture malformed records for later inspection.
read:
connection: "landing"
format: "json"
path: "events/*.json"
archive_options:
badRecordsPath: "/mnt/quarantine/bad_records"
Recipe 5: Optimize JDBC Parallelism (Spark) Control partition count for SQL sources to reduce task overhead.
read:
connection: "enterprise_dw"
format: "sql"
table: "small_lookup_table"
options:
numPartitions: 1 # Single partition for small tables
Performance Tip: For small tables (<100K rows), use numPartitions: 1 to avoid
excessive Spark task scheduling overhead. For large tables, increase partitions
to enable parallel reads (requires partitionColumn, lowerBound, upperBound).
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| connection | Optional[str] | No | - | Connection name from project.yaml (null for synthetic/simulation sources) |
| format | ReadFormat | Yes | - | Data format: csv, parquet, delta, json, sql, api, excel, avro, cloudFiles |
| table | Optional[str] | No | - | Table name for SQL/Delta |
| path | Optional[str] | No | - | Path for file-based sources |
| streaming | bool | No | False |
Enable streaming read (Spark only) |
| schema_ddl | Optional[str] | No | - | Schema for streaming reads from file sources (required for Avro, JSON, CSV). Use Spark DDL format: 'col1 STRING, col2 INT, col3 TIMESTAMP'. Not required for Delta (schema is inferred from table metadata). |
| query | Optional[str] | No | - | SQL query to filter at source (pushdown). Mutually exclusive with table/path if supported by connector. |
| sql_file | Optional[str] | No | - | Path to external .sql file containing the query, relative to the YAML file defining the node. Mutually exclusive with 'query'. |
| filter | Optional[str] | No | - | SQL WHERE clause filter (pushed down to source for SQL formats). Example: "DAY > '2022-12-31'" |
| incremental | Optional[IncrementalConfig] | No | - | Automatic incremental loading strategy (CDC-like). If set, generates query based on target state (HWM). |
| time_travel | Optional[TimeTravelConfig] | No | - | Time travel options (Delta only) |
| archive_options | Dict[str, Any] | No | PydanticUndefined |
Options for archiving bad records (e.g. badRecordsPath for Spark) |
| options | Dict[str, Any] | No | PydanticUndefined |
Format-specific options |
IncrementalConfig¶
Used in: ReadConfig
Configuration for automatic incremental loading.
When to Use: Load only new/changed data instead of full table scans.
See Also: ReadConfig
Modes:
1. Rolling Window (Default): Uses a time-based lookback from NOW().
Good for: Stateless loading where you just want "recent" data.
Args: lookback, unit
- Stateful: Tracks the High-Water Mark (HWM) of the key column.
Good for: Exact incremental ingestion (e.g. CDC-like).
Args:
state_key(optional),watermark_lag(optional)
Generates SQL:
- Rolling: WHERE column >= NOW() - lookback
- Stateful: WHERE column > :last_hwm
Example (Rolling Window):
Example (Stateful HWM):
incremental:
mode: "stateful"
column: "id"
# Optional: track separate column for HWM state
state_key: "last_processed_id"
Example (Stateful with Watermark Lag):
incremental:
mode: "stateful"
column: "updated_at"
# Handle late-arriving data: look back 2 hours from HWM
watermark_lag: "2h"
Example (Oracle Date Format):
incremental:
mode: "rolling_window"
column: "EVENT_START"
lookback: 3
unit: "day"
# For string columns with Oracle format (DD-MON-YY)
date_format: "oracle"
Supported date_format values:
- oracle: DD-MON-YY for Oracle databases (uses TO_TIMESTAMP)
- oracle_sqlserver: DD-MON-YY format stored in SQL Server (uses TRY_CONVERT)
- sql_server: Uses CONVERT with style 120
- us: MM/DD/YYYY format
- eu: DD/MM/YYYY format
- iso: YYYY-MM-DDTHH:MM:SS format
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| mode | IncrementalMode | No | IncrementalMode.ROLLING_WINDOW |
Incremental strategy: 'rolling_window' or 'stateful' |
| column | str | Yes | - | Primary column to filter on (e.g., updated_at) |
| fallback_column | Optional[str] | No | - | Backup column if primary is NULL (e.g., created_at). Generates COALESCE(col, fallback) >= ... |
| lookback | Optional[int] | No | - | Time units to look back (Rolling Window only) |
| unit | Optional[IncrementalUnit] | No | - | Time unit for lookback (Rolling Window only). Options: 'hour', 'day', 'month', 'year' |
| state_key | Optional[str] | No | - | Unique ID for state tracking. Defaults to node name if not provided. |
| watermark_lag | Optional[str] | No | - | Safety buffer for late-arriving data in stateful mode. Subtracts this duration from the stored HWM when filtering. Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). Examples: '2h' (2 hours), '30m' (30 minutes), '1d' (1 day). Use when source has replication lag or eventual consistency. |
| date_format | Optional[str] | No | - | Source date format when the column is stored as a string. Options: 'oracle' (DD-MON-YY for Oracle DB), 'oracle_sqlserver' (DD-MON-YY format in SQL Server), 'sql_server' (uses CONVERT with style 120), 'us' (MM/DD/YYYY), 'eu' (DD/MM/YYYY), 'iso' (YYYY-MM-DDTHH:MM:SS). When set, SQL pushdown will use appropriate CONVERT/TO_TIMESTAMP functions. |
TimeTravelConfig¶
Used in: ReadConfig
Configuration for time travel reading (Delta/Iceberg).
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| as_of_version | Optional[int] | No | - | Version number to time travel to |
| as_of_timestamp | Optional[str] | No | - | Timestamp string to time travel to |
TransformConfig¶
Used in: NodeConfig
Configuration for transformation steps within a node.
When to Use: Custom business logic, data cleaning, SQL transformations.
Key Concepts:
- steps: Ordered list of operations (SQL, functions, or both)
- Each step receives the DataFrame from the previous step
- Steps execute in order: step1 → step2 → step3
See Also: Transformer Catalog
Transformer vs Transform:
- transformer: Single heavy operation (scd2, merge, deduplicate)
- transform.steps: Chain of lighter operations
🔧 "Transformation Pipeline" Guide¶
Business Problem: "I have complex logic that mixes SQL for speed and Python for complex calculations."
The Solution: Chain multiple steps together. Output of Step 1 becomes input of Step 2.
Function Registry:
The function step type looks up functions registered with @transform (or @register).
This allows you to use the same registered functions as both top-level Transformers and steps in a chain.
Recipe: The Mix-and-Match
transform:
steps:
# Step 1: SQL Filter (Fast)
- sql: "SELECT * FROM df WHERE status = 'ACTIVE'"
# Step 2: Custom Python Function (Complex Logic)
# Looks up 'calculate_lifetime_value' in the registry
- function: "calculate_lifetime_value"
params: { discount_rate: 0.05 }
# Step 3: Built-in Operation (Standard)
- operation: "drop_duplicates"
params: { subset: ["user_id"] }
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| steps | List[str | TransformStep] | Yes | - | List of transformation steps (SQL strings or TransformStep configs) |
DeleteDetectionConfig¶
Configuration for delete detection in Silver layer.
🔍 "CDC Without CDC" Guide¶
Business Problem: "Records are deleted in our Azure SQL source, but our Silver tables still show them."
The Solution: Use delete detection to identify and flag records that no longer exist in the source.
Recipe 1: SQL Compare (Recommended for HWM)
transform:
steps:
- operation: detect_deletes
params:
mode: sql_compare
keys: [customer_id]
source_connection: azure_sql
source_table: dbo.Customers
Recipe 2: Snapshot Diff (For Full Snapshot Sources)
Use ONLY with full snapshot ingestion, NOT with HWM incremental.
Requires connection and path to specify the target Delta table for comparison.
transform:
steps:
- operation: detect_deletes
params:
mode: snapshot_diff
keys: [customer_id]
connection: silver_conn # Required: connection to target Delta table
path: "silver/customers" # Required: path to target Delta table
Recipe 3: Conservative Threshold
transform:
steps:
- operation: detect_deletes
params:
mode: sql_compare
keys: [customer_id]
source_connection: erp
source_table: dbo.Customers
max_delete_percent: 20.0
on_threshold_breach: error
Recipe 4: Hard Delete (Remove Rows)
transform:
steps:
- operation: detect_deletes
params:
mode: sql_compare
keys: [customer_id]
source_connection: azure_sql
source_table: dbo.Customers
soft_delete_col: null # removes rows instead of flagging
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| mode | DeleteDetectionMode | No | DeleteDetectionMode.NONE |
Delete detection strategy: none, snapshot_diff, sql_compare |
| keys | List[str] | No | PydanticUndefined |
Business key columns for comparison |
| connection | Optional[str] | No | - | For snapshot_diff: connection name to target Delta table (required for snapshot_diff) |
| path | Optional[str] | No | - | For snapshot_diff: path to target Delta table (required for snapshot_diff) |
| soft_delete_col | Optional[str] | No | _is_deleted |
Column to flag deletes (True = deleted). Set to null for hard-delete (removes rows). |
| source_connection | Optional[str] | No | - | For sql_compare: connection name to query live source |
| source_table | Optional[str] | No | - | For sql_compare: table to query for current keys |
| source_query | Optional[str] | No | - | For sql_compare: custom SQL query for keys (overrides source_table) |
| snapshot_column | Optional[str] | No | - | For snapshot_diff on non-Delta: column to identify snapshots. If None, uses Delta time travel (default). |
| on_first_run | FirstRunBehavior | No | FirstRunBehavior.SKIP |
Behavior when no previous version exists for snapshot_diff |
| max_delete_percent | Optional[float] | No | 50.0 |
Safety threshold: warn/error if more than X% of rows would be deleted |
| on_threshold_breach | ThresholdBreachAction | No | ThresholdBreachAction.WARN |
Behavior when delete percentage exceeds max_delete_percent |
ValidationConfig¶
Used in: NodeConfig
Configuration for data validation (post-transform checks).
When to Use: Output data quality checks that run after transformation but before writing.
See Also: Validation Guide, Quarantine Guide, Contracts Overview (pre-transform checks)
🛡️ "The Indestructible Pipeline" Pattern¶
Business Problem: "Bad data polluted our Gold reports, causing executives to make wrong decisions. We need to stop it before it lands."
The Solution: A Quality Gate that runs after transformation but before writing.
Recipe: The Quality Gate
validation:
mode: "fail" # fail (stop pipeline) or warn (log only)
on_fail: "alert" # alert or ignore
tests:
# 1. Completeness
- type: "not_null"
columns: ["transaction_id", "customer_id"]
# 2. Integrity
- type: "unique"
columns: ["transaction_id"]
- type: "accepted_values"
column: "status"
values: ["PENDING", "COMPLETED", "FAILED"]
# 3. Ranges & Patterns
- type: "range"
column: "age"
min: 18
max: 120
- type: "regex_match"
column: "email"
pattern: "^[\w\.-]+@[\w\.-]+\.\w+$"
# 4. Business Logic (SQL)
- type: "custom_sql"
name: "dates_ordered"
condition: "created_at <= completed_at"
threshold: 0.01 # Allow 1% failure
Recipe: Quarantine + Gate
validation:
tests:
- type: not_null
columns: [customer_id]
on_fail: quarantine
quarantine:
connection: silver
path: customers_quarantine
gate:
require_pass_rate: 0.95
on_fail: abort
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| mode | ValidationAction | No | ValidationAction.FAIL |
Execution mode: 'fail' (stop pipeline) or 'warn' (log only) |
| on_fail | OnFailAction | No | OnFailAction.ALERT |
Action on failure: 'alert' (send notification) or 'ignore' |
| tests | List[TestConfig] | No | PydanticUndefined |
List of validation tests Options: NotNullTest, UniqueTest, AcceptedValuesTest, RowCountTest, CustomSQLTest, RangeTest, RegexMatchTest, VolumeDropTest, SchemaContract, DistributionContract, FreshnessContract |
| quarantine | Optional[QuarantineConfig] | No | - | Quarantine configuration for failed rows |
| gate | Optional[GateConfig] | No | - | Quality gate configuration for batch-level validation |
| fail_fast | bool | No | False |
Stop validation on first failure. Skips remaining tests for faster feedback. |
| cache_df | bool | No | False |
Cache DataFrame before validation (Spark only). Improves performance with many tests. |
QuarantineConfig¶
Used in: ValidationConfig
Configuration for quarantine table routing.
When to Use: Capture invalid records for review/reprocessing instead of failing the pipeline.
See Also: Quarantine Guide, ValidationConfig
Routes rows that fail validation tests to a quarantine table with rejection metadata for later analysis/reprocessing.
Example:
validation:
tests:
- type: not_null
columns: [customer_id]
on_fail: quarantine
quarantine:
connection: silver
path: customers_quarantine
add_columns:
_rejection_reason: true
_rejected_at: true
max_rows: 10000
sample_fraction: 0.1
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| connection | str | Yes | - | Connection for quarantine writes |
| path | Optional[str] | No | - | Path for quarantine data |
| table | Optional[str] | No | - | Table name for quarantine |
| add_columns | QuarantineColumnsConfig | No | PydanticUndefined |
Metadata columns to add to quarantined rows |
| retention_days | Optional[int] | No | 90 |
Days to retain quarantined data (auto-cleanup) |
| max_rows | Optional[int] | No | - | Maximum number of rows to quarantine per run. Limits storage for high-failure batches. |
| sample_fraction | Optional[float] | No | - | Sample fraction of invalid rows to quarantine (0.0-1.0). Use for sampling large invalid sets. |
QuarantineColumnsConfig¶
Used in: QuarantineConfig
Columns added to quarantined rows for debugging and reprocessing.
Example:
quarantine:
connection: silver
path: customers_quarantine
add_columns:
_rejection_reason: true
_rejected_at: true
_source_batch_id: true
_failed_tests: true
_original_node: false
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| rejection_reason | bool | No | True |
Add _rejection_reason column with test failure description |
| rejected_at | bool | No | True |
Add _rejected_at column with UTC timestamp |
| source_batch_id | bool | No | True |
Add _source_batch_id column with run ID for traceability |
| failed_tests | bool | No | True |
Add _failed_tests column with comma-separated list of failed test names |
| original_node | bool | No | False |
Add _original_node column with source node name |
GateConfig¶
Used in: ValidationConfig
Quality gate configuration for batch-level validation.
When to Use: Pipeline-level pass/fail thresholds, row count limits, change detection.
See Also: Quality Gates, ValidationConfig
Gates evaluate the entire batch before writing, ensuring data quality thresholds are met.
Example:
gate:
require_pass_rate: 0.95
on_fail: abort
thresholds:
- test: not_null
min_pass_rate: 0.99
row_count:
min: 100
change_threshold: 0.5
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| require_pass_rate | float | No | 0.95 |
Minimum percentage of rows passing ALL tests |
| on_fail | GateOnFail | No | GateOnFail.ABORT |
Action when gate fails |
| thresholds | List[GateThreshold] | No | PydanticUndefined |
Per-test thresholds (overrides global require_pass_rate) |
| row_count | Optional[RowCountGate] | No | - | Row count anomaly detection |
GateThreshold¶
Used in: GateConfig
Per-test threshold configuration for quality gates.
Allows setting different pass rate requirements for specific tests.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| test | str | Yes | - | Test name or type to apply threshold to |
| min_pass_rate | float | Yes | - | Minimum pass rate required (0.0-1.0, e.g., 0.99 = 99%) |
RowCountGate¶
Used in: GateConfig
Row count anomaly detection for quality gates.
Validates that batch size falls within expected bounds and detects significant changes from previous runs.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| min | Optional[int] | No | - | Minimum expected row count |
| max | Optional[int] | No | - | Maximum expected row count |
| change_threshold | Optional[float] | No | - | Max allowed change vs previous run (e.g., 0.5 = 50% change triggers failure) |
WriteConfig¶
Used in: NodeConfig
Configuration for writing data from a node.
When to Use: Any node that persists data to storage.
Key Concepts:
- mode: How to handle existing data (overwrite, append, upsert)
- keys: Required for upsert mode - columns that identify unique records
- partition_by: Columns to partition output by (improves query performance)
See Also: - Performance Tuning - Partitioning strategies
🚀 "Big Data Performance" Guide¶
Business Problem: "My dashboards are slow because the query scans terabytes of data just to find one day's sales."
The Solution: Use Partitioning for coarse filtering (skipping huge chunks) and Z-Ordering for fine-grained skipping (colocating related data).
Recipe: Lakehouse Optimized
write:
connection: "gold_lake"
format: "delta"
table: "fact_sales"
mode: "append"
# 1. Partitioning: Physical folders.
# Use for low-cardinality columns often used in WHERE clauses.
# WARNING: Do NOT partition by high-cardinality cols like ID or Timestamp!
partition_by: ["country_code", "txn_year_month"]
# 2. Z-Ordering: Data clustering.
# Use for high-cardinality columns often used in JOINs or predicates.
zorder_by: ["customer_id", "product_id"]
# 3. Table Properties: Engine tuning.
table_properties:
"delta.autoOptimize.optimizeWrite": "true"
"delta.autoOptimize.autoCompact": "true"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| connection | str | Yes | - | Connection name from project.yaml |
| format | ReadFormat | Yes | - | Output format: csv, parquet, delta, json, sql, api, excel, avro, cloudFiles |
| table | Optional[str] | No | - | Table name for SQL/Delta |
| path | Optional[str] | No | - | Path for file-based outputs |
| register_table | Optional[str] | No | - | Register file output as external table (Spark/Delta only) |
| mode | WriteMode | No | WriteMode.OVERWRITE |
Write mode. Options: 'overwrite', 'append', 'upsert', 'append_once', 'merge'. Use 'append_once' for idempotent Bronze ingestion (requires 'keys' in options). See WriteMode enum for details. |
| partition_by | List[str] | No | PydanticUndefined |
List of columns to physically partition the output by (folder structure). Use for low-cardinality columns (e.g. date, country). |
| zorder_by | List[str] | No | PydanticUndefined |
List of columns to Z-Order by. Improves read performance for high-cardinality columns used in filters/joins (Delta only). |
| table_properties | Dict[str, str] | No | PydanticUndefined |
Delta table properties. Overrides global performance.delta_table_properties. Example: {'delta.columnMapping.mode': 'name'} to allow special characters in column names. |
| merge_schema | bool | No | False |
Allow schema evolution (mergeSchema option in Delta) |
| overwrite_schema | bool | No | False |
Allow schema overwrite on mode=overwrite (overwriteSchema option in Delta). Use when the incoming schema differs from the existing table schema. |
| first_run_query | Optional[str] | No | - | SQL query for full-load on first run (High Water Mark pattern). If set, uses this query when target table doesn't exist, then switches to incremental. Only applies to SQL reads. |
| options | Dict[str, Any] | No | PydanticUndefined |
Format-specific options |
| auto_optimize | bool | AutoOptimizeConfig | No | - | Auto-run OPTIMIZE and VACUUM after write (Delta only) |
| add_metadata | bool | WriteMetadataConfig | No | - | Add metadata columns for Bronze layer lineage. Set to true to add all applicable columns, or provide a WriteMetadataConfig for selective columns. Columns: _extracted_at, _source_file (file sources), _source_connection, _source_table (SQL sources). |
| skip_if_unchanged | bool | No | False |
Skip write if DataFrame content is identical to previous write. Computes SHA256 hash of entire DataFrame and compares to stored hash in Delta table metadata. Useful for snapshot tables without timestamps to avoid redundant appends. Only supported for Delta format. |
| skip_hash_columns | Optional[List[str]] | No | - | Columns to include in hash computation for skip_if_unchanged. If None, all columns are used. Specify a subset to ignore volatile columns like timestamps. |
| skip_hash_sort_columns | Optional[List[str]] | No | - | Columns to sort by before hashing for deterministic comparison. Required if row order may vary between runs. Typically your business key columns. |
| streaming | Optional[StreamingWriteConfig] | No | - | Streaming write configuration for Spark Structured Streaming. When set, uses writeStream instead of batch write. Requires a streaming DataFrame from a streaming read source. |
| merge_keys | Optional[List[str]] | No | - | Key columns for SQL Server MERGE operations. Required when mode='merge'. These columns form the ON clause of the MERGE statement. |
| merge_options | Optional[SqlServerMergeOptions] | No | - | Options for SQL Server MERGE operations (conditions, staging, audit cols) |
| overwrite_options | Optional[SqlServerOverwriteOptions] | No | - | Options for SQL Server overwrite operations (strategy, audit cols) |
WriteMetadataConfig¶
Used in: WriteConfig
Configuration for metadata columns added during Bronze writes.
📋 Bronze Metadata Guide¶
Business Problem: "We need lineage tracking and debugging info for our Bronze layer data."
The Solution: Add metadata columns during ingestion for traceability.
Recipe 1: Add All Metadata (Recommended)
write:
connection: bronze
table: customers
mode: append
add_metadata: true # adds all applicable columns
Recipe 2: Selective Metadata
write:
connection: bronze
table: customers
mode: append
add_metadata:
extracted_at: true
source_file: true
source_connection: false
source_table: false
Available Columns:
- _extracted_at: Pipeline execution timestamp (all sources)
- _source_file: Source filename/path (file sources only)
- _source_connection: Connection name used (all sources)
- _source_table: Table or query name (SQL sources only)
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| extracted_at | bool | No | True |
Add _extracted_at column with pipeline execution timestamp |
| source_file | bool | No | True |
Add _source_file column with source filename (file sources only) |
| source_connection | bool | No | False |
Add _source_connection column with connection name |
| source_table | bool | No | False |
Add _source_table column with table/query name (SQL sources only) |
StreamingWriteConfig¶
Used in: WriteConfig
Configuration for Spark Structured Streaming writes.
🚀 "Real-Time Pipeline" Guide¶
Business Problem: "I need to process data continuously as it arrives from Kafka/Event Hubs and write it to Delta Lake in near real-time."
The Solution: Configure streaming write with checkpoint location for fault tolerance and trigger interval for processing frequency.
Recipe: Streaming Ingestion
write:
connection: "silver_lake"
format: "delta"
table: "events_stream"
streaming:
output_mode: append
checkpoint_location: "/checkpoints/events_stream"
trigger:
processing_time: "10 seconds"
Recipe: One-Time Streaming (Batch-like)
write:
connection: "silver_lake"
format: "delta"
table: "events_batch"
streaming:
output_mode: append
checkpoint_location: "/checkpoints/events_batch"
trigger:
available_now: true
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| output_mode | Literal['append', 'update', 'complete'] | No | append |
Output mode for streaming writes. 'append' - Only new rows. 'update' - Updated rows only. 'complete' - Entire result table (requires aggregation). |
| checkpoint_location | str | Yes | - | Path for streaming checkpoints. Required for fault tolerance. Must be a reliable storage location (e.g., cloud storage, DBFS). |
| trigger | Optional[TriggerConfig] | No | - | Trigger configuration. If not specified, processes data as fast as possible. Use 'processing_time' for micro-batch intervals, 'once' for single batch, 'available_now' for processing all available data then stopping. |
| query_name | Optional[str] | No | - | Name for the streaming query (useful for monitoring and debugging) |
| await_termination | Optional[bool] | No | False |
Wait for the streaming query to terminate. Set to True for batch-like streaming with 'once' or 'available_now' triggers. |
| timeout_seconds | Optional[int] | No | - | Timeout in seconds when await_termination is True. If None, waits indefinitely. |
TriggerConfig¶
Used in: StreamingWriteConfig
Configuration for streaming trigger intervals.
Specify exactly one of the trigger options.
Example:
Or for one-time processing:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| processing_time | Optional[str] | No | - | Trigger interval as duration string (e.g., '10 seconds', '1 minute') |
| once | Optional[bool] | No | - | Process all available data once and stop |
| available_now | Optional[bool] | No | - | Process all available data in multiple batches, then stop |
| continuous | Optional[str] | No | - | Continuous processing with checkpoint interval (e.g., '1 second') |
AutoOptimizeConfig¶
Used in: WriteConfig
Configuration for Delta Lake automatic optimization.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| enabled | bool | No | True |
Enable auto optimization |
| vacuum_retention_hours | int | No | 168 |
Hours to retain history for VACUUM (default 7 days). Set to 0 to disable VACUUM. |
ApiOptionsConfig¶
Complete options configuration for API data sources (format: api).
When to Use: Pull data from REST APIs with pagination, retry, and rate limiting.
See Also: API Data Sources Guide
Example:
nodes:
- name: api_data
read:
connection: my_api
format: api
path: /v1/records
options:
pagination:
type: offset_limit
limit: 1000
max_pages: 100
response:
items_path: data.records
add_fields:
_source: "my_api"
_fetched_at: "$now"
retry:
max_retries: 3
rate_limit:
requests_per_second: 5
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| pagination | Optional[ApiPaginationConfig] | No | - | Pagination configuration |
| response | Optional[ApiResponseConfig] | No | - | Response parsing configuration |
| retry | Optional[ApiRetryConfig] | No | - | Retry configuration |
| rate_limit | Optional[ApiRateLimitConfig] | No | - | Rate limiting configuration |
| method | str | No | GET |
HTTP method (GET, POST) |
| headers | Optional[Dict[str, str]] | No | - | Additional HTTP headers |
| params | Optional[Dict[str, str]] | No | - | Additional query parameters |
| json_body | Optional[Dict[str, Any]] | No | - | JSON body for POST requests |
ApiPaginationConfig¶
Used in: ApiOptionsConfig
Pagination configuration for API data sources.
When to Use: Configure how to paginate through API results.
Example (offset/limit pagination):
read:
format: api
options:
pagination:
type: offset_limit
offset_param: skip
limit_param: limit
limit: 1000
max_pages: 100
Example (cursor-based pagination):
read:
format: api
options:
pagination:
type: cursor
cursor_path: meta.next_cursor
cursor_param: cursor
Example (link header pagination - GitHub style):
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | ApiPaginationType | No | ApiPaginationType.OFFSET_LIMIT |
Pagination strategy to use |
| offset_param | str | No | offset |
Query param name for offset (offset_limit type) |
| limit_param | str | No | limit |
Query param name for limit |
| limit | int | No | 100 |
Number of records per page |
| max_pages | Optional[int] | No | - | Maximum pages to fetch (None = unlimited) |
| cursor_path | Optional[str] | No | - | Dotted path to cursor in response (cursor type) |
| cursor_param | Optional[str] | No | - | Query param name for cursor (cursor type) |
| page_param | str | No | page |
Query param name for page number (page_number type) |
| start_page | int | No | 1 |
Starting page number (page_number type) |
ApiRateLimitConfig¶
Used in: ApiOptionsConfig
Rate limiting configuration for API requests.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| requests_per_second | Optional[float] | No | - | Max requests per second (None = no limit) |
ApiResponseConfig¶
Used in: ApiOptionsConfig
Response parsing configuration for API data sources.
When to Use: Configure how to extract data from API responses.
Date Variables: Use in add_fields OR params for dynamic dates:
| Variable | Format | Example Value |
|---|---|---|
$now |
ISO timestamp | 2024-01-15T10:30:00+00:00 |
$today |
YYYY-MM-DD | 2024-01-15 |
$yesterday |
YYYY-MM-DD | 2024-01-14 |
$date |
YYYY-MM-DD | 2024-01-15 |
$7_days_ago |
YYYY-MM-DD | 2024-01-08 |
$30_days_ago |
YYYY-MM-DD | 2023-12-16 |
$90_days_ago |
YYYY-MM-DD | 2023-10-17 |
$start_of_week |
YYYY-MM-DD | 2024-01-15 |
$start_of_month |
YYYY-MM-DD | 2024-01-01 |
$start_of_year |
YYYY-MM-DD | 2024-01-01 |
$today_compact |
YYYYMMDD | 20240115 |
$yesterday_compact |
YYYYMMDD | 20240114 |
$7_days_ago_compact |
YYYYMMDD | 20240108 |
$30_days_ago_compact |
YYYYMMDD | 20231216 |
$90_days_ago_compact |
YYYYMMDD | 20231017 |
Example (add_fields):
read:
format: api
options:
response:
items_path: results
add_fields:
_fetched_at: "$now"
_load_date: "$today"
Example (params with compact dates for openFDA):
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| items_path | str | No | - | Dotted path to items array in response (e.g., 'results', 'data.items'). Empty = response is the array. |
| dict_to_list | bool | No | False |
If True and items_path resolves to a dict, extract dict values as rows with keys preserved in '_key' field. Useful for APIs like ddragon that return {'Aatrox': {...}, 'Ahri': {...}}. |
| add_fields | Optional[Dict[str, Any]] | No | - | Fields to add to each record. Supports date variables: $now, $today, $yesterday, $7_days_ago, etc. |
ApiRetryConfig¶
Used in: ApiOptionsConfig
Retry configuration for API requests.
Example:
read:
format: api
options:
retry:
max_retries: 3
backoff_factor: 2.0
retry_codes: [429, 500, 502, 503]
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| max_retries | int | No | 3 |
Maximum retry attempts |
| backoff_factor | float | No | 2.0 |
Exponential backoff multiplier |
| retry_codes | List[int] | No | [429, 500, 502, 503, 504] |
HTTP status codes to retry on |
PrivacyConfig¶
Used in: NodeConfig
Configuration for PII anonymization.
🔐 Privacy & PII Protection¶
How It Works:
1. Mark columns as pii: true in the columns metadata
2. Configure a privacy block with the anonymization method
3. During node execution, all columns marked as PII (and inherited from dependencies) are anonymized
4. Upstream PII markings are inherited by downstream nodes
Example:
columns:
customer_email:
pii: true # Mark as PII
customer_id:
pii: false
privacy:
method: hash # hash, mask, or redact
salt: "secret_key" # Optional: makes hash unique/secure
declassify: [] # Remove columns from PII protection
Methods:
- hash: SHA256 hash (length 64). With salt, prevents pre-computed rainbow tables.
- mask: Show only last 4 chars, replace rest with *. Example: john@email.com → ****@email.com
- redact: Replace entire value with [REDACTED]
Important:
- pii: true alone does NOTHING. You must set a privacy.method to actually mask data.
- PII inheritance: If dependency outputs PII columns, this node inherits them unless declassified.
- Salt is optional but recommended for hash to prevent attacks.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| method | PrivacyMethod | Yes | - | Anonymization method: 'hash' (SHA256), 'mask' (show last 4), or 'redact' ([REDACTED]) |
| salt | Optional[str] | No | - | Salt for hashing (optional but recommended). Appended before hashing to create unique hashes. Example: 'company_secret_key_2025' |
| declassify | List[str] | No | PydanticUndefined |
List of columns to remove from PII protection (stops inheritance from upstream). Example: ['customer_id'] |
SqlServerAuditColsConfig¶
Audit column configuration for SQL Server merge operations.
These columns are automatically populated with GETUTCDATE() during merge:
- created_col: Set on INSERT only
- updated_col: Set on INSERT and UPDATE
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| created_col | Optional[str] | No | - | Column name for creation timestamp (set on INSERT) |
| updated_col | Optional[str] | No | - | Column name for update timestamp (set on INSERT and UPDATE) |
SqlServerMergeOptions¶
Used in: WriteConfig
Options for SQL Server MERGE operations (Phase 1).
Enables incremental sync from Spark to SQL Server using T-SQL MERGE. Data is written to a staging table, then merged into the target.
Basic Usage¶
write:
connection: azure_sql
format: sql_server
table: sales.fact_orders
mode: merge
merge_keys: [DateId, store_id]
merge_options:
update_condition: "source._hash_diff != target._hash_diff"
exclude_columns: [_hash_diff]
audit_cols:
created_col: created_ts
updated_col: updated_ts
Conditions¶
update_condition: Only update rows matching this condition (e.g., hash diff)delete_condition: Delete rows matching this condition (soft delete pattern)insert_condition: Only insert rows matching this condition
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| update_condition | Optional[str] | No | - | SQL condition for WHEN MATCHED UPDATE. Use 'source.' and 'target.' prefixes. Example: 'source._hash_diff != target._hash_diff' |
| delete_condition | Optional[str] | No | - | SQL condition for WHEN MATCHED DELETE. Example: 'source._is_deleted = 1' |
| insert_condition | Optional[str] | No | - | SQL condition for WHEN NOT MATCHED INSERT. Example: 'source.is_valid = 1' |
| exclude_columns | List[str] | No | PydanticUndefined |
Columns to exclude from MERGE (not written to target table) |
| staging_schema | str | No | staging |
Schema for staging table. Table name: {staging_schema}.{table}_staging |
| audit_cols | Optional[SqlServerAuditColsConfig] | No | - | Audit columns for created/updated timestamps |
| validations | Optional[ForwardRef('SqlServerMergeValidationConfig')] | No | - | Validation checks before merge (null keys, duplicate keys) |
| auto_create_schema | bool | No | False |
Auto-create schema if it doesn't exist (Phase 4). Runs CREATE SCHEMA IF NOT EXISTS. |
| auto_create_table | bool | No | False |
Auto-create target table if it doesn't exist (Phase 4). Infers schema from DataFrame. |
| schema_evolution | Optional[ForwardRef('SqlServerSchemaEvolutionConfig')] | No | - | Schema evolution configuration (Phase 4). Controls handling of schema differences. |
| batch_size | Optional[int] | No | - | Batch size for staging table writes (Phase 4). Chunks large DataFrames for memory efficiency. |
| primary_key_on_merge_keys | bool | No | False |
Create a clustered primary key on merge_keys when auto-creating table. Enforces uniqueness. |
| index_on_merge_keys | bool | No | False |
Create a nonclustered index on merge_keys. Use if primary key already exists elsewhere. |
| incremental | bool | No | False |
Enable incremental merge optimization. When True, reads target table's keys and hashes to determine which rows changed, then only writes changed rows to staging. Significantly faster when few rows change between runs. |
| hash_column | Optional[str] | No | - | Name of pre-computed hash column in DataFrame for change detection. Used when incremental=True. If not specified, will auto-detect '_hash_diff' column. |
| change_detection_columns | Optional[List[str]] | No | - | Columns to use for computing change detection hash. Used when incremental=True and no hash_column is specified. If None, uses all non-key columns. |
| bulk_copy | bool | No | False |
Enable bulk copy mode for fast staging table loads. Writes data to ADLS as staging file, then uses BULK INSERT. 10-50x faster than JDBC for large datasets. Requires staging_connection. |
| staging_connection | Optional[str] | No | - | Connection name for staging files (ADLS/Blob storage). Required when bulk_copy=True. The connection must have write access. |
| staging_path | Optional[str] | No | - | Path prefix for staging files. Defaults to 'odibi_staging/bulk'. Files are automatically cleaned up after successful load. |
| external_data_source | Optional[str] | No | - | SQL Server external data source name for BULK INSERT. If not specified and auto_setup=True, will be auto-generated as 'odibi_{staging_connection}'. |
| keep_staging_files | bool | No | False |
Keep staging files after load (for debugging). Default deletes after success. |
| auto_setup | bool | No | False |
Auto-create SQL Server external data source and credential if they don't exist. Reads auth credentials from staging_connection and creates matching SQL objects. Requires elevated SQL permissions (ALTER ANY EXTERNAL DATA SOURCE, CONTROL). |
| force_recreate | bool | No | False |
Force recreation of external data source and credential even if they exist. Use when you've rotated SAS tokens or storage keys and need to update SQL Server. Has no effect if auto_setup=False. |
| csv_options | Optional[Dict[str, str]] | No | - | Custom CSV options for bulk copy when writing to Azure SQL Database. Passed to Spark CSV writer. Defaults: quote='"', escape='"', escapeQuotes='true', nullValue='', emptyValue='', encoding='UTF-8'. Override any option here. |
SqlServerMergeValidationConfig¶
Validation configuration for SQL Server merge/overwrite operations.
Validates source data before writing to SQL Server.
Example:
merge_options:
validations:
check_null_keys: true
check_duplicate_keys: true
fail_on_validation_error: true
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| check_null_keys | bool | No | True |
Fail if merge_keys contain NULL values |
| check_duplicate_keys | bool | No | True |
Fail if merge_keys have duplicate combinations |
| fail_on_validation_error | bool | No | True |
If False, log warning instead of failing on validation errors |
SqlServerOverwriteOptions¶
Used in: WriteConfig
Options for SQL Server overwrite operations (Phase 2).
Enhanced overwrite with multiple strategies for different use cases.
Strategies¶
truncate_insert: TRUNCATE TABLE then INSERT (fastest, requires TRUNCATE permission)drop_create: DROP TABLE, CREATE TABLE, INSERT (refreshes schema)delete_insert: DELETE FROM then INSERT (works with limited permissions)
Example¶
write:
connection: azure_sql
format: sql_server
table: fact.combined_downtime
mode: overwrite
overwrite_options:
strategy: truncate_insert
audit_cols:
created_col: created_ts
updated_col: updated_ts
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| strategy | SqlServerOverwriteStrategy | No | SqlServerOverwriteStrategy.TRUNCATE_INSERT |
Overwrite strategy: truncate_insert, drop_create, delete_insert |
| audit_cols | Optional[SqlServerAuditColsConfig] | No | - | Audit columns for created/updated timestamps |
| validations | Optional[SqlServerMergeValidationConfig] | No | - | Validation checks before overwrite |
| auto_create_schema | bool | No | False |
Auto-create schema if it doesn't exist (Phase 4). Runs CREATE SCHEMA IF NOT EXISTS. |
| auto_create_table | bool | No | False |
Auto-create target table if it doesn't exist (Phase 4). Infers schema from DataFrame. |
| schema_evolution | Optional[SqlServerSchemaEvolutionConfig] | No | - | Schema evolution configuration (Phase 4). Controls handling of schema differences. |
| batch_size | Optional[int] | No | - | Batch size for writes (Phase 4). Chunks large DataFrames for memory efficiency. |
| bulk_copy | bool | No | False |
Enable bulk copy mode for fast writes. Writes data to ADLS as staging file, then uses BULK INSERT. 10-50x faster than JDBC for large datasets. Requires staging_connection. |
| staging_connection | Optional[str] | No | - | Connection name for staging files (ADLS/Blob storage). Required when bulk_copy=True. The connection must have write access. |
| staging_path | Optional[str] | No | - | Path prefix for staging files. Defaults to 'odibi_staging/bulk'. Files are automatically cleaned up after successful load. |
| external_data_source | Optional[str] | No | - | SQL Server external data source name for BULK INSERT. If not specified and auto_setup=True, will be auto-generated as 'odibi_{staging_connection}'. |
| keep_staging_files | bool | No | False |
Keep staging files after load (for debugging). Default deletes after success. |
| auto_setup | bool | No | False |
Auto-create SQL Server external data source and credential if they don't exist. Reads auth credentials from staging_connection and creates matching SQL objects. Requires elevated SQL permissions (ALTER ANY EXTERNAL DATA SOURCE, CONTROL). |
| force_recreate | bool | No | False |
Force recreation of external data source and credential even if they exist. Use when you've rotated SAS tokens or storage keys and need to update SQL Server. Has no effect if auto_setup=False. |
| csv_options | Optional[Dict[str, str]] | No | - | Custom CSV options for bulk copy when writing to Azure SQL Database. Passed to Spark CSV writer. Defaults: quote='"', escape='"', escapeQuotes='true', nullValue='', emptyValue='', encoding='UTF-8'. Override any option here. |
SqlServerSchemaEvolutionConfig¶
Schema evolution configuration for SQL Server operations (Phase 4).
Controls automatic schema changes when DataFrame schema differs from target table.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| mode | SqlServerSchemaEvolutionMode | No | SqlServerSchemaEvolutionMode.STRICT |
Schema evolution mode: strict (fail), evolve (add columns), ignore (skip mismatched) |
| add_columns | bool | No | False |
If mode='evolve', automatically add new columns via ALTER TABLE ADD COLUMN |
TransformStep¶
Used in: TransformConfig
Single transformation step.
Supports four step types (exactly one required):
sql- Inline SQL query stringsql_file- Path to external .sql file (relative to the YAML file defining the node)function- Registered Python function nameoperation- Built-in operation (e.g., drop_duplicates)
sql_file Example:
If your project structure is:
project.yaml # imports pipelines/silver/silver.yaml
pipelines/
silver/
silver.yaml # defines the node
sql/
transform.sql # your SQL file
In silver.yaml, use a path relative to silver.yaml:
Important: The path is resolved relative to the YAML file where the node is defined,
NOT the project.yaml that imports it. Do NOT use absolute paths like /pipelines/silver/sql/....
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| sql | Optional[str] | No | - | Inline SQL query. Use df to reference the current DataFrame. |
| sql_file | Optional[str] | No | - | Path to external .sql file, relative to the YAML file defining the node. Example: 'sql/transform.sql' resolves relative to the node's source YAML. |
| function | Optional[str] | No | - | Name of a registered Python function (@transform or @register). |
| operation | Optional[str] | No | - | Built-in operation name (e.g., drop_duplicates, fill_na). |
| params | Dict[str, Any] | No | PydanticUndefined |
Parameters to pass to function or operation. |
Contracts (Data Quality Gates)¶
Contracts (Pre-Transform Checks)¶
Contracts are fail-fast data quality checks that run on input data before transformation. They always halt execution on failure - use them to prevent bad data from entering the pipeline.
Contracts vs Validation vs Quality Gates:
| Feature | When it Runs | On Failure | Use Case |
|---|---|---|---|
| Contracts | Before transform | Always fails | Input data quality (not-null, unique keys) |
| Validation | After transform | Configurable (fail/warn/quarantine) | Output data quality (ranges, formats) |
| Quality Gates | After validation | Configurable (abort/warn) | Pipeline-level thresholds (pass rate, row counts) |
| Quarantine | With validation | Routes bad rows | Capture invalid records for review |
See Also: - Validation Guide - Full validation configuration - Quarantine Guide - Quarantine setup and review - Getting Started: Validation
Example:
- name: "process_orders"
contracts:
- type: not_null
columns: [order_id, customer_id]
- type: row_count
min: 100
- type: freshness
column: created_at
max_age: "24h"
read:
source: raw_orders
AcceptedValuesTest¶
Used in: NodeConfig, ValidationConfig
Ensures a column only contains values from an allowed list.
When to Use: Enum-like fields, status columns, categorical data validation.
See Also: Contracts Overview
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['accepted_values'] | No | TestType.ACCEPTED_VALUES |
- |
| name | Optional[str] | No | - | Optional name for the check |
| on_fail | ContractSeverity | No | ContractSeverity.FAIL |
Action on failure |
| column | str | Yes | - | Column to check |
| values | List[Any] | Yes | - | Allowed values |
CustomSQLTest¶
Used in: NodeConfig, ValidationConfig
Runs a custom SQL condition and fails if too many rows violate it.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['custom_sql'] | No | TestType.CUSTOM_SQL |
- |
| name | Optional[str] | No | - | Optional name for the check |
| on_fail | ContractSeverity | No | ContractSeverity.FAIL |
Action on failure |
| condition | str | Yes | - | SQL condition that should be true for valid rows |
| threshold | float | No | 0.0 |
Failure rate threshold (0.0 = strictly no failures allowed) |
DistributionContract¶
Used in: NodeConfig, ValidationConfig
Checks if a column's statistical distribution is within expected bounds.
When to Use: Detect data drift, anomaly detection, statistical monitoring.
See Also: Contracts Overview
contracts:
- type: distribution
column: price
metric: mean
threshold: ">100" # Mean must be > 100
on_fail: warn
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['distribution'] | No | TestType.DISTRIBUTION |
- |
| name | Optional[str] | No | - | Optional name for the check |
| on_fail | ContractSeverity | No | ContractSeverity.WARN |
- |
| column | str | Yes | - | Column to analyze |
| metric | Literal['mean', 'min', 'max', 'null_percentage'] | Yes | - | Statistical metric to check |
| threshold | str | Yes | - | Threshold expression (e.g., '>100', '<0.05') |
FreshnessContract¶
Used in: NodeConfig, ValidationConfig
Validates that data is not stale by checking a timestamp column.
When to Use: Source systems that should update regularly, SLA monitoring.
See Also: Contracts Overview
contracts:
- type: freshness
column: updated_at
max_age: "24h" # Fail if no data newer than 24 hours
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['freshness'] | No | TestType.FRESHNESS |
- |
| name | Optional[str] | No | - | Optional name for the check |
| on_fail | ContractSeverity | No | ContractSeverity.FAIL |
- |
| column | str | No | updated_at |
Timestamp column to check |
| max_age | str | Yes | - | Maximum allowed age (e.g., '24h', '7d') |
NotNullTest¶
Used in: NodeConfig, ValidationConfig
Ensures specified columns contain no NULL values.
When to Use: Primary keys, required fields, foreign keys that must resolve.
See Also: Contracts Overview
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['not_null'] | No | TestType.NOT_NULL |
- |
| name | Optional[str] | No | - | Optional name for the check |
| on_fail | ContractSeverity | No | ContractSeverity.FAIL |
Action on failure |
| columns | List[str] | Yes | - | Columns that must not contain nulls |
RangeTest¶
Used in: NodeConfig, ValidationConfig
Ensures column values fall within a specified range.
When to Use: Numeric bounds validation (ages, prices, quantities), date ranges.
See Also: Contracts Overview
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['range'] | No | TestType.RANGE |
- |
| name | Optional[str] | No | - | Optional name for the check |
| on_fail | ContractSeverity | No | ContractSeverity.FAIL |
Action on failure |
| column | str | Yes | - | Column to check |
| min | int | float | str | No | - | Minimum value (inclusive) |
| max | int | float | str | No | - | Maximum value (inclusive) |
RegexMatchTest¶
Used in: NodeConfig, ValidationConfig
Ensures column values match a regex pattern.
When to Use: Format validation (emails, phone numbers, IDs, codes).
See Also: Contracts Overview
contracts:
- type: regex_match
column: email
pattern: "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['regex_match'] | No | TestType.REGEX_MATCH |
- |
| name | Optional[str] | No | - | Optional name for the check |
| on_fail | ContractSeverity | No | ContractSeverity.FAIL |
Action on failure |
| column | str | Yes | - | Column to check |
| pattern | str | Yes | - | Regex pattern to match |
RowCountTest¶
Used in: NodeConfig, ValidationConfig
Validates that row count falls within expected bounds.
When to Use: Ensure minimum data completeness, detect truncated loads, cap batch sizes.
See Also: Contracts Overview, GateConfig
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['row_count'] | No | TestType.ROW_COUNT |
- |
| name | Optional[str] | No | - | Optional name for the check |
| on_fail | ContractSeverity | No | ContractSeverity.FAIL |
Action on failure |
| min | Optional[int] | No | - | Minimum row count |
| max | Optional[int] | No | - | Maximum row count |
SchemaContract¶
Used in: NodeConfig, ValidationConfig
Validates that the DataFrame schema matches expected columns.
When to Use: Enforce schema stability, detect upstream schema drift, ensure column presence.
See Also: Contracts Overview, SchemaPolicyConfig
Uses the columns metadata from NodeConfig to verify schema.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['schema'] | No | TestType.SCHEMA |
- |
| name | Optional[str] | No | - | Optional name for the check |
| on_fail | ContractSeverity | No | ContractSeverity.FAIL |
- |
| strict | bool | No | True |
If true, fail on unexpected columns |
UniqueTest¶
Used in: NodeConfig, ValidationConfig
Ensures specified columns (or combination) contain unique values.
When to Use: Primary keys, natural keys, deduplication verification.
See Also: Contracts Overview
contracts:
- type: unique
columns: [order_id] # Single column
# OR composite key:
- type: unique
columns: [customer_id, order_date] # Composite uniqueness
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['unique'] | No | TestType.UNIQUE |
- |
| name | Optional[str] | No | - | Optional name for the check |
| on_fail | ContractSeverity | No | ContractSeverity.FAIL |
Action on failure |
| columns | List[str] | Yes | - | Columns that must be unique (composite key if multiple) |
VolumeDropTest¶
Used in: NodeConfig, ValidationConfig
Checks if row count dropped significantly compared to history.
When to Use: Detect source outages, partial loads, or data pipeline issues.
See Also: Contracts Overview, RowCountTest
Formula: (current - avg) / avg < -threshold
contracts:
- type: volume_drop
threshold: 0.5 # Fail if > 50% drop from 7-day average
lookback_days: 7
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['volume_drop'] | No | TestType.VOLUME_DROP |
- |
| name | Optional[str] | No | - | Optional name for the check |
| on_fail | ContractSeverity | No | ContractSeverity.FAIL |
Action on failure |
| threshold | float | No | 0.5 |
Max allowed drop (0.5 = 50% drop) |
| lookback_days | int | No | 7 |
Days of history to average |
Global Settings¶
LineageConfig¶
Used in: ProjectConfig
Configuration for OpenLineage integration.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| url | Optional[str] | No | - | OpenLineage API URL |
| namespace | str | No | odibi |
Namespace for jobs |
| api_key | Optional[str] | No | - | API Key |
AlertConfig¶
Used in: ProjectConfig
Configuration for alerts with throttling support.
Supports Slack, Teams, and generic webhooks with event-specific payloads.
Available Events:
- on_start - Pipeline started
- on_success - Pipeline completed successfully
- on_failure - Pipeline failed
- on_quarantine - Rows were quarantined
- on_gate_block - Quality gate blocked the pipeline
- on_threshold_breach - A threshold was exceeded
Example:
alerts:
- type: slack
url: "${SLACK_WEBHOOK_URL}"
on_events:
- on_failure
- on_quarantine
- on_gate_block
metadata:
throttle_minutes: 15
max_per_hour: 10
channel: "#data-alerts"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | AlertType | Yes | - | - |
| url | str | Yes | - | Webhook URL |
| on_events | List[AlertEvent] | No | [<AlertEvent.ON_FAILURE: 'on_failure'>] |
Events to trigger alert: on_start, on_success, on_failure, on_quarantine, on_gate_block, on_threshold_breach |
| metadata | Dict[str, Any] | No | PydanticUndefined |
Extra metadata: throttle_minutes, max_per_hour, channel, etc. |
LoggingConfig¶
Used in: ProjectConfig
Logging configuration.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| level | LogLevel | No | LogLevel.INFO |
- |
| structured | bool | No | False |
Output JSON logs |
| metadata | Dict[str, Any] | No | PydanticUndefined |
Extra metadata in logs |
PerformanceConfig¶
Used in: ProjectConfig
Performance tuning configuration.
Example:
performance:
use_arrow: true
spark_config:
"spark.sql.shuffle.partitions": "200"
"spark.sql.adaptive.enabled": "true"
"spark.databricks.delta.optimizeWrite.enabled": "true"
delta_table_properties:
"delta.columnMapping.mode": "name"
Spark Config Notes:
- Configs are applied via spark.conf.set() at runtime
- For existing sessions (e.g., Databricks), only runtime-settable configs will take effect
- Session-level configs (e.g., spark.executor.memory) require session restart
- Common runtime-safe configs: shuffle partitions, adaptive query execution, Delta optimizations
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| use_arrow | bool | No | True |
Use Apache Arrow-backed DataFrames (Pandas only). Reduces memory and speeds up I/O. |
| spark_config | Dict[str, str] | No | PydanticUndefined |
Spark configuration settings applied at runtime via spark.conf.set(). Example: {'spark.sql.shuffle.partitions': '200', 'spark.sql.adaptive.enabled': 'true'}. Note: Some configs require session restart and cannot be set at runtime. |
| delta_table_properties | Dict[str, str] | No | PydanticUndefined |
Default table properties applied to all Delta writes. Example: {'delta.columnMapping.mode': 'name'} to allow special characters in column names. |
| skip_null_profiling | bool | No | False |
Skip null profiling in metadata collection phase. Reduces execution time for large DataFrames by avoiding an additional Spark job. |
| skip_catalog_writes | bool | No | False |
Skip catalog metadata writes (register_asset, track_schema, log_pattern, record_lineage) after each node write. Significantly improves performance for high-throughput pipelines like Bronze layer ingestion. Set to true when catalog tracking is not needed. |
| skip_run_logging | bool | No | False |
Skip batch catalog writes at pipeline end (log_runs_batch, register_outputs_batch). Saves 10-20s per pipeline run. Enable when you don't need run history in the catalog. Stories are still generated and contain full execution details. |
RetryConfig¶
Used in: ProjectConfig
Retry configuration for transient failures.
Automatically retries failed operations (database timeouts, network issues, rate limits) with configurable backoff strategy.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| enabled | bool | No | True |
Enable automatic retry on transient failures (timeouts, connection errors) |
| max_attempts | int | No | 3 |
Maximum number of retry attempts before failing. Total attempts = 1 + max_attempts. |
| backoff | BackoffStrategy | No | BackoffStrategy.EXPONENTIAL |
Wait strategy between retries. 'exponential' (2^n seconds, recommended), 'linear' (n seconds), or 'constant' (fixed 1 second). |
StoryConfig¶
Used in: ProjectConfig
Story generation configuration.
Stories are ODIBI's core value - execution reports with lineage. They must use a connection for consistent, traceable output.
Example:
story:
connection: "local_data"
path: "stories/"
retention_days: 30
failure_sample_size: 100
max_failure_samples: 500
max_sampled_validations: 5
Failure Sample Settings:
- failure_sample_size: Number of failed rows to capture per validation (default: 100)
- max_failure_samples: Total failed rows across all validations (default: 500)
- max_sampled_validations: After this many validations, show only counts (default: 5)
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| connection | str | Yes | - | Connection name for story output (uses connection's path resolution) |
| path | str | Yes | - | Path for stories (relative to connection base_path) |
| max_sample_rows | int | No | 10 |
Maximum rows to include in data samples within story reports. Higher values give more debugging context but increase file size. Set to 0 to disable data sampling. |
| auto_generate | bool | No | True |
- |
| retention_days | Optional[int] | No | 30 |
Days to keep stories |
| retention_count | Optional[int] | No | 100 |
Max number of stories to keep |
| failure_sample_size | int | No | 100 |
Number of failed rows to capture per validation rule |
| max_failure_samples | int | No | 500 |
Maximum total failed rows across all validations |
| max_sampled_validations | int | No | 5 |
After this many validations, show only counts (no samples) |
| async_generation | bool | No | False |
Generate stories asynchronously (fire-and-forget). Pipeline returns immediately while story writes in background. Improves multi-pipeline performance by ~5-10s per pipeline. |
| generate_lineage | bool | No | True |
Generate combined lineage graph from all stories. Creates a unified view of data flow across pipelines. |
| docs | Optional[ForwardRef('DocsConfig')] | No | - | Documentation generation settings. Generates README.md, TECHNICAL_DETAILS.md, NODE_CARDS/*.md from Story data. |
Transformation Reference¶
How to Use Transformers¶
You can use any transformer in two ways:
1. As a Top-Level Transformer ("The App") Use this for major operations that define the node's purpose (e.g. Merge, SCD2).
2. As a Step in a Chain ("The Script")
Use this for smaller operations within a transform block (e.g. clean_text, filter).
Available Transformers:
The models below describe the params required for each transformer.
📂 Common Operations¶
CaseWhenCase¶
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| condition | str | Yes | - | - |
| value | str | Yes | - | - |
add_prefix (AddPrefixParams)¶
Adds a prefix to column names.
Configuration for adding a prefix to column names.
Example - All columns:
Example - Specific columns:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| prefix | str | Yes | - | Prefix to add to column names |
| columns | Optional[List[str]] | No | - | Columns to prefix (default: all columns) |
| exclude | Optional[List[str]] | No | - | Columns to exclude from prefixing |
add_suffix (AddSuffixParams)¶
Adds a suffix to column names.
Configuration for adding a suffix to column names.
Example - All columns:
Example - Specific columns:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| suffix | str | Yes | - | Suffix to add to column names |
| columns | Optional[List[str]] | No | - | Columns to suffix (default: all columns) |
| exclude | Optional[List[str]] | No | - | Columns to exclude from suffixing |
case_when (CaseWhenParams)¶
Implements structured CASE WHEN logic.
Configuration for conditional logic.
Example:
case_when:
output_col: "age_group"
default: "'Adult'"
cases:
- condition: "age < 18"
value: "'Minor'"
- condition: "age > 65"
value: "'Senior'"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| cases | List[CaseWhenCase] | Yes | - | List of conditional branches |
| default | str | No | NULL |
Default value if no condition met |
| output_col | str | Yes | - | Name of the resulting column |
cast_columns (CastColumnsParams)¶
Casts specific columns to new types while keeping others intact.
Normalizes common type aliases (int->INTEGER, str->STRING, float->DOUBLE)
and passes through complex SQL types as-is (e.g., ARRAY
Args: context: Current execution context with dataframe params: Cast configuration with column-to-type mapping
Returns: New context with columns cast to specified types
Example: Cast columns using simple type aliases and complex SQL types:
Configuration for column type casting.
Example:
cast_columns:
casts:
age: "int"
salary: "DOUBLE"
created_at: "TIMESTAMP"
tags: "ARRAY<STRING>" # Raw SQL types allowed
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| casts | Dict[str, SimpleType | str] | Yes | - | Map of column to target SQL type |
clean_text (CleanTextParams)¶
Applies string cleaning operations (Trim/Case) via SQL.
Configuration for text cleaning.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| columns | List[str] | Yes | - | List of columns to clean |
| trim | bool | No | True |
Apply TRIM() |
| case | Literal['lower', 'upper', 'preserve'] | No | preserve |
Case conversion |
coalesce_columns (CoalesceColumnsParams)¶
Returns the first non-null value from a list of columns.
Useful for fallback/priority scenarios where you want to use the first available value from multiple columns (e.g., primary phone, backup phone, home phone).
Args: context: Current execution context with dataframe params: Coalesce configuration (columns list, output name, drop source option)
Returns: New context with coalesced column added
Example: Create a primary phone column from multiple fallback options:
coalesce_columns:
columns: ["mobile_phone", "work_phone", "home_phone"]
output_col: "primary_phone"
drop_source: true
Configuration for coalescing columns (first non-null value).
Example - Phone number fallback:
Example - Timestamp fallback:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| columns | List[str] | Yes | - | List of columns to coalesce (in priority order) |
| output_col | str | Yes | - | Name of the output column |
| drop_source | bool | No | False |
Drop the source columns after coalescing |
concat_columns (ConcatColumnsParams)¶
Concatenates multiple columns into one string. NULLs are skipped (treated as empty string) using CONCAT_WS behavior.
Configuration for string concatenation.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| columns | List[str] | Yes | - | Columns to concatenate |
| separator | str | No | - | Separator string |
| output_col | str | Yes | - | Resulting column name |
convert_timezone (ConvertTimezoneParams)¶
Converts a timestamp from one timezone to another. Assumes the input column is a naive timestamp representing time in source_tz, or a timestamp with timezone.
Configuration for timezone conversion.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| col | str | Yes | - | Timestamp column to convert |
| source_tz | str | No | UTC |
Source timezone (e.g., 'UTC', 'America/New_York') |
| target_tz | str | Yes | - | Target timezone (e.g., 'America/Los_Angeles') |
| output_col | Optional[str] | No | - | Name of the result column (default: {col}_{target_tz}) |
date_add (DateAddParams)¶
Adds an interval to a date/timestamp column.
Configuration for date addition.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| col | str | Yes | - | - |
| value | int | Yes | - | - |
| unit | Literal['day', 'month', 'year', 'hour', 'minute', 'second'] | Yes | - | - |
date_diff (DateDiffParams)¶
Calculates difference between two dates/timestamps. Returns the elapsed time in the specified unit (as float for sub-day units).
Configuration for date difference.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| start_col | str | Yes | - | - |
| end_col | str | Yes | - | - |
| unit | Literal['day', 'hour', 'minute', 'second'] | No | day |
- |
date_trunc (DateTruncParams)¶
Truncates a date/timestamp to the specified precision.
Configuration for date truncation.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| col | str | Yes | - | - |
| unit | Literal['year', 'month', 'day', 'hour', 'minute', 'second'] | Yes | - | - |
derive_columns (DeriveColumnsParams)¶
Appends new columns based on SQL expressions.
Design:
- Uses projection to add fields.
- Keeps all existing columns via *.
Configuration for derived columns.
Example:
derive_columns:
derivations:
total_price: "quantity * unit_price"
full_name: "concat(first_name, ' ', last_name)"
Note: Engine will fail if expressions reference non-existent columns.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| derivations | Dict[str, str] | Yes | - | Map of column name to SQL expression |
distinct (DistinctParams)¶
Return unique rows from the dataset using SQL DISTINCT.
Parameters¶
context : EngineContext The engine context containing the DataFrame to deduplicate. params : DistinctParams Parameters specifying which columns to consider for uniqueness. If None, all columns are used.
Returns¶
EngineContext The updated engine context with duplicate rows removed.
Configuration for distinct rows.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| columns | Optional[List[str]] | No | - | Columns to project (if None, keeps all columns unique) |
drop_columns (DropColumnsParams)¶
Removes the specified columns from the DataFrame.
Configuration for dropping specific columns (blacklist).
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| columns | List[str] | Yes | - | List of column names to drop |
extract_date_parts (ExtractDateParams)¶
Extracts date parts using ANSI SQL extract/functions.
Configuration for extracting date parts.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| source_col | str | Yes | - | - |
| prefix | Optional[str] | No | - | - |
| parts | Literal[typing.Literal['year', 'month', 'day', 'hour']] | No | ['year', 'month', 'day'] |
- |
fill_nulls (FillNullsParams)¶
Replaces null values with specified defaults using COALESCE.
Configuration for filling null values.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| values | Dict[str, str | int | float | bool] | Yes | - | Map of column to fill value |
filter_rows (FilterRowsParams)¶
Filters rows using a standard SQL WHERE clause.
Design: - SQL-First: Pushes filtering to the engine's optimizer. - Zero-Copy: No data movement to Python.
Configuration for filtering rows.
Example:
Example (Null Check):
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| condition | str | Yes | - | SQL WHERE clause (e.g., 'age > 18 AND status = "active"') |
limit (LimitParams)¶
Limit the number of rows returned from the dataset.
Parameters¶
context : EngineContext The engine context containing the DataFrame to limit. params : LimitParams Parameters specifying the number of rows to return and the offset.
Returns¶
EngineContext The updated engine context with the limited DataFrame.
Configuration for result limiting.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| n | int | Yes | - | Number of rows to return |
| offset | int | No | 0 |
Number of rows to skip |
normalize_column_names (NormalizeColumnNamesParams)¶
Normalizes column names to a consistent style.
Useful for cleaning up messy source data with spaces, mixed case, or special characters. Converts names like "First Name" or "firstName" to "first_name".
Args: context: Current execution context with dataframe params: Normalization configuration (style, lowercase, special char handling)
Returns: New context with standardized column names
Example: Convert all columns to lowercase snake_case:
Configuration for normalizing column names.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| style | Literal['snake_case', 'none'] | No | snake_case |
Naming style: 'snake_case' converts spaces/special chars to underscores |
| lowercase | bool | No | True |
Convert names to lowercase |
| remove_special | bool | No | True |
Remove special characters except underscores |
normalize_schema (NormalizeSchemaParams)¶
Structural transformation to rename, drop, and reorder columns.
Note: This is one of the few that might behave better with native API in some cases, but SQL projection handles it perfectly and is consistent.
Configuration for schema normalization.
Example:
normalize_schema:
rename:
old_col: "new_col"
drop: ["unused_col"]
select_order: ["id", "new_col", "created_at"]
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| rename | Optional[Dict[str, str]] | No | PydanticUndefined |
old_name -> new_name |
| drop | Optional[List[str]] | No | PydanticUndefined |
Columns to remove; ignored if not present |
| select_order | Optional[List[str]] | No | - | Final column order; any missing columns appended after |
rename_columns (RenameColumnsParams)¶
Renames columns according to the provided mapping. Columns not in the mapping are kept unchanged.
Configuration for bulk column renaming.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| mapping | Dict[str, str] | Yes | - | Map of old column name to new column name |
replace_values (ReplaceValuesParams)¶
Replaces values in specified columns according to the mapping.
Supports replacing specific values with new values or NULL. Useful for data standardization (e.g., "N/A" -> NULL, "Unknown" -> NULL).
Args: context: Current execution context with dataframe params: Replacement configuration (columns and value mapping)
Returns: New context with replaced values
Example: Standardize missing value indicators:
replace_values:
columns: ["status", "category"]
mapping:
"N/A": null
"Unknown": null
"PENDING": "pending"
Configuration for bulk value replacement.
Example - Standardize nulls:
Example - Code replacement:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| columns | List[str] | Yes | - | Columns to apply replacements to |
| mapping | Dict[str, Optional[str]] | Yes | - | Map of old value to new value (use null for NULL) |
sample (SampleParams)¶
Return a random sample of rows from the dataset.
Parameters¶
context : EngineContext The engine context containing the DataFrame to sample from. params : SampleParams Parameters specifying the fraction of rows to return and the random seed.
Returns¶
EngineContext The updated engine context with the sampled DataFrame.
Configuration for random sampling.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| fraction | float | Yes | - | Fraction of rows to return (0.0 to 1.0) |
| seed | Optional[int] | No | - | - |
select_columns (SelectColumnsParams)¶
Keeps only the specified columns, dropping all others.
Configuration for selecting specific columns (whitelist).
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| columns | List[str] | Yes | - | List of column names to keep |
sort (SortParams)¶
Sort the dataset by one or more columns.
Parameters¶
context : EngineContext The engine context containing the DataFrame to sort. params : SortParams Parameters specifying columns to sort by and sort order.
Returns¶
EngineContext The updated engine context with the sorted DataFrame.
Configuration for sorting.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| by | str | List[str] | Yes | - | Column(s) to sort by |
| ascending | bool | No | True |
Sort order |
split_part (SplitPartParams)¶
Extracts the Nth part of a string after splitting by a delimiter.
Configuration for splitting strings.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| col | str | Yes | - | Column to split |
| delimiter | str | Yes | - | Delimiter to split by |
| index | int | Yes | - | 1-based index of the token to extract |
trim_whitespace (TrimWhitespaceParams)¶
Trims leading and trailing whitespace from string columns.
If no columns are specified, applies to all columns (SQL TRIM handles non-strings gracefully). Useful for cleaning up data from sources with inconsistent spacing.
Args: context: Current execution context with dataframe params: Trim configuration (optional column list)
Returns: New context with trimmed string columns
Example: Trim specific columns:
Trim all string columns:
```yaml
trim_whitespace: {}
```
Configuration for trimming whitespace from string columns.
Example - All string columns:
Example - Specific columns:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| columns | Optional[List[str]] | No | - | Columns to trim (default: all string columns detected at runtime) |
📂 Relational Algebra¶
aggregate (AggregateParams)¶
Performs grouping and aggregation via SQL.
Configuration for aggregation.
Example:
aggregate:
group_by: ["department", "region"]
aggregations:
salary: "sum"
employee_id: "count"
age: "avg"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| group_by | List[str] | Yes | - | Columns to group by |
| aggregations | Dict[str, AggFunc] | Yes | - | Map of column to aggregation function (sum, avg, min, max, count) |
join (JoinParams)¶
Joins the current dataset with another dataset from the context.
Configuration for joining datasets.
Scenario 1: Simple Left Join
Scenario 2: Join with Prefix (avoid collisions)
join:
right_dataset: "orders"
on: ["user_id"]
how: "inner"
prefix: "ord" # Result cols: ord_date, ord_amount...
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| right_dataset | str | Yes | - | Name of the node/dataset to join with |
| on | str | List[str] | Yes | - | Column(s) to join on |
| how | Literal['inner', 'left', 'right', 'full', 'cross', 'anti', 'semi'] | No | left |
Join type |
| prefix | Optional[str] | No | - | Prefix for columns from right dataset to avoid collisions |
pivot (PivotParams)¶
Pivots row values into columns.
Configuration for pivoting data.
Example:
Example (Optimized for Spark):
pivot:
group_by: ["id"]
pivot_col: "category"
values: ["A", "B", "C"] # Explicit values avoid extra pass
agg_col: "amount"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| group_by | List[str] | Yes | - | - |
| pivot_col | str | Yes | - | - |
| agg_col | str | Yes | - | - |
| agg_func | Literal['sum', 'count', 'avg', 'max', 'min', 'first'] | No | sum |
- |
| values | Optional[List[str]] | No | - | Specific values to pivot (for Spark optimization) |
union (UnionParams)¶
Unions current dataset with others.
Configuration for unioning datasets.
Example (By Name - Default):
Example (By Position):
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| datasets | List[str] | Yes | - | List of node names to union with current |
| by_name | bool | No | True |
Match columns by name (UNION ALL BY NAME) |
unpivot (UnpivotParams)¶
Unpivots columns into rows (Melt/Stack).
Configuration for unpivoting (melting) data.
Example:
unpivot:
id_cols: ["product_id"]
value_vars: ["jan_sales", "feb_sales", "mar_sales"]
var_name: "month"
value_name: "sales"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| id_cols | List[str] | Yes | - | - |
| value_vars | List[str] | Yes | - | - |
| var_name | str | No | variable |
- |
| value_name | str | No | value |
- |
📂 Data Quality¶
cross_check (CrossCheckParams)¶
Perform cross-node validation checks.
Does not return a DataFrame (returns None). Raises ValidationError on failure.
Configuration for cross-node validation checks.
Example (Row Count Mismatch):
transformer: "cross_check"
params:
type: "row_count_diff"
inputs: ["node_a", "node_b"]
threshold: 0.05 # Allow 5% difference
Example (Schema Match):
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | str | Yes | - | Check type: 'row_count_diff', 'schema_match' |
| inputs | List[str] | Yes | - | List of node names to compare |
| threshold | float | No | 0.0 |
Threshold for diff (0.0-1.0) |
📂 Warehousing Patterns¶
AuditColumnsConfig¶
Configuration for automatic audit timestamp columns during merge operations.
Attributes: created_col: Column to set only on first insert (e.g., "created_at") updated_col: Column to update on every merge operation (e.g., "updated_at")
At least one of created_col or updated_col must be specified.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| created_col | Optional[str] | No | - | Column to set only on first insert |
| updated_col | Optional[str] | No | - | Column to update on every merge |
merge (MergeParams)¶
Merge transformer implementation. Handles Upsert, Append-Only, and Delete-Match strategies.
Args: context: EngineContext (preferred) or legacy PandasContext/SparkContext params: MergeParams object (when called via function step) or DataFrame (legacy) current: DataFrame (legacy positional arg, deprecated) **kwargs: Parameters when not using MergeParams
Configuration for Merge transformer (Upsert/Append).
⚖️ "GDPR & Compliance" Guide¶
Business Problem: "A user exercised their 'Right to be Forgotten'. We need to remove them from our Silver tables immediately."
The Solution:
Use the delete_match strategy. The source dataframe contains the IDs to be deleted, and the transformer removes them from the target.
Recipe 1: Right to be Forgotten (Delete)
transformer: "merge"
params:
target: "silver.customers"
keys: ["customer_id"]
strategy: "delete_match"
Recipe 2: Conditional Update (SCD Type 1) "Only update if the source record is newer than the target record."
transformer: "merge"
params:
target: "silver.products"
keys: ["product_id"]
strategy: "upsert"
update_condition: "source.updated_at > target.updated_at"
Recipe 3: Safe Insert (Filter Bad Records) "Only insert records that are not marked as deleted."
transformer: "merge"
params:
target: "silver.orders"
keys: ["order_id"]
strategy: "append_only"
insert_condition: "source.is_deleted = false"
Recipe 4: Audit Columns "Track when records were created or updated."
transformer: "merge"
params:
target: "silver.users"
keys: ["user_id"]
audit_cols:
created_col: "dw_created_at"
updated_col: "dw_updated_at"
Recipe 5: Full Sync (Insert + Update + Delete) "Sync target with source: insert new, update changed, and remove soft-deleted."
transformer: "merge"
params:
target: "silver.customers"
keys: ["id"]
strategy: "upsert"
# 1. Delete if source says so
delete_condition: "source.is_deleted = true"
# 2. Update if changed (and not deleted)
update_condition: "source.hash != target.hash"
# 3. Insert new (and not deleted)
insert_condition: "source.is_deleted = false"
Recipe 6: Connection-based Path Resolution (ADLS) "Use a connection to resolve paths, just like write config."
transform:
steps:
- function: merge
params:
connection: goat_prod
path: OEE/silver/customers
register_table: silver.customers
keys: ["customer_id"]
strategy: "upsert"
audit_cols:
created_col: "_created_at"
updated_col: "_updated_at"
Strategies: * upsert (Default): Update existing records, insert new ones. * append_only: Ignore duplicates, only insert new keys. * delete_match: Delete records in target that match keys in source.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| target | Optional[str] | No | - | Target table name or full path (use this OR connection+path) |
| connection | Optional[str] | No | - | Connection name to resolve path (use with 'path' param) |
| path | Optional[str] | No | - | Relative path within connection (e.g., 'OEE/silver/customers') |
| register_table | Optional[str] | No | - | Register as Unity Catalog/metastore table after merge (e.g., 'silver.customers') |
| keys | List[str] | Yes | - | List of join keys |
| strategy | MergeStrategy | No | MergeStrategy.UPSERT |
Merge behavior: 'upsert', 'append_only', 'delete_match' |
| audit_cols | Optional[AuditColumnsConfig] | No | - | {'created_col': '...', 'updated_col': '...'} |
| optimize_write | bool | No | False |
Run OPTIMIZE after write (Spark) |
| vacuum_hours | Optional[int] | No | - | Hours to retain for VACUUM after merge (Spark only). Set to 168 for 7 days. None disables VACUUM. |
| zorder_by | Optional[List[str]] | No | - | Columns to Z-Order by |
| cluster_by | Optional[List[str]] | No | - | Columns to Liquid Cluster by (Delta) |
| update_condition | Optional[str] | No | - | SQL condition for update clause (e.g. 'source.ver > target.ver') |
| insert_condition | Optional[str] | No | - | SQL condition for insert clause (e.g. 'source.status != "deleted"') |
| delete_condition | Optional[str] | No | - | SQL condition for delete clause (e.g. 'source.status = "deleted"') |
| table_properties | Optional[dict] | No | - | Delta table properties for initial table creation (e.g., column mapping) |
scd2 (SCD2Params)¶
Implements SCD Type 2 Logic.
SCD2 is self-contained: it writes directly to the target table on all engines and code paths. No separate write: block is needed.
On Spark with use_delta_merge=True (default), uses an optimized Delta MERGE that only touches changed rows. The legacy path reads the full target, computes the union, and overwrites. Both write directly.
On Pandas, writes directly to the target file (parquet or CSV).
Parameters for SCD Type 2 (Slowly Changing Dimensions) transformer.
🕰️ The "Time Machine" Pattern¶
Business Problem: "I need to know what the customer's address was last month, not just where they live now."
The Solution: SCD Type 2 tracks the full history of changes. Each record has an "effective window" (start/end dates) and a flag indicating if it is the current version.
Recipe 1: Using table name
transformer: "scd2"
params:
target: "silver.dim_customers" # Registered table name
keys: ["customer_id"]
track_cols: ["address", "tier"]
effective_time_col: "txn_date"
Recipe 2: Using connection + path (ADLS)
transformer: "scd2"
params:
connection: adls_prod # Connection name
path: OEE/silver/dim_customers # Relative path
keys: ["customer_id"]
track_cols: ["address", "tier"]
effective_time_col: "txn_date"
How it works:
1. Match: Finds existing records using keys.
2. Compare: Checks track_cols to see if data changed.
3. Close: If changed, updates the old record's end_time_col to the new effective_time_col.
4. Insert: Adds a new record with start_time_col (renamed from effective_time_col)
as the version start, open-ended end_time_col, and is_current = true.
The effective_time_col value is copied into a new start_time_col column
(default: valid_from) in the target, giving each version a complete time window:
[valid_from, valid_to). The original source column is preserved.
Note: SCD2 is self-contained — it writes directly to the target table on all
engines. No separate write: block is needed in your pipeline YAML.
On Spark with Delta targets, uses an optimized Delta MERGE by default
(use_delta_merge: true). Set use_delta_merge: false to use the legacy
full-overwrite approach (still self-contained, just slower for large tables).
On Pandas, writes directly to the target file (parquet or CSV).
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| target | Optional[str] | No | - | Target table name or full path (use this OR connection+path) |
| connection | Optional[str] | No | - | Connection name to resolve path (use with 'path' param) |
| path | Optional[str] | No | - | Relative path within connection (e.g., 'OEE/silver/dim_customers') |
| keys | List[str] | Yes | - | Natural keys to identify unique entities |
| track_cols | List[str] | Yes | - | Columns to monitor for changes |
| effective_time_col | str | Yes | - | Source column indicating when the change occurred. |
| start_time_col | str | No | valid_from |
Name of the start timestamp column in the target. The effective_time_col value is copied to this column. |
| end_time_col | str | No | valid_to |
Name of the end timestamp column |
| current_flag_col | str | No | is_current |
Name of the current record flag column |
| delete_col | Optional[str] | No | - | Column indicating soft deletion (boolean) |
| use_delta_merge | bool | No | True |
Use Delta Lake MERGE for Spark engine (faster for large tables). Falls back to full overwrite if target is not Delta format. |
| register_table | Optional[str] | No | - | Register as Unity Catalog/metastore table after write (e.g., 'silver.dim_customers'). Spark only. |
| vacuum_hours | Optional[int] | No | - | Hours to retain for VACUUM after SCD2 write (Spark only). Set to 168 for 7 days. None disables VACUUM. |
📂 Manufacturing & IoT¶
PhaseConfig¶
Configuration for a single phase.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| timer_col | str | Yes | - | Timer column name for this phase |
| start_threshold | Optional[int] | No | - | Override default start threshold for this phase (seconds) |
detect_sequential_phases (DetectSequentialPhasesParams)¶
Detect and analyze sequential manufacturing phases.
For each group (e.g., batch), this transformer: 1. Processes phases sequentially (each starts after previous ends) 2. Detects phase start by finding first valid timer reading and back-calculating 3. Detects phase end by finding first repeated (plateaued) timer value 4. Calculates time spent in each status during each phase 5. Aggregates specified metrics within each phase window 6. Outputs one summary row per group
Output columns per phase: - {phase}start: Phase start timestamp - {phase}_end: Phase end timestamp - {phase}_max_minutes: Maximum timer value converted to minutes - {phase}minutes: Time in each status (if status_col provided) - {phase}: Aggregated metrics (if phase_metrics provided)
Detect and analyze sequential manufacturing phases from timer columns.
This transformer processes raw sensor/PLC data where timer columns increment during each phase. It detects phase boundaries, calculates durations, and tracks time spent in each equipment status.
Common use cases: - Batch reactor cycle analysis - CIP (Clean-in-Place) phase timing - Food processing (cook, cool, package cycles) - Any multi-step batch process with PLC timers
Scenario: Analyze FBR cycle times
detect_sequential_phases:
group_by: BatchID
timestamp_col: ts
phases:
- timer_col: LoadTime
- timer_col: AcidTime
- timer_col: DryTime
- timer_col: CookTime
- timer_col: CoolTime
- timer_col: UnloadTime
start_threshold: 240
status_col: Status
status_mapping:
1: idle
2: active
3: hold
4: faulted
phase_metrics:
Level: max
metadata:
ProductCode: first_after_start
Weight: max
Scenario: Group by multiple columns
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| group_by | str | List[str] | Yes | - | Column(s) to group by. Can be a single column name or list of columns. E.g., 'BatchID' or ['BatchID', 'AssetID'] |
| timestamp_col | str | No | ts |
Timestamp column for ordering events |
| phases | List[str | PhaseConfig] | Yes | - | List of phase timer columns (strings) or PhaseConfig objects. Phases are processed sequentially - each phase starts after the previous ends. |
| start_threshold | int | No | 240 |
Default max timer value (seconds) to consider as valid phase start. Filters out late readings where timer already shows large elapsed time. |
| status_col | Optional[str] | No | - | Column containing equipment status codes |
| status_mapping | Optional[Dict[int, str]] | No | - | Mapping of status codes to names. E.g., |
| phase_metrics | Optional[Dict[str, str]] | No | - | Columns to aggregate within each phase window. E.g., {Level: max, Pressure: max}. Outputs {Phase}_{Column} columns. |
| metadata | Optional[Dict[str, str]] | No | - | Columns to include in output with aggregation method. Options: 'first', 'last', 'first_after_start', 'max', 'min', 'mean', 'sum'. E.g., |
| output_time_format | str | No | %Y-%m-%d %H:%M:%S |
Format for output timestamp columns |
| fill_null_minutes | bool | No | False |
If True, fill null numeric columns (_max_minutes, _status_minutes, _metrics) with 0. Timestamp columns remain null for skipped phases. |
| spark_native | bool | No | False |
If True, use native Spark window functions. If False (default), use applyInPandas which is often faster for datasets with many batches. |
📂 Advanced & Feature Engineering¶
ShiftDefinition¶
Definition of a single shift.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| name | str | Yes | - | Name of the shift (e.g., 'Day', 'Night') |
| start | str | Yes | - | Start time in HH:MM format (e.g., '06:00') |
| end | str | Yes | - | End time in HH:MM format (e.g., '14:00') |
deduplicate (DeduplicateParams)¶
Deduplicates data using Window functions.
Configuration for deduplication.
Scenario: Keep latest record
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| keys | List[str] | Yes | - | List of columns to partition by (columns that define uniqueness) |
| order_by | Optional[str] | No | - | SQL Order by clause (e.g. 'updated_at DESC') to determine which record to keep (first one is kept) |
dict_based_mapping (DictMappingParams)¶
Maps values in a column using a provided dictionary.
For each value in the specified column, replaces it with the mapped value. If 'default' is provided, uses it for values not found in the mapping. Supports Spark and Pandas engines.
Configuration for dictionary mapping.
Scenario: Map status codes to labels
dict_based_mapping:
column: "status_code"
mapping:
"1": "Active"
"0": "Inactive"
default: "Unknown"
output_column: "status_desc"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| column | str | Yes | - | Column to map values from |
| mapping | Dict[str, str | int | float | bool] | Yes | - | Dictionary of source value -> target value |
| default | str | int | float | bool | No | - | Default value if source value is not found in mapping |
| output_column | Optional[str] | No | - | Name of output column. If not provided, overwrites source column. |
explode_list_column (ExplodeParams)¶
Explodes a list/array column into multiple rows.
For each element in the specified list column, creates a new row. If 'outer' is True, keeps rows with empty lists (like explode_outer). Supports Spark and Pandas engines.
Configuration for exploding lists.
Scenario: Flatten list of items per order
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| column | str | Yes | - | Column containing the list/array to explode |
| outer | bool | No | False |
If True, keep rows with empty lists (explode_outer behavior). If False, drops them. |
generate_numeric_key (NumericKeyParams)¶
Generates a deterministic BIGINT surrogate key from a hash of columns.
This is useful when: - Unioning data from multiple sources - Some sources have IDs, some don't - You need stable numeric IDs for gold layer
The key is generated by: 1. Concatenating columns with separator 2. Computing MD5 hash 3. Converting first 15 hex chars to BIGINT
If coalesce_with is specified, keeps the existing value when not null. If output_col == coalesce_with, the original column is replaced.
Configuration for numeric surrogate key generation.
Generates a deterministic BIGINT key from a hash of specified columns. Useful when unioning data from multiple sources where some have IDs and others don't.
Example:
- function: generate_numeric_key
params:
columns: [DateID, store_id, reason_id, duration_min, notes]
output_col: ID
coalesce_with: ID # Keep existing ID if not null
The generated key is: - Deterministic: same input data = same ID every time - BIGINT: large numeric space to avoid collisions - Stable: safe for gold layer / incremental loads
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| columns | List[str] | Yes | - | Columns to combine for the key |
| separator | str | No | | |
Separator between values |
| output_col | str | No | numeric_key |
Name of the output column |
| coalesce_with | Optional[str] | No | - | Existing column to coalesce with (keep existing value if not null) |
generate_surrogate_key (SurrogateKeyParams)¶
Generates a deterministic surrogate key (MD5) from a combination of columns. Handles NULLs by treating them as empty strings to ensure consistency.
Configuration for surrogate key generation.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| columns | List[str] | Yes | - | Columns to combine for the key |
| separator | str | No | - |
Separator between values |
| output_col | str | No | surrogate_key |
Name of the output column |
geocode (GeocodeParams)¶
Geocoding Stub.
Configuration for geocoding.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| address_col | str | Yes | - | Column containing the address to geocode |
| output_col | str | No | lat_long |
Name of the output column for coordinates |
hash_columns (HashParams)¶
Hashes columns for PII/Anonymization.
Configuration for column hashing.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| columns | List[str] | Yes | - | List of columns to hash |
| algorithm | HashAlgorithm | No | HashAlgorithm.SHA256 |
Hashing algorithm. Options: 'sha256', 'md5' |
normalize_json (NormalizeJsonParams)¶
Flattens a nested JSON/Struct column.
Configuration for JSON normalization.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| column | str | Yes | - | Column containing nested JSON/Struct |
| sep | str | No | _ |
Separator for nested fields (e.g., 'parent_child') |
parse_json (ParseJsonParams)¶
Parses a JSON string column into a Struct/Map column.
Configuration for JSON parsing.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| column | str | Yes | - | String column containing JSON |
| json_schema | str | Yes | - | DDL schema string (e.g. 'a INT, b STRING') or Spark StructType DDL |
| output_col | Optional[str] | No | - | - |
regex_replace (RegexReplaceParams)¶
Applies a regex replacement to a column.
Uses SQL-based REGEXP_REPLACE to replace all matches of the pattern in the specified column with the given replacement string. Works on both Spark and DuckDB/Pandas engines.
Configuration for regex replacement.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| column | str | Yes | - | Column to apply regex replacement on |
| pattern | str | Yes | - | Regex pattern to match |
| replacement | str | Yes | - | String to replace matches with |
sessionize (SessionizeParams)¶
Assigns session IDs based on inactivity threshold.
Configuration for sessionization.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| timestamp_col | str | Yes | - | Timestamp column to calculate session duration from |
| user_col | str | Yes | - | User identifier to partition sessions by |
| threshold_seconds | int | No | 1800 |
Inactivity threshold in seconds (default: 30 minutes). If gap > threshold, new session starts. |
| session_col | str | No | session_id |
Output column name for the generated session ID |
split_events_by_period (SplitEventsByPeriodParams)¶
Splits events that span multiple time periods into individual segments.
For events spanning multiple days/hours/shifts, this creates separate rows for each period with adjusted start/end times and recalculated durations.
Configuration for splitting events that span multiple time periods.
Splits events that span multiple days, hours, or shifts into individual segments per period. Useful for OEE/downtime analysis, billing, and time-based aggregations.
Example - Split by day:
split_events_by_period:
start_col: "Shutdown_Start_Time"
end_col: "Shutdown_End_Time"
period: "day"
duration_col: "Shutdown_Duration_Min"
Example - Split by shift:
split_events_by_period:
start_col: "event_start"
end_col: "event_end"
period: "shift"
duration_col: "duration_minutes"
shifts:
- name: "Day"
start: "06:00"
end: "14:00"
- name: "Swing"
start: "14:00"
end: "22:00"
- name: "Night"
start: "22:00"
end: "06:00"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| start_col | str | Yes | - | Column containing the event start timestamp |
| end_col | str | Yes | - | Column containing the event end timestamp |
| period | str | No | day |
Period type to split by: 'day', 'hour', or 'shift' |
| duration_col | Optional[str] | No | - | Output column name for duration in minutes. If not set, no duration column is added. |
| shifts | Optional[List[ShiftDefinition]] | No | - | List of shift definitions (required when period='shift') |
| shift_col | Optional[str] | No | shift_name |
Output column name for shift name (only used when period='shift') |
unpack_struct (UnpackStructParams)¶
Flattens a struct/dict column into top-level columns.
Configuration for unpacking structs.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| column | str | Yes | - | Struct/Dictionary column to unpack/flatten into individual columns |
validate_and_flag (ValidateAndFlagParams)¶
Validates rules and appends a column with a list/string of failed rule names.
Configuration for validation flagging.
Example:
validate_and_flag:
flag_col: "data_issues"
rules:
age_check: "age >= 0"
email_format: "email LIKE '%@%'"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| rules | Dict[str, str] | Yes | - | Map of rule name to SQL condition (must be TRUE) |
| flag_col | str | No | _issues |
Name of the column to store failed rules |
window_calculation (WindowCalculationParams)¶
Generic wrapper for Window functions.
Configuration for window functions.
Example:
window_calculation:
target_col: "cumulative_sales"
function: "sum(sales)"
partition_by: ["region"]
order_by: "date ASC"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| target_col | str | Yes | - | - |
| function | str | Yes | - | Window function e.g. 'sum(amount)', 'rank()' |
| partition_by | List[str] | No | PydanticUndefined |
- |
| order_by | Optional[str] | No | - | - |
📂 Other Transformers¶
ConversionSpec¶
Specification for a single unit conversion.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| from_unit | str | Yes | - | Source unit (e.g., 'psig', 'degF', 'BTU/lb') |
| to | str | Yes | - | Target unit (e.g., 'bar', 'degC', 'kJ/kg') |
| output | Optional[str] | No | - | Output column name. If not specified, overwrites the source column. |
PropertyOutputConfig¶
Configuration for a single output property.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| property | str | Yes | - | CoolProp property key: H (enthalpy), S (entropy), D (density), C (specific heat Cp), CVMASS (Cv), V (viscosity), L (conductivity), T (temperature), P (pressure), Q (quality) |
| unit | Optional[str] | No | - | Output unit for this property. If not specified, uses SI units. |
| output_column | Optional[str] | No | - | Custom output column name. Defaults to {prefix}_{property}. |
PsychrometricOutputConfig¶
Configuration for psychrometric output property.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| property | str | Yes | - | Property: W (humidity ratio), B (wet bulb), D (dew point), H (enthalpy), V (specific volume), R (relative humidity) |
| unit | Optional[str] | No | - | Output unit |
| output_column | Optional[str] | No | - | Custom output column name |
fluid_properties (FluidPropertiesParams)¶
Calculate thermodynamic properties for any fluid using CoolProp.
Supports 122+ fluids including Water, Ammonia, R134a, Air, CO2, etc. Uses IAPWS-IF97 formulation for water/steam calculations.
Engine parity: Pandas, Spark (via Pandas UDF), Polars
Configuration for fluid property calculations using CoolProp.
Supports 122+ fluids including Water, Ammonia, R134a, Air, CO2, etc. See CoolProp documentation for full list: http://www.coolprop.org/fluid_properties/PurePseudoPure.html
State is defined by two independent properties. Common combinations: - P + T (pressure + temperature) - subcooled/superheated regions - P + Q (pressure + quality) - two-phase region - P + H (pressure + enthalpy) - when enthalpy is known
Scenario: Calculate steam properties from pressure and temperature
fluid_properties:
fluid: Water
pressure_col: steam_pressure
temperature_col: steam_temp
pressure_unit: psig
temperature_unit: degF
gauge_offset: 14.696
outputs:
- property: H
unit: BTU/lb
output_column: steam_enthalpy
- property: S
unit: BTU/(lb·R)
output_column: steam_entropy
Scenario: Calculate saturated steam properties from pressure only
fluid_properties:
fluid: Water
pressure_col: steam_pressure
quality: 1.0 # Saturated vapor
pressure_unit: psia
outputs:
- property: H
unit: BTU/lb
- property: T
unit: degF
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| fluid | str | No | Water |
CoolProp fluid name (e.g., Water, Ammonia, R134a, Air, CO2) |
| pressure_col | Optional[str] | No | - | Column containing pressure values |
| temperature_col | Optional[str] | No | - | Column containing temperature values |
| enthalpy_col | Optional[str] | No | - | Column containing enthalpy values |
| quality_col | Optional[str] | No | - | Column containing quality values (0-1) |
| pressure | Optional[float] | No | - | Fixed pressure value |
| temperature | Optional[float] | No | - | Fixed temperature value |
| enthalpy | Optional[float] | No | - | Fixed enthalpy value (in input unit) |
| quality | Optional[float] | No | - | Fixed quality value (0=sat liquid, 1=sat vapor) |
| pressure_unit | str | No | Pa |
Pressure unit: Pa, kPa, MPa, bar, psia, psig, atm |
| temperature_unit | str | No | K |
Temperature unit: K, degC, degF, degR |
| enthalpy_unit | str | No | J/kg |
Input enthalpy unit: J/kg, kJ/kg, BTU/lb |
| gauge_offset | float | No | 14.696 |
Atmospheric pressure offset for psig (default 14.696 psia) |
| outputs | List[PropertyOutputConfig] | No | PydanticUndefined |
List of properties to calculate with their units |
| prefix | str | No | - | Prefix for output column names (e.g., 'steam' -> 'steam_H') |
psychrometrics (PsychrometricsParams)¶
Calculate psychrometric (humid air) properties using CoolProp HAPropsSI.
Engine parity: Pandas, Spark (via Pandas UDF), Polars
Psychrometric (humid air) calculations using CoolProp HAPropsSI.
Calculates moist air properties from dry bulb temperature and one other property (typically relative humidity or humidity ratio).
CoolProp property keys for humid air: - W: Humidity ratio (kg water / kg dry air) - B: Wet bulb temperature - D: Dew point temperature - H: Enthalpy (per kg dry air) - V: Specific volume (per kg dry air) - R: Relative humidity (0-1) - T: Dry bulb temperature - P: Total pressure
Scenario: Calculate humidity ratio from T and RH
psychrometrics:
dry_bulb_col: ambient_temp
relative_humidity_col: rh_percent
temperature_unit: degF
rh_is_percent: true
pressure_unit: psia
elevation_ft: 875
outputs:
- property: W
unit: lb/lb
output_column: humidity_ratio
- property: B
unit: degF
output_column: wet_bulb
- property: D
unit: degF
output_column: dew_point
Scenario: Calculate RH from temperature and humidity ratio
psychrometrics:
dry_bulb_col: temp_f
humidity_ratio_col: w
temperature_unit: degF
pressure: 14.696
pressure_unit: psia
outputs:
- property: R
output_column: relative_humidity
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| dry_bulb_col | str | Yes | - | Column containing dry bulb temperature |
| relative_humidity_col | Optional[str] | No | - | Column containing relative humidity |
| humidity_ratio_col | Optional[str] | No | - | Column containing humidity ratio (kg/kg or lb/lb) |
| wet_bulb_col | Optional[str] | No | - | Column containing wet bulb temperature |
| dew_point_col | Optional[str] | No | - | Column containing dew point temperature |
| relative_humidity | Optional[float] | No | - | Fixed relative humidity |
| humidity_ratio | Optional[float] | No | - | Fixed humidity ratio |
| pressure_col | Optional[str] | No | - | Column containing pressure |
| pressure | Optional[float] | No | - | Fixed pressure value |
| elevation_ft | Optional[float] | No | - | Elevation in feet (used to estimate pressure if not provided) |
| elevation_m | Optional[float] | No | - | Elevation in meters (used to estimate pressure if not provided) |
| temperature_unit | str | No | K |
Temperature unit for all temps |
| pressure_unit | str | No | Pa |
Pressure unit |
| humidity_ratio_unit | str | No | kg/kg |
Humidity ratio unit (kg/kg, lb/lb, g/kg) |
| rh_is_percent | bool | No | False |
If True, RH input is 0-100%, otherwise 0-1 |
| outputs | List[PsychrometricOutputConfig] | No | PydanticUndefined |
Properties to calculate |
| prefix | str | No | - | Prefix for output columns |
saturation_properties (SaturationPropertiesParams)¶
Calculate saturated liquid or vapor properties.
Convenience wrapper that sets Q=0 (liquid) or Q=1 (vapor) automatically.
Convenience wrapper for saturated liquid (Q=0) or saturated vapor (Q=1) properties.
This is a simplified interface for common saturation calculations.
Scenario: Get saturated steam properties at a given pressure
saturation_properties:
fluid: Water
pressure_col: steam_pressure
pressure_unit: psig
phase: vapor
outputs:
- property: H
unit: BTU/lb
output_column: hg
- property: T
unit: degF
output_column: sat_temp
Scenario: Get saturated liquid enthalpy (hf)
saturation_properties:
pressure_col: pressure_psia
pressure_unit: psia
phase: liquid
outputs:
- property: H
unit: BTU/lb
output_column: hf
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| fluid | str | No | Water |
CoolProp fluid name |
| pressure_col | Optional[str] | No | - | Column containing pressure values |
| pressure | Optional[float] | No | - | Fixed pressure value |
| temperature_col | Optional[str] | No | - | Column containing saturation temp |
| temperature | Optional[float] | No | - | Fixed saturation temperature |
| pressure_unit | str | No | Pa |
Pressure unit |
| temperature_unit | str | No | K |
Temperature unit |
| gauge_offset | float | No | 14.696 |
Gauge pressure offset for psig |
| phase | Literal['liquid', 'vapor'] | No | vapor |
Phase: 'liquid' (Q=0) or 'vapor' (Q=1) |
| outputs | List[PropertyOutputConfig] | No | PydanticUndefined |
Properties to calculate |
| prefix | str | No | - | Prefix for output columns |
unit_convert (UnitConvertParams)¶
Convert columns from one unit to another.
Uses Pint for comprehensive unit conversion support. Handles all SI units, imperial units, and common engineering units out of the box.
Engine parity: Pandas, Spark, Polars
Configuration for unit conversion transformer.
Converts columns from one unit to another using Pint's comprehensive unit database. Supports all SI units, imperial units, and common engineering units.
Scenario: Normalize sensor data to SI units
unit_convert:
conversions:
pressure_psig:
from: psig
to: bar
output: pressure_bar
temperature_f:
from: degF
to: degC
output: temperature_c
flow_gpm:
from: gpm
to: m³/s
output: flow_si
Scenario: Convert in-place (overwrite original columns)
Scenario: Handle gauge pressure with custom atmospheric reference
unit_convert:
gauge_pressure_offset: 14.696 psia
conversions:
steam_pressure:
from: psig
to: psia
output: steam_pressure_abs
Scenario: Complex engineering units
unit_convert:
conversions:
heat_transfer_coeff:
from: BTU/(hr * ft² * degF)
to: W/(m² * K)
output: htc_si
thermal_conductivity:
from: BTU/(hr * ft * degF)
to: W/(m * K)
output: k_si
specific_heat:
from: BTU/(lb * degF)
to: kJ/(kg * K)
output: cp_si
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| conversions | Dict[str, ConversionSpec] | Yes | - | Mapping of source column names to conversion specifications |
| gauge_pressure_offset | Optional[str] | No | 14.696 psia |
Atmospheric pressure for gauge-to-absolute conversions. Default is sea level (14.696 psia). |
| errors | str | No | null |
How to handle conversion errors: 'null' (default), 'raise', or 'ignore' |
Simulation Engine¶
Simulation Engine¶
Generate realistic time-series data for testing pipelines, dashboards, and analytics.
Simulation is configured as format: simulation in a node's read section.
Core building blocks: - SimulationScope: Time boundaries, timestep, row count - EntityConfig: Who generates data (sensors, machines, pumps) - ColumnGeneratorConfig: What data each entity produces - ScheduledEvent: Maintenance windows, setpoint changes, condition-based triggers - ChaosConfig: Outliers, duplicates, downtime for realistic imperfections
Duration/interval format (used by timestep, recurrence, duration, jitter, cooldown, sustain):
number + unit, where unit is s (seconds), m (minutes), h (hours), or d (days).
Examples: 5m, 1h, 30s, 2d.
Example:
read:
connection: null
format: simulation
options:
simulation:
scope:
start_time: "2026-01-01T00:00:00Z"
timestep: "5m"
row_count: 288
seed: 42
entities:
count: 3
id_prefix: pump_
columns:
- name: temperature
data_type: float
generator:
type: random_walk
start: 75.0
min: 60.0
max: 100.0
volatility: 1.0
mean_reversion: 0.1
See Also: Simulation Docs, Generator Reference
SimulationConfig¶
Complete simulation configuration.
Example:
read:
connection: null
format: simulation
options:
simulation:
scope:
start_time: "2026-01-01T00:00:00Z"
timestep: "5m"
row_count: 10000
seed: 42
entities:
count: 10
id_prefix: pump_
columns:
- name: entity_id
data_type: string
generator:
type: constant
value: "{entity_id}"
- name: timestamp
data_type: timestamp
generator:
type: timestamp
- name: temperature
data_type: float
generator:
type: range
min: 60.0
max: 80.0
distribution: normal
null_rate: 0.02
chaos:
outlier_rate: 0.01
outlier_factor: 3.0
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| scope | SimulationScope | Yes | - | Simulation scope and boundaries |
| entities | EntityConfig | Yes | - | Entity configuration |
| columns | List[ColumnGeneratorConfig] | Yes | - | Column definitions |
| chaos | Optional[ChaosConfig] | No | - | Chaos parameters |
| scheduled_events | List[ScheduledEvent] | No | PydanticUndefined |
Scheduled events that modify simulation behavior |
SimulationScope¶
Used in: SimulationConfig
Simulation scope and boundaries.
Example (row count):
Example (time range):
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| start_time | str | Yes | - | Simulation start timestamp in ISO8601 Zulu format (e.g., '2026-01-01T00:00:00Z') |
| timestep | str | Yes | - | Time between rows. Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). Examples: '30s', '5m', '1h', '2d'. |
| row_count | Optional[int] | No | - | Total rows to generate (mutually exclusive with end_time) |
| end_time | Optional[str] | No | - | Simulation end timestamp in ISO8601 Zulu format (e.g., '2026-01-02T00:00:00Z'). Mutually exclusive with row_count |
| seed | int | No | 42 |
Random seed for deterministic generation |
EntityConfig¶
Used in: SimulationConfig
Entity declaration for simulation.
Example (auto-generated):
Example (explicit names):
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| count | Optional[int] | No | - | Number of entities to generate |
| names | Optional[List[str]] | No | - | Explicit entity names |
| id_prefix | str | No | entity_ |
Prefix for auto-generated entity IDs |
| id_format | Literal['sequential', 'uuid'] | No | sequential |
ID format: sequential or uuid |
ColumnGeneratorConfig¶
Used in: SimulationConfig
Configuration for a simulated column.
Example:
- name: temperature
data_type: float
generator:
type: range
min: 60.0
max: 100.0
distribution: normal
null_rate: 0.02
entity_overrides:
pump_01:
type: range
min: 80.0
max: 120.0
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| name | str | Yes | - | Column name |
| data_type | SimulationDataType | Yes | - | Data type |
| generator | RangeGeneratorConfig | CategoricalGeneratorConfig | BooleanGeneratorConfig | TimestampGeneratorConfig | SequentialGeneratorConfig | ConstantGeneratorConfig | UUIDGeneratorConfig | EmailGeneratorConfig | IPGeneratorConfig | GeoGeneratorConfig | RandomWalkGeneratorConfig | DailyProfileGeneratorConfig | DerivedGeneratorConfig | Yes | - | Generator configuration |
| null_rate | float | No | 0.0 |
Probability of NULL values |
| entity_overrides | Dict[str, RangeGeneratorConfig | CategoricalGeneratorConfig | BooleanGeneratorConfig | TimestampGeneratorConfig | SequentialGeneratorConfig | ConstantGeneratorConfig | UUIDGeneratorConfig | EmailGeneratorConfig | IPGeneratorConfig | GeoGeneratorConfig | RandomWalkGeneratorConfig | DailyProfileGeneratorConfig | DerivedGeneratorConfig] | No | PydanticUndefined |
Per-entity generator overrides |
ScheduledEvent¶
Used in: SimulationConfig
Scheduled event that modifies simulation behavior at specific times or conditions.
Enables realistic process simulation with: - Maintenance windows (forced power=0) - Grid curtailment (forced output reduction) - Setpoint changes (scheduled process changes) - Cleaning cycles (efficiency restoration) - Recurring events (e.g., weekly maintenance) - Condition-based triggers (e.g., degrade when efficiency drops)
Example (maintenance window):
scheduled_events:
- type: forced_value
entity: Turbine_01
column: power_kw
value: 0.0
start_time: "2026-03-11T14:00:00Z"
end_time: "2026-03-11T18:00:00Z"
Example (grid curtailment - all entities):
scheduled_events:
- type: forced_value
entity: null # Applies to all entities
column: max_output_pct
value: 80.0
start_time: "2026-03-11T16:00:00Z"
end_time: "2026-03-11T19:00:00Z"
Example (permanent setpoint change):
scheduled_events:
- type: setpoint_change
entity: Reactor_01
column: temp_setpoint_c
value: 370.0
start_time: "2026-03-11T12:00:00Z"
# No end_time = permanent change
Example (recurring maintenance every 30 days):
scheduled_events:
- type: forced_value
entity: Turbine_01
column: power_kw
value: 0.0
start_time: "2026-01-15T06:00:00Z"
recurrence: "30d"
duration: "4h"
jitter: "2d"
max_occurrences: 12
Example (condition-based event):
scheduled_events:
- type: parameter_override
entity: Pump_05
column: flow_rate_lpm
value: 50.0
condition: "actual_efficiency_pct < 70"
cooldown: "7d"
sustain: "24h"
Example (ramped setpoint change):
scheduled_events:
- type: setpoint_change
entity: Reactor_01
column: temp_setpoint_c
value: 370.0
start_time: "2026-03-11T12:00:00Z"
duration: "2h"
transition: ramp
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | ScheduledEventType | Yes | - | Event type |
| entity | Optional[str] | No | - | Entity name (must match a name from entities.names) or None to apply to all entities |
| column | str | Yes | - | Column name to modify |
| value | Any | Yes | - | Value to apply during event |
| start_time | Optional[str] | No | - | Event start timestamp in ISO8601 Zulu format (e.g., '2026-01-15T06:00:00Z'). Required for time-based events. |
| end_time | Optional[str] | No | - | Event end timestamp in ISO8601 Zulu format (e.g., '2026-01-15T18:00:00Z'). None = permanent change. |
| priority | int | No | 0 |
Priority for overlapping events (higher = applied last) |
| recurrence | Optional[str] | No | - | Repeat interval. Event recurs at this interval from start_time. Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). Examples: '30d', '7d', '4h'. |
| duration | Optional[str] | No | - | Duration of each occurrence. Alternative to specifying end_time. Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). Examples: '4h', '30m', '2d'. |
| jitter | Optional[str] | No | - | Random offset ± applied to each recurrence start (deterministic per seed). Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). Examples: '2d', '6h'. |
| max_occurrences | Optional[int] | No | - | Stop repeating after N occurrences. |
| condition | Optional[str] | No | - | Sandboxed Python expression evaluated against current row columns. Supports comparison operators, compound logic (and, or, not), and safe functions (abs, round, min, max). E.g., 'actual_efficiency_pct < 70 and pressure > 50'. Triggers event when true. |
| cooldown | Optional[str] | No | - | Minimum gap between condition triggers. Prevents rapid re-triggering. Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). Examples: '7d', '12h'. |
| sustain | Optional[str] | No | - | Condition must be continuously true for this duration before triggering. Prevents spurious triggers from momentary spikes. Format: number followed by unit — s (seconds), m (minutes), h (hours), or d (days). Examples: '24h', '30m'. |
| transition | str | No | instant |
How value is applied: 'instant' (default, jump to value) or 'ramp' (linear interpolation over duration). |
ChaosConfig¶
Used in: SimulationConfig
Chaos engineering parameters for realistic data imperfections.
Example:
chaos:
outlier_rate: 0.01
outlier_factor: 3.0
duplicate_rate: 0.005
downtime_events:
- entity: pump_01
start_time: "2026-01-01T10:00:00Z"
end_time: "2026-01-01T12:00:00Z"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| outlier_rate | float | No | 0.0 |
Probability of outlier values |
| outlier_factor | float | No | 3.0 |
Multiplier for outlier values |
| duplicate_rate | float | No | 0.0 |
Probability of duplicating rows |
| downtime_events | List[DowntimeEvent] | No | PydanticUndefined |
Time periods with no data generation |
DowntimeEvent¶
Used in: ChaosConfig
Downtime period where no data is generated.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| entity | Optional[str] | No | - | Entity affected (None = all) |
| start_time | str | Yes | - | Downtime start timestamp (ISO8601) |
| end_time | str | Yes | - | Downtime end timestamp (ISO8601) |
RangeGeneratorConfig¶
Used in: ColumnGeneratorConfig
Range generator for numeric values.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['range'] | Yes | - | - |
| min | float | Yes | - | Minimum value |
| max | float | Yes | - | Maximum value |
| distribution | DistributionType | No | DistributionType.UNIFORM |
Distribution type: uniform or normal |
| mean | Optional[float] | No | - | Mean for normal distribution (defaults to midpoint) |
| std_dev | Optional[float] | No | - | Standard deviation for normal (defaults to range/6) |
RandomWalkGeneratorConfig¶
Used in: ColumnGeneratorConfig
Random walk generator for realistic time-series data.
Produces values where each row depends on the previous row's value, creating smooth, realistic process data instead of independent random values.
Uses an Ornstein-Uhlenbeck process with optional trend for realistic simulation of controlled process variables (temperatures, pressures, flow rates).
Example (static setpoint):
type: random_walk
start: 350.0
min: 300.0
max: 400.0
volatility: 0.5
mean_reversion: 0.1
trend: 0.001
precision: 1
shock_rate: 0.02
shock_magnitude: 30.0
shock_bias: 1.0
Example (dynamic setpoint - temperature tracking ambient):
- name: ambient_temp_c
generator:
type: random_walk
start: 25.0
min: 15.0
max: 35.0
volatility: 0.3
mean_reversion: 0.05
- name: battery_temp_c
generator:
type: random_walk
start: 28.0
min: 20.0
max: 40.0
volatility: 0.4
mean_reversion: 0.1
mean_reversion_to: ambient_temp_c # Drifts toward ambient
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['random_walk'] | Yes | - | - |
| start | float | Yes | - | Initial value / setpoint |
| min | float | Yes | - | Hard lower bound (physical limit) |
| max | float | Yes | - | Hard upper bound (physical limit) |
| volatility | float | No | 1.0 |
Standard deviation of step-to-step changes. Controls noise magnitude. |
| mean_reversion | float | No | 0.0 |
Strength of pull back toward start value or mean_reversion_to column (0 = pure random walk, 1 = snap back immediately). Simulates PID-like control. |
| mean_reversion_to | Optional[str] | No | - | Column name to use as dynamic setpoint for mean reversion instead of static 'start' value. Enables realistic process simulation where the walk tracks a time-varying reference. Example: PV that drifts toward a changing SP column, or temperature following ambient. If specified, must reference a column defined earlier in dependency order. |
| trend | float | No | 0.0 |
Drift per timestep. Positive = gradual increase, negative = gradual decrease. Simulates fouling, degradation, or slow process drift. |
| precision | Optional[int] | No | - | Round values to N decimal places. None = no rounding. |
| shock_rate | float | No | 0.0 |
Probability of a sudden shock at each timestep (0.0 = never, 1.0 = every step). Simulates process upsets like valve sticks, feed disruptions, or sensor glitches. The shock perturbs the walk's internal state, so mean_reversion naturally recovers over subsequent steps — just like a real PID-controlled process. |
| shock_magnitude | float | No | 10.0 |
Maximum absolute size of a shock event. The actual shock is drawn uniformly from [0, shock_magnitude]. Use values relative to your min/max range — e.g., if range is 300-400, a magnitude of 30 means shocks up to 30% of range. |
| shock_bias | float | No | 0.0 |
Directional tendency for shocks. +1.0 = shocks always go UP (e.g., exothermic runaway, pressure buildup). -1.0 = shocks always go DOWN (e.g., pump cavitation, flow drop). 0.0 = shocks go either direction with equal probability. Values between give partial bias (e.g., 0.7 = mostly upward). |
DailyProfileGeneratorConfig¶
Used in: ColumnGeneratorConfig
Daily profile generator for time-of-day patterns.
Produces values that follow a repeating daily curve defined by anchor points. The engine interpolates between anchor points, adds noise, and clamps to [min, max]. Ideal for simulating occupancy, energy demand, traffic, call volume, or any metric with a predictable intraday shape.
Example (building occupancy):
type: daily_profile
min: 0
max: 25
precision: 0
noise: 1.5
interpolation: linear
profile:
"00:00": 1
"06:00": 3
"08:00": 19
"12:00": 15
"13:00": 22
"17:00": 14
"22:00": 2
Example (network traffic with weekend scaling):
type: daily_profile
min: 0.0
max: 1000.0
noise: 50.0
interpolation: linear
weekend_scale: 0.3
profile:
"00:00": 50.0
"06:00": 100.0
"09:00": 800.0
"12:00": 650.0
"13:00": 750.0
"17:00": 900.0
"20:00": 400.0
"23:00": 100.0
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['daily_profile'] | Yes | - | - |
| profile | Dict[str, float] | Yes | - | Anchor points mapping time-of-day (HH:MM) to target values. The engine interpolates between these points to produce a smooth daily curve. |
| min | float | Yes | - | Hard lower bound (physical limit) |
| max | float | Yes | - | Hard upper bound (physical limit) |
| noise | float | No | 0.0 |
Random noise amplitude (±noise added to interpolated value). |
| interpolation | InterpolationType | No | InterpolationType.LINEAR |
Interpolation method between anchor points: linear or step. |
| precision | Optional[int] | No | - | Round values to N decimal places. None = no rounding. 0 = integers. |
| volatility | float | No | 0.0 |
Day-to-day variation in anchor point targets. Each day, every anchor point is independently shifted by a random amount drawn from a normal distribution (mean = profile value, std_dev = volatility). This makes each day's curve slightly different while preserving the overall shape. 0.0 = identical curve every day. Higher values = more day-to-day variation. |
| weekend_scale | Optional[float] | No | - | Scale factor for weekends (Saturday/Sunday). 0.0 = zero on weekends, 1.0 = same as weekday. None = no weekend adjustment. |
DerivedGeneratorConfig¶
Used in: ColumnGeneratorConfig
Derived column generator (calculated from other columns).
Example:
Supported operators: - Arithmetic: +, -, *, /, //, %, ** - Comparison: ==, !=, <, <=, >, >= - Logical: and, or, not - Functions: abs(), round(), min(), max()
Example (conditional):
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['derived'] | Yes | - | - |
| expression | str | Yes | - | Sandboxed Python expression referencing column names. Supports context variables (_row_index, entity_id, _timestamp), safe math functions (abs, round, min, max, coalesce, safe_div), and stateful functions (prev, ema, pid, delay). |
CategoricalGeneratorConfig¶
Used in: ColumnGeneratorConfig
Categorical generator for discrete values.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['categorical'] | Yes | - | - |
| values | List[Any] | Yes | - | List of possible values |
| weights | Optional[List[float]] | No | - | Probability weights (must sum to 1.0) |
SequentialGeneratorConfig¶
Used in: ColumnGeneratorConfig
Sequential number generator.
By default, generates globally unique IDs across all entities by offsetting each entity's sequence range. Entity 0 gets IDs [start, start + rows), entity 1 gets [start + rows, start + 2*rows), etc.
Set unique_across_entities: false for per-entity sequences (all entities
share the same ID range).
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['sequential'] | Yes | - | - |
| start | int | No | 1 |
Starting value |
| step | int | No | 1 |
Increment step |
| unique_across_entities | bool | No | True |
When true, each entity gets a non-overlapping ID range |
ConstantGeneratorConfig¶
Used in: ColumnGeneratorConfig
Constant value generator.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['constant'] | Yes | - | - |
| value | Any | Yes | - | Constant value for all rows |
BooleanGeneratorConfig¶
Used in: ColumnGeneratorConfig
Boolean generator.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['boolean'] | Yes | - | - |
| true_probability | float | No | 0.5 |
Probability of True |
TimestampGeneratorConfig¶
Used in: ColumnGeneratorConfig
Timestamp generator (uses simulation scope).
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['timestamp'] | Yes | - | - |
UUIDGeneratorConfig¶
Used in: ColumnGeneratorConfig
UUID/GUID generator.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['uuid'] | Yes | - | - |
| version | Literal[4, 5] | No | 4 |
UUID version. Options: 4 (random, default) or 5 (deterministic/namespace-based). Only these values are supported. |
| namespace | Optional[str] | No | - | Namespace seed for UUID5 generation. Arbitrary string; default 'DNS' uses the standard DNS namespace UUID. |
EmailGeneratorConfig¶
Used in: ColumnGeneratorConfig
Email address generator.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['email'] | Yes | - | - |
| domain | str | No | example.com |
Email domain |
| pattern | str | No | {entity}_{index} |
Username template. Available placeholders: {entity}, {index}, {row}. Default produces usernames like 'entity_01'. |
IPGeneratorConfig¶
Used in: ColumnGeneratorConfig
IPv4 address generator.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['ipv4'] | Yes | - | - |
| subnet | Optional[str] | No | - | CIDR subnet (e.g., '192.168.0.0/16'). If None, uses full range. |
GeoGeneratorConfig¶
Used in: ColumnGeneratorConfig
Geographic coordinate generator.
Example:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| type | Literal['geo'] | Yes | - | - |
| bbox | List[float] | Yes | - | Bounding box [min_lat, min_lon, max_lat, max_lon] |
| format | Literal['tuple', 'lat_lon_separate'] | No | tuple |
Output format: 'tuple' for (lat,lon) or 'lat_lon_separate' for separate columns |
Semantic Layer¶
Semantic Layer¶
The semantic layer provides a unified interface for defining and querying business metrics. Define metrics once, query them by name across dimensions.
Core Components: - MetricDefinition: Define aggregation expressions (SUM, COUNT, AVG) - DimensionDefinition: Define grouping attributes with hierarchies - MaterializationConfig: Pre-compute metrics at specific grain - SemanticQuery: Execute queries like "revenue BY region, month" - Project: Unified API that connects pipelines and semantic layer
Unified Project API (Recommended):
from odibi import Project
project = Project.load("odibi.yaml")
result = project.query("revenue BY region")
print(result.df)
YAML Configuration:
project: my_warehouse
engine: pandas
connections:
gold:
type: delta
path: /mnt/data/gold
# Semantic layer at project level
semantic:
metrics:
- name: revenue
expr: "SUM(total_amount)"
source: gold.fact_orders # connection.table notation
filters:
- "status = 'completed'"
dimensions:
- name: region
source: gold.dim_customer
column: region
materializations:
- name: monthly_revenue
metrics: [revenue]
dimensions: [region, month]
output: gold/agg_monthly_revenue
The source: gold.fact_orders notation resolves paths automatically from connections.
DimensionDefinition¶
Used in: SemanticLayerConfig
Definition of a semantic dimension.
A dimension represents an attribute for grouping and filtering metrics (e.g., date, product, region).
Attributes:
name: Unique dimension identifier
label: Display name for column alias in generated views. Defaults to name.
source: Source table reference. Supports three formats:
- $pipeline.node (recommended): e.g., $build_warehouse.dim_customer
- connection.path: e.g., gold.dim_customer or gold.dims/customer
- table_name: Uses default connection
column: Column name in source (defaults to name)
expr: Custom SQL expression. If provided, overrides column and grain.
Example: "YEAR(DATEADD(month, 6, Date))" for fiscal year
hierarchy: Optional ordered list of columns for drill-down
description: Human-readable description
grain: Time grain transformation (day, week, month, quarter, year).
Ignored if expr is provided.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| name | str | Yes | - | Unique dimension identifier |
| label | Optional[str] | No | - | Display name for column alias (defaults to name) |
| source | Optional[str] | No | - | Source table reference. Formats: $pipeline.node (e.g., $build_warehouse.dim_customer), connection.path (e.g., gold.dim_customer or gold.dims/customer), or bare table_name |
| column | Optional[str] | No | - | Column name (defaults to name) |
| expr | Optional[str] | No | - | Custom SQL expression. Overrides column and grain. Example: YEAR(DATEADD(month, 6, Date)) for fiscal year |
| hierarchy | List[str] | No | PydanticUndefined |
Drill-down hierarchy |
| description | Optional[str] | No | - | Human-readable description |
| grain | Optional[TimeGrain] | No | - | Time grain transformation |
MaterializationConfig¶
Used in: SemanticLayerConfig
Configuration for materializing metrics to a table.
Materialization pre-computes aggregated metrics at a specific grain and persists them for faster querying.
Attributes: name: Unique materialization identifier metrics: List of metric names to include dimensions: List of dimension names (determines grain) output: Output table path schedule: Optional cron schedule for refresh incremental: Configuration for incremental refresh
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| name | str | Yes | - | Unique materialization identifier |
| metrics | List[str] | Yes | - | Metrics to materialize |
| dimensions | List[str] | Yes | - | Dimensions for grouping |
| output | str | Yes | - | Output table path |
| schedule | Optional[str] | No | - | Cron schedule |
| incremental | Optional[Dict[str, Any]] | No | - | Incremental refresh config |
MetricDefinition¶
Used in: SemanticLayerConfig
Definition of a semantic metric.
A metric represents a measurable value that can be aggregated across dimensions (e.g., revenue, order_count, avg_order_value).
Attributes:
name: Unique metric identifier
label: Display name for column alias in generated views. Defaults to name.
description: Human-readable description
expr: SQL aggregation expression (e.g., "SUM(total_amount)").
Optional for derived metrics.
source: Source table reference. Supports three formats:
- $pipeline.node (recommended): e.g., $build_warehouse.fact_orders
- connection.path: e.g., gold.fact_orders or gold.oee/plant_a/metrics
- table_name: Uses default connection
filters: Optional WHERE conditions to apply
type: "simple" (direct aggregation) or "derived" (references other metrics)
components: List of component metric names (required for derived metrics).
These metrics must be additive (e.g., SUM-based) for correct
recalculation at different grains.
formula: Calculation formula using component names (required for derived).
Example: "(total_revenue - total_cost) / total_revenue"
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| name | str | Yes | - | Unique metric identifier |
| label | Optional[str] | No | - | Display name for column alias (defaults to name) |
| description | Optional[str] | No | - | Human-readable description |
| expr | Optional[str] | No | - | SQL aggregation expression |
| source | Optional[str] | No | - | Source table reference. Formats: $pipeline.node (e.g., $build_warehouse.fact_orders), connection.path (e.g., gold.fact_orders or gold.oee/plant_a/table), or bare table_name |
| filters | List[str] | No | PydanticUndefined |
WHERE conditions |
| type | MetricType | No | MetricType.SIMPLE |
Metric type |
| components | Optional[List[str]] | No | - | Component metric names for derived metrics |
| formula | Optional[str] | No | - | Calculation formula using component names |
SemanticLayerConfig¶
Complete semantic layer configuration.
Contains all metrics, dimensions, materializations, and views for a semantic layer deployment.
Attributes: metrics: List of metric definitions dimensions: List of dimension definitions materializations: List of materialization configurations views: List of view configurations
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| metrics | List[MetricDefinition] | No | PydanticUndefined |
Metric definitions |
| dimensions | List[DimensionDefinition] | No | PydanticUndefined |
Dimension definitions |
| materializations | List[MaterializationConfig] | No | PydanticUndefined |
Materialization configs |
| views | List[ViewConfig] | No | PydanticUndefined |
View configurations |
FK Validation¶
FK Validation¶
Declare and validate referential integrity between fact and dimension tables.
Features: - Declare relationships in YAML - Validate FK constraints on fact load - Detect orphan records - Generate lineage from relationships
Example:
relationships:
- name: orders_to_customers
fact: fact_orders
dimension: dim_customer
fact_key: customer_sk
dimension_key: customer_sk
on_violation: error
RelationshipConfig¶
Used in: RelationshipRegistry
Configuration for a foreign key relationship.
Attributes: name: Unique relationship identifier fact: Fact table name dimension: Dimension table name fact_key: Foreign key column in fact table dimension_key: Primary/surrogate key column in dimension nullable: Whether nulls are allowed in fact_key on_violation: Action on violation ("warn", "error", "quarantine")
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| name | str | Yes | - | Unique relationship identifier |
| fact | str | Yes | - | Fact table name |
| dimension | str | Yes | - | Dimension table name |
| fact_key | str | Yes | - | FK column in fact table |
| dimension_key | str | Yes | - | PK/SK column in dimension |
| nullable | bool | No | False |
Allow nulls in fact_key |
| on_violation | str | No | error |
Action on violation |
RelationshipRegistry¶
Registry of all declared relationships.
Attributes: relationships: List of relationship configurations
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| relationships | List[RelationshipConfig] | No | PydanticUndefined |
Relationship definitions |
Data Patterns¶
Data Patterns¶
Declarative patterns for common data warehouse building blocks. Patterns encapsulate best practices for dimensional modeling, ensuring consistent implementation across your data warehouse.
DimensionPattern¶
Build complete dimension tables with surrogate keys and SCD (Slowly Changing Dimension) support.
When to Use: - Building dimension tables from source systems (customers, products, locations) - Need surrogate keys for star schema joins - Need to track historical changes (SCD Type 2)
Beginner Note: Dimensions are the "who, what, where, when" of your data warehouse. A customer dimension has customer_id (natural key) and customer_sk (surrogate key). Fact tables join to dimensions via surrogate keys.
See Also: FactPattern, DateDimensionPattern
Features: - Auto-generate integer surrogate keys (MAX(existing) + ROW_NUMBER) - SCD Type 0 (static), 1 (overwrite), 2 (history tracking) - Optional unknown member row (SK=0) for orphan FK handling - Audit columns (load_timestamp, source_system)
Params:
| Parameter | Type | Required | Description |
|---|---|---|---|
natural_key |
str | Yes | Natural/business key column name |
surrogate_key |
str | Yes | Surrogate key column name to generate |
scd_type |
int | No | 0=static, 1=overwrite, 2=history (default: 1) |
track_cols |
list | SCD1/2 | Columns to track for change detection |
target |
str | SCD2 | Target table path to read existing history |
unknown_member |
bool | No | Insert row with SK=0 for orphan handling |
audit.load_timestamp |
bool | No | Add load_timestamp column |
audit.source_system |
str | No | Add source_system column with value |
Supported Target Formats: - Spark: catalog.table, Delta paths, .parquet, .csv, .json, .orc - Pandas: .parquet, .csv, .json, .xlsx, .feather, .pickle
Example:
pattern:
type: dimension
params:
natural_key: customer_id
surrogate_key: customer_sk
scd_type: 2
track_cols: [name, email, address, city]
target: warehouse.dim_customer
unknown_member: true
audit:
load_timestamp: true
source_system: "crm"
DateDimensionPattern¶
Generate a complete date dimension table with pre-calculated attributes for BI/reporting.
When to Use: - Every data warehouse needs a date dimension for time-based analytics - Enable date filtering, grouping by week/month/quarter, fiscal year reporting
Beginner Note: The date dimension is foundational for any BI/reporting system. It lets you query "sales by month" or "orders in fiscal Q2" without complex date calculations.
See Also: DimensionPattern
Features: - Generates all dates in a range with rich attributes - Calendar and fiscal year support - ISO week numbering - Weekend/month-end flags
Params:
| Parameter | Type | Required | Description |
|---|---|---|---|
start_date |
str | Yes | Start date (YYYY-MM-DD) |
end_date |
str | Yes | End date (YYYY-MM-DD) |
date_key_format |
str | No | Format for date_sk (default: yyyyMMdd) |
fiscal_year_start_month |
int | No | Month fiscal year starts (1-12, default: 1) |
unknown_member |
bool | No | Add unknown date row with date_sk=0 |
Generated Columns:
date_sk, full_date, day_of_week, day_of_week_num, day_of_month,
day_of_year, is_weekend, week_of_year, month, month_name, quarter,
quarter_name, year, fiscal_year, fiscal_quarter, is_month_start,
is_month_end, is_year_start, is_year_end
Example:
pattern:
type: date_dimension
params:
start_date: "2020-01-01"
end_date: "2030-12-31"
fiscal_year_start_month: 7
unknown_member: true
FactPattern¶
Build fact tables with automatic surrogate key lookups from dimensions.
When to Use: - Building fact tables from transactional data (orders, events, transactions) - Need to look up surrogate keys from dimension tables - Need to handle orphan records (missing dimension matches)
Beginner Note: Facts are the "how much, how many" of your data warehouse. An orders fact has measures (quantity, revenue) and dimension keys (customer_sk, product_sk). The pattern automatically looks up SKs from dimensions.
See Also: DimensionPattern, QuarantineConfig
Features: - Automatic SK lookups from dimension tables (with SCD2 current-record filtering) - Orphan handling: unknown (SK=0), reject (error), quarantine (route to table) - Grain validation (detect duplicates) - Calculated measures and column renaming - Audit columns
Params:
| Parameter | Type | Required | Description |
|---|---|---|---|
grain |
list | No | Columns defining uniqueness (validates no duplicates) |
dimensions |
list | No | Dimension lookup configurations (see below) |
orphan_handling |
str | No | "unknown" | "reject" | "quarantine" (default: unknown) |
quarantine |
dict | quarantine | Quarantine config (see below) |
measures |
list | No | Measure definitions (passthrough, rename, or calculated) |
deduplicate |
bool | No | Remove duplicates before processing |
keys |
list | dedupe | Keys for deduplication |
audit.load_timestamp |
bool | No | Add load_timestamp column |
audit.source_system |
str | No | Add source_system column |
Dimension Lookup Config:
dimensions:
- source_column: customer_id # Column in source fact
dimension_table: dim_customer # Dimension in context
dimension_key: customer_id # Natural key in dimension
surrogate_key: customer_sk # SK to retrieve
scd2: true # Filter is_current=true
Quarantine Config (for orphan_handling: quarantine):
quarantine:
connection: silver # Required: connection name
path: fact_orders_orphans # OR table: quarantine_table
add_columns:
_rejection_reason: true # Add rejection reason
_rejected_at: true # Add rejection timestamp
_source_dimension: true # Add dimension name
Example:
pattern:
type: fact
params:
grain: [order_id]
dimensions:
- source_column: customer_id
dimension_table: dim_customer
dimension_key: customer_id
surrogate_key: customer_sk
scd2: true
- source_column: product_id
dimension_table: dim_product
dimension_key: product_id
surrogate_key: product_sk
orphan_handling: unknown
measures:
- quantity
- revenue: "quantity * unit_price"
audit:
load_timestamp: true
source_system: "pos"
AggregationPattern¶
Declarative aggregation with GROUP BY and optional incremental merge.
When to Use: - Building summary/aggregate tables (daily sales, monthly metrics) - Need incremental aggregation (update existing aggregates) - Gold layer reporting tables
Beginner Note: Aggregations summarize facts at a higher grain. Example: daily_sales aggregates orders by date with SUM(revenue).
See Also: FactPattern
Features: - Declare grain (GROUP BY columns) - Define measures with SQL aggregation expressions - Optional HAVING filter - Audit columns
Params:
| Parameter | Type | Required | Description |
|---|---|---|---|
grain |
list | Yes | Columns to GROUP BY (defines uniqueness) |
measures |
list | Yes | Measure definitions with name and expr |
having |
str | No | HAVING clause for filtering aggregates |
incremental.timestamp_column |
str | No | Column to identify new data |
incremental.merge_strategy |
str | No | "replace", "sum", "min", or "max" |
audit.load_timestamp |
bool | No | Add load_timestamp column |
audit.source_system |
str | No | Add source_system column |
Example:
pattern:
type: aggregation
params:
grain: [date_sk, product_sk, region]
measures:
- name: total_revenue
expr: "SUM(total_amount)"
- name: order_count
expr: "COUNT(*)"
- name: avg_order_value
expr: "AVG(total_amount)"
having: "COUNT(*) > 0"
audit:
load_timestamp: true
AuditConfig¶
Configuration for audit columns.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| load_timestamp | bool | No | True |
Add load_timestamp column |
| source_system | Optional[str] | No | - | Source system name for source_system column |