Cross-Pipeline Dependencies¶
Last Updated: 2025-12-03
Status: β
Implemented
Overview¶
Cross-pipeline dependencies enable pipelines to reference outputs from other pipelines using the $pipeline.node syntax. This is essential for implementing the medallion architecture pattern where silver nodes depend on bronze outputs, and gold nodes depend on silver outputs.
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β Bronze ββββββΆβ Silver ββββββΆβ Gold β
β Pipeline β β Pipeline β β Pipeline β
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β β β
βΌ βΌ βΌ
writes to reads from reads from
meta_outputs $read_bronze.* $transform_silver.*
Use Cases¶
1. Medallion Architecture (Bronze β Silver β Gold)¶
The most common pattern: ingest raw data in bronze, clean/enrich in silver, aggregate for business in gold.
# Bronze: Raw ingestion
pipeline: read_bronze
nodes:
- name: raw_orders
read:
connection: source_db
table: sales.orders
write:
connection: lakehouse
format: delta
path: "bronze/orders"
# Silver: Enriched data
pipeline: transform_silver
nodes:
- name: enriched_orders
inputs:
orders: $read_bronze.raw_orders # β Cross-pipeline reference
customers: $read_bronze.raw_customers
transform:
steps:
- operation: join
left: orders
right: customers
on: [customer_id]
write:
connection: lakehouse
format: delta
path: "silver/enriched_orders"
# Gold: Business aggregates
pipeline: build_gold
nodes:
- name: daily_sales
inputs:
orders: $transform_silver.enriched_orders
transform:
steps:
- sql: |
SELECT
DATE(order_date) as date,
SUM(amount) as total_sales
FROM orders
GROUP BY 1
write:
connection: lakehouse
format: delta
path: "gold/daily_sales"
2. Multi-Source Joins¶
When a node needs to join data from multiple sources:
- name: enriched_downtime
inputs:
events: $read_bronze.shift_events
calendar: $read_bronze.calendar_dim
store: $read_bronze.store_dim
transform:
steps:
- operation: join
left: events
right: calendar
on: [date_id]
- operation: join
right: store
on: [store_id]
3. Mixing References with Explicit Reads¶
You can combine cross-pipeline references with explicit read configs:
- name: combined_data
inputs:
# Cross-pipeline reference
events: $read_bronze.events
# Explicit read (for data not from another pipeline)
reference_data:
connection: static_files
path: "reference/lookup_table.csv"
format: csv
YAML Syntax¶
The inputs Block¶
nodes:
- name: node_name
inputs:
<input_name>: $<pipeline_name>.<node_name> # Cross-pipeline reference
<input_name>: # Explicit read config
connection: <connection_name>
path: <path>
format: <format>
Reference Syntax: $pipeline.node¶
| Component | Description |
|---|---|
$ |
Prefix indicating a cross-pipeline reference |
pipeline |
Name of the source pipeline (from pipeline: field) |
. |
Separator |
node |
Name of the source node (from name: field) |
Examples:
- $read_bronze.orders β Output from node orders in pipeline read_bronze
- $ingest_daily.customers β Output from node customers in pipeline ingest_daily
How the meta_outputs Catalog Table Works¶
When a node with a write block completes, its output metadata is recorded in the system catalog.
Schema¶
| Column | Type | Description |
|---|---|---|
pipeline_name |
STRING | Pipeline identifier |
node_name |
STRING | Node identifier |
output_type |
STRING | "external_table" or "managed_table" |
connection_name |
STRING | Connection used (for external tables) |
path |
STRING | Storage path |
format |
STRING | Data format (delta, parquet, etc.) |
table_name |
STRING | Registered table name (if any) |
last_run |
TIMESTAMP | Last execution time |
row_count |
LONG | Row count at last write |
updated_at |
TIMESTAMP | Record update time |
Resolution Flow¶
1. Silver node has: inputs: {events: $read_bronze.shift_events}
2. At load time, Odibi queries meta_outputs:
SELECT * FROM meta_outputs
WHERE pipeline_name = 'read_bronze' AND node_name = 'shift_events'
3. Returns: {connection: 'goat_prod', path: 'bronze/sales/shift_events', format: 'delta'}
4. At runtime: engine.read(connection='goat_prod', path='bronze/sales/shift_events', format='delta')
Performance Notes¶
Batch Writes Only¶
Output metadata is collected in-memory during pipeline execution and written to the catalog once at pipeline completion. This avoids per-node I/O overhead.
Before optimization: 17 nodes Γ ~2-3s = ~40s overhead
After optimization: Single batch MERGE = ~2s total
Caching¶
The get_node_output() method uses caching to avoid repeated catalog queries within the same session.
Validate Early¶
All $references are validated at pipeline load time (fail fast), not at execution time. This provides immediate feedback if a referenced pipeline hasn't run.
Troubleshooting¶
Error: "No output found for $pipeline.node"¶
ReferenceResolutionError: No output found for $read_bronze.shift_events.
Ensure pipeline 'read_bronze' has run and node 'shift_events' has a write block.
Causes:
1. The referenced pipeline hasn't been run yet
2. The referenced node doesn't have a write block
3. Typo in pipeline or node name
Solutions:
1. Run the source pipeline first: odibi run bronze.yaml
2. Add a write block to the source node
3. Check spelling matches exactly (case-sensitive)
Error: "Cannot have both 'read' and 'inputs'"¶
ValidationError: Node 'my_node': Cannot have both 'read' and 'inputs'.
Use 'read' for single-source nodes or 'inputs' for multi-source cross-pipeline dependencies.
Solution: Choose one approach:
- Use read for simple single-source reads
- Use inputs for multi-source or cross-pipeline reads
Error: "Invalid reference format"¶
Solution: Ensure the reference includes both pipeline and node names separated by a dot.
Engine Compatibility¶
| Feature | Spark | Pandas | Polars |
|---|---|---|---|
meta_outputs writes |
β | β | β |
$pipeline.node (path-based) |
β | β | β |
$pipeline.node (managed table) |
β | β | β |
inputs: block |
β | β | β |
Best Practice: Always use path: in write config for cross-engine compatibility.
Files Changed in Implementation¶
| File | Changes |
|---|---|
odibi/catalog.py |
Added meta_outputs table, register_outputs_batch(), get_node_output() |
odibi/config.py |
Added inputs field to NodeConfig |
odibi/node.py |
Added _execute_inputs_phase(), _create_output_record() |
odibi/pipeline.py |
Added batch output registration at pipeline end |
odibi/references.py |
New module for reference resolution |
tests/unit/test_cross_pipeline_dependencies.py |
26 new tests |
Related Documentation¶
- Configuration Reference - NodeConfig with
inputsfield - YAML Schema Reference - Full schema documentation
- Catalog Feature - System catalog details
- Pipelines - Pipeline execution flow