Odibi Cookbook: Recipes for Common Patterns¶
This guide provides copy-pasteable solutions for real-world Data Engineering problems.
Recipe 1: The "Unstable API" Ingestion 🌪️¶
Problem: "My source JSON adds new fields constantly and is deeply nested. My pipeline breaks whenever the schema changes."
Solution:
Use schema_policy: { mode: "evolve" } to automatically adapt to new columns, and normalize_json to flatten the structure.
- name: "ingest_unstable_api"
read:
connection: "api_source"
format: "json"
path: "events/v1/*.json"
# 1. Handle Drift: Automatically add new columns as NULLable
schema_policy:
mode: "evolve"
on_new_columns: "add_nullable"
# 2. Flatten: Convert nested JSON into columns (e.g. payload.id -> payload_id)
transformer: "normalize_json"
params:
column: "payload"
sep: "_"
write:
connection: "silver"
format: "delta"
table: "events_flat"
Recipe 2: The "Privacy-First" Customer Table 🔒¶
Problem: "I need to ingest customer data but Hash emails and Mask credit card numbers for compliance (GDPR/CCPA)."
Solution:
Use the privacy block for global anonymization and sensitive columns for masking in stories. You can also mix methods using hash_columns transformer.
- name: "load_secure_customers"
read:
connection: "s3_raw"
format: "parquet"
path: "customers/"
# 1. Global Privacy Policy (Applies to PII columns)
privacy:
method: "hash"
salt: "${PRIVACY_SALT}" # Load from env var
# 2. Mark Columns as PII (Triggers Privacy Policy)
columns:
email:
pii: true
phone:
pii: true
# 3. Explicit Masking for Credit Cards (Transformers run before Write)
transform:
steps:
# Mask CCNs (keep last 4)
- function: "regex_replace"
params:
column: "credit_card"
pattern: ".(?=.{4})" # Regex to match all except last 4
replacement: "*"
# 4. Hide from Stories (Documentation)
sensitive: ["email", "credit_card", "phone"]
write:
connection: "silver"
format: "delta"
table: "dim_customers_anonymized"
Recipe 3: Sessionizing Clickstream Data ⏱️¶
Problem: "I have raw events. I need to group them into User Sessions (30-minute timeout) and load them incrementally."
Solution:
Combine the sessionize transformer with incremental: { mode: "stateful" } to process only new data while maintaining session logic.
- name: "clickstream_sessions"
read:
connection: "kafka_landing"
format: "json"
path: "clicks/"
# 1. Incremental Loading (Stateful)
# Tracks the last processed timestamp to only read new events
incremental:
mode: "stateful"
column: "event_time"
state_key: "clickstream_hwm"
watermark_lag: "1h" # Handle late arriving data
# 2. Session Logic (30 min timeout)
transformer: "sessionize"
params:
timestamp_col: "event_time"
user_col: "user_id"
threshold_seconds: 1800 # 30 minutes
session_col: "session_id"
write:
connection: "gold"
format: "delta"
table: "fact_sessions"
mode: "append"