Skip to content

Stateful Incremental Loading

Stateful Incremental Loading is the "Auto-Pilot" mode for ingestion. Unlike Smart Read (Rolling Window) which blindly looks back X days, Stateful Mode remembers exactly where it left off.

It tracks the High Water Mark (HWM)β€”the maximum value of a column (e.g., updated_at or id) seen in the previous runβ€”and only fetches records greater than that value.

When to use this?

  • CDC-like Ingestion: You want to sync a large table and only get new rows.
  • Exactness: You don't want to guess a lookback window (e.g., "3 days just to be safe").
  • Performance: You want to query the absolute minimum data required.

Configuration

Enable it by setting mode: stateful in the incremental block.

- name: "ingest_orders"
  read:
    connection: "postgres_prod"
    format: "sql"
    table: "public.orders"

    incremental:
      mode: "stateful"              # Enable State Tracking
      column: "updated_at"      # Column to track (max value is saved)
      fallback_column: "created_at" # Optional: Use this if key_column is NULL
      watermark_lag: "30m"          # Safety buffer (overlaps the window)
      state_key: "orders_ingest"    # Optional: Custom ID for the state file

  write:
    connection: "bronze"
    format: "delta"
    table: "orders_bronze"
    mode: "append"

How It Works

  1. First Run (Bootstrap)

    • Odibi checks the state backend (Delta table or local JSON).
    • No state found? β†’ Full Load (SELECT * FROM table).
    • After success, it saves MAX(updated_at) as the HWM.
  2. Subsequent Runs (Incremental)

    • Odibi retrieves the last HWM (e.g., 2023-10-25 10:00:00).
    • It subtracts the watermark_lag (e.g., 30 mins) β†’ 09:30:00.
    • Generates query: SELECT * FROM table WHERE updated_at > '2023-10-25 09:30:00'.
    • After success, it updates the HWM with the new maximum from the fetched batch.

Key Features

🌊 Watermark Lag

Data often arrives late or out of order. If you run your pipeline at 10:00, you might miss a record timestamped 09:59 that gets committed at 10:01.

The watermark_lag creates a safety overlap. * Lag: "30m" implies: "Give me everything since the last run, but re-read the last 30 minutes just in case." * This ensures At-Least-Once delivery. * Note: This causes duplicates in the Bronze layer. This is expected! Your Silver layer (Merge/Upsert) handles deduplication.

πŸ›‘οΈ State Backends

Odibi automatically chooses the best backend: * Spark/Databricks: Uses a Delta table (odibi_meta.state) to track HWMs. This is robust and supports concurrency. * Pandas/Local: Uses a local JSON file (.odibi/state.json).

πŸ”„ Resets

To reset the state and force a full reload: 1. Delete the target table/file. 2. Clear the state entry (manually or via CLI - CLI command coming soon).

Date Format for String Columns

If your date column is stored as a string (e.g., Oracle DD-MON-YY format), use the date_format option:

incremental:
  mode: "stateful"
  column: "EVENT_TIME"
  date_format: "oracle"  # Handles '20-APR-24 07:11:01.0'

Supported values: - oracle β€” DD-MON-YY (e.g., 20-APR-24 07:11:01.0) - sql_server β€” CONVERT with style 120 (YYYY-MM-DD HH:MM:SS) - us β€” MM/DD/YYYY - eu β€” DD/MM/YYYY - iso β€” YYYY-MM-DDTHH:MM:SS

See Smart Read for details.

Comparison: Rolling Window vs. Stateful

Feature Rolling Window (smart_read) Stateful (stateful)
Logic NOW() - lookback > Last HWM
State Stateless (Time-based) Stateful (Persisted)
Best For Reporting windows ("Last 30 days") Ingestion / Replication ("Sync table")
Complexity Low Medium
Safety Good (if lookback is large) Excellent (Exact tracking)

Example: CDC Ingestion Pipeline

Here is a robust pattern for database replication:

nodes:
  # 1. Ingest (Bronze) - Accumulates history with duplicates
  - name: "ingest_users"
    read:
      connection: "db_prod"
      table: "users"
      incremental:
        mode: "stateful"
        key_column: "updated_at"
        watermark_lag: "15m"
    write:
      connection: "lake"
      format: "delta"
      table: "bronze_users"
      mode: "append"

  # 2. Merge (Silver) - Deduplicates and keeps current state
  - name: "dim_users"
    depends_on: ["ingest_users"] # Reads ONLY the new batch
    transformer: "merge"
    params:
      keys: ["user_id"]
      order_by: "updated_at DESC"
    write:
      connection: "lake"
      format: "delta"
      table: "silver_users"
      mode: "upsert"