Skip to content

Materializing Metrics Tutorial

In this tutorial, you'll learn how to pre-compute and persist metrics using the Materializer class. Materialization creates pre-aggregated tables for fast dashboard performance.

What You'll Learn: - Why materialize metrics - Defining materializations - Executing materializations - Scheduling with cron - Incremental strategies (replace vs sum)


Unified Project API Note

When using the unified Project API, materializations are defined in the semantic section of your odibi.yaml. Sources can reference pipeline nodes directly:

# odibi.yaml
project: retail_warehouse
engine: pandas

connections:
  gold:
    type: local
    base_path: /mnt/data/gold

pipelines:
  - pipeline: build_warehouse
    nodes:
      - name: fact_orders
        write: { connection: gold, table: fact_orders }
      - name: dim_customer
        write: { connection: gold, table: dim_customer }
      - name: dim_date
        write: { connection: gold, table: dim_date }

semantic:
  metrics:
    - name: revenue
      expr: "SUM(line_total)"
      source: $build_warehouse.fact_orders    # References node's write target
      filters: ["status = 'completed'"]

  dimensions:
    - name: region
      source: $build_warehouse.dim_customer   # No path duplication!
      column: region
    - name: month
      source: $build_warehouse.dim_date
      column: month_name

  materializations:
    - name: monthly_revenue_by_region
      metrics: [revenue]
      dimensions: [region, month]
      output: gold/agg_monthly_revenue
      schedule: "0 2 1 * *"

The $pipeline.node notation automatically reads from wherever the node writes. For full control over materialization execution, continue with the Materializer class below.


Why Materialize?

Ad-hoc queries are powerful but slow for dashboards:

Approach Query Time Use Case
Ad-hoc query 5-30 seconds Exploratory analysis
Materialized table 0.1-0.5 seconds Production dashboards

Materialization pre-computes metrics at a specific grain and saves them to a table. Dashboards query the pre-computed table instead of raw data.


Semantic Config with Materializations

Let's extend our semantic config to include materializations:

YAML Configuration

# semantic_config.yaml
metrics:
  - name: revenue
    description: "Total revenue from completed orders"
    expr: "SUM(line_total)"
    source: fact_orders
    filters: ["status = 'completed'"]

  - name: order_count
    expr: "COUNT(*)"
    source: fact_orders
    filters: ["status = 'completed'"]

  - name: unique_customers
    expr: "COUNT(DISTINCT customer_sk)"
    source: fact_orders
    filters: ["status = 'completed'"]

  - name: avg_order_value
    expr: "AVG(line_total)"
    source: fact_orders
    filters: ["status = 'completed'"]

dimensions:
  - name: region
    source: dim_customer
    column: region

  - name: category
    source: dim_product
    column: category

  - name: month
    source: dim_date
    column: month_name

  - name: date_sk
    source: dim_date
    column: date_sk

materializations:
  - name: monthly_revenue_by_region
    metrics: [revenue, order_count]
    dimensions: [region, month]
    output: gold/agg_monthly_revenue_region
    schedule: "0 2 1 * *"  # 2am on 1st of month

  - name: daily_revenue
    metrics: [revenue, order_count, unique_customers]
    dimensions: [date_sk]
    output: gold/agg_daily_revenue
    schedule: "0 3 * * *"  # 3am daily

  - name: category_summary
    metrics: [revenue, order_count, avg_order_value]
    dimensions: [category]
    output: gold/agg_category_summary

Step 1: Define a Materialization

A materialization specifies which metrics and dimensions to pre-compute:

Python Code

from odibi.semantics import MaterializationConfig

# Define a materialization
monthly_revenue = MaterializationConfig(
    name="monthly_revenue_by_region",
    metrics=["revenue", "order_count"],
    dimensions=["region", "month"],
    output="gold/agg_monthly_revenue_region",
    schedule="0 2 1 * *"
)

Understanding the Definition

Field Value Purpose
name "monthly_revenue_by_region" Unique identifier
metrics ["revenue", "order_count"] Which metrics to compute
dimensions ["region", "month"] Grain (GROUP BY)
output "gold/agg_monthly_revenue_region" Output table path
schedule "0 2 1 * *" Cron schedule (optional)

What Gets Generated

The materialization creates a table with: - One row per unique combination of region × month - Columns for each metric (revenue, order_count)

Output Table (12 rows - 4 regions × 3 months):

region month revenue order_count
North January 2,549.88 7
North February 3,120.50 9
North March 2,890.25 8
South January 2,349.93 7
South February 2,780.40 8
South March 3,050.75 9
East January 1,923.88 7
East February 2,450.60 7
East March 2,180.35 6
West January 2,129.87 7
West February 2,890.45 8
West March 2,650.90 7

Step 2: Execute a Materialization

Use the Materializer class to execute:

Python Code

from odibi.semantics import Materializer, parse_semantic_config
from odibi.context import EngineContext
from odibi.enums import EngineType
import yaml

# Load config
with open("semantic_config.yaml") as f:
    config = parse_semantic_config(yaml.safe_load(f))

# Create context
context = EngineContext(df=None, engine_type=EngineType.PANDAS)
context.register("fact_orders", fact_orders_df)
context.register("dim_customer", dim_customer_df)
context.register("dim_product", dim_product_df)
context.register("dim_date", dim_date_df)

# Create materializer
materializer = Materializer(config)

# Execute single materialization
result = materializer.execute("monthly_revenue_by_region", context)

# Check result
print(f"Name: {result.name}")
print(f"Output: {result.output}")
print(f"Row count: {result.row_count}")
print(f"Success: {result.success}")
print(f"Execution time: {result.elapsed_ms:.2f}ms")

MaterializationResult

Field Type Description
name str Materialization name
output str Output table path
row_count int Number of rows generated
elapsed_ms float Execution time in ms
success bool Whether it succeeded
error str Error message (if failed)
df DataFrame The computed data

Step 3: Write the Output

Use a callback to write the materialized data:

Python Code

# Define write callback
def write_to_parquet(df, output_path):
    """Write DataFrame to Parquet."""
    full_path = f"warehouse/{output_path}.parquet"
    df.to_parquet(full_path, index=False)
    print(f"Wrote {len(df)} rows to {full_path}")

# Execute with write callback
result = materializer.execute(
    "monthly_revenue_by_region",
    context,
    write_callback=write_to_parquet
)

Spark Example

def write_to_delta(df, output_path):
    """Write Spark DataFrame to Delta Lake."""
    df.write.format("delta").mode("overwrite").save(f"/mnt/warehouse/{output_path}")

result = materializer.execute(
    "monthly_revenue_by_region",
    context,
    write_callback=write_to_delta
)

Step 4: Understanding Schedules

The schedule field uses cron syntax:

┌───────── minute (0-59)
│ ┌─────── hour (0-23)
│ │ ┌───── day of month (1-31)
│ │ │ ┌─── month (1-12)
│ │ │ │ ┌─ day of week (0-6, Sun=0)
│ │ │ │ │
* * * * *

Common Schedules

Schedule Cron When
Daily at 2am 0 2 * * * Every day at 2:00 AM
Monthly at 2am 0 2 1 * * 1st of month at 2:00 AM
Weekly Sunday 0 3 * * 0 Sunday at 3:00 AM
Hourly 0 * * * * Every hour on the hour

Reading Schedules

# Get schedule for a materialization
mat_config = materializer.get_materialization("monthly_revenue_by_region")
print(f"Schedule: {mat_config.schedule}")
# Output: Schedule: 0 2 1 * *

# List all materializations with schedules
for mat in materializer.list_materializations():
    print(f"{mat['name']}: {mat['schedule'] or 'No schedule'}")

Step 5: Execute All Materializations

Execute all defined materializations at once:

Python Code

# Execute all materializations
results = materializer.execute_all(context, write_callback=write_to_parquet)

# Print summary
print("=" * 60)
print("Materialization Summary")
print("=" * 60)
for result in results:
    status = "SUCCESS" if result.success else f"FAILED: {result.error}"
    print(f"  {result.name}: {status}")
    if result.success:
        print(f"    Rows: {result.row_count}, Time: {result.elapsed_ms:.0f}ms")

Output

============================================================
Materialization Summary
============================================================
  monthly_revenue_by_region: SUCCESS
    Rows: 12, Time: 45ms
  daily_revenue: SUCCESS
    Rows: 14, Time: 32ms
  category_summary: SUCCESS
    Rows: 2, Time: 18ms

Step 6: Incremental Materialization

For large datasets, use incremental updates instead of full rebuilds.

Replace Strategy

New data replaces existing rows for matching grain keys:

materializations:
  - name: daily_revenue
    metrics: [revenue, order_count]
    dimensions: [date_sk]
    output: gold/agg_daily_revenue
    incremental:
      timestamp_column: load_timestamp
      merge_strategy: replace

How Replace Works

Existing Table:

date_sk revenue order_count
20240115 1,439.96 3
20240116 589.95 3
20240117 749.98 2

New Data Arrives (late order for Jan 15):

date_sk revenue order_count
20240115 1,539.96 4

After Replace Merge:

date_sk revenue order_count Note
20240115 1,539.96 4 Replaced
20240116 589.95 3 Unchanged
20240117 749.98 2 Unchanged

Sum Strategy (Use with Caution)

New measure values add to existing values:

materializations:
  - name: daily_order_count
    metrics: [order_count]  # Only COUNT metrics!
    dimensions: [date_sk]
    output: gold/agg_daily_count
    incremental:
      timestamp_column: created_at
      merge_strategy: sum

How Sum Works

Existing Table:

date_sk order_count
20240115 3
20240116 3

New Orders (2 new orders on Jan 15):

date_sk order_count
20240115 2

After Sum Merge:

date_sk order_count Note
20240115 5 3 + 2 = 5
20240116 3 Unchanged

When NOT to Use Sum

Never use sum for: - AVG() - Would become average of averages - COUNT(DISTINCT) - Would overcount - MIN() / MAX() - Would be wrong - Data with corrections/updates


Complete Python Example

from odibi.semantics import Materializer, parse_semantic_config
from odibi.context import EngineContext
from odibi.enums import EngineType
import pandas as pd
import yaml

# ===========================================
# 1. Load config with materializations
# ===========================================
config_dict = {
    "metrics": [
        {"name": "revenue", "expr": "SUM(line_total)", "source": "fact_orders",
         "filters": ["status = 'completed'"]},
        {"name": "order_count", "expr": "COUNT(*)", "source": "fact_orders",
         "filters": ["status = 'completed'"]},
        {"name": "avg_order_value", "expr": "AVG(line_total)", "source": "fact_orders",
         "filters": ["status = 'completed'"]}
    ],
    "dimensions": [
        {"name": "region", "source": "dim_customer", "column": "region"},
        {"name": "category", "source": "dim_product", "column": "category"},
        {"name": "date_sk", "source": "dim_date", "column": "date_sk"}
    ],
    "materializations": [
        {
            "name": "daily_summary",
            "metrics": ["revenue", "order_count"],
            "dimensions": ["date_sk"],
            "output": "gold/agg_daily_summary",
            "schedule": "0 3 * * *"
        },
        {
            "name": "region_summary",
            "metrics": ["revenue", "order_count", "avg_order_value"],
            "dimensions": ["region"],
            "output": "gold/agg_region_summary"
        },
        {
            "name": "category_by_region",
            "metrics": ["revenue", "order_count"],
            "dimensions": ["category", "region"],
            "output": "gold/agg_category_region"
        }
    ]
}

config = parse_semantic_config(config_dict)

# ===========================================
# 2. Setup context with data
# ===========================================
context = EngineContext(df=None, engine_type=EngineType.PANDAS)
context.register("fact_orders", pd.read_parquet("warehouse/fact_orders"))
context.register("dim_customer", pd.read_parquet("warehouse/dim_customer"))
context.register("dim_product", pd.read_parquet("warehouse/dim_product"))
context.register("dim_date", pd.read_parquet("warehouse/dim_date"))

# ===========================================
# 3. Define write callback
# ===========================================
def write_output(df, output_path):
    full_path = f"warehouse/{output_path}.parquet"
    df.to_parquet(full_path, index=False)
    print(f"  → Wrote {len(df)} rows to {full_path}")

# ===========================================
# 4. Execute all materializations
# ===========================================
materializer = Materializer(config)

print("=" * 60)
print("Executing Materializations")
print("=" * 60)

results = materializer.execute_all(context, write_callback=write_output)

# ===========================================
# 5. Show results
# ===========================================
print()
print("=" * 60)
print("Results Summary")
print("=" * 60)

for result in results:
    status = "✓ SUCCESS" if result.success else f"✗ FAILED: {result.error}"
    print(f"\n{result.name}:")
    print(f"  Status: {status}")
    print(f"  Output: {result.output}")
    print(f"  Rows: {result.row_count}")
    print(f"  Time: {result.elapsed_ms:.0f}ms")

    # Show sample data
    if result.success and result.df is not None:
        print(f"  Sample data:")
        print(result.df.head(5).to_string(index=False))

Output

============================================================
Executing Materializations
============================================================
  → Wrote 14 rows to warehouse/gold/agg_daily_summary.parquet
  → Wrote 4 rows to warehouse/gold/agg_region_summary.parquet
  → Wrote 8 rows to warehouse/gold/agg_category_region.parquet

============================================================
Results Summary
============================================================

daily_summary:
  Status: ✓ SUCCESS
  Output: gold/agg_daily_summary
  Rows: 14
  Time: 42ms
  Sample data:
   date_sk   revenue  order_count
  20240115  1439.96            3
  20240116   589.95            3
  20240117   749.98            2
  20240118   983.94            2
  20240119   269.98            2

region_summary:
  Status: ✓ SUCCESS
  Output: gold/agg_region_summary
  Rows: 4
  Time: 28ms
  Sample data:
  region   revenue  order_count  avg_order_value
   North  2549.88            7           364.27
   South  2349.93            7           335.70
    East  1923.88            7           274.84
    West  2129.87            7           304.27

category_by_region:
  Status: ✓ SUCCESS
  Output: gold/agg_category_region
  Rows: 8
  Time: 35ms
  Sample data:
     category  region   revenue  order_count
  Electronics   North  1549.94            4
  Electronics   South  1449.95            4
  Electronics    East  1323.91            4
  Electronics    West  1079.93            3
    Furniture   North   999.94            3

What You Learned

In this tutorial, you learned:

  • Materialization pre-computes metrics for fast dashboard queries
  • MaterializationConfig specifies metrics, dimensions, output, and schedule
  • Materializer.execute() runs a single materialization
  • Materializer.execute_all() runs all configured materializations
  • Schedules use cron syntax: "0 2 * * *" (daily at 2am)
  • Replace strategy overwrites matching grain keys (recommended)
  • Sum strategy adds to existing values (use with caution)

Next Steps

Now let's put everything together in a complete semantic layer example.

Next: Semantic Full Example


Previous Up Next
Querying Metrics Tutorials Semantic Full Example

Reference

For complete documentation, see: Materializing Reference