The Definitive Guide to Odibi¶
Version: 3.4.3
Audience: Data Engineers, Analytics Engineers, Architects
Prerequisites: Basic Python and SQL knowledge
Goal: From "Hello World" to Enterprise Production
Table of Contents¶
- Introduction & Philosophy
- Architecture Deep Dive
- The Object Model (Python API)
- Context, State, and Dependencies
- Transformation Engineering
- The Multi-Engine Architecture: Pandas, Spark & Polars
- Declarative Configuration (YAML)
- Observability: Data Stories
- Reliability & Governance
- Production Deployment Patterns
- Comprehensive End-to-End Example
1. Introduction & Philosophy¶
Odibi is a Declarative Data Framework designed to solve the "Two Language Problem" in data engineering: 1. Local Development is often done in Python/Pandas on a laptop (fast iteration, small data). 2. Production runs on distributed clusters like Spark/Databricks (high latency, massive data).
Traditionally, this requires rewriting code or wrapping Spark in complex local docker containers. Odibi provides a unified abstraction layer that allows the exact same pipeline definition to run on your MacBook (using Pandas) and on a 100-node Databricks cluster (using Spark).
Core Principles¶
- Declarative over Imperative: Define what you want (Input -> Transform -> Output), not how to loop through files.
- Code First, Configuration Second: Learn the Python API to understand the system; use YAML for deployment.
- Engine Agnostic: Logic written for Odibi runs on Pandas (Local) and Spark (Scale) without code changes.
- Observability by Default: Every run generates a rich HTML "Data Story" with lineage, profile, and logs.
2. Architecture Deep Dive¶
Odibi is built on a "Three Layer" Architecture. Understanding these layers helps you debug and extend the framework.
graph TD
subgraph "Layer 1: Declarative (User Interface)"
A[odibi.yaml] -->|Parsed by| B(ProjectConfig)
CLI[odibi run] -->|Invokes| B
end
subgraph "Layer 2: Object Model (The Brain)"
B --> C[PipelineConfig]
C --> D[NodeConfig]
D -->|Validates| E[DependencyGraph]
E -->|Orchestrates| F[Pipeline Executor]
end
subgraph "Layer 3: Runtime Engine (The Muscle)"
F -->|Delegates to| G{Engine Interface}
G -->|Local Mode| H[Pandas Engine]
G -->|Scale Mode| I[Spark Engine]
H --> J[(Local Files / DuckDB)]
I --> K[(Delta Lake / ADLS / Databricks)]
end
- Layer 1 (YAML): Simple configuration files.
- Layer 2 (Pydantic Objects): The
odibi.configmodule. This is where validation happens. If your graph has a cycle, or you miss a parameter, Layer 2 catches it before execution starts. - Layer 3 (Engines): The
odibi.enginemodule. This adapts your abstract "Read CSV" command intopd.read_csv(...)orspark.read.csv(...).
3. The Object Model (Python API)¶
The most effective way to learn Odibi is to build a pipeline using the Python classes directly. This demystifies the YAML tags.
3.1 The Hierarchy¶
ProjectConfig: Top-level container. Holds global settings (Engine type, Retry policy, Connections).PipelineConfig: A logical grouping of tasks (e.g., "Daily Sales Batch").NodeConfig: A single step in the pipeline.- Operations:
ReadConfig,TransformConfig,WriteConfig.
3.2 Building a Complex Node Programmatically¶
Let's look at a feature-rich node configuration.
from odibi.config import NodeConfig, ReadConfig, TransformConfig, WriteConfig, ErrorStrategy
node = NodeConfig(
name="process_orders",
description="Cleans orders and calculates tax",
tags=["daily", "critical"],
# 1. Dependency Management
depends_on=["raw_orders", "ref_tax_rates"],
# 2. Transformation Logic
transform=TransformConfig(
steps=[
# Step A: Filter (SQL)
"SELECT * FROM raw_orders WHERE status != 'CANCELLED'",
# Step B: Custom Python Function
{"function": "calculate_tax", "params": {"rate_source": "ref_tax_rates"}}
]
),
# 3. Output Configuration
write=WriteConfig(
connection="silver_db",
format="delta",
table="orders_cleaned",
mode="append"
),
# 4. Reliability & Governance
on_error=ErrorStrategy.FAIL_FAST, # Stop whole pipeline if this fails
sensitive=["customer_email"], # Mask this column in logs/stories
cache=True # Cache result in memory for downstream nodes
)
3.3 Node Operations Guide¶
| Operation | Description | Key Fields |
|---|---|---|
| Read | Ingests data from a connection. | connection, format (csv, parquet, delta, sql), path, options |
| Transform | Runs a sequence of steps. | steps (List of SQL strings or {function: name, params: {}} dicts) |
| Transformer | Runs a single "App-like" transformation. | transformer (Name string), params (Dict). Used for complex logic like SCD2 or Deduplication. |
| Write | Saves the result. | connection, format, path/table, mode (overwrite, append, upsert) |
4. Context, State, and Dependencies¶
In Odibi, you don't pass variables between functions. You rely on the Dependency Graph.
4.1 The Global Context (odibi.context.Context)¶
The Pipeline Executor maintains a Global Context.
* Registry: When a node named raw_orders finishes, its result (DataFrame) is registered in the context under the key "raw_orders".
* Access: Downstream nodes request data from the context by name.
* Memory Management: In the Pandas engine, this is a dictionary of DataFrames in RAM. In Spark, these are Temp Views registered in the Spark Session.
4.2 Dependency Resolution¶
When you define depends_on=["node_A"] for node_B:
1. Graph Construction: Odibi builds a DAG (Directed Acyclic Graph).
2. Topological Sort: It determines the execution order (A -> B).
3. Execution:
* node_A runs. Output registered as "node_A".
* node_B starts. It can now execute SELECT * FROM node_A.
4.3 The EngineContext (odibi.context.EngineContext)¶
When writing custom transformations, you interact with EngineContext, not the Global Context directly. This wraps the global state and provides uniform APIs.
| Method | Description |
|---|---|
context.df |
The DataFrame resulting from the previous step in the current node. |
context.get(name) |
Retrieve a DataFrame from an upstream node (Global Context). |
context.sql(query) |
Run SQL on context.df. Returns a new Context with updated df. |
context.register_temp_view(name, df) |
Register a DF manually for complex SQL joins. |
5. Transformation Engineering¶
While SQL is great for filtering and projection, complex logic belongs in Python. Odibi uses a registry pattern.
5.1 The @transform Decorator¶
You must register functions so Odibi can find them by name from the YAML configuration.
from odibi.registry import transform
@transform
def categorize_users(context, params: dict):
"""
Categorizes users based on spend.
YAML:
function: categorize_users
params:
threshold: 1000
"""
threshold = params.get("threshold", 1000)
# 1. Get Input
df = context.df
# 2. Use Engine-Native commands (Pandas/Spark agnostic via SQL)
# Or check context.engine_type if you need specific optimization
return context.sql(f"""
SELECT
*,
CASE WHEN total_spend > {threshold} THEN 'VIP' ELSE 'Regular' END as category
FROM df
""").df
5.2 Transformer vs. Transform Steps¶
There are two ways to define logic in a node:
-
Top-Level Transformer (
transformer: "name"):- The node acts as a specific "App" (e.g.,
deduplicate,scd2). - It usually takes the dependencies as input and produces one output.
- Use this for heavy, reusable logic (Slowly Changing Dimensions, Merges).
- The node acts as a specific "App" (e.g.,
-
Transform Steps (
transform: { steps: [...] }):- A "Script" of sequential operations.
- Mixes SQL and lightweight Python functions.
- Use this for business logic specific to that pipeline (Filter -> Join -> Map -> Reduce).
6. The Multi-Engine Architecture: Pandas, Spark & Polars¶
Odibi abstracts the engine, but knowing how they differ is crucial for performance.
6.1 Comparison¶
| Feature | Pandas Engine | Spark Engine | Polars Engine |
|---|---|---|---|
| Compute | Single Node (CPU/RAM bound) | Distributed (Cluster) | Single Node (Multi-threaded) |
| SQL | DuckDB / PandasQL | Spark SQL (Catalyst) | DuckDB |
| IO | Local FS, S3/Blob (via fsspec) | HDFS, S3, ADLS (Native) | Local FS, S3/Blob |
| Setup | pip install odibi |
pip install odibi[spark] |
pip install odibi[polars] |
| Best For | Dev, Testing, Small Data (<10GB) | Prod, Big Data (TB/PB) | High-performance single-node |
6.2 Spark-Specific Features¶
The Spark engine enables advanced Data Lakehouse features via odibi.engine.spark_engine.SparkEngine.
Delta Lake Integration¶
Odibi treats Delta Lake as a first-class citizen.
-
Time Travel:
-
Upserts (MERGE):
-
Optimization (Write Options):
7. Declarative Configuration (YAML)¶
The YAML configuration is the deployment artifact. It maps 1:1 to the Pydantic models.
7.1 Project Structure¶
Recommended folder structure for an Odibi project:
my_project/
├── odibi.yaml # Main entry point
├── transforms.py # Custom python logic (@transform)
├── data/ # Local data (for dev)
├── stories/ # Generated reports
└── .env # Secrets (API keys)
7.2 Advanced YAML Features¶
Environment Variables¶
You can inject secrets or environment-specific paths using ${VAR_NAME}.
connections:
snowflake:
type: "sql_server"
host: "${DB_HOST}"
password: "${DB_PASSWORD}" # Redacted in logs automatically
YAML Anchors & Aliases¶
Reuse configuration blocks to keep YAML DRY (Don't Repeat Yourself).
# Define a template
.default_write: &default_write
connection: "datalake"
format: "delta"
mode: "overwrite"
nodes:
- name: "customers"
write:
<<: *default_write
table: "dim_customers"
- name: "orders"
write:
<<: *default_write
table: "fact_orders"
8. Observability: Data Stories¶
Running a pipeline blindly is dangerous. Odibi generates Data Stories.
8.1 What is a Story?¶
A Story is an HTML file generated at the end of a run. It answers: 1. Lineage: "Where did this data come from?" (Visual Graph) 2. Profile: "How many rows? How many nulls?" (Schema & Stats) 3. Sample: "What does the data look like?" (Preview rows) 4. Logic: "What code actually ran?" (SQL/Python snippet)
8.2 Configuration¶
Enable story generation in ProjectConfig.
story:
connection: "local_data" # Where to save the HTML
path: "stories/"
max_sample_rows: 20 # Number of rows to preview
retention_days: 30 # Auto-cleanup old reports
8.3 OpenLineage¶
Odibi supports the OpenLineage standard for integration with tools like Marquez or Atlan.
9. Reliability & Governance¶
9.1 Retries & Backoff¶
Network blips happen. Configure retries globally or per node.
# Global setting in project config
retry:
enabled: true
max_attempts: 3
backoff: "exponential" # linear, constant
9.2 Alerting¶
Send notifications when pipelines fail or succeed.
alerts:
- type: "slack"
url: "${SLACK_WEBHOOK}"
on_events: ["on_failure"] # on_start, on_success
metadata:
env: "production"
9.3 PII Protection¶
Prevent sensitive data from leaking into logs or Data Stories.
nodes:
- name: "load_users"
# Masks columns in the 'Sample Data' section of the Story
sensitive: ["email", "ssn", "phone_number"]
10. Production Deployment Patterns¶
10.1 The "Hybrid" Pattern¶
- Develop Locally: Use
engine: pandasand local CSVs. Iterate fast. - Deploy to Cloud:
- Switch
engine: spark. - Change connections to ADLS/S3.
- Use
odibi runvia a job scheduler (Airflow, Databricks Workflows).
- Switch
10.2 CI/CD¶
Since Odibi logic is Python code and YAML config:
1. Linting: Run black and ruff on your transforms.py.
2. Validation: Run odibi validate odibi.yaml to validate the graph structure before deployment.
3. Testing: Use the Python API to run single nodes with mock data (Unit Tests).
11. Comprehensive End-to-End Example¶
This script demonstrates a complete workflow: 1. Mocking data generation. 2. Defining custom logic. 3. Configuring a multi-stage pipeline (Bronze -> Silver -> Gold). 4. Executing with error handling.
import pandas as pd
import os
import logging
from odibi.config import (
PipelineConfig, NodeConfig, ReadConfig, TransformConfig, WriteConfig,
RetryConfig, ErrorStrategy
)
from odibi.pipeline import Pipeline
from odibi.registry import transform
from odibi.connections import LocalConnection
# ==========================================
# 0. Setup Environment
# ==========================================
# Create dummy data for the example
os.makedirs("data/landing", exist_ok=True)
os.makedirs("data/silver", exist_ok=True)
os.makedirs("data/gold", exist_ok=True)
# Generate Raw JSON Data (Simulating an API dump)
pd.DataFrame([
{"id": 1, "user": "Alice", "tx_amount": 150.0, "ts": "2023-10-01T10:00:00"},
{"id": 2, "user": "Bob", "tx_amount": 20.0, "ts": "2023-10-01T10:05:00"},
{"id": 3, "user": "Alice", "tx_amount": -50.0, "ts": "2023-10-01T10:10:00"}, # Invalid
{"id": 4, "user": "Eve", "tx_amount": 1000.0, "ts": "2023-10-01T10:15:00"},
]).to_json("data/landing/transactions.json", orient="records")
# ==========================================
# 1. Custom Logic (Business Rules)
# ==========================================
@transform
def anomaly_detection(context, params):
"""
Flags transactions that are 3 std devs above the mean.
"""
df = context.df
# Using SQL for statistical window function
# This works in DuckDB (Pandas) and Spark SQL
query = """
SELECT
*,
AVG(tx_amount) OVER () as mean_amount,
STDDEV(tx_amount) OVER () as std_amount,
CASE
WHEN tx_amount > (AVG(tx_amount) OVER () + 3 * STDDEV(tx_amount) OVER ())
THEN true
ELSE false
END as is_anomaly
FROM df
"""
return context.sql(query).df
# ==========================================
# 2. Pipeline Definition
# ==========================================
pipeline_conf = PipelineConfig(
pipeline="fraud_detection_batch",
description="Ingests transactions and flags anomalies",
nodes=[
# --- Bronze Layer: Ingest Raw ---
NodeConfig(
name="bronze_tx",
read=ReadConfig(
connection="landing",
format="json",
path="transactions.json"
),
# Keep raw data safe, minimal transform
write=WriteConfig(
connection="silver",
format="parquet",
path="bronze_tx.parquet",
mode="overwrite"
)
),
# --- Silver Layer: Clean & Enrich ---
NodeConfig(
name="silver_tx",
depends_on=["bronze_tx"],
transform=TransformConfig(
steps=[
# 1. Filter invalid amounts
"SELECT * FROM bronze_tx WHERE tx_amount > 0",
# 2. Run Anomaly Detection
{"function": "anomaly_detection", "params": {}}
]
),
write=WriteConfig(
connection="silver",
format="parquet",
path="silver_tx.parquet",
mode="overwrite"
)
),
# --- Gold Layer: Business Aggregates ---
NodeConfig(
name="gold_user_summary",
depends_on=["silver_tx"],
transform=TransformConfig(
steps=[
"""
SELECT
user,
COUNT(*) as tx_count,
SUM(tx_amount) as total_volume,
SUM(CASE WHEN is_anomaly THEN 1 ELSE 0 END) as suspicious_tx_count
FROM silver_tx
GROUP BY user
"""
]
),
write=WriteConfig(
connection="gold",
format="csv",
path="user_risk_report.csv",
mode="overwrite"
),
# Fail fast if report generation breaks
on_error=ErrorStrategy.FAIL_FAST
)
]
)
# ==========================================
# 3. Execution Wrapper
# ==========================================
def run_pipeline():
print("🚀 Starting Fraud Detection Pipeline...")
# Connections Definition
connections = {
"landing": LocalConnection(base_path="./data/landing"),
"silver": LocalConnection(base_path="./data/silver"),
"gold": LocalConnection(base_path="./data/gold")
}
# Initialize Pipeline
pipeline = Pipeline(
pipeline_conf,
connections=connections,
generate_story=True,
story_config={
"output_path": "data/stories",
"max_sample_rows": 50
},
retry_config=RetryConfig(enabled=True, max_attempts=2)
)
# Run
results = pipeline.run()
# Logging Results
print(f"\n⏱️ Duration: {results.duration:.2f}s")
if results.failed:
print(f"❌ Failed Nodes: {results.failed}")
# Inspect specific error
for node in results.failed:
res = results.get_node_result(node)
print(f" Reason ({node}): {res.error}")
else:
print(f"✅ Success! Report generated at: {results.story_path}")
# Verify output
print("\n--- Risk Report ---")
df = pd.read_csv("data/gold/user_risk_report.csv")
print(df.to_string(index=False))
if __name__ == "__main__":
run_pipeline()
This guide provides a solid foundation for using Odibi. Start with the Python API to understand the mechanics, and transition to YAML for production operations.