Configuration API¶
odibi.config
¶
Configuration models for ODIBI framework.
ConnectionConfig = Union[LocalConnectionConfig, AzureBlobConnectionConfig, DeltaConnectionConfig, SQLServerConnectionConfig, HttpConnectionConfig, CustomConnectionConfig]
module-attribute
¶
EngineType
¶
ConnectionType
¶
WriteMode
¶
Bases: str, Enum
Write modes for output operations.
Values:
* overwrite - Replace all existing data. Use for full refresh, dimensions.
* append - Add rows without checking for duplicates. Use for true append-only logs.
* upsert - Update existing rows by key, insert new. Use for Silver/Gold with updates.
* append_once - Insert only rows where keys don't exist (idempotent). Recommended for Bronze ingestion. Requires keys in write options. Safe to retry/rerun without creating duplicates.
* merge - SQL Server MERGE via staging table + T-SQL MERGE statement.
Choosing the right mode:
| Mode | Existing Keys | New Keys | Use Case |
|---|---|---|---|
| overwrite | Deleted | Inserted | Full refresh, dimensions |
| append | Duplicated | Inserted | True append-only logs |
| upsert | Updated | Inserted | Silver/Gold with updates |
| append_once | Skipped | Inserted | Idempotent Bronze ingestion |
| merge | Updated | Inserted | SQL Server targets |
Source code in odibi/config.py
AlertConfig
¶
Bases: BaseModel
Configuration for alerts with throttling support.
Supports Slack, Teams, and generic webhooks with event-specific payloads.
Available Events:
- on_start - Pipeline started
- on_success - Pipeline completed successfully
- on_failure - Pipeline failed
- on_quarantine - Rows were quarantined
- on_gate_block - Quality gate blocked the pipeline
- on_threshold_breach - A threshold was exceeded
Example:
alerts:
- type: slack
url: "${SLACK_WEBHOOK_URL}"
on_events:
- on_failure
- on_quarantine
- on_gate_block
metadata:
throttle_minutes: 15
max_per_hour: 10
channel: "#data-alerts"
Source code in odibi/config.py
TransformConfig
¶
Bases: BaseModel
Configuration for transformation steps within a node.
When to Use: Custom business logic, data cleaning, SQL transformations.
Key Concepts:
- steps: Ordered list of operations (SQL, functions, or both)
- Each step receives the DataFrame from the previous step
- Steps execute in order: step1 → step2 → step3
See Also: Transformer Catalog
Transformer vs Transform:
- transformer: Single heavy operation (scd2, merge, deduplicate)
- transform.steps: Chain of lighter operations
🔧 "Transformation Pipeline" Guide¶
Business Problem: "I have complex logic that mixes SQL for speed and Python for complex calculations."
The Solution: Chain multiple steps together. Output of Step 1 becomes input of Step 2.
Function Registry:
The function step type looks up functions registered with @transform (or @register).
This allows you to use the same registered functions as both top-level Transformers and steps in a chain.
Recipe: The Mix-and-Match
transform:
steps:
# Step 1: SQL Filter (Fast)
- sql: "SELECT * FROM df WHERE status = 'ACTIVE'"
# Step 2: Custom Python Function (Complex Logic)
# Looks up 'calculate_lifetime_value' in the registry
- function: "calculate_lifetime_value"
params: { discount_rate: 0.05 }
# Step 3: Built-in Operation (Standard)
- operation: "drop_duplicates"
params: { subset: ["user_id"] }
Source code in odibi/config.py
ValidationConfig
¶
Bases: BaseModel
Configuration for data validation (post-transform checks).
When to Use: Output data quality checks that run after transformation but before writing.
See Also: Validation Guide, Quarantine Guide, Contracts Overview (pre-transform checks)
🛡️ "The Indestructible Pipeline" Pattern¶
Business Problem: "Bad data polluted our Gold reports, causing executives to make wrong decisions. We need to stop it before it lands."
The Solution: A Quality Gate that runs after transformation but before writing.
Recipe: The Quality Gate
validation:
mode: "fail" # fail (stop pipeline) or warn (log only)
on_fail: "alert" # alert or ignore
tests:
# 1. Completeness
- type: "not_null"
columns: ["transaction_id", "customer_id"]
# 2. Integrity
- type: "unique"
columns: ["transaction_id"]
- type: "accepted_values"
column: "status"
values: ["PENDING", "COMPLETED", "FAILED"]
# 3. Ranges & Patterns
- type: "range"
column: "age"
min: 18
max: 120
- type: "regex_match"
column: "email"
pattern: "^[\w\.-]+@[\w\.-]+\.\w+$"
# 4. Business Logic (SQL)
- type: "custom_sql"
name: "dates_ordered"
condition: "created_at <= completed_at"
threshold: 0.01 # Allow 1% failure
Recipe: Quarantine + Gate
validation:
tests:
- type: not_null
columns: [customer_id]
on_fail: quarantine
quarantine:
connection: silver
path: customers_quarantine
gate:
require_pass_rate: 0.95
on_fail: abort
Source code in odibi/config.py
3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430 3431 3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 | |
validate_quarantine_config()
¶
Warn if quarantine config exists but no tests use on_fail: quarantine.
Source code in odibi/config.py
PipelineConfig
¶
Bases: BaseModel
Configuration for a pipeline.
Example:
pipelines:
- pipeline: "user_onboarding"
description: "Ingest and process new users"
layer: "silver"
owner: "data-team@example.com"
freshness_sla: "6h"
nodes:
- name: "node1"
...
Source code in odibi/config.py
4853 4854 4855 4856 4857 4858 4859 4860 4861 4862 4863 4864 4865 4866 4867 4868 4869 4870 4871 4872 4873 4874 4875 4876 4877 4878 4879 4880 4881 4882 4883 4884 4885 4886 4887 4888 4889 4890 4891 4892 4893 4894 4895 4896 4897 4898 4899 4900 4901 4902 4903 4904 4905 4906 4907 4908 4909 4910 4911 4912 4913 4914 4915 4916 4917 4918 4919 4920 4921 4922 4923 4924 4925 4926 4927 4928 4929 4930 4931 4932 4933 4934 4935 4936 4937 4938 4939 4940 4941 4942 4943 4944 4945 4946 4947 4948 4949 4950 4951 4952 4953 4954 4955 4956 | |
auto_populate_depends_on_from_inputs()
¶
Auto-populate depends_on for same-pipeline references in inputs.
If a node has inputs like $silver.other_node and this is the silver pipeline, automatically add 'other_node' to depends_on for correct execution order.
Source code in odibi/config.py
check_unique_node_names(nodes)
classmethod
¶
Ensure all node names are unique within the pipeline.
Source code in odibi/config.py
validate_pipeline_name_format(v)
classmethod
¶
Ensure pipeline names are valid identifiers (alphanumeric + underscore).
Source code in odibi/config.py
StoryConfig
¶
Bases: BaseModel
Story generation configuration.
Stories are ODIBI's core value - execution reports with lineage. They must use a connection for consistent, traceable output.
Example:
story:
connection: "local_data"
path: "stories/"
retention_days: 30
failure_sample_size: 100
max_failure_samples: 500
max_sampled_validations: 5
Failure Sample Settings:
- failure_sample_size: Number of failed rows to capture per validation (default: 100)
- max_failure_samples: Total failed rows across all validations (default: 500)
- max_sampled_validations: After this many validations, show only counts (default: 5)
Source code in odibi/config.py
5089 5090 5091 5092 5093 5094 5095 5096 5097 5098 5099 5100 5101 5102 5103 5104 5105 5106 5107 5108 5109 5110 5111 5112 5113 5114 5115 5116 5117 5118 5119 5120 5121 5122 5123 5124 5125 5126 5127 5128 5129 5130 5131 5132 5133 5134 5135 5136 5137 5138 5139 5140 5141 5142 5143 5144 5145 5146 5147 5148 5149 5150 5151 5152 5153 5154 5155 5156 5157 5158 5159 5160 5161 5162 5163 5164 5165 5166 5167 5168 5169 5170 5171 5172 5173 5174 5175 5176 5177 5178 5179 5180 5181 5182 5183 5184 5185 5186 5187 5188 5189 | |