Skip to content

FK Validation

The FK Validation module declares and validates referential integrity between fact and dimension tables.

How It Fits Into Odibi

FK Validation is a Python API module that complements the fact pattern. While the fact pattern handles basic orphan handling (unknown member assignment), the FK validation module provides:

  • Detailed orphan reporting with sample values
  • Multiple validation strategies (error, warn, filter)
  • Relationship registry for documenting your data model
  • Lineage generation from relationships

It's typically used for: 1. Post-pipeline validation/auditing 2. Custom fact loading with advanced orphan handling 3. Documenting relationships for data governance


Quick Start

1. Define Relationships

from odibi.validation.fk import RelationshipConfig, RelationshipRegistry

relationships = [
    RelationshipConfig(
        name="orders_to_customers",
        fact="fact_orders",
        dimension="dim_customer",
        fact_key="customer_sk",
        dimension_key="customer_sk",
        on_violation="error"
    ),
    RelationshipConfig(
        name="orders_to_products",
        fact="fact_orders",
        dimension="dim_product",
        fact_key="product_sk",
        dimension_key="product_sk",
        on_violation="warn"
    )
]

registry = RelationshipRegistry(relationships=relationships)

2. Validate a Fact Table

from odibi.validation.fk import FKValidator
from odibi.context import EngineContext

# Setup context with dimension tables
context = EngineContext(df=None, engine_type=EngineType.SPARK, spark=spark)
context.register("dim_customer", spark.table("warehouse.dim_customer"))
context.register("dim_product", spark.table("warehouse.dim_product"))

# Load fact table
fact_df = spark.table("warehouse.fact_orders")

# Validate
validator = FKValidator(registry)
report = validator.validate_fact(fact_df, "fact_orders", context)

# Check results
if report.all_valid:
    print("All FK relationships valid!")
else:
    print(f"Found {len(report.orphan_records)} orphan records")
    for result in report.results:
        if not result.valid:
            print(f"  {result.relationship_name}: {result.orphan_count} orphans")

YAML Configuration (Optional)

You can define relationships in YAML and load them:

# relationships.yaml
relationships:
  - name: orders_to_customers
    fact: fact_orders
    dimension: dim_customer
    fact_key: customer_sk
    dimension_key: customer_sk
    nullable: false
    on_violation: error

  - name: orders_to_products
    fact: fact_orders
    dimension: dim_product
    fact_key: product_sk
    dimension_key: product_sk
    nullable: false
    on_violation: error

  - name: orders_to_dates
    fact: fact_orders
    dimension: dim_date
    fact_key: order_date_sk
    dimension_key: date_sk
    nullable: true  # Pending orders may not have date
    on_violation: warn
from odibi.validation.fk import parse_relationships_config
import yaml

with open("relationships.yaml") as f:
    config = yaml.safe_load(f)

registry = parse_relationships_config(config)

RelationshipConfig

Field Type Required Default Description
name str Yes - Unique relationship identifier
fact str Yes - Fact table name
dimension str Yes - Dimension table name
fact_key str Yes - FK column in fact table
dimension_key str Yes - PK/SK column in dimension
nullable bool No false Whether nulls are allowed in fact_key
on_violation str No "error" Action on violation: "error", "warn", "quarantine"

Validation Results

FKValidationResult (per relationship)

Field Type Description
relationship_name str Relationship identifier
valid bool Whether validation passed
total_rows int Total rows in fact table
orphan_count int Number of orphan records
null_count int Number of null FK values
orphan_values list Sample orphan values (up to 100)
elapsed_ms float Validation time

FKValidationReport (for entire fact table)

Field Type Description
fact_table str Fact table name
all_valid bool True if all relationships valid
total_relationships int Number of relationships checked
valid_relationships int Number that passed
results List[FKValidationResult] Individual results
orphan_records List[OrphanRecord] All orphan records

validate_fk_on_load

Convenience function for pipeline integration:

from odibi.validation.fk import validate_fk_on_load, RelationshipConfig

relationships = [
    RelationshipConfig(
        name="orders_to_customers",
        fact="fact_orders",
        dimension="dim_customer",
        fact_key="customer_sk",
        dimension_key="customer_sk"
    )
]

# Error on violation (default) - raises ValueError
validated_df = validate_fk_on_load(
    fact_df=fact_df,
    relationships=relationships,
    context=context,
    on_failure="error"
)

# Warn on violation - logs warning, returns original
validated_df = validate_fk_on_load(
    fact_df=fact_df,
    relationships=relationships,
    context=context,
    on_failure="warn"
)

# Filter orphans - removes orphan rows
validated_df = validate_fk_on_load(
    fact_df=fact_df,
    relationships=relationships,
    context=context,
    on_failure="filter"
)

Integration with Fact Pattern

The fact pattern already handles basic orphan handling. Use FK validation for additional auditing:

# odibi.yaml - Build fact with orphan handling
pipelines:
  - pipeline: build_facts
    nodes:
      - name: dim_customer
        read:
          connection: warehouse
          path: dim_customer

      - name: fact_orders
        depends_on: [dim_customer]
        read:
          connection: staging
          path: orders
        pattern:
          type: fact
          params:
            grain: [order_id]
            dimensions:
              - source_column: customer_id
                dimension_table: dim_customer
                dimension_key: customer_id
                surrogate_key: customer_sk
            orphan_handling: unknown  # Assigns SK=0 to orphans
        write:
          connection: warehouse
          path: fact_orders

Then run FK validation as a post-pipeline check:

# Post-pipeline audit
from odibi.validation.fk import FKValidator, RelationshipRegistry, RelationshipConfig

# Define expected relationships
registry = RelationshipRegistry(relationships=[
    RelationshipConfig(
        name="verify_customer_fk",
        fact="fact_orders",
        dimension="dim_customer",
        fact_key="customer_sk",
        dimension_key="customer_sk"
    )
])

validator = FKValidator(registry)
report = validator.validate_fact(
    spark.table("warehouse.fact_orders"),
    "fact_orders",
    context
)

# Report findings
if not report.all_valid:
    print(f"WARNING: {report.orphan_records} orphan records found")
    # Log to monitoring, send alert, etc.

Lineage Generation

Generate lineage graph from relationships:

registry = RelationshipRegistry(relationships=[
    RelationshipConfig(
        name="orders_to_customers",
        fact="fact_orders",
        dimension="dim_customer",
        fact_key="customer_sk",
        dimension_key="customer_sk"
    ),
    RelationshipConfig(
        name="orders_to_products",
        fact="fact_orders",
        dimension="dim_product",
        fact_key="product_sk",
        dimension_key="product_sk"
    ),
    RelationshipConfig(
        name="line_items_to_orders",
        fact="fact_line_items",
        dimension="fact_orders",
        fact_key="order_sk",
        dimension_key="order_sk"
    )
])

lineage = registry.generate_lineage()
# {
#     'fact_orders': ['dim_customer', 'dim_product'],
#     'fact_line_items': ['fact_orders']
# }

Full Example

Complete FK validation workflow:

from odibi.validation.fk import (
    RelationshipConfig,
    RelationshipRegistry,
    FKValidator,
    get_orphan_records
)
from odibi.context import EngineContext
from odibi.enums import EngineType

# Define relationships
relationships = [
    RelationshipConfig(
        name="orders_to_customers",
        fact="fact_orders",
        dimension="dim_customer",
        fact_key="customer_sk",
        dimension_key="customer_sk",
        nullable=False,
        on_violation="error"
    ),
    RelationshipConfig(
        name="orders_to_products",
        fact="fact_orders",
        dimension="dim_product",
        fact_key="product_sk",
        dimension_key="product_sk",
        nullable=False,
        on_violation="warn"
    )
]

registry = RelationshipRegistry(relationships=relationships)
validator = FKValidator(registry)

# Setup context
context = EngineContext(df=None, engine_type=EngineType.SPARK, spark=spark)
context.register("dim_customer", spark.table("warehouse.dim_customer"))
context.register("dim_product", spark.table("warehouse.dim_product"))

# Load and validate
fact_df = spark.table("warehouse.fact_orders")
report = validator.validate_fact(fact_df, "fact_orders", context)

# Report
print(f"Validation {'PASSED' if report.all_valid else 'FAILED'}")
print(f"Checked {report.total_relationships} relationships")

for result in report.results:
    status = "PASS" if result.valid else "FAIL"
    print(f"  {result.relationship_name}: {status}")
    if not result.valid:
        print(f"    Orphans: {result.orphan_count}")
        print(f"    Sample values: {result.orphan_values[:5]}")

# Generate lineage
lineage = registry.generate_lineage()
print(f"Lineage: {lineage}")


Pipeline Integration Patterns

Pattern 1: Pre-Load Validation

Validate FK relationships before writing to the warehouse:

from odibi.validation.fk import validate_fk_on_load, RelationshipConfig

@transform
def validate_and_load_orders(context, current):
    """Validate FKs before writing to warehouse."""
    relationships = [
        RelationshipConfig(
            name="orders_to_customers",
            fact="orders",
            dimension="dim_customer",
            fact_key="customer_id",
            dimension_key="customer_id"
        )
    ]

    validated_df = validate_fk_on_load(
        fact_df=current,
        relationships=relationships,
        context=context,
        on_failure="filter"  # Remove orphans
    )
    return validated_df

YAML configuration:

nodes:
  - name: dim_customer
    read:
      connection: warehouse
      path: dim_customer

  - name: validated_orders
    depends_on: [dim_customer]
    read:
      connection: staging
      path: orders
    transform:
      steps:
        - function: validate_and_load_orders
    write:
      connection: warehouse
      path: fact_orders

Pattern 2: Post-Pipeline Audit Job

Run FK validation as a separate audit pipeline:

pipelines:
  - pipeline: audit_referential_integrity
    description: "Nightly FK validation audit"
    nodes:
      - name: load_dimensions
        read:
          connection: warehouse
          tables:
            - dim_customer
            - dim_product
            - dim_date

      - name: validate_fact_orders
        depends_on: [load_dimensions]
        read:
          connection: warehouse
          path: fact_orders
        transform:
          steps:
            - function: run_fk_audit
              params:
                relationships_file: "config/fk_relationships.yaml"

Pattern 3: Integrated with Data Quality Gate

Use FK validation as a quality gate:

nodes:
  - name: fact_orders
    read:
      connection: staging
      path: orders
    pattern:
      type: fact
      params:
        grain: [order_id]
        dimensions:
          - source_column: customer_id
            dimension_table: dim_customer
            dimension_key: customer_id
            surrogate_key: customer_sk
    gate:
      - type: custom
        function: fk_validation_gate
        params:
          max_orphan_percent: 0.1  # Fail if > 0.1% orphans
    write:
      connection: warehouse
      path: fact_orders

See Also