Materializing Metrics¶
This guide covers how to pre-compute and persist metrics using the Materializer class.
Source Notation: Metrics and dimensions use
sourceto reference tables. Supports$pipeline.node(recommended),connection.path, or bare names. See Source Notation for details.
Overview¶
Materialization pre-computes aggregated metrics at a specific grain and persists them to an output table. This enables:
- Faster query response: Pre-computed aggregates vs. real-time calculation
- Scheduled refresh: Cron-based updates for dashboards
- Incremental updates: Merge new data without full recalculation
MaterializationConfig¶
Define materializations in your semantic layer config:
materializations:
- name: monthly_revenue_by_region # Unique identifier
metrics: [revenue, order_count] # Metrics to include
dimensions: [region, month] # Grain (GROUP BY)
output: gold/agg_monthly_revenue # Output table path
schedule: "0 2 1 * *" # Optional: cron schedule
incremental: # Optional: incremental config
timestamp_column: order_date
merge_strategy: replace
Fields¶
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
name |
str | Yes | - | Unique materialization identifier |
metrics |
list | Yes | - | Metrics to materialize |
dimensions |
list | Yes | - | Dimensions for grouping |
output |
str | Yes | - | Output table path |
schedule |
str | No | - | Cron schedule for refresh |
incremental |
dict | No | - | Incremental refresh config |
Materializer Class¶
Basic Usage¶
from odibi.semantics import Materializer, parse_semantic_config
from odibi.context import EngineContext
# Load config
config = parse_semantic_config(yaml.safe_load(open("semantic_layer.yaml")))
# Create materializer
materializer = Materializer(config)
# Execute single materialization
result = materializer.execute("monthly_revenue_by_region", context)
print(result.name) # 'monthly_revenue_by_region'
print(result.output) # 'gold/agg_monthly_revenue'
print(result.row_count) # Number of aggregated rows
print(result.elapsed_ms) # Execution time
print(result.success) # True if successful
print(result.error) # Error message if failed
Execute All Materializations¶
# Execute all configured materializations
results = materializer.execute_all(context)
for result in results:
status = "OK" if result.success else f"FAILED: {result.error}"
print(f" {result.name}: {status} ({result.row_count} rows)")
Write Callback¶
Provide a callback to write the output:
def write_delta(df, output_path):
"""Write DataFrame to Delta Lake."""
df.write.format("delta").mode("overwrite").save(output_path)
# Execute with write callback
result = materializer.execute(
"monthly_revenue_by_region",
context,
write_callback=write_delta
)
Scheduling¶
Materializations can have cron schedules for automated refresh:
materializations:
- name: daily_revenue
metrics: [revenue]
dimensions: [date_sk]
output: gold/agg_daily_revenue
schedule: "0 2 * * *" # 2am daily
- name: monthly_summary
metrics: [revenue, order_count]
dimensions: [region, month]
output: gold/agg_monthly_summary
schedule: "0 3 1 * *" # 3am on 1st of month
Reading Schedules¶
# Get schedule for a materialization
schedule = materializer.get_schedule("daily_revenue")
print(schedule) # "0 2 * * *"
# List all materializations with schedules
for mat in materializer.list_materializations():
print(f"{mat['name']}: {mat['schedule']}")
Integration with Orchestrators¶
Use schedules with your orchestrator (Airflow, Dagster, etc.):
# Airflow example
from airflow import DAG
from airflow.operators.python import PythonOperator
def run_materialization(name):
materializer.execute(name, context, write_callback=write_delta)
for mat in materializer.list_materializations():
if mat['schedule']:
PythonOperator(
task_id=f"materialize_{mat['name']}",
python_callable=run_materialization,
op_args=[mat['name']],
schedule_interval=mat['schedule']
)
Incremental Materialization¶
Use IncrementalMaterializer for efficient updates:
from odibi.semantics.materialize import IncrementalMaterializer
# Create incremental materializer
inc_materializer = IncrementalMaterializer(config)
# Load existing materialized data
existing_df = spark.read.format("delta").load("gold/agg_monthly_revenue")
# Get last processed timestamp
last_timestamp = existing_df.agg({"load_timestamp": "max"}).collect()[0][0]
# Execute incremental update
result = inc_materializer.execute_incremental(
name="monthly_revenue_by_region",
context=context,
existing_df=existing_df,
timestamp_column="order_date",
since_timestamp=last_timestamp,
merge_strategy="replace"
)
Merge Strategies¶
Replace Strategy¶
New aggregates overwrite existing for matching grain keys:
result = inc_materializer.execute_incremental(
name="monthly_revenue_by_region",
context=context,
existing_df=existing_df,
timestamp_column="order_date",
since_timestamp=last_processed,
merge_strategy="replace"
)
Behavior:
1. Filter source to order_date > since_timestamp
2. Aggregate new data at grain
3. Remove matching grain keys from existing
4. Union remaining existing + new aggregates
Use case: Late-arriving data, corrections, any non-additive metrics
Sum Strategy¶
Add new measure values to existing aggregates:
result = inc_materializer.execute_incremental(
name="daily_order_count",
context=context,
existing_df=existing_df,
timestamp_column="created_at",
since_timestamp=last_processed,
merge_strategy="sum"
)
Behavior:
1. Filter source to created_at > since_timestamp
2. Aggregate new data at grain
3. Full outer join with existing on grain
4. Sum measure values
Use case: Purely additive metrics (counts, sums) where data is append-only
Warning: Don't use for AVG, DISTINCT counts, or ratios.
Full Example¶
Complete materialization pipeline:
# semantic_layer.yaml
semantic_layer:
metrics:
- name: revenue
expr: "SUM(total_amount)"
source: fact_orders
filters: ["status = 'completed'"]
- name: order_count
expr: "COUNT(*)"
source: fact_orders
- name: unique_customers
expr: "COUNT(DISTINCT customer_sk)"
source: fact_orders
dimensions:
- name: region
source: dim_customer
column: region
- name: month
source: dim_date
column: month_name
- name: date_sk
source: dim_date
column: date_sk
materializations:
- name: daily_revenue
metrics: [revenue, order_count]
dimensions: [date_sk, region]
output: gold/agg_daily_revenue
schedule: "0 2 * * *"
- name: monthly_summary
metrics: [revenue, order_count, unique_customers]
dimensions: [region, month]
output: gold/agg_monthly_summary
schedule: "0 3 1 * *"
from odibi.semantics import Materializer, IncrementalMaterializer, parse_semantic_config
from odibi.context import EngineContext
import yaml
# Load config
with open("semantic_layer.yaml") as f:
config = parse_semantic_config(yaml.safe_load(f)["semantic_layer"])
# Setup context
context = EngineContext(df=None, engine_type=EngineType.SPARK, spark=spark)
context.register("fact_orders", spark.table("silver.fact_orders"))
context.register("dim_customer", spark.table("gold.dim_customer"))
context.register("dim_date", spark.table("gold.dim_date"))
# Write callback
def write_to_delta(df, output_path):
df.write.format("delta").mode("overwrite").save(f"/mnt/warehouse/{output_path}")
# Full refresh all materializations
materializer = Materializer(config)
results = materializer.execute_all(context, write_callback=write_to_delta)
# Print summary
for r in results:
status = "SUCCESS" if r.success else f"FAILED: {r.error}"
print(f"{r.name}: {status} - {r.row_count} rows in {r.elapsed_ms:.0f}ms")
# Incremental refresh for daily
inc_materializer = IncrementalMaterializer(config)
existing_daily = spark.read.format("delta").load("/mnt/warehouse/gold/agg_daily_revenue")
last_date = existing_daily.agg({"date_sk": "max"}).collect()[0][0]
result = inc_materializer.execute_incremental(
name="daily_revenue",
context=context,
existing_df=existing_daily,
timestamp_column="order_date",
since_timestamp=last_date,
merge_strategy="replace"
)
# Write incremental result
if result.success:
write_to_delta(result.df, "gold/agg_daily_revenue")
print(f"Updated daily_revenue: {result.row_count} rows")
MaterializationResult¶
| Field | Type | Description |
|---|---|---|
name |
str | Materialization name |
output |
str | Output table path |
row_count |
int | Number of aggregated rows |
elapsed_ms |
float | Execution time in milliseconds |
success |
bool | Whether execution succeeded |
error |
str | Error message if failed |
Best Practices¶
Grain Selection¶
- Choose grain based on query patterns
- Finer grain = more rows, but more flexibility
- Coarser grain = faster queries, less flexibility
Scheduling¶
- Schedule based on source data freshness
- Daily aggregates: run after nightly ETL
- Monthly: run after month close
Incremental Strategy¶
- Use
replacefor late-arriving data tolerance - Use
sumonly for append-only sources - Track
since_timestampin state store
Performance¶
- Partition output by time dimension
- Use Delta Lake for efficient updates
- Monitor execution times
See Also¶
- Defining Metrics - Create metric definitions
- Querying - Interactive metric queries
- Aggregation Pattern - Pattern-based aggregation