Skip to content

🐍 Odibi Python API: Zero to Hero

Ultimate Cheatsheet & Reference (v3.4.3)

Welcome to the Python API guide. While the CLI is great for running pipelines, the Python API allows you to automate, test, and extend Odibi deeply into your infrastructure.


🟢 Level 1: The Basics (Running Pipelines)

The core entry point is the PipelineManager. It reads your YAML configuration and manages execution.

1. Load and Run

from odibi.pipeline import PipelineManager

# 1. Load your project configuration
manager = PipelineManager.from_yaml("odibi.yaml")

# 2. Run EVERYTHING (All pipelines defined in yaml)
results = manager.run()

# 3. Check if it worked
if results['main_pipeline'].failed:
    print("❌ Pipeline Failed!")
else:
    print("✅ Success!")

2. Run a Specific Pipeline

If your YAML has multiple pipelines (e.g., ingest, transform, export), run just one:

# Returns a single PipelineResults object instead of a dict
result = manager.run("ingest")

print(f"Duration: {result.duration:.2f}s")

3. Dry Run (Simulation)

Check logic without moving data:

manager.run("ingest", dry_run=True)


🟡 Level 2: Intermediate (Inspection & Automation)

Once you have PipelineResults, you can inspect exactly what happened.

1. Inspect Node Results

result = manager.run("ingest")

for node_name, node_result in result.node_results.items():
    status = "✅" if node_result.success else "❌"
    print(f"{status} {node_name}: {node_result.duration:.2f}s")

    # See metadata (row counts, schema output, etc.)
    if node_result.metadata:
        print(f"   Rows: {node_result.metadata.get('rows_out', 0)}")

2. Resume from Failure

If a pipeline fails at step 5 of 10, you don't want to re-run steps 1-4.

# Automatically skips successfully completed nodes from the last run
manager.run("ingest", resume_from_failure=True)


🔴 Level 3: Hero (Advanced Usage)

This is where Odibi shines. You can unit test individual logic units without running the full pipeline.

1. Unit Testing Nodes

You don't need to run the whole pipeline to test one complex SQL transformation.

from odibi.pipeline import PipelineManager
import pandas as pd

# 1. Setup Manager
manager = PipelineManager.from_yaml("odibi.yaml")
pipeline = manager.get_pipeline("main_etl")

# 2. Mock Input Data (Inject test data into context)
mock_data = {
    "read_customers": pd.DataFrame([
        {"id": 1, "email": "BAD_EMAIL"},
        {"id": 2, "email": "good@test.com"}
    ])
}

# 3. Run ONE specific node with mocked input
result = pipeline.run_node("clean_customers", mock_data=mock_data)

# 4. Assertions
output_df = pipeline.context.get("clean_customers")
assert len(output_df) == 1  # Should have filtered bad email
assert output_df.iloc[0]['email'] == "good@test.com"
print("✅ Unit Test Passed")

2. Accessing Story Data

Want to send the pipeline report to Slack or Email programmatically?

result = manager.run("ingest")

if result.story_path:
    print(f"HTML Report generated at: {result.story_path}")

    # Read the HTML content
    with open(result.story_path, "r") as f:
        html_content = f.read()

    # send_email(to="team@company.com", subject="Pipeline Report", body=html_content)

3. Deep Diff (Pipeline Runs)

Programmatically detect changes between two pipeline runs (schema drift, row count changes, logic changes).

from odibi.diagnostics.diff import diff_runs
from odibi.story.metadata import PipelineStoryMetadata

# Load run metadata (generated by odibi run in odibi_stories/metadata/)
# Note: You need to know the paths to the JSON files
run_a = PipelineStoryMetadata.from_json("odibi_stories/metadata/run_20231027_120000.json")
run_b = PipelineStoryMetadata.from_json("odibi_stories/metadata/run_20231027_120500.json")

# Calculate differences
diff = diff_runs(run_a, run_b)

# Inspect Results
if diff.nodes_added:
    print(f"New Nodes: {diff.nodes_added}")

for node_name, node_diff in diff.node_diffs.items():
    if node_diff.has_drift:
        print(f"⚠️ DRIFT in {node_name}:")
        if node_diff.schema_change:
            print(f"   - Schema changed! Added: {node_diff.columns_added}")
        if node_diff.sql_changed:
            print(f"   - SQL Logic changed")
        if node_diff.rows_diff != 0:
            print(f"   - Row count changed by {node_diff.rows_diff}")

4. Deep Diff (Delta Lake)

Directly compare two versions of a Delta table to see what changed (rows added, removed, updated).

from odibi.diagnostics.delta import get_delta_diff

table_path = "data/delta_tables/silver/customers"

# Compare version 1 vs version 2
diff = get_delta_diff(
    table_path=table_path,
    version_a=1,
    version_b=2,
    deep=True,          # Perform row-by-row comparison
    keys=["id"]         # Primary key for detecting updates
)

print(f"Rows Added: {diff.rows_added}")
print(f"Rows Removed: {diff.rows_removed}")
print(f"Rows Updated: {diff.rows_updated}")

if diff.sample_updated:
    print("Sample Updates:", diff.sample_updated[0])

📚 Reference: Custom Transformations

To extend the Python API with your own functions, see the Writing Custom Transformations guide.

Quick Snippet:

from odibi import transform

@transform
def my_custom_logic(context, current, threshold=100):
    return current[current['value'] > threshold]