Loading Patterns¶
Pre-built execution patterns for common data warehouse loading scenarios including Dimension, Fact, SCD2, Merge, Aggregation, and Date Dimension operations.
Overview¶
Odibi's pattern system provides:
- Declarative loading: Configure complex loading logic via YAML
- Engine agnostic: Works with Spark, Pandas, and Polars engines
- Built-in validation: Patterns validate required parameters before execution
- Extensible: Create custom patterns by extending the Pattern base class
Available Patterns¶
| Pattern | Description | Use Case |
|---|---|---|
dimension |
Builds dimension tables with surrogate keys and SCD support | Star schema dimensions with SK generation |
fact |
Builds fact tables with automatic surrogate key lookups | Star schema facts with dimension FK resolution |
date_dimension |
Generates a complete date dimension table | Calendar/fiscal date dimensions |
aggregation |
Declarative aggregation with time-grain rollups | Summary tables, KPI rollups |
scd2 |
Slowly Changing Dimension Type 2 | Historical tracking of dimension changes |
merge |
Upsert/merge operations | Incremental updates, CDC |
Note: For simple append or overwrite operations, use
write.mode: appendorwrite.mode: overwritedirectly—no pattern needed.
Configuration¶
Patterns are configured via the pattern: block in node configuration:
nodes:
- name: dim_customer
read:
connection: source
path: customers.csv
pattern:
type: dimension
params:
natural_key: customer_id
surrogate_key: customer_sk
scd_type: 1
track_cols: [name, email, tier]
Config Options¶
| Field | Type | Required | Description |
|---|---|---|---|
pattern.type |
string | Yes | Pattern name: dimension, fact, date_dimension, aggregation, scd2, merge |
pattern.params |
object | Yes | Pattern-specific parameters (see below) |
Pattern Parameters¶
Dimension Pattern¶
Builds complete dimension tables with auto-generated surrogate keys and SCD support (Type 0, 1, or 2). Includes optional unknown member row (SK=0) for orphan FK handling and audit columns.
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
natural_key |
string | Yes | - | Natural/business key column name |
surrogate_key |
string | Yes | - | Surrogate key column name to generate |
scd_type |
int | No | 1 |
0 = static, 1 = overwrite, 2 = history tracking |
track_cols |
list | Conditional | - | Columns to monitor for changes (required for SCD Type 1 and 2) |
target |
string | Conditional | - | Target table path (required for SCD Type 2 to read existing history) |
unknown_member |
bool | No | false |
If true, insert a row with SK=0 for orphan FK handling |
audit |
object | No | {} |
Audit config: {load_timestamp: true, source_system: "name"} |
Fact Pattern¶
Builds fact tables with automatic surrogate key lookups from dimension tables, grain validation, orphan handling, and measure calculations.
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
grain |
list | No | - | Columns that define uniqueness (validates no duplicates) |
dimensions |
list | No | [] |
Dimension lookup configurations (see below) |
orphan_handling |
string | No | unknown |
unknown (SK=0), reject (error), or quarantine |
quarantine |
object | No | - | Quarantine config (required when orphan_handling: quarantine) |
measures |
list | No | [] |
Measure definitions (passthrough column names or calculated expressions) |
audit |
object | No | {} |
Audit config: {load_timestamp: true, source_system: "name"} |
deduplicate |
bool | No | false |
If true, removes duplicates before insert |
keys |
list | Conditional | - | Keys for deduplication (required when deduplicate: true) |
Dimension Lookup Config¶
Each entry in the dimensions list requires:
| Field | Type | Required | Description |
|---|---|---|---|
source_column |
string | Yes | Column in source data to look up |
dimension_table |
string | Yes | Name of the dimension node in context |
dimension_key |
string | Yes | Natural key column in the dimension table |
surrogate_key |
string | Yes | Surrogate key column to retrieve |
scd2 |
bool | No | If true, filters is_current=true before lookup |
Orphan Handling Strategies¶
| Strategy | Description |
|---|---|
unknown |
Map orphans to surrogate key 0 (unknown member row) |
reject |
Raise an error if orphan records are found |
quarantine |
Write orphan records to a separate quarantine location |
Date Dimension Pattern¶
Generates a complete date dimension table with pre-calculated attributes useful for BI/reporting. Does not require a read: block—data is generated from the date range parameters.
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
start_date |
string | Yes | - | Start date in YYYY-MM-DD format |
end_date |
string | Yes | - | End date in YYYY-MM-DD format |
date_key_format |
string | No | yyyyMMdd |
Format for date_sk (e.g., 20240115) |
fiscal_year_start_month |
int | No | 1 |
Month when fiscal year starts (1–12) |
unknown_member |
bool | No | false |
If true, add unknown date row with date_sk=0 |
Generated Columns¶
| Column | Description |
|---|---|
date_sk |
Integer surrogate key (YYYYMMDD format) |
full_date |
The actual date |
day_of_week |
Day name (Monday, Tuesday, etc.) |
day_of_week_num |
Day number (1=Monday, 7=Sunday) |
day_of_month |
Day of month (1–31) |
day_of_year |
Day of year (1–366) |
is_weekend |
Boolean flag |
week_of_year |
ISO week number (1–53) |
month |
Month number (1–12) |
month_name |
Month name (January, February, etc.) |
quarter |
Calendar quarter (1–4) |
quarter_name |
Q1, Q2, Q3, Q4 |
year |
Calendar year |
fiscal_year |
Fiscal year (based on fiscal_year_start_month) |
fiscal_quarter |
Fiscal quarter (1–4) |
is_month_start |
First day of month |
is_month_end |
Last day of month |
is_year_start |
First day of year |
is_year_end |
Last day of year |
Aggregation Pattern¶
Declarative aggregation with SQL expressions, optional HAVING clause, and incremental merge strategies.
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
grain |
list | Yes | - | Columns to GROUP BY (defines uniqueness) |
measures |
list | Yes | - | Measure definitions with name and expr (see below) |
having |
string | No | - | Optional HAVING clause for filtering aggregates |
target |
string | No | - | Target table path (required for incremental mode) |
incremental |
object | No | - | Incremental merge config (see below) |
audit |
object | No | {} |
Audit config: {load_timestamp: true, source_system: "name"} |
Measure Definition¶
Each entry in the measures list is a dict:
| Field | Type | Required | Description |
|---|---|---|---|
name |
string | Yes | Output column name |
expr |
string | Yes | SQL aggregation expression (e.g., SUM(amount)) |
Incremental Config¶
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
timestamp_column |
string | Yes | - | Column to identify new data |
merge_strategy |
string | No | replace |
replace, sum, min, or max |
SCD2 Pattern¶
Tracks history by creating new rows for updates. When a tracked column changes, the old record is closed and a new record is inserted.
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
target |
string | Yes | - | Target table name or path containing history |
keys |
list | Yes | - | Natural keys to identify unique entities |
track_cols |
list | No | - | Columns to monitor for changes |
time_col |
string | No | - | Timestamp column for versioning (default: current time) |
valid_from_col |
string | No | valid_from |
Name of the start timestamp column |
valid_to_col |
string | No | valid_to |
Name of the end timestamp column |
is_current_col |
string | No | is_current |
Name of the current record flag column |
Merge Pattern¶
Upsert/merge logic with support for multiple strategies.
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
target |
string | Yes | - | Target table name or path |
keys |
list | Yes | - | Join keys for matching records |
strategy |
string | No | upsert |
upsert, append_only, delete_match |
audit_cols |
object | No | null |
{created_col: "...", updated_col: "..."} |
update_condition |
string | No | null |
SQL condition for update clause |
insert_condition |
string | No | null |
SQL condition for insert clause |
delete_condition |
string | No | null |
SQL condition for delete clause |
optimize_write |
bool | No | false |
Run OPTIMIZE after write (Spark only) |
zorder_by |
list | No | null |
Columns to Z-Order by |
cluster_by |
list | No | null |
Columns to Liquid Cluster by (Delta) |
Merge Strategies¶
| Strategy | Description |
|---|---|
upsert |
Update existing records, insert new ones |
append_only |
Ignore duplicates, only insert new keys |
delete_match |
Delete records in target that match keys in source |
Pattern API¶
All patterns extend the Pattern base class:
from abc import ABC, abstractmethod
from typing import Any, ClassVar, Dict, List
from odibi.config import NodeConfig
from odibi.context import EngineContext
from odibi.engine.base import Engine
class Pattern(ABC):
"""Base class for Execution Patterns."""
required_params: ClassVar[List[str]] = []
optional_params: ClassVar[List[str]] = []
param_aliases: ClassVar[Dict[str, List[str]]] = {}
def __init__(self, engine: Engine, config: NodeConfig):
self.engine = engine
self.config = config
self.params = config.params
@abstractmethod
def execute(self, context: EngineContext) -> Any:
"""Execute the pattern logic."""
pass
def validate(self) -> None:
"""Validate pattern configuration. Raises ValueError if invalid."""
pass
Creating Custom Patterns¶
- Extend the
Patternbase class - Define
required_paramsandoptional_paramsclass variables - Implement
execute()method - Optionally override
validate()for parameter validation
from odibi.patterns.base import Pattern
from odibi.context import EngineContext
class MyCustomPattern(Pattern):
required_params = ["required_param"]
optional_params = ["optional_param"]
def validate(self) -> None:
if not self.params.get("required_param"):
raise ValueError("MyCustomPattern: 'required_param' is required.")
def execute(self, context: EngineContext):
df = context.df
# Custom transformation logic
return df
Examples¶
Dimension: Customer Dimension with SCD1¶
Build a customer dimension with surrogate keys and unknown member:
project: CustomerDW
connections:
source:
type: local
base_path: ./data/source
gold:
type: local
base_path: ./data/gold
pipelines:
- pipeline: build_dimensions
nodes:
- name: dim_customer
read:
connection: source
format: csv
path: customers.csv
options:
header: true
pattern:
type: dimension
params:
natural_key: customer_id
surrogate_key: customer_sk
scd_type: 1
track_cols:
- name
- email
- tier
- city
unknown_member: true
write:
connection: gold
format: parquet
path: dim_customer.parquet
mode: overwrite
Fact: Sales Fact with Dimension Lookups¶
Build a fact table with automatic FK resolution from dimensions:
pipelines:
- pipeline: build_facts
nodes:
# Load dimensions into context for FK lookups
- name: dim_customer
read:
connection: gold
format: parquet
path: dim_customer.parquet
- name: dim_product
read:
connection: gold
format: parquet
path: dim_product.parquet
- name: dim_date
read:
connection: gold
format: parquet
path: dim_date.parquet
# Build fact table
- name: fact_sales
depends_on: [dim_customer, dim_product, dim_date]
read:
connection: source
format: csv
path: orders.csv
options:
header: true
pattern:
type: fact
params:
grain:
- order_id
- line_item_id
dimensions:
- source_column: customer_id
dimension_table: dim_customer
dimension_key: customer_id
surrogate_key: customer_sk
- source_column: product_id
dimension_table: dim_product
dimension_key: product_id
surrogate_key: product_sk
- source_column: order_date
dimension_table: dim_date
dimension_key: full_date
surrogate_key: date_sk
orphan_handling: unknown
measures:
- quantity
- amount
audit:
load_timestamp: true
source_system: "orders_csv"
write:
connection: gold
format: parquet
path: fact_sales.parquet
mode: overwrite
Date Dimension: Generated Calendar¶
Generate a full year of dates with fiscal year support:
pipelines:
- pipeline: build_dimensions
nodes:
- name: dim_date
pattern:
type: date_dimension
params:
start_date: "2025-01-01"
end_date: "2025-12-31"
fiscal_year_start_month: 7
unknown_member: true
write:
connection: gold
format: parquet
path: dim_date.parquet
mode: overwrite
Aggregation: Daily Sales Summary¶
Aggregate sales by date and product with incremental merge:
pipelines:
- pipeline: build_aggregates
nodes:
- name: agg_daily_sales
read:
connection: gold
path: fact_sales.parquet
format: parquet
pattern:
type: aggregation
params:
grain: [date_sk, product_sk]
measures:
- name: total_revenue
expr: "SUM(amount)"
- name: order_count
expr: "COUNT(*)"
- name: avg_order_value
expr: "AVG(amount)"
having: "COUNT(*) > 0"
audit:
load_timestamp: true
write:
connection: gold
format: parquet
path: agg_daily_sales.parquet
mode: overwrite
SCD2: Customer Dimension with History¶
Track customer address and tier changes over time:
project: CustomerDW
engine: spark
connections:
gold:
type: delta
path: /mnt/gold
pipelines:
- pipeline: load_customer_dim
nodes:
- name: customer_scd2
read:
connection: bronze
path: customers
pattern:
type: scd2
params:
target: "gold/customers"
keys: ["customer_id"]
track_cols: ["address", "city", "state", "tier"]
valid_from_col: "valid_from"
valid_to_col: "valid_to"
is_current_col: "is_active"
Merge: Incremental Customer Updates¶
Upsert with audit columns:
pipelines:
- pipeline: sync_customers
nodes:
- name: customers_merge
read:
connection: bronze
path: customer_updates
pattern:
type: merge
params:
target: "silver.customers"
keys: ["customer_id"]
strategy: upsert
audit_cols:
created_col: "dw_created_at"
updated_col: "dw_updated_at"
Merge: GDPR Delete Request¶
Delete records matching source keys:
pipelines:
- pipeline: gdpr_delete
nodes:
- name: delete_customers
read:
connection: compliance
path: deletion_requests
pattern:
type: merge
params:
target: "silver.customers"
keys: ["customer_id"]
strategy: delete_match
Merge: Conditional Update¶
Only update if source record is newer:
pipelines:
- pipeline: sync_products
nodes:
- name: products_merge
read:
connection: bronze
path: product_updates
pattern:
type: merge
params:
target: "silver.products"
keys: ["product_id"]
strategy: upsert
update_condition: "source.updated_at > target.updated_at"
insert_condition: "source.is_deleted = false"
Simple Append (No Pattern Needed)¶
For event/fact data, just use write mode:
pipelines:
- pipeline: load_events
nodes:
- name: events
read:
connection: bronze
path: events
write:
connection: gold
path: events
mode: append
Simple Overwrite (No Pattern Needed)¶
For full refresh of small tables:
pipelines:
- pipeline: refresh_products
nodes:
- name: products
read:
connection: bronze
path: products
write:
connection: gold
path: products
mode: overwrite
Best Practices¶
- Choose the right pattern - Use
dimensionfor star schema dimensions with surrogate keys,factfor fact tables with FK lookups,scd2for standalone history tracking,mergefor incremental CDC - Use
date_dimensionfor calendar tables - Generates 18+ pre-calculated columns including fiscal year support - Use write.mode for simple cases -
appendfor events,overwritefor full refresh - Define business keys - Ensure
keysornatural_keyuniquely identify records in your domain - Monitor tracked columns - For SCD patterns, only track columns that represent meaningful business changes
- Use audit columns - Track
load_timestampandsource_systemfor debugging and lineage - Handle orphans - Use
unknown_member: trueon dimensions andorphan_handling: unknownon facts to gracefully handle missing FK references - Optimize large tables - Use
zorder_byorcluster_byon merge patterns for frequently queried columns
Related¶
- Transformers - Built-in transformation functions
- Pipelines - Pipeline configuration
- YAML Schema Reference - Full schema documentation