Skip to content

Patterns API

odibi.patterns.base

odibi.patterns.scd2

SCD2Pattern

Bases: Pattern

SCD2 Pattern: Slowly Changing Dimension Type 2.

Tracks history by creating new rows for updates.

Configuration Options (via params dict): - keys (list): Business keys. - time_col (str): Timestamp column for versioning (default: current time). - valid_from_col (str): Name of start date column (default: valid_from). - valid_to_col (str): Name of end date column (default: valid_to). - is_current_col (str): Name of current flag column (default: is_current).

Source code in odibi/patterns/scd2.py
class SCD2Pattern(Pattern):
    """
    SCD2 Pattern: Slowly Changing Dimension Type 2.

    Tracks history by creating new rows for updates.

    Configuration Options (via params dict):
        - **keys** (list): Business keys.
        - **time_col** (str): Timestamp column for versioning (default: current time).
        - **valid_from_col** (str): Name of start date column (default: valid_from).
        - **valid_to_col** (str): Name of end date column (default: valid_to).
        - **is_current_col** (str): Name of current flag column (default: is_current).
    """

    required_params: ClassVar[List[str]] = ["keys", "target"]
    optional_params: ClassVar[List[str]] = [
        "time_col",
        "valid_from_col",
        "valid_to_col",
        "is_current_col",
    ]

    def validate(self) -> None:
        """Validate SCD2 pattern configuration parameters.

        Ensures that all required parameters are present for SCD Type 2 operations. Checks that:
        - keys are provided (business keys to track changes)
        - target is provided (existing dimension table for comparison)

        Raises:
            ValueError: If keys are missing or target is missing.
        """
        ctx = get_logging_context()
        ctx.debug(
            "SCD2Pattern validation starting",
            pattern="SCD2Pattern",
            keys=self.params.get("keys"),
            target=self.params.get("target"),
        )

        if not self.params.get("keys"):
            ctx.error(
                "SCD2Pattern validation failed: 'keys' parameter is required",
                pattern="SCD2Pattern",
                node=self.config.name,
            )
            raise ValueError(
                f"SCD2Pattern (node '{self.config.name}'): 'keys' parameter is required. "
                f"Expected a list of business key column names, but got: {self.params.get('keys')!r}. "
                f"Available params: {list(self.params.keys())}. "
                "Fix: Provide 'keys' as a list, e.g., keys=['customer_id']."
            )
        if not self.params.get("target"):
            ctx.error(
                "SCD2Pattern validation failed: 'target' parameter is required",
                pattern="SCD2Pattern",
                node=self.config.name,
            )
            raise ValueError(
                f"SCD2Pattern (node '{self.config.name}'): 'target' parameter is required. "
                f"Expected a table name or path string, but got: {self.params.get('target')!r}. "
                "Fix: Provide 'target' as a string, e.g., target='dim_customer'."
            )

        ctx.debug(
            "SCD2Pattern validation passed",
            pattern="SCD2Pattern",
            keys=self.params.get("keys"),
            target=self.params.get("target"),
        )

    def execute(self, context: EngineContext) -> Any:
        """Execute the SCD2 pattern to track dimension history with versioning.

        Implements Slowly Changing Dimension Type 2 logic by delegating to the scd2 transformer.
        The execution flow:
        1. Load existing target dimension table
        2. Compare source records with current dimension records using business keys
        3. Expire changed records (set valid_to date, is_current=false)
        4. Insert new versions for changed records (new valid_from, is_current=true)
        5. Insert new records for keys not in target

        The result includes version history with valid_from, valid_to, and is_current columns.

        Args:
            context: Engine context containing the source DataFrame and execution environment.

        Returns:
            DataFrame with SCD2 logic applied, including versioned records with valid_from,
            valid_to, and is_current columns.

        Raises:
            ValueError: If SCD2 parameters are invalid or cannot be parsed.
            Exception: If target loading fails, change detection fails, or versioning fails.
        """
        ctx = get_logging_context()
        start_time = time.time()

        keys = self.params.get("keys")
        target = self.params.get("target")
        valid_from_col = self.params.get("valid_from_col", "valid_from")
        valid_to_col = self.params.get("valid_to_col", "valid_to")
        is_current_col = self.params.get("is_current_col", "is_current")
        track_cols = self.params.get("track_cols")

        ctx.debug(
            "SCD2 pattern starting",
            pattern="SCD2Pattern",
            keys=keys,
            target=target,
            valid_from_col=valid_from_col,
            valid_to_col=valid_to_col,
            is_current_col=is_current_col,
            track_cols=track_cols,
        )

        source_count = None
        try:
            if context.engine_type == "spark":
                source_count = context.df.count()
            else:
                source_count = len(context.df)
            ctx.debug("SCD2 source data loaded", pattern="SCD2Pattern", source_rows=source_count)
        except Exception:
            ctx.debug("SCD2 could not determine source row count", pattern="SCD2Pattern")

        valid_keys = SCD2Params.model_fields.keys()
        filtered_params = {k: v for k, v in self.params.items() if k in valid_keys}

        try:
            scd_params = SCD2Params(**filtered_params)
        except Exception as e:
            ctx.error(
                f"SCD2 invalid parameters: {e}",
                pattern="SCD2Pattern",
                node=self.config.name,
                error_type=type(e).__name__,
                params=filtered_params,
            )
            raise ValueError(
                f"Invalid SCD2 parameters in node '{self.config.name}': {e}. "
                f"Provided params: {filtered_params}. "
                f"Valid param names: {list(valid_keys)}."
            )

        try:
            result_ctx = scd2(context, scd_params)
        except Exception as e:
            elapsed_ms = (time.time() - start_time) * 1000
            ctx.error(
                f"SCD2 pattern execution failed: {e}",
                pattern="SCD2Pattern",
                node=self.config.name,
                error_type=type(e).__name__,
                elapsed_ms=round(elapsed_ms, 2),
            )
            raise

        result_df = result_ctx.df
        elapsed_ms = (time.time() - start_time) * 1000

        result_count = None
        try:
            if context.engine_type == "spark":
                result_count = result_df.count()
            else:
                result_count = len(result_df)
        except Exception:
            pass

        ctx.info(
            "SCD2 pattern completed",
            pattern="SCD2Pattern",
            elapsed_ms=round(elapsed_ms, 2),
            source_rows=source_count,
            result_rows=result_count,
            keys=keys,
            target=target,
            valid_from_col=valid_from_col,
            valid_to_col=valid_to_col,
        )

        return result_df

execute(context)

Execute the SCD2 pattern to track dimension history with versioning.

Implements Slowly Changing Dimension Type 2 logic by delegating to the scd2 transformer. The execution flow: 1. Load existing target dimension table 2. Compare source records with current dimension records using business keys 3. Expire changed records (set valid_to date, is_current=false) 4. Insert new versions for changed records (new valid_from, is_current=true) 5. Insert new records for keys not in target

The result includes version history with valid_from, valid_to, and is_current columns.

Parameters:

Name Type Description Default
context EngineContext

Engine context containing the source DataFrame and execution environment.

required

Returns:

Type Description
Any

DataFrame with SCD2 logic applied, including versioned records with valid_from,

Any

valid_to, and is_current columns.

Raises:

Type Description
ValueError

If SCD2 parameters are invalid or cannot be parsed.

Exception

If target loading fails, change detection fails, or versioning fails.

Source code in odibi/patterns/scd2.py
def execute(self, context: EngineContext) -> Any:
    """Execute the SCD2 pattern to track dimension history with versioning.

    Implements Slowly Changing Dimension Type 2 logic by delegating to the scd2 transformer.
    The execution flow:
    1. Load existing target dimension table
    2. Compare source records with current dimension records using business keys
    3. Expire changed records (set valid_to date, is_current=false)
    4. Insert new versions for changed records (new valid_from, is_current=true)
    5. Insert new records for keys not in target

    The result includes version history with valid_from, valid_to, and is_current columns.

    Args:
        context: Engine context containing the source DataFrame and execution environment.

    Returns:
        DataFrame with SCD2 logic applied, including versioned records with valid_from,
        valid_to, and is_current columns.

    Raises:
        ValueError: If SCD2 parameters are invalid or cannot be parsed.
        Exception: If target loading fails, change detection fails, or versioning fails.
    """
    ctx = get_logging_context()
    start_time = time.time()

    keys = self.params.get("keys")
    target = self.params.get("target")
    valid_from_col = self.params.get("valid_from_col", "valid_from")
    valid_to_col = self.params.get("valid_to_col", "valid_to")
    is_current_col = self.params.get("is_current_col", "is_current")
    track_cols = self.params.get("track_cols")

    ctx.debug(
        "SCD2 pattern starting",
        pattern="SCD2Pattern",
        keys=keys,
        target=target,
        valid_from_col=valid_from_col,
        valid_to_col=valid_to_col,
        is_current_col=is_current_col,
        track_cols=track_cols,
    )

    source_count = None
    try:
        if context.engine_type == "spark":
            source_count = context.df.count()
        else:
            source_count = len(context.df)
        ctx.debug("SCD2 source data loaded", pattern="SCD2Pattern", source_rows=source_count)
    except Exception:
        ctx.debug("SCD2 could not determine source row count", pattern="SCD2Pattern")

    valid_keys = SCD2Params.model_fields.keys()
    filtered_params = {k: v for k, v in self.params.items() if k in valid_keys}

    try:
        scd_params = SCD2Params(**filtered_params)
    except Exception as e:
        ctx.error(
            f"SCD2 invalid parameters: {e}",
            pattern="SCD2Pattern",
            node=self.config.name,
            error_type=type(e).__name__,
            params=filtered_params,
        )
        raise ValueError(
            f"Invalid SCD2 parameters in node '{self.config.name}': {e}. "
            f"Provided params: {filtered_params}. "
            f"Valid param names: {list(valid_keys)}."
        )

    try:
        result_ctx = scd2(context, scd_params)
    except Exception as e:
        elapsed_ms = (time.time() - start_time) * 1000
        ctx.error(
            f"SCD2 pattern execution failed: {e}",
            pattern="SCD2Pattern",
            node=self.config.name,
            error_type=type(e).__name__,
            elapsed_ms=round(elapsed_ms, 2),
        )
        raise

    result_df = result_ctx.df
    elapsed_ms = (time.time() - start_time) * 1000

    result_count = None
    try:
        if context.engine_type == "spark":
            result_count = result_df.count()
        else:
            result_count = len(result_df)
    except Exception:
        pass

    ctx.info(
        "SCD2 pattern completed",
        pattern="SCD2Pattern",
        elapsed_ms=round(elapsed_ms, 2),
        source_rows=source_count,
        result_rows=result_count,
        keys=keys,
        target=target,
        valid_from_col=valid_from_col,
        valid_to_col=valid_to_col,
    )

    return result_df

validate()

Validate SCD2 pattern configuration parameters.

Ensures that all required parameters are present for SCD Type 2 operations. Checks that: - keys are provided (business keys to track changes) - target is provided (existing dimension table for comparison)

Raises:

Type Description
ValueError

If keys are missing or target is missing.

Source code in odibi/patterns/scd2.py
def validate(self) -> None:
    """Validate SCD2 pattern configuration parameters.

    Ensures that all required parameters are present for SCD Type 2 operations. Checks that:
    - keys are provided (business keys to track changes)
    - target is provided (existing dimension table for comparison)

    Raises:
        ValueError: If keys are missing or target is missing.
    """
    ctx = get_logging_context()
    ctx.debug(
        "SCD2Pattern validation starting",
        pattern="SCD2Pattern",
        keys=self.params.get("keys"),
        target=self.params.get("target"),
    )

    if not self.params.get("keys"):
        ctx.error(
            "SCD2Pattern validation failed: 'keys' parameter is required",
            pattern="SCD2Pattern",
            node=self.config.name,
        )
        raise ValueError(
            f"SCD2Pattern (node '{self.config.name}'): 'keys' parameter is required. "
            f"Expected a list of business key column names, but got: {self.params.get('keys')!r}. "
            f"Available params: {list(self.params.keys())}. "
            "Fix: Provide 'keys' as a list, e.g., keys=['customer_id']."
        )
    if not self.params.get("target"):
        ctx.error(
            "SCD2Pattern validation failed: 'target' parameter is required",
            pattern="SCD2Pattern",
            node=self.config.name,
        )
        raise ValueError(
            f"SCD2Pattern (node '{self.config.name}'): 'target' parameter is required. "
            f"Expected a table name or path string, but got: {self.params.get('target')!r}. "
            "Fix: Provide 'target' as a string, e.g., target='dim_customer'."
        )

    ctx.debug(
        "SCD2Pattern validation passed",
        pattern="SCD2Pattern",
        keys=self.params.get("keys"),
        target=self.params.get("target"),
    )

odibi.patterns.merge

MergePattern

Bases: Pattern

Merge Pattern: Upsert/Merge logic.

Configuration Options (via params dict): - target (str): Target table/path. - keys (list): Join keys. - strategy (str): 'upsert', 'append_only', 'delete_match'.

Source code in odibi/patterns/merge.py
class MergePattern(Pattern):
    """
    Merge Pattern: Upsert/Merge logic.

    Configuration Options (via params dict):
        - **target** (str): Target table/path.
        - **keys** (list): Join keys.
        - **strategy** (str): 'upsert', 'append_only', 'delete_match'.
    """

    required_params: ClassVar[List[str]] = ["keys", "target"]
    optional_params: ClassVar[List[str]] = ["strategy"]
    param_aliases: ClassVar[Dict[str, List[str]]] = {"target": ["path"]}

    def validate(self) -> None:
        """Validate merge pattern configuration parameters.

        Ensures that all required parameters are present for merge operations. Checks that:
        - target (or path) is provided (destination table/path for merge)
        - keys are provided (columns to match source and target rows)

        Raises:
            ValueError: If target/path is missing or keys are missing.
        """
        ctx = get_logging_context()

        # Support both 'target' and 'path' for compatibility with merge transformer
        target = self.params.get("target") or self.params.get("path")

        ctx.debug(
            "MergePattern validation starting",
            pattern="MergePattern",
            target=target,
            keys=self.params.get("keys"),
            strategy=self.params.get("strategy"),
        )

        if not target:
            ctx.error(
                "MergePattern validation failed: 'target' or 'path' is required",
                pattern="MergePattern",
                node=self.config.name,
            )
            provided_params = {k: v for k, v in self.params.items() if v is not None}
            raise ValueError(
                f"MergePattern (node '{self.config.name}'): 'target' or 'path' is required. "
                f"Expected: A target table path string. "
                f"Provided params: {list(provided_params.keys())}. "
                f"Fix: Add 'target' or 'path' to your pattern configuration."
            )
        if not self.params.get("keys"):
            ctx.error(
                "MergePattern validation failed: 'keys' is required",
                pattern="MergePattern",
                node=self.config.name,
            )
            source_columns = list(self.source.columns) if hasattr(self.source, "columns") else []
            raise ValueError(
                f"MergePattern (node '{self.config.name}'): 'keys' is required. "
                f"Expected: A list of column names to match source and target rows for merge. "
                f"Available source columns: {source_columns}. "
                f"Fix: Add 'keys' with columns that uniquely identify rows (e.g., keys=['id'])."
            )

        ctx.debug(
            "MergePattern validation passed",
            pattern="MergePattern",
            target=self.params.get("target"),
            keys=self.params.get("keys"),
            strategy=self.params.get("strategy", "upsert"),
        )

    def execute(self, context: EngineContext) -> Any:
        """Execute the merge pattern to upsert data into the target table.

        Performs a merge/upsert operation by delegating to the merge transformer. The merge
        strategy determines the behavior:
        - 'upsert': Insert new rows, update matching rows
        - 'append_only': Insert new rows only, never update
        - 'delete_match': Delete rows matching the keys

        The merge transformer handles loading the target table, comparing keys, and applying
        the specified merge strategy.

        Args:
            context: Engine context containing the source DataFrame and execution environment.

        Returns:
            Source DataFrame (unchanged, as merge writes to target directly).

        Raises:
            Exception: If merge operation fails, target loading fails, or parameters are invalid.
        """
        ctx = get_logging_context()
        start_time = time.time()

        # Support both 'target' and 'path' for compatibility
        target = self.params.get("target") or self.params.get("path")
        keys = self.params.get("keys")
        strategy = self.params.get("strategy", "upsert")

        ctx.debug(
            "Merge pattern starting",
            pattern="MergePattern",
            target=target,
            keys=keys,
            strategy=strategy,
        )

        source_count = None
        try:
            if context.engine_type == "spark":
                source_count = context.df.count()
            else:
                source_count = len(context.df)
            ctx.debug(
                "Merge source data loaded",
                pattern="MergePattern",
                source_rows=source_count,
            )
        except Exception:
            ctx.debug("Merge could not determine source row count", pattern="MergePattern")

        valid_keys = MergeParams.model_fields.keys()
        filtered_params = {k: v for k, v in self.params.items() if k in valid_keys}

        try:
            merge(context, context.df, **filtered_params)
        except Exception as e:
            elapsed_ms = (time.time() - start_time) * 1000
            ctx.error(
                f"Merge pattern execution failed: {e}",
                pattern="MergePattern",
                node=self.config.name,
                error_type=type(e).__name__,
                elapsed_ms=round(elapsed_ms, 2),
                target=target,
                keys=keys,
                strategy=strategy,
            )
            raise

        elapsed_ms = (time.time() - start_time) * 1000

        ctx.info(
            "Merge pattern completed",
            pattern="MergePattern",
            elapsed_ms=round(elapsed_ms, 2),
            source_rows=source_count,
            target=target,
            keys=keys,
            strategy=strategy,
        )

        return context.df

execute(context)

Execute the merge pattern to upsert data into the target table.

Performs a merge/upsert operation by delegating to the merge transformer. The merge strategy determines the behavior: - 'upsert': Insert new rows, update matching rows - 'append_only': Insert new rows only, never update - 'delete_match': Delete rows matching the keys

The merge transformer handles loading the target table, comparing keys, and applying the specified merge strategy.

Parameters:

Name Type Description Default
context EngineContext

Engine context containing the source DataFrame and execution environment.

required

Returns:

Type Description
Any

Source DataFrame (unchanged, as merge writes to target directly).

Raises:

Type Description
Exception

If merge operation fails, target loading fails, or parameters are invalid.

Source code in odibi/patterns/merge.py
def execute(self, context: EngineContext) -> Any:
    """Execute the merge pattern to upsert data into the target table.

    Performs a merge/upsert operation by delegating to the merge transformer. The merge
    strategy determines the behavior:
    - 'upsert': Insert new rows, update matching rows
    - 'append_only': Insert new rows only, never update
    - 'delete_match': Delete rows matching the keys

    The merge transformer handles loading the target table, comparing keys, and applying
    the specified merge strategy.

    Args:
        context: Engine context containing the source DataFrame and execution environment.

    Returns:
        Source DataFrame (unchanged, as merge writes to target directly).

    Raises:
        Exception: If merge operation fails, target loading fails, or parameters are invalid.
    """
    ctx = get_logging_context()
    start_time = time.time()

    # Support both 'target' and 'path' for compatibility
    target = self.params.get("target") or self.params.get("path")
    keys = self.params.get("keys")
    strategy = self.params.get("strategy", "upsert")

    ctx.debug(
        "Merge pattern starting",
        pattern="MergePattern",
        target=target,
        keys=keys,
        strategy=strategy,
    )

    source_count = None
    try:
        if context.engine_type == "spark":
            source_count = context.df.count()
        else:
            source_count = len(context.df)
        ctx.debug(
            "Merge source data loaded",
            pattern="MergePattern",
            source_rows=source_count,
        )
    except Exception:
        ctx.debug("Merge could not determine source row count", pattern="MergePattern")

    valid_keys = MergeParams.model_fields.keys()
    filtered_params = {k: v for k, v in self.params.items() if k in valid_keys}

    try:
        merge(context, context.df, **filtered_params)
    except Exception as e:
        elapsed_ms = (time.time() - start_time) * 1000
        ctx.error(
            f"Merge pattern execution failed: {e}",
            pattern="MergePattern",
            node=self.config.name,
            error_type=type(e).__name__,
            elapsed_ms=round(elapsed_ms, 2),
            target=target,
            keys=keys,
            strategy=strategy,
        )
        raise

    elapsed_ms = (time.time() - start_time) * 1000

    ctx.info(
        "Merge pattern completed",
        pattern="MergePattern",
        elapsed_ms=round(elapsed_ms, 2),
        source_rows=source_count,
        target=target,
        keys=keys,
        strategy=strategy,
    )

    return context.df

validate()

Validate merge pattern configuration parameters.

Ensures that all required parameters are present for merge operations. Checks that: - target (or path) is provided (destination table/path for merge) - keys are provided (columns to match source and target rows)

Raises:

Type Description
ValueError

If target/path is missing or keys are missing.

Source code in odibi/patterns/merge.py
def validate(self) -> None:
    """Validate merge pattern configuration parameters.

    Ensures that all required parameters are present for merge operations. Checks that:
    - target (or path) is provided (destination table/path for merge)
    - keys are provided (columns to match source and target rows)

    Raises:
        ValueError: If target/path is missing or keys are missing.
    """
    ctx = get_logging_context()

    # Support both 'target' and 'path' for compatibility with merge transformer
    target = self.params.get("target") or self.params.get("path")

    ctx.debug(
        "MergePattern validation starting",
        pattern="MergePattern",
        target=target,
        keys=self.params.get("keys"),
        strategy=self.params.get("strategy"),
    )

    if not target:
        ctx.error(
            "MergePattern validation failed: 'target' or 'path' is required",
            pattern="MergePattern",
            node=self.config.name,
        )
        provided_params = {k: v for k, v in self.params.items() if v is not None}
        raise ValueError(
            f"MergePattern (node '{self.config.name}'): 'target' or 'path' is required. "
            f"Expected: A target table path string. "
            f"Provided params: {list(provided_params.keys())}. "
            f"Fix: Add 'target' or 'path' to your pattern configuration."
        )
    if not self.params.get("keys"):
        ctx.error(
            "MergePattern validation failed: 'keys' is required",
            pattern="MergePattern",
            node=self.config.name,
        )
        source_columns = list(self.source.columns) if hasattr(self.source, "columns") else []
        raise ValueError(
            f"MergePattern (node '{self.config.name}'): 'keys' is required. "
            f"Expected: A list of column names to match source and target rows for merge. "
            f"Available source columns: {source_columns}. "
            f"Fix: Add 'keys' with columns that uniquely identify rows (e.g., keys=['id'])."
        )

    ctx.debug(
        "MergePattern validation passed",
        pattern="MergePattern",
        target=self.params.get("target"),
        keys=self.params.get("keys"),
        strategy=self.params.get("strategy", "upsert"),
    )

odibi.patterns.dimension

AuditConfig

Bases: BaseModel

Configuration for audit columns.

Source code in odibi/patterns/dimension.py
class AuditConfig(BaseModel):
    """Configuration for audit columns."""

    load_timestamp: bool = Field(default=True, description="Add load_timestamp column")
    source_system: Optional[str] = Field(
        default=None, description="Source system name for source_system column"
    )

DimensionPattern

Bases: Pattern

Dimension Pattern: Builds complete dimension tables with surrogate keys and SCD support.

Features: - Auto-generate integer surrogate keys (MAX(existing) + ROW_NUMBER for new rows) - SCD Type 0 (static), 1 (overwrite), 2 (history tracking) - Optional unknown member row (SK=0) for orphan FK handling - Audit columns (load_timestamp, source_system)

Configuration Options (via params dict): - natural_key (str): Natural/business key column name - surrogate_key (str): Surrogate key column name to generate - scd_type (int): 0=static, 1=overwrite, 2=history tracking (default: 1) - track_cols (list): Columns to track for SCD1/2 changes - target (str): Target table path (required for SCD2 to read existing history) - unknown_member (bool): If true, insert a row with SK=0 for orphan FK handling - audit (dict): Audit configuration with load_timestamp and source_system

Supported target formats
Source code in odibi/patterns/dimension.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
class DimensionPattern(Pattern):
    """
    Dimension Pattern: Builds complete dimension tables with surrogate keys and SCD support.

    Features:
    - Auto-generate integer surrogate keys (MAX(existing) + ROW_NUMBER for new rows)
    - SCD Type 0 (static), 1 (overwrite), 2 (history tracking)
    - Optional unknown member row (SK=0) for orphan FK handling
    - Audit columns (load_timestamp, source_system)

    Configuration Options (via params dict):
        - **natural_key** (str): Natural/business key column name
        - **surrogate_key** (str): Surrogate key column name to generate
        - **scd_type** (int): 0=static, 1=overwrite, 2=history tracking (default: 1)
        - **track_cols** (list): Columns to track for SCD1/2 changes
        - **target** (str): Target table path (required for SCD2 to read existing history)
        - **unknown_member** (bool): If true, insert a row with SK=0 for orphan FK handling
        - **audit** (dict): Audit configuration with load_timestamp and source_system

    Supported target formats:
        Spark:
            - Catalog tables: catalog.schema.table, warehouse.dim_customer
            - Delta paths: /path/to/delta (no extension)
            - Parquet: /path/to/file.parquet
            - CSV: /path/to/file.csv
            - JSON: /path/to/file.json
            - ORC: /path/to/file.orc
        Pandas:
            - Parquet: path/to/file.parquet (or directory)
            - CSV: path/to/file.csv
            - JSON: path/to/file.json
            - Excel: path/to/file.xlsx, path/to/file.xls
            - Feather/Arrow: path/to/file.feather, path/to/file.arrow
            - Pickle: path/to/file.pickle, path/to/file.pkl
            - Connection-prefixed: warehouse.dim_customer
    """

    required_params: ClassVar[List[str]] = ["natural_key", "surrogate_key"]
    optional_params: ClassVar[List[str]] = [
        "scd_type",
        "track_cols",
        "target",
        "unknown_member",
        "audit",
    ]

    def validate(self) -> None:
        """Validate dimension pattern configuration parameters.

        Ensures that all required parameters are present and valid. Checks that:
        - natural_key is provided (business key column(s))
        - surrogate_key is provided (auto-generated primary key column name)
        - scd_type is valid (0, 1, or 2)
        - target is provided for SCD Type 2 (required for history comparison)
        - track_cols is provided for SCD Type 1 and 2 (columns to monitor for changes)

        Raises:
            ValueError: If natural_key or surrogate_key is missing, scd_type is invalid,
                target is missing for SCD2, or track_cols is missing for SCD1/2.
        """
        ctx = get_logging_context()
        ctx.debug(
            "DimensionPattern validation starting",
            pattern="DimensionPattern",
            params=self.params,
        )

        if not self.params.get("natural_key"):
            ctx.error(
                "DimensionPattern validation failed: 'natural_key' is required",
                pattern="DimensionPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"DimensionPattern (node '{self.config.name}'): 'natural_key' parameter is required. "
                "The natural_key identifies the business key column(s) that uniquely identify "
                "each dimension record in the source system. "
                "Provide natural_key as a string (single column) or list of strings (composite key)."
            )

        if not self.params.get("surrogate_key"):
            ctx.error(
                "DimensionPattern validation failed: 'surrogate_key' is required",
                pattern="DimensionPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"DimensionPattern (node '{self.config.name}'): 'surrogate_key' parameter is required. "
                "The surrogate_key is the auto-generated primary key column for the dimension table, "
                "used to join with fact tables instead of the natural key. "
                "Provide surrogate_key as a string specifying the column name (e.g., 'customer_sk')."
            )

        scd_type = self.params.get("scd_type", 1)
        if scd_type not in (0, 1, 2):
            ctx.error(
                f"DimensionPattern validation failed: invalid scd_type {scd_type}",
                pattern="DimensionPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"DimensionPattern (node '{self.config.name}'): 'scd_type' must be 0, 1, or 2. Got: {scd_type}. "
                "SCD Type 0: No changes tracked (static dimension). "
                "SCD Type 1: Overwrite changes (no history). "
                "SCD Type 2: Track full history with valid_from/valid_to dates."
            )

        if scd_type == 2 and not self.params.get("target"):
            ctx.error(
                "DimensionPattern validation failed: 'target' required for SCD2",
                pattern="DimensionPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"DimensionPattern (node '{self.config.name}'): 'target' parameter is required for scd_type=2. "
                "SCD Type 2 compares incoming data against existing records to detect changes, "
                "so a target DataFrame containing current dimension data must be provided. "
                "Pass the existing dimension table as the 'target' parameter."
            )

        if scd_type in (1, 2) and not self.params.get("track_cols"):
            ctx.error(
                "DimensionPattern validation failed: 'track_cols' required for SCD1/2",
                pattern="DimensionPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"DimensionPattern (node '{self.config.name}'): 'track_cols' parameter is required for scd_type 1 or 2. "
                "The track_cols specifies which columns to monitor for changes. "
                "When these columns change, SCD1 overwrites values or SCD2 creates new history records. "
                "Provide track_cols as a list of column names (e.g., ['address', 'phone', 'email'])."
            )

        ctx.debug(
            "DimensionPattern validation passed",
            pattern="DimensionPattern",
        )

    def execute(self, context: EngineContext) -> Any:
        """Execute the dimension pattern to build a dimension table with surrogate keys.

        Builds a complete dimension table with auto-generated surrogate keys and optional
        slowly changing dimension (SCD) support. The execution flow varies by SCD type:
        - SCD Type 0: Insert new records only, never update existing
        - SCD Type 1: Update existing records in-place, insert new records
        - SCD Type 2: Track full history with valid_from/valid_to dates and is_current flag

        All modes generate surrogate keys starting from MAX(existing_sk) + 1 for new records.
        Optionally adds audit columns and ensures unknown member row (SK=0) exists.

        Args:
            context: Engine context containing the source DataFrame and execution environment.

        Returns:
            DataFrame with surrogate keys assigned and SCD logic applied as configured.

        Raises:
            Exception: If target loading fails, SCD processing fails, or surrogate key
                generation encounters errors.
        """
        ctx = get_logging_context()
        start_time = time.time()

        natural_key = self.params.get("natural_key")
        surrogate_key = self.params.get("surrogate_key")
        scd_type = self.params.get("scd_type", 1)
        track_cols = self.params.get("track_cols", [])
        target = self.params.get("target")
        unknown_member = self.params.get("unknown_member", False)
        audit_config = self.params.get("audit", {})

        ctx.debug(
            "DimensionPattern starting",
            pattern="DimensionPattern",
            natural_key=natural_key,
            surrogate_key=surrogate_key,
            scd_type=scd_type,
            track_cols=track_cols,
            target=target,
            unknown_member=unknown_member,
        )

        source_count = self._get_row_count(context.df, context.engine_type)
        ctx.debug("Dimension source loaded", pattern="DimensionPattern", source_rows=source_count)

        try:
            if scd_type == 0:
                result_df = self._execute_scd0(context, natural_key, surrogate_key, target)
            elif scd_type == 1:
                result_df = self._execute_scd1(
                    context, natural_key, surrogate_key, track_cols, target
                )
            else:
                result_df = self._execute_scd2(
                    context, natural_key, surrogate_key, track_cols, target
                )

            result_df = self._add_audit_columns(context, result_df, audit_config)

            if unknown_member:
                result_df = self._ensure_unknown_member(
                    context, result_df, natural_key, surrogate_key, audit_config
                )

            result_count = self._get_row_count(result_df, context.engine_type)
            elapsed_ms = (time.time() - start_time) * 1000

            ctx.info(
                "DimensionPattern completed",
                pattern="DimensionPattern",
                elapsed_ms=round(elapsed_ms, 2),
                source_rows=source_count,
                result_rows=result_count,
                scd_type=scd_type,
            )

            return result_df

        except Exception as e:
            elapsed_ms = (time.time() - start_time) * 1000
            ctx.error(
                f"DimensionPattern failed: {e}",
                pattern="DimensionPattern",
                node=self.config.name,
                error_type=type(e).__name__,
                elapsed_ms=round(elapsed_ms, 2),
            )
            raise

    def _get_max_sk(self, df, surrogate_key: str, engine_type) -> int:
        """Get the maximum surrogate key value from existing data."""
        if df is None:
            return 0
        try:
            if engine_type == EngineType.SPARK:
                from pyspark.sql import functions as F

                max_row = df.agg(F.max(surrogate_key)).collect()[0]
                max_val = max_row[0]
                return max_val if max_val is not None else 0
            else:
                if surrogate_key not in df.columns:
                    return 0
                max_val = df[surrogate_key].max()
                return int(max_val) if max_val is not None and not (max_val != max_val) else 0
        except Exception:
            return 0

    def _generate_surrogate_keys(
        self,
        context: EngineContext,
        df,
        natural_key: str,
        surrogate_key: str,
        start_sk: int,
    ):
        """Generate surrogate keys starting from start_sk + 1."""
        if context.engine_type == EngineType.SPARK:
            from pyspark.sql import functions as F
            from pyspark.sql.window import Window

            window = Window.orderBy(natural_key)
            df = df.withColumn(
                surrogate_key, (F.row_number().over(window) + F.lit(start_sk)).cast("int")
            )
            return df
        else:
            df = df.copy()
            df = df.sort_values(by=natural_key).reset_index(drop=True)
            df[surrogate_key] = range(start_sk + 1, start_sk + 1 + len(df))
            df[surrogate_key] = df[surrogate_key].astype("int64")
            return df

    def _execute_scd0(
        self,
        context: EngineContext,
        natural_key: str,
        surrogate_key: str,
        target: Optional[str],
    ):
        """
        SCD Type 0: Static dimension - never update existing records.
        Only insert new records that don't exist in target.
        """
        existing_df = self._load_existing_target(context, target) if target else None
        source_df = context.df

        if existing_df is None:
            return self._generate_surrogate_keys(
                context, source_df, natural_key, surrogate_key, start_sk=0
            )

        max_sk = self._get_max_sk(existing_df, surrogate_key, context.engine_type)

        if context.engine_type == EngineType.SPARK:
            existing_keys = existing_df.select(natural_key).distinct()
            new_records = source_df.join(existing_keys, on=natural_key, how="left_anti")
        else:
            existing_keys = set(existing_df[natural_key].unique())
            new_records = source_df[~source_df[natural_key].isin(existing_keys)].copy()

        if self._get_row_count(new_records, context.engine_type) == 0:
            return existing_df

        new_with_sk = self._generate_surrogate_keys(
            context, new_records, natural_key, surrogate_key, start_sk=max_sk
        )

        if context.engine_type == EngineType.SPARK:
            return existing_df.unionByName(new_with_sk, allowMissingColumns=True)
        else:
            import pandas as pd

            return pd.concat([existing_df, new_with_sk], ignore_index=True)

    def _execute_scd1(
        self,
        context: EngineContext,
        natural_key: str,
        surrogate_key: str,
        track_cols: List[str],
        target: Optional[str],
    ):
        """
        SCD Type 1: Overwrite changes - no history tracking.
        Update existing records in place, insert new records.
        """
        existing_df = self._load_existing_target(context, target) if target else None
        source_df = context.df

        if existing_df is None:
            return self._generate_surrogate_keys(
                context, source_df, natural_key, surrogate_key, start_sk=0
            )

        max_sk = self._get_max_sk(existing_df, surrogate_key, context.engine_type)

        if context.engine_type == EngineType.SPARK:
            return self._execute_scd1_spark(
                context, source_df, existing_df, natural_key, surrogate_key, track_cols, max_sk
            )
        else:
            return self._execute_scd1_pandas(
                context, source_df, existing_df, natural_key, surrogate_key, track_cols, max_sk
            )

    def _execute_scd1_spark(
        self,
        context: EngineContext,
        source_df,
        existing_df,
        natural_key: str,
        surrogate_key: str,
        track_cols: List[str],
        max_sk: int,
    ):
        from pyspark.sql import functions as F

        t_prefix = "__existing_"
        renamed_existing = existing_df
        for c in existing_df.columns:
            renamed_existing = renamed_existing.withColumnRenamed(c, f"{t_prefix}{c}")

        joined = source_df.join(
            renamed_existing,
            source_df[natural_key] == renamed_existing[f"{t_prefix}{natural_key}"],
            "left",
        )

        new_records = joined.filter(F.col(f"{t_prefix}{natural_key}").isNull()).select(
            source_df.columns
        )

        update_records = joined.filter(F.col(f"{t_prefix}{natural_key}").isNotNull())
        update_cols = [F.col(f"{t_prefix}{surrogate_key}").alias(surrogate_key)] + [
            F.col(c) for c in source_df.columns
        ]
        updated_records = update_records.select(update_cols)

        unchanged_keys = update_records.select(F.col(f"{t_prefix}{natural_key}").alias(natural_key))
        unchanged = existing_df.join(unchanged_keys, on=natural_key, how="left_anti")

        new_with_sk = self._generate_surrogate_keys(
            context, new_records, natural_key, surrogate_key, start_sk=max_sk
        )

        result = unchanged.unionByName(updated_records, allowMissingColumns=True).unionByName(
            new_with_sk, allowMissingColumns=True
        )
        return result

    def _execute_scd1_pandas(
        self,
        context: EngineContext,
        source_df,
        existing_df,
        natural_key: str,
        surrogate_key: str,
        track_cols: List[str],
        max_sk: int,
    ):
        import pandas as pd

        merged = pd.merge(
            source_df,
            existing_df[[natural_key, surrogate_key]],
            on=natural_key,
            how="left",
            suffixes=("", "_existing"),
        )

        has_existing_sk = f"{surrogate_key}_existing" in merged.columns
        if has_existing_sk:
            merged[surrogate_key] = merged[f"{surrogate_key}_existing"]
            merged = merged.drop(columns=[f"{surrogate_key}_existing"])

        new_mask = merged[surrogate_key].isna()
        new_records = merged[new_mask].drop(columns=[surrogate_key])
        existing_records = merged[~new_mask]

        if len(new_records) > 0:
            new_with_sk = self._generate_surrogate_keys(
                context, new_records, natural_key, surrogate_key, start_sk=max_sk
            )
        else:
            new_with_sk = pd.DataFrame()

        unchanged = existing_df[~existing_df[natural_key].isin(source_df[natural_key])]

        result = pd.concat([unchanged, existing_records, new_with_sk], ignore_index=True)
        return result

    def _execute_scd2(
        self,
        context: EngineContext,
        natural_key: str,
        surrogate_key: str,
        track_cols: List[str],
        target: str,
    ):
        """
        SCD Type 2: History tracking - reuse existing scd2 transformer.
        Surrogate keys are generated for new/changed records.
        """
        existing_df = self._load_existing_target(context, target)

        valid_from_col = self.params.get("valid_from_col", "valid_from")
        valid_to_col = self.params.get("valid_to_col", "valid_to")
        is_current_col = self.params.get("is_current_col", "is_current")

        if context.engine_type == EngineType.SPARK:
            from pyspark.sql import functions as F

            source_with_time = context.df.withColumn(valid_from_col, F.current_timestamp())
        else:
            source_df = context.df.copy()
            # Use timezone-aware timestamp for Delta Lake compatibility
            from datetime import timezone

            source_df[valid_from_col] = datetime.now(timezone.utc)
            source_with_time = source_df

        temp_context = context.with_df(source_with_time)

        scd_params = SCD2Params(
            target=target,
            keys=[natural_key],
            track_cols=track_cols,
            effective_time_col=valid_from_col,
            end_time_col=valid_to_col,
            current_flag_col=is_current_col,
        )

        result_context = scd2(temp_context, scd_params)
        result_df = result_context.df

        max_sk = self._get_max_sk(existing_df, surrogate_key, context.engine_type)

        if context.engine_type == EngineType.SPARK:
            from pyspark.sql import functions as F
            from pyspark.sql.window import Window

            if surrogate_key not in result_df.columns:
                window = Window.orderBy(natural_key, valid_from_col)
                result_df = result_df.withColumn(
                    surrogate_key, (F.row_number().over(window) + F.lit(max_sk)).cast("int")
                )
            else:
                null_sk_df = result_df.filter(F.col(surrogate_key).isNull())
                has_sk_df = result_df.filter(F.col(surrogate_key).isNotNull())

                if null_sk_df.count() > 0:
                    window = Window.orderBy(natural_key, valid_from_col)
                    null_sk_df = null_sk_df.withColumn(
                        surrogate_key, (F.row_number().over(window) + F.lit(max_sk)).cast("int")
                    )
                    result_df = has_sk_df.unionByName(null_sk_df)
        else:
            import pandas as pd

            if surrogate_key not in result_df.columns:
                result_df = result_df.sort_values([natural_key, valid_from_col]).reset_index(
                    drop=True
                )
                result_df[surrogate_key] = range(max_sk + 1, max_sk + 1 + len(result_df))
            else:
                null_mask = result_df[surrogate_key].isna()
                if null_mask.any():
                    null_df = result_df[null_mask].copy()
                    null_df = null_df.sort_values([natural_key, valid_from_col]).reset_index(
                        drop=True
                    )
                    null_df[surrogate_key] = range(max_sk + 1, max_sk + 1 + len(null_df))
                    result_df = pd.concat([result_df[~null_mask], null_df], ignore_index=True)

        return result_df

    def _add_audit_columns(self, context: EngineContext, df, audit_config: dict):
        """Add audit columns with load_timestamp defaulting to True for dimensions."""
        config = dict(audit_config)
        config.setdefault("load_timestamp", True)
        return super()._add_audit_columns(context, df, config)

    def _ensure_unknown_member(
        self,
        context: EngineContext,
        df,
        natural_key: str,
        surrogate_key: str,
        audit_config: dict,
    ):
        """Ensure unknown member row exists with SK=0.

        The unknown member is a special dimension row used to handle orphan facts
        where the dimension lookup fails. It has a surrogate key of 0 and
        standard 'Unknown' placeholder values.

        Args:
            context: Engine context containing engine type and configuration.
            df: The dimension DataFrame to ensure unknown member in.
            natural_key: The natural key column name.
            surrogate_key: The surrogate key column name.
            audit_config: Audit configuration for source_system value.

        Returns:
            DataFrame with unknown member row added if not already present.
        """
        valid_from_col = self.params.get("valid_from_col", "valid_from")
        valid_to_col = self.params.get("valid_to_col", "valid_to")
        is_current_col = self.params.get("is_current_col", "is_current")

        if context.engine_type == EngineType.SPARK:
            from pyspark.sql import functions as F

            existing_unknown = df.filter(F.col(surrogate_key) == 0)
            if existing_unknown.count() > 0:
                return df

            columns = df.columns
            unknown_values = []
            for col in columns:
                if col == surrogate_key:
                    unknown_values.append(0)
                elif col == natural_key:
                    unknown_values.append("-1")
                elif col == valid_from_col:
                    unknown_values.append(datetime(1900, 1, 1, tzinfo=timezone.utc))
                elif col == valid_to_col:
                    unknown_values.append(None)
                elif col == is_current_col:
                    unknown_values.append(True)
                elif col == "load_timestamp":
                    # Use timezone-aware timestamp for Delta Lake compatibility
                    unknown_values.append(datetime.now(timezone.utc))
                elif col == "source_system":
                    unknown_values.append(audit_config.get("source_system", "Unknown"))
                else:
                    unknown_values.append("Unknown")

            unknown_row = context.spark.createDataFrame([unknown_values], columns)
            return unknown_row.unionByName(df)
        else:
            import pandas as pd

            if (df[surrogate_key] == 0).any():
                return df

            unknown_row = {}
            for col in df.columns:
                if col == surrogate_key:
                    unknown_row[col] = 0
                elif col == natural_key:
                    unknown_row[col] = "-1"
                elif col == valid_from_col:
                    unknown_row[col] = datetime(1900, 1, 1, tzinfo=timezone.utc)
                elif col == valid_to_col:
                    unknown_row[col] = None
                elif col == is_current_col:
                    unknown_row[col] = True
                elif col == "load_timestamp":
                    # Use timezone-aware timestamp for Delta Lake compatibility
                    unknown_row[col] = datetime.now(timezone.utc)
                elif col == "source_system":
                    unknown_row[col] = audit_config.get("source_system", "Unknown")
                else:
                    dtype = df[col].dtype
                    if pd.api.types.is_numeric_dtype(dtype):
                        unknown_row[col] = 0
                    else:
                        unknown_row[col] = "Unknown"

            unknown_df = pd.DataFrame([unknown_row])
            for col in unknown_df.columns:
                if col in df.columns:
                    try:
                        unknown_df[col] = unknown_df[col].astype(df[col].dtype)
                    except (TypeError, ValueError):
                        pass
            return pd.concat([unknown_df, df], ignore_index=True)

execute(context)

Execute the dimension pattern to build a dimension table with surrogate keys.

Builds a complete dimension table with auto-generated surrogate keys and optional slowly changing dimension (SCD) support. The execution flow varies by SCD type: - SCD Type 0: Insert new records only, never update existing - SCD Type 1: Update existing records in-place, insert new records - SCD Type 2: Track full history with valid_from/valid_to dates and is_current flag

All modes generate surrogate keys starting from MAX(existing_sk) + 1 for new records. Optionally adds audit columns and ensures unknown member row (SK=0) exists.

Parameters:

Name Type Description Default
context EngineContext

Engine context containing the source DataFrame and execution environment.

required

Returns:

Type Description
Any

DataFrame with surrogate keys assigned and SCD logic applied as configured.

Raises:

Type Description
Exception

If target loading fails, SCD processing fails, or surrogate key generation encounters errors.

Source code in odibi/patterns/dimension.py
def execute(self, context: EngineContext) -> Any:
    """Execute the dimension pattern to build a dimension table with surrogate keys.

    Builds a complete dimension table with auto-generated surrogate keys and optional
    slowly changing dimension (SCD) support. The execution flow varies by SCD type:
    - SCD Type 0: Insert new records only, never update existing
    - SCD Type 1: Update existing records in-place, insert new records
    - SCD Type 2: Track full history with valid_from/valid_to dates and is_current flag

    All modes generate surrogate keys starting from MAX(existing_sk) + 1 for new records.
    Optionally adds audit columns and ensures unknown member row (SK=0) exists.

    Args:
        context: Engine context containing the source DataFrame and execution environment.

    Returns:
        DataFrame with surrogate keys assigned and SCD logic applied as configured.

    Raises:
        Exception: If target loading fails, SCD processing fails, or surrogate key
            generation encounters errors.
    """
    ctx = get_logging_context()
    start_time = time.time()

    natural_key = self.params.get("natural_key")
    surrogate_key = self.params.get("surrogate_key")
    scd_type = self.params.get("scd_type", 1)
    track_cols = self.params.get("track_cols", [])
    target = self.params.get("target")
    unknown_member = self.params.get("unknown_member", False)
    audit_config = self.params.get("audit", {})

    ctx.debug(
        "DimensionPattern starting",
        pattern="DimensionPattern",
        natural_key=natural_key,
        surrogate_key=surrogate_key,
        scd_type=scd_type,
        track_cols=track_cols,
        target=target,
        unknown_member=unknown_member,
    )

    source_count = self._get_row_count(context.df, context.engine_type)
    ctx.debug("Dimension source loaded", pattern="DimensionPattern", source_rows=source_count)

    try:
        if scd_type == 0:
            result_df = self._execute_scd0(context, natural_key, surrogate_key, target)
        elif scd_type == 1:
            result_df = self._execute_scd1(
                context, natural_key, surrogate_key, track_cols, target
            )
        else:
            result_df = self._execute_scd2(
                context, natural_key, surrogate_key, track_cols, target
            )

        result_df = self._add_audit_columns(context, result_df, audit_config)

        if unknown_member:
            result_df = self._ensure_unknown_member(
                context, result_df, natural_key, surrogate_key, audit_config
            )

        result_count = self._get_row_count(result_df, context.engine_type)
        elapsed_ms = (time.time() - start_time) * 1000

        ctx.info(
            "DimensionPattern completed",
            pattern="DimensionPattern",
            elapsed_ms=round(elapsed_ms, 2),
            source_rows=source_count,
            result_rows=result_count,
            scd_type=scd_type,
        )

        return result_df

    except Exception as e:
        elapsed_ms = (time.time() - start_time) * 1000
        ctx.error(
            f"DimensionPattern failed: {e}",
            pattern="DimensionPattern",
            node=self.config.name,
            error_type=type(e).__name__,
            elapsed_ms=round(elapsed_ms, 2),
        )
        raise

validate()

Validate dimension pattern configuration parameters.

Ensures that all required parameters are present and valid. Checks that: - natural_key is provided (business key column(s)) - surrogate_key is provided (auto-generated primary key column name) - scd_type is valid (0, 1, or 2) - target is provided for SCD Type 2 (required for history comparison) - track_cols is provided for SCD Type 1 and 2 (columns to monitor for changes)

Raises:

Type Description
ValueError

If natural_key or surrogate_key is missing, scd_type is invalid, target is missing for SCD2, or track_cols is missing for SCD1/2.

Source code in odibi/patterns/dimension.py
def validate(self) -> None:
    """Validate dimension pattern configuration parameters.

    Ensures that all required parameters are present and valid. Checks that:
    - natural_key is provided (business key column(s))
    - surrogate_key is provided (auto-generated primary key column name)
    - scd_type is valid (0, 1, or 2)
    - target is provided for SCD Type 2 (required for history comparison)
    - track_cols is provided for SCD Type 1 and 2 (columns to monitor for changes)

    Raises:
        ValueError: If natural_key or surrogate_key is missing, scd_type is invalid,
            target is missing for SCD2, or track_cols is missing for SCD1/2.
    """
    ctx = get_logging_context()
    ctx.debug(
        "DimensionPattern validation starting",
        pattern="DimensionPattern",
        params=self.params,
    )

    if not self.params.get("natural_key"):
        ctx.error(
            "DimensionPattern validation failed: 'natural_key' is required",
            pattern="DimensionPattern",
            node=self.config.name,
        )
        raise ValueError(
            f"DimensionPattern (node '{self.config.name}'): 'natural_key' parameter is required. "
            "The natural_key identifies the business key column(s) that uniquely identify "
            "each dimension record in the source system. "
            "Provide natural_key as a string (single column) or list of strings (composite key)."
        )

    if not self.params.get("surrogate_key"):
        ctx.error(
            "DimensionPattern validation failed: 'surrogate_key' is required",
            pattern="DimensionPattern",
            node=self.config.name,
        )
        raise ValueError(
            f"DimensionPattern (node '{self.config.name}'): 'surrogate_key' parameter is required. "
            "The surrogate_key is the auto-generated primary key column for the dimension table, "
            "used to join with fact tables instead of the natural key. "
            "Provide surrogate_key as a string specifying the column name (e.g., 'customer_sk')."
        )

    scd_type = self.params.get("scd_type", 1)
    if scd_type not in (0, 1, 2):
        ctx.error(
            f"DimensionPattern validation failed: invalid scd_type {scd_type}",
            pattern="DimensionPattern",
            node=self.config.name,
        )
        raise ValueError(
            f"DimensionPattern (node '{self.config.name}'): 'scd_type' must be 0, 1, or 2. Got: {scd_type}. "
            "SCD Type 0: No changes tracked (static dimension). "
            "SCD Type 1: Overwrite changes (no history). "
            "SCD Type 2: Track full history with valid_from/valid_to dates."
        )

    if scd_type == 2 and not self.params.get("target"):
        ctx.error(
            "DimensionPattern validation failed: 'target' required for SCD2",
            pattern="DimensionPattern",
            node=self.config.name,
        )
        raise ValueError(
            f"DimensionPattern (node '{self.config.name}'): 'target' parameter is required for scd_type=2. "
            "SCD Type 2 compares incoming data against existing records to detect changes, "
            "so a target DataFrame containing current dimension data must be provided. "
            "Pass the existing dimension table as the 'target' parameter."
        )

    if scd_type in (1, 2) and not self.params.get("track_cols"):
        ctx.error(
            "DimensionPattern validation failed: 'track_cols' required for SCD1/2",
            pattern="DimensionPattern",
            node=self.config.name,
        )
        raise ValueError(
            f"DimensionPattern (node '{self.config.name}'): 'track_cols' parameter is required for scd_type 1 or 2. "
            "The track_cols specifies which columns to monitor for changes. "
            "When these columns change, SCD1 overwrites values or SCD2 creates new history records. "
            "Provide track_cols as a list of column names (e.g., ['address', 'phone', 'email'])."
        )

    ctx.debug(
        "DimensionPattern validation passed",
        pattern="DimensionPattern",
    )

odibi.patterns.date_dimension

DateDimensionPattern

Bases: Pattern

Date Dimension Pattern: Generates a complete date dimension table.

Creates a date dimension with pre-calculated attributes useful for BI/reporting including day of week, quarter, fiscal year, etc.

Configuration Options (via params dict): - start_date (str): Start date in YYYY-MM-DD format - end_date (str): End date in YYYY-MM-DD format - date_key_format (str): Format for date_sk (default: "yyyyMMdd" -> 20240115) - fiscal_year_start_month (int): Month when fiscal year starts (1-12, default: 1) - include_time (bool): If true, generate time dimension (not implemented yet) - unknown_member (bool): If true, add unknown date row with date_sk=0

Generated Columns
  • date_sk: Integer surrogate key (YYYYMMDD format)
  • full_date: The actual date
  • day_of_week: Day name (Monday, Tuesday, etc.)
  • day_of_week_num: Day number (1=Monday, 7=Sunday)
  • day_of_month: Day of month (1-31)
  • day_of_year: Day of year (1-366)
  • is_weekend: Boolean flag
  • week_of_year: ISO week number (1-53)
  • month: Month number (1-12)
  • month_name: Month name (January, February, etc.)
  • quarter: Calendar quarter (1-4)
  • quarter_name: Q1, Q2, Q3, Q4
  • year: Calendar year
  • fiscal_year: Fiscal year
  • fiscal_quarter: Fiscal quarter (1-4)
  • is_month_start: First day of month
  • is_month_end: Last day of month
  • is_year_start: First day of year
  • is_year_end: Last day of year
Source code in odibi/patterns/date_dimension.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
class DateDimensionPattern(Pattern):
    """
    Date Dimension Pattern: Generates a complete date dimension table.

    Creates a date dimension with pre-calculated attributes useful for
    BI/reporting including day of week, quarter, fiscal year, etc.

    Configuration Options (via params dict):
        - **start_date** (str): Start date in YYYY-MM-DD format
        - **end_date** (str): End date in YYYY-MM-DD format
        - **date_key_format** (str): Format for date_sk (default: "yyyyMMdd" -> 20240115)
        - **fiscal_year_start_month** (int): Month when fiscal year starts (1-12, default: 1)
        - **include_time** (bool): If true, generate time dimension (not implemented yet)
        - **unknown_member** (bool): If true, add unknown date row with date_sk=0

    Generated Columns:
        - date_sk: Integer surrogate key (YYYYMMDD format)
        - full_date: The actual date
        - day_of_week: Day name (Monday, Tuesday, etc.)
        - day_of_week_num: Day number (1=Monday, 7=Sunday)
        - day_of_month: Day of month (1-31)
        - day_of_year: Day of year (1-366)
        - is_weekend: Boolean flag
        - week_of_year: ISO week number (1-53)
        - month: Month number (1-12)
        - month_name: Month name (January, February, etc.)
        - quarter: Calendar quarter (1-4)
        - quarter_name: Q1, Q2, Q3, Q4
        - year: Calendar year
        - fiscal_year: Fiscal year
        - fiscal_quarter: Fiscal quarter (1-4)
        - is_month_start: First day of month
        - is_month_end: Last day of month
        - is_year_start: First day of year
        - is_year_end: Last day of year
    """

    required_params: ClassVar[List[str]] = ["start_date", "end_date"]
    optional_params: ClassVar[List[str]] = [
        "date_key_format",
        "fiscal_year_start_month",
        "include_time",
        "unknown_member",
    ]

    def validate(self) -> None:
        """Validate date dimension pattern configuration parameters.

        Ensures that all required parameters are present and valid. Checks that:
        - start_date is provided in YYYY-MM-DD format
        - end_date is provided in YYYY-MM-DD format
        - start_date is before or equal to end_date
        - fiscal_year_start_month is an integer between 1-12 if provided

        Raises:
            ValueError: If required dates are missing, dates are invalid/unparseable,
                start_date is after end_date, or fiscal_year_start_month is invalid.
        """
        ctx = get_logging_context()
        ctx.debug(
            "DateDimensionPattern validation starting",
            pattern="DateDimensionPattern",
            params=self.params,
        )

        if not self.params.get("start_date"):
            ctx.error(
                "DateDimensionPattern validation failed: 'start_date' is required",
                pattern="DateDimensionPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"DateDimensionPattern (node '{self.config.name}'): 'start_date' parameter is required. "
                "Expected format: 'YYYY-MM-DD' (e.g., '2024-01-01'). "
                "Provide a valid start_date in params."
            )

        if not self.params.get("end_date"):
            ctx.error(
                "DateDimensionPattern validation failed: 'end_date' is required",
                pattern="DateDimensionPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"DateDimensionPattern (node '{self.config.name}'): 'end_date' parameter is required. "
                "Expected format: 'YYYY-MM-DD' (e.g., '2024-12-31'). "
                "Provide a valid end_date in params."
            )

        try:
            start = self._parse_date(self.params["start_date"])
            end = self._parse_date(self.params["end_date"])
            if start > end:
                raise ValueError(
                    f"DateDimensionPattern (node '{self.config.name}'): start_date must be before or equal to end_date. "
                    f"Provided: start_date='{self.params['start_date']}', "
                    f"end_date='{self.params['end_date']}'. "
                    f"Fix: Swap the values or adjust the date range."
                )
        except Exception as e:
            ctx.error(
                f"DateDimensionPattern validation failed: {e}",
                pattern="DateDimensionPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"DateDimensionPattern (node '{self.config.name}'): Invalid date parameters. {e} "
                f"Provided: start_date='{self.params.get('start_date')}', "
                f"end_date='{self.params.get('end_date')}'. "
                f"Expected format: 'YYYY-MM-DD'."
            )

        fiscal_month = self.params.get("fiscal_year_start_month", 1)
        if not isinstance(fiscal_month, int) or fiscal_month < 1 or fiscal_month > 12:
            ctx.error(
                "DateDimensionPattern validation failed: invalid fiscal_year_start_month",
                pattern="DateDimensionPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"DateDimensionPattern (node '{self.config.name}'): 'fiscal_year_start_month' must be an integer 1-12. "
                f"Provided: {fiscal_month!r} (type: {type(fiscal_month).__name__}). "
                f"Use an integer like 1 for January or 7 for July."
            )

        ctx.debug(
            "DateDimensionPattern validation passed",
            pattern="DateDimensionPattern",
        )

    def _parse_date(self, date_str: str) -> date:
        """Parse a date string in YYYY-MM-DD format."""
        if isinstance(date_str, (date, datetime)):
            return date_str if isinstance(date_str, date) else date_str.date()
        return datetime.strptime(date_str, "%Y-%m-%d").date()

    def execute(self, context: EngineContext) -> Any:
        """Execute the date dimension pattern to generate a date dimension table.

        Generates a complete date dimension table with pre-calculated date attributes for
        the specified date range. The execution flow:
        1. Parse start_date and end_date parameters
        2. Generate one row per date in the range
        3. Calculate all date attributes (day_of_week, quarter, fiscal_year, etc.)
        4. Add unknown member row (date_sk=0) if configured

        Generated columns include date_sk (surrogate key), full_date, day/week/month/quarter/year
        attributes, fiscal year/quarter, and boolean flags for period boundaries.

        Args:
            context: Engine context for DataFrame creation and execution environment.

        Returns:
            DataFrame containing one row per date with all calculated date attributes.

        Raises:
            Exception: If date parsing fails or DataFrame generation encounters errors.
        """
        ctx = get_logging_context()
        start_time = time.time()

        start_date = self._parse_date(self.params["start_date"])
        end_date = self._parse_date(self.params["end_date"])
        fiscal_year_start_month = self.params.get("fiscal_year_start_month", 1)
        unknown_member = self.params.get("unknown_member", False)

        ctx.debug(
            "DateDimensionPattern starting",
            pattern="DateDimensionPattern",
            start_date=str(start_date),
            end_date=str(end_date),
            fiscal_year_start_month=fiscal_year_start_month,
        )

        try:
            if context.engine_type == EngineType.SPARK:
                result_df = self._generate_spark(
                    context, start_date, end_date, fiscal_year_start_month
                )
            else:
                result_df = self._generate_pandas(start_date, end_date, fiscal_year_start_month)

            if unknown_member:
                result_df = self._add_unknown_member(context, result_df)

            row_count = self._get_row_count(result_df, context.engine_type)
            elapsed_ms = (time.time() - start_time) * 1000

            ctx.info(
                "DateDimensionPattern completed",
                pattern="DateDimensionPattern",
                elapsed_ms=round(elapsed_ms, 2),
                rows_generated=row_count,
                start_date=str(start_date),
                end_date=str(end_date),
            )

            return result_df

        except Exception as e:
            elapsed_ms = (time.time() - start_time) * 1000
            ctx.error(
                f"DateDimensionPattern failed: {e}",
                pattern="DateDimensionPattern",
                node=self.config.name,
                error_type=type(e).__name__,
                elapsed_ms=round(elapsed_ms, 2),
            )
            raise

    def _get_row_count(self, df, engine_type) -> Optional[int]:
        try:
            if engine_type == EngineType.SPARK:
                return df.count()
            else:
                return len(df)
        except Exception:
            return None

    def _generate_pandas(
        self, start_date: date, end_date: date, fiscal_year_start_month: int
    ) -> pd.DataFrame:
        """Generate date dimension using Pandas."""
        dates = pd.date_range(start=start_date, end=end_date, freq="D")

        df = pd.DataFrame({"full_date": dates})

        df["date_sk"] = df["full_date"].dt.strftime("%Y%m%d").astype(int)

        df["day_of_week"] = df["full_date"].dt.day_name()
        df["day_of_week_num"] = df["full_date"].dt.dayofweek + 1
        df["day_of_month"] = df["full_date"].dt.day
        df["day_of_year"] = df["full_date"].dt.dayofyear

        df["is_weekend"] = df["day_of_week_num"].isin([6, 7])

        df["week_of_year"] = df["full_date"].dt.isocalendar().week.astype(int)

        df["month"] = df["full_date"].dt.month
        df["month_name"] = df["full_date"].dt.month_name()

        df["quarter"] = df["full_date"].dt.quarter
        df["quarter_name"] = "Q" + df["quarter"].astype(str)

        df["year"] = df["full_date"].dt.year

        df["fiscal_year"] = df.apply(
            lambda row: self._calc_fiscal_year(row["full_date"], fiscal_year_start_month),
            axis=1,
        )
        df["fiscal_quarter"] = df.apply(
            lambda row: self._calc_fiscal_quarter(row["full_date"], fiscal_year_start_month),
            axis=1,
        )

        df["is_month_start"] = df["full_date"].dt.is_month_start
        df["is_month_end"] = df["full_date"].dt.is_month_end
        df["is_year_start"] = (df["month"] == 1) & (df["day_of_month"] == 1)
        df["is_year_end"] = (df["month"] == 12) & (df["day_of_month"] == 31)

        df["full_date"] = df["full_date"].dt.date

        column_order = [
            "date_sk",
            "full_date",
            "day_of_week",
            "day_of_week_num",
            "day_of_month",
            "day_of_year",
            "is_weekend",
            "week_of_year",
            "month",
            "month_name",
            "quarter",
            "quarter_name",
            "year",
            "fiscal_year",
            "fiscal_quarter",
            "is_month_start",
            "is_month_end",
            "is_year_start",
            "is_year_end",
        ]
        return df[column_order]

    def _calc_fiscal_year(self, dt, fiscal_start_month: int) -> int:
        """Calculate fiscal year based on fiscal start month."""
        if isinstance(dt, pd.Timestamp):
            month = dt.month
            year = dt.year
        else:
            month = dt.month
            year = dt.year

        if fiscal_start_month == 1:
            return year
        if month >= fiscal_start_month:
            return year + 1
        return year

    def _calc_fiscal_quarter(self, dt, fiscal_start_month: int) -> int:
        """Calculate fiscal quarter based on fiscal start month."""
        if isinstance(dt, pd.Timestamp):
            month = dt.month
        else:
            month = dt.month

        adjusted_month = (month - fiscal_start_month) % 12
        return (adjusted_month // 3) + 1

    def _generate_spark(
        self, context: EngineContext, start_date: date, end_date: date, fiscal_year_start_month: int
    ):
        """Generate date dimension using Spark."""
        from pyspark.sql import functions as F
        from pyspark.sql.types import IntegerType

        spark = context.spark

        num_days = (end_date - start_date).days + 1
        start_date_str = start_date.strftime("%Y-%m-%d")

        df = spark.range(num_days).select(
            F.date_add(F.lit(start_date_str), F.col("id").cast(IntegerType())).alias("full_date")
        )

        df = df.withColumn("date_sk", F.date_format("full_date", "yyyyMMdd").cast(IntegerType()))

        df = df.withColumn("day_of_week", F.date_format("full_date", "EEEE"))
        df = df.withColumn("day_of_week_num", F.dayofweek("full_date"))
        df = df.withColumn(
            "day_of_week_num",
            F.when(F.col("day_of_week_num") == 1, 7).otherwise(F.col("day_of_week_num") - 1),
        )
        df = df.withColumn("day_of_month", F.dayofmonth("full_date"))
        df = df.withColumn("day_of_year", F.dayofyear("full_date"))

        df = df.withColumn("is_weekend", F.col("day_of_week_num").isin([6, 7]))

        df = df.withColumn("week_of_year", F.weekofyear("full_date"))

        df = df.withColumn("month", F.month("full_date"))
        df = df.withColumn("month_name", F.date_format("full_date", "MMMM"))

        df = df.withColumn("quarter", F.quarter("full_date"))
        df = df.withColumn("quarter_name", F.concat(F.lit("Q"), F.col("quarter")))

        df = df.withColumn("year", F.year("full_date"))

        if fiscal_year_start_month == 1:
            df = df.withColumn("fiscal_year", F.col("year"))
            df = df.withColumn("fiscal_quarter", F.col("quarter"))
        else:
            df = df.withColumn(
                "fiscal_year",
                F.when(F.col("month") >= fiscal_year_start_month, F.col("year") + 1).otherwise(
                    F.col("year")
                ),
            )
            adjusted_month = (F.col("month") - fiscal_year_start_month + 12) % 12
            df = df.withColumn("fiscal_quarter", (adjusted_month / 3).cast(IntegerType()) + 1)

        df = df.withColumn(
            "is_month_start",
            F.col("day_of_month") == 1,
        )
        df = df.withColumn(
            "is_month_end",
            F.col("full_date") == F.last_day("full_date"),
        )
        df = df.withColumn(
            "is_year_start",
            (F.col("month") == 1) & (F.col("day_of_month") == 1),
        )
        df = df.withColumn(
            "is_year_end",
            (F.col("month") == 12) & (F.col("day_of_month") == 31),
        )

        column_order = [
            "date_sk",
            "full_date",
            "day_of_week",
            "day_of_week_num",
            "day_of_month",
            "day_of_year",
            "is_weekend",
            "week_of_year",
            "month",
            "month_name",
            "quarter",
            "quarter_name",
            "year",
            "fiscal_year",
            "fiscal_quarter",
            "is_month_start",
            "is_month_end",
            "is_year_start",
            "is_year_end",
        ]
        return df.select(column_order)

    def _add_unknown_member(self, context: EngineContext, df):
        """Add unknown member row with date_sk=0."""
        if context.engine_type == EngineType.SPARK:
            from pyspark.sql import Row

            unknown_data = {
                "date_sk": 0,
                "full_date": date(1900, 1, 1),
                "day_of_week": "Unknown",
                "day_of_week_num": 0,
                "day_of_month": 0,
                "day_of_year": 0,
                "is_weekend": False,
                "week_of_year": 0,
                "month": 0,
                "month_name": "Unknown",
                "quarter": 0,
                "quarter_name": "Unknown",
                "year": 0,
                "fiscal_year": 0,
                "fiscal_quarter": 0,
                "is_month_start": False,
                "is_month_end": False,
                "is_year_start": False,
                "is_year_end": False,
            }
            unknown_row = context.spark.createDataFrame([Row(**unknown_data)])
            return unknown_row.unionByName(df)
        else:
            unknown_row = pd.DataFrame(
                [
                    {
                        "date_sk": 0,
                        "full_date": date(1900, 1, 1),
                        "day_of_week": "Unknown",
                        "day_of_week_num": 0,
                        "day_of_month": 0,
                        "day_of_year": 0,
                        "is_weekend": False,
                        "week_of_year": 0,
                        "month": 0,
                        "month_name": "Unknown",
                        "quarter": 0,
                        "quarter_name": "Unknown",
                        "year": 0,
                        "fiscal_year": 0,
                        "fiscal_quarter": 0,
                        "is_month_start": False,
                        "is_month_end": False,
                        "is_year_start": False,
                        "is_year_end": False,
                    }
                ]
            )
            return pd.concat([unknown_row, df], ignore_index=True)

execute(context)

Execute the date dimension pattern to generate a date dimension table.

Generates a complete date dimension table with pre-calculated date attributes for the specified date range. The execution flow: 1. Parse start_date and end_date parameters 2. Generate one row per date in the range 3. Calculate all date attributes (day_of_week, quarter, fiscal_year, etc.) 4. Add unknown member row (date_sk=0) if configured

Generated columns include date_sk (surrogate key), full_date, day/week/month/quarter/year attributes, fiscal year/quarter, and boolean flags for period boundaries.

Parameters:

Name Type Description Default
context EngineContext

Engine context for DataFrame creation and execution environment.

required

Returns:

Type Description
Any

DataFrame containing one row per date with all calculated date attributes.

Raises:

Type Description
Exception

If date parsing fails or DataFrame generation encounters errors.

Source code in odibi/patterns/date_dimension.py
def execute(self, context: EngineContext) -> Any:
    """Execute the date dimension pattern to generate a date dimension table.

    Generates a complete date dimension table with pre-calculated date attributes for
    the specified date range. The execution flow:
    1. Parse start_date and end_date parameters
    2. Generate one row per date in the range
    3. Calculate all date attributes (day_of_week, quarter, fiscal_year, etc.)
    4. Add unknown member row (date_sk=0) if configured

    Generated columns include date_sk (surrogate key), full_date, day/week/month/quarter/year
    attributes, fiscal year/quarter, and boolean flags for period boundaries.

    Args:
        context: Engine context for DataFrame creation and execution environment.

    Returns:
        DataFrame containing one row per date with all calculated date attributes.

    Raises:
        Exception: If date parsing fails or DataFrame generation encounters errors.
    """
    ctx = get_logging_context()
    start_time = time.time()

    start_date = self._parse_date(self.params["start_date"])
    end_date = self._parse_date(self.params["end_date"])
    fiscal_year_start_month = self.params.get("fiscal_year_start_month", 1)
    unknown_member = self.params.get("unknown_member", False)

    ctx.debug(
        "DateDimensionPattern starting",
        pattern="DateDimensionPattern",
        start_date=str(start_date),
        end_date=str(end_date),
        fiscal_year_start_month=fiscal_year_start_month,
    )

    try:
        if context.engine_type == EngineType.SPARK:
            result_df = self._generate_spark(
                context, start_date, end_date, fiscal_year_start_month
            )
        else:
            result_df = self._generate_pandas(start_date, end_date, fiscal_year_start_month)

        if unknown_member:
            result_df = self._add_unknown_member(context, result_df)

        row_count = self._get_row_count(result_df, context.engine_type)
        elapsed_ms = (time.time() - start_time) * 1000

        ctx.info(
            "DateDimensionPattern completed",
            pattern="DateDimensionPattern",
            elapsed_ms=round(elapsed_ms, 2),
            rows_generated=row_count,
            start_date=str(start_date),
            end_date=str(end_date),
        )

        return result_df

    except Exception as e:
        elapsed_ms = (time.time() - start_time) * 1000
        ctx.error(
            f"DateDimensionPattern failed: {e}",
            pattern="DateDimensionPattern",
            node=self.config.name,
            error_type=type(e).__name__,
            elapsed_ms=round(elapsed_ms, 2),
        )
        raise

validate()

Validate date dimension pattern configuration parameters.

Ensures that all required parameters are present and valid. Checks that: - start_date is provided in YYYY-MM-DD format - end_date is provided in YYYY-MM-DD format - start_date is before or equal to end_date - fiscal_year_start_month is an integer between 1-12 if provided

Raises:

Type Description
ValueError

If required dates are missing, dates are invalid/unparseable, start_date is after end_date, or fiscal_year_start_month is invalid.

Source code in odibi/patterns/date_dimension.py
def validate(self) -> None:
    """Validate date dimension pattern configuration parameters.

    Ensures that all required parameters are present and valid. Checks that:
    - start_date is provided in YYYY-MM-DD format
    - end_date is provided in YYYY-MM-DD format
    - start_date is before or equal to end_date
    - fiscal_year_start_month is an integer between 1-12 if provided

    Raises:
        ValueError: If required dates are missing, dates are invalid/unparseable,
            start_date is after end_date, or fiscal_year_start_month is invalid.
    """
    ctx = get_logging_context()
    ctx.debug(
        "DateDimensionPattern validation starting",
        pattern="DateDimensionPattern",
        params=self.params,
    )

    if not self.params.get("start_date"):
        ctx.error(
            "DateDimensionPattern validation failed: 'start_date' is required",
            pattern="DateDimensionPattern",
            node=self.config.name,
        )
        raise ValueError(
            f"DateDimensionPattern (node '{self.config.name}'): 'start_date' parameter is required. "
            "Expected format: 'YYYY-MM-DD' (e.g., '2024-01-01'). "
            "Provide a valid start_date in params."
        )

    if not self.params.get("end_date"):
        ctx.error(
            "DateDimensionPattern validation failed: 'end_date' is required",
            pattern="DateDimensionPattern",
            node=self.config.name,
        )
        raise ValueError(
            f"DateDimensionPattern (node '{self.config.name}'): 'end_date' parameter is required. "
            "Expected format: 'YYYY-MM-DD' (e.g., '2024-12-31'). "
            "Provide a valid end_date in params."
        )

    try:
        start = self._parse_date(self.params["start_date"])
        end = self._parse_date(self.params["end_date"])
        if start > end:
            raise ValueError(
                f"DateDimensionPattern (node '{self.config.name}'): start_date must be before or equal to end_date. "
                f"Provided: start_date='{self.params['start_date']}', "
                f"end_date='{self.params['end_date']}'. "
                f"Fix: Swap the values or adjust the date range."
            )
    except Exception as e:
        ctx.error(
            f"DateDimensionPattern validation failed: {e}",
            pattern="DateDimensionPattern",
            node=self.config.name,
        )
        raise ValueError(
            f"DateDimensionPattern (node '{self.config.name}'): Invalid date parameters. {e} "
            f"Provided: start_date='{self.params.get('start_date')}', "
            f"end_date='{self.params.get('end_date')}'. "
            f"Expected format: 'YYYY-MM-DD'."
        )

    fiscal_month = self.params.get("fiscal_year_start_month", 1)
    if not isinstance(fiscal_month, int) or fiscal_month < 1 or fiscal_month > 12:
        ctx.error(
            "DateDimensionPattern validation failed: invalid fiscal_year_start_month",
            pattern="DateDimensionPattern",
            node=self.config.name,
        )
        raise ValueError(
            f"DateDimensionPattern (node '{self.config.name}'): 'fiscal_year_start_month' must be an integer 1-12. "
            f"Provided: {fiscal_month!r} (type: {type(fiscal_month).__name__}). "
            f"Use an integer like 1 for January or 7 for July."
        )

    ctx.debug(
        "DateDimensionPattern validation passed",
        pattern="DateDimensionPattern",
    )

odibi.patterns.fact

FactPattern

Bases: Pattern

Enhanced Fact Pattern: Builds fact tables with automatic SK lookups.

Features: - Automatic surrogate key lookups from dimension tables - Orphan handling (unknown member, reject, or quarantine) - Grain validation (detect duplicates at PK level) - Audit columns (load_timestamp, source_system) - Deduplication support - Measure calculations and renaming

Basic Params (backward compatible): deduplicate (bool): If true, removes duplicates before insert. keys (list): Keys for deduplication.

Enhanced Params

grain (list): Columns that define uniqueness (validates no duplicates) dimensions (list): Dimension lookup configurations - source_column: Column in source data - dimension_table: Name of dimension in context - dimension_key: Natural key column in dimension - surrogate_key: Surrogate key to retrieve - scd2 (bool): If true, filter is_current=true orphan_handling (str): "unknown" | "reject" | "quarantine" quarantine (dict): Quarantine configuration (required if orphan_handling=quarantine) - connection: Connection name for quarantine writes - path: Path for quarantine data (or use 'table') - table: Table name for quarantine (or use 'path') - add_columns (dict): Metadata columns to add - _rejection_reason (bool): Add rejection reason column - _rejected_at (bool): Add rejection timestamp column - _source_dimension (bool): Add source dimension name column measures (list): Measure definitions (passthrough, rename, or calculated) audit (dict): Audit column configuration - load_timestamp (bool) - source_system (str)

Example Config
Example with Quarantine
Source code in odibi/patterns/fact.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
class FactPattern(Pattern):
    """
    Enhanced Fact Pattern: Builds fact tables with automatic SK lookups.

    Features:
    - Automatic surrogate key lookups from dimension tables
    - Orphan handling (unknown member, reject, or quarantine)
    - Grain validation (detect duplicates at PK level)
    - Audit columns (load_timestamp, source_system)
    - Deduplication support
    - Measure calculations and renaming

    Basic Params (backward compatible):
        deduplicate (bool): If true, removes duplicates before insert.
        keys (list): Keys for deduplication.

    Enhanced Params:
        grain (list): Columns that define uniqueness (validates no duplicates)
        dimensions (list): Dimension lookup configurations
            - source_column: Column in source data
            - dimension_table: Name of dimension in context
            - dimension_key: Natural key column in dimension
            - surrogate_key: Surrogate key to retrieve
            - scd2 (bool): If true, filter is_current=true
        orphan_handling (str): "unknown" | "reject" | "quarantine"
        quarantine (dict): Quarantine configuration (required if orphan_handling=quarantine)
            - connection: Connection name for quarantine writes
            - path: Path for quarantine data (or use 'table')
            - table: Table name for quarantine (or use 'path')
            - add_columns (dict): Metadata columns to add
                - _rejection_reason (bool): Add rejection reason column
                - _rejected_at (bool): Add rejection timestamp column
                - _source_dimension (bool): Add source dimension name column
        measures (list): Measure definitions (passthrough, rename, or calculated)
        audit (dict): Audit column configuration
            - load_timestamp (bool)
            - source_system (str)

    Example Config:
        pattern:
          type: fact
          params:
            grain: [order_id]
            dimensions:
              - source_column: customer_id
                dimension_table: dim_customer
                dimension_key: customer_id
                surrogate_key: customer_sk
                scd2: true
            orphan_handling: unknown
            measures:
              - quantity
              - total_amount: "quantity * price"
            audit:
              load_timestamp: true
              source_system: "pos"

    Example with Quarantine:
        pattern:
          type: fact
          params:
            dimensions:
              - source_column: customer_id
                dimension_table: dim_customer
                dimension_key: customer_id
                surrogate_key: customer_sk
            orphan_handling: quarantine
            quarantine:
              connection: silver
              path: fact_orders_orphans
              add_columns:
                _rejection_reason: true
                _rejected_at: true
                _source_dimension: true
    """

    required_params: ClassVar[List[str]] = []
    optional_params: ClassVar[List[str]] = [
        "grain",
        "dimensions",
        "orphan_handling",
        "quarantine",
        "measures",
        "audit",
        "deduplicate",
        "keys",
    ]

    def validate(self) -> None:
        """Validate fact pattern configuration parameters.

        Ensures that all required parameters are present and valid. Checks that:
        - keys are provided when deduplicate is True
        - orphan_handling is valid ('unknown', 'reject', or 'quarantine')
        - quarantine config is complete when orphan_handling='quarantine'
        - all dimension lookups have required fields (source_column, dimension_table, etc.)

        Raises:
            ValueError: If keys are missing for deduplication, orphan_handling is invalid,
                quarantine config is incomplete, or dimension configs are missing required fields.
        """
        ctx = get_logging_context()
        deduplicate = self.params.get("deduplicate")
        keys = self.params.get("keys")
        grain = self.params.get("grain")
        dimensions = self.params.get("dimensions", [])
        orphan_handling = self.params.get("orphan_handling", "unknown")

        ctx.debug(
            "FactPattern validation starting",
            pattern="FactPattern",
            deduplicate=deduplicate,
            keys=keys,
            grain=grain,
            dimensions_count=len(dimensions),
        )

        if deduplicate and not keys:
            ctx.error(
                "FactPattern validation failed: 'keys' required when 'deduplicate' is True",
                pattern="FactPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"FactPattern (node '{self.config.name}'): 'keys' required when 'deduplicate' is True. "
                "Keys define which columns uniquely identify a fact row for deduplication. "
                "Provide keys=['col1', 'col2'] to specify the deduplication columns."
            )

        if orphan_handling not in ("unknown", "reject", "quarantine"):
            ctx.error(
                f"FactPattern validation failed: invalid orphan_handling '{orphan_handling}'",
                pattern="FactPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"FactPattern (node '{self.config.name}'): 'orphan_handling' must be 'unknown', 'reject', or 'quarantine'. "
                f"Got: {orphan_handling}"
            )

        if orphan_handling == "quarantine":
            quarantine_config = self.params.get("quarantine")
            if not quarantine_config:
                ctx.error(
                    "FactPattern validation failed: 'quarantine' config required "
                    "when orphan_handling='quarantine'",
                    pattern="FactPattern",
                    node=self.config.name,
                )
                raise ValueError(
                    f"FactPattern (node '{self.config.name}'): 'quarantine' configuration is required when "
                    "orphan_handling='quarantine'."
                )
            if not quarantine_config.get("connection"):
                ctx.error(
                    "FactPattern validation failed: quarantine.connection is required",
                    pattern="FactPattern",
                    node=self.config.name,
                )
                raise ValueError(
                    f"FactPattern (node '{self.config.name}'): 'quarantine.connection' is required. "
                    "The connection specifies where to write quarantined orphan records "
                    "(e.g., a Spark session or database connection). "
                    "Add 'connection' to your quarantine config."
                )
            if not quarantine_config.get("path") and not quarantine_config.get("table"):
                ctx.error(
                    "FactPattern validation failed: quarantine requires 'path' or 'table'",
                    pattern="FactPattern",
                    node=self.config.name,
                )
                raise ValueError(
                    f"FactPattern (node '{self.config.name}'): 'quarantine' requires either 'path' or 'table'. "
                    f"Got config: {quarantine_config}. "
                    "Add 'path' for file storage or 'table' for database storage."
                )

        for i, dim in enumerate(dimensions):
            required_keys = ["source_column", "dimension_table", "dimension_key", "surrogate_key"]
            for key in required_keys:
                if key not in dim:
                    ctx.error(
                        f"FactPattern validation failed: dimension[{i}] missing '{key}'",
                        pattern="FactPattern",
                        node=self.config.name,
                    )
                    raise ValueError(
                        f"FactPattern (node '{self.config.name}'): dimension[{i}] missing required key '{key}'. "
                        f"Required keys: {required_keys}. "
                        f"Got: {dim}. "
                        f"Ensure all required keys are provided in the dimension config."
                    )

        ctx.debug(
            "FactPattern validation passed",
            pattern="FactPattern",
        )

    def execute(self, context: EngineContext) -> Any:
        """Execute the fact pattern to build a fact table with surrogate key lookups.

        Builds a fact table with automatic surrogate key lookups from dimension tables.
        The execution flow:
        1. Deduplicate source data if configured
        2. Perform dimension lookups to replace natural keys with surrogate keys
        3. Handle orphan records (unknown SK=0, reject with error, or quarantine)
        4. Apply measure calculations and transformations if configured
        5. Validate grain (check for duplicates) if grain is specified
        6. Add audit columns (load_timestamp, source_system) if configured

        Args:
            context: Engine context containing the source DataFrame, dimension tables,
                and execution environment.

        Returns:
            Fact DataFrame with surrogate keys from dimension lookups and all transformations applied.

        Raises:
            ValueError: If orphan records are found and orphan_handling='reject', if grain
                validation fails (duplicates found), or if dimension lookup fails.
            Exception: If quarantine write fails or measure calculations fail.
        """
        ctx = get_logging_context()
        start_time = time.time()

        deduplicate = self.params.get("deduplicate")
        keys = self.params.get("keys")
        grain = self.params.get("grain")
        dimensions = self.params.get("dimensions", [])
        orphan_handling = self.params.get("orphan_handling", "unknown")
        quarantine_config = self.params.get("quarantine", {})
        measures = self.params.get("measures", [])
        audit_config = self.params.get("audit", {})

        ctx.debug(
            "FactPattern starting",
            pattern="FactPattern",
            deduplicate=deduplicate,
            keys=keys,
            grain=grain,
            dimensions_count=len(dimensions),
            orphan_handling=orphan_handling,
        )

        df = context.df
        source_count = self._get_row_count(df, context.engine_type)
        ctx.debug("Fact source loaded", pattern="FactPattern", source_rows=source_count)

        try:
            if deduplicate and keys:
                df = self._deduplicate(context, df, keys)
                ctx.debug(
                    "Fact deduplication complete",
                    pattern="FactPattern",
                    rows_after=self._get_row_count(df, context.engine_type),
                )

            if dimensions:
                df, orphan_count, quarantined_df = self._lookup_dimensions(
                    context, df, dimensions, orphan_handling, quarantine_config
                )
                ctx.debug(
                    "Fact dimension lookups complete",
                    pattern="FactPattern",
                    orphan_count=orphan_count,
                )

                if orphan_handling == "quarantine" and quarantined_df is not None:
                    self._write_quarantine(context, quarantined_df, quarantine_config)
                    ctx.info(
                        f"Quarantined {orphan_count} orphan records",
                        pattern="FactPattern",
                        quarantine_path=quarantine_config.get("path")
                        or quarantine_config.get("table"),
                    )

            if measures:
                df = self._apply_measures(context, df, measures)

            if grain:
                self._validate_grain(context, df, grain)

            df = self._add_audit_columns(context, df, audit_config)

            result_count = self._get_row_count(df, context.engine_type)
            elapsed_ms = (time.time() - start_time) * 1000

            ctx.info(
                "FactPattern completed",
                pattern="FactPattern",
                elapsed_ms=round(elapsed_ms, 2),
                source_rows=source_count,
                result_rows=result_count,
            )

            return df

        except Exception as e:
            elapsed_ms = (time.time() - start_time) * 1000
            ctx.error(
                f"FactPattern failed: {e}",
                pattern="FactPattern",
                node=self.config.name,
                error_type=type(e).__name__,
                elapsed_ms=round(elapsed_ms, 2),
            )
            raise

    def _deduplicate(self, context: EngineContext, df, keys: List[str]):
        """Remove duplicates based on keys."""
        if context.engine_type == EngineType.SPARK:
            return df.dropDuplicates(keys)
        else:
            return df.drop_duplicates(subset=keys)

    def _lookup_dimensions(
        self,
        context: EngineContext,
        df,
        dimensions: List[Dict],
        orphan_handling: str,
        quarantine_config: Dict,
    ):
        """
        Perform surrogate key lookups from dimension tables.

        Returns:
            Tuple of (result_df, orphan_count, quarantined_df)
        """
        total_orphans = 0
        all_quarantined = []

        for dim_config in dimensions:
            source_col = dim_config["source_column"]
            dim_table = dim_config["dimension_table"]
            dim_key = dim_config["dimension_key"]
            sk_col = dim_config["surrogate_key"]
            is_scd2 = dim_config.get("scd2", False)

            dim_df = self._get_dimension_df(context, dim_table, is_scd2)
            if dim_df is None:
                raise ValueError(
                    f"FactPattern (node '{self.config.name}'): Dimension table '{dim_table}' not found in context. "
                    f"Expected dimension: '{dim_table}'. Available context: {list(context.dataframes.keys()) if hasattr(context, 'dataframes') else 'N/A'}."
                )

            df, orphan_count, quarantined = self._join_dimension(
                context,
                df,
                dim_df,
                source_col,
                dim_key,
                sk_col,
                orphan_handling,
                dim_table,
                quarantine_config,
            )
            total_orphans += orphan_count
            if quarantined is not None:
                all_quarantined.append(quarantined)

        quarantined_df = None
        if all_quarantined:
            quarantined_df = self._union_dataframes(context, all_quarantined)

        return df, total_orphans, quarantined_df

    def _union_dataframes(self, context: EngineContext, dfs: List):
        """Union multiple DataFrames together."""
        if not dfs:
            return None
        if context.engine_type == EngineType.SPARK:
            result = dfs[0]
            for df in dfs[1:]:
                result = result.unionByName(df, allowMissingColumns=True)
            return result
        else:
            import pandas as pd

            return pd.concat(dfs, ignore_index=True)

    def _get_dimension_df(self, context: EngineContext, dim_table: str, is_scd2: bool):
        """Get dimension DataFrame from context, optionally filtering for current records."""
        try:
            dim_df = context.get(dim_table)
        except KeyError:
            return None

        if is_scd2:
            is_current_col = "is_current"
            if context.engine_type == EngineType.SPARK:
                from pyspark.sql import functions as F

                if is_current_col in dim_df.columns:
                    dim_df = dim_df.filter(F.col(is_current_col) == True)  # noqa: E712
            else:
                if is_current_col in dim_df.columns:
                    dim_df = dim_df[dim_df[is_current_col] == True].copy()  # noqa: E712

        return dim_df

    def _join_dimension(
        self,
        context: EngineContext,
        fact_df,
        dim_df,
        source_col: str,
        dim_key: str,
        sk_col: str,
        orphan_handling: str,
        dim_table: str,
        quarantine_config: Dict,
    ):
        """
        Join fact to dimension and retrieve surrogate key.

        Returns:
            Tuple of (result_df, orphan_count, quarantined_df)
        """
        if context.engine_type == EngineType.SPARK:
            return self._join_dimension_spark(
                context,
                fact_df,
                dim_df,
                source_col,
                dim_key,
                sk_col,
                orphan_handling,
                dim_table,
                quarantine_config,
            )
        else:
            return self._join_dimension_pandas(
                fact_df,
                dim_df,
                source_col,
                dim_key,
                sk_col,
                orphan_handling,
                dim_table,
                quarantine_config,
            )

    def _join_dimension_spark(
        self,
        context: EngineContext,
        fact_df,
        dim_df,
        source_col: str,
        dim_key: str,
        sk_col: str,
        orphan_handling: str,
        dim_table: str,
        quarantine_config: Dict,
    ):
        from pyspark.sql import functions as F

        # Use a unique alias for the SK column to avoid ambiguity if sk_col
        # already exists in fact_df (e.g., from a previous dimension lookup)
        sk_alias = f"_dim_{sk_col}"
        dim_subset = dim_df.select(
            F.col(dim_key).alias(f"_dim_{dim_key}"),
            F.col(sk_col).alias(sk_alias),
        )

        joined = fact_df.join(
            dim_subset,
            fact_df[source_col] == dim_subset[f"_dim_{dim_key}"],
            "left",
        )

        # If sk_col already existed in fact_df, drop the old one and rename the new one
        if sk_col in fact_df.columns:
            joined = joined.drop(fact_df[sk_col])
        joined = joined.withColumnRenamed(sk_alias, sk_col)

        orphan_mask = F.col(sk_col).isNull()
        orphan_count = joined.filter(orphan_mask).count()
        quarantined_df = None

        if orphan_handling == "reject" and orphan_count > 0:
            raise ValueError(
                f"FactPattern: {orphan_count} orphan records found for dimension "
                f"lookup on '{source_col}'. Orphan handling is set to 'reject'."
            )

        if orphan_handling == "unknown":
            joined = joined.withColumn(sk_col, F.coalesce(F.col(sk_col), F.lit(0)))

        if orphan_handling == "quarantine" and orphan_count > 0:
            orphan_rows = joined.filter(orphan_mask).drop(f"_dim_{dim_key}")
            orphan_rows = self._add_quarantine_metadata_spark(
                orphan_rows, dim_table, source_col, quarantine_config
            )
            quarantined_df = orphan_rows
            joined = joined.filter(~orphan_mask)

        result = joined.drop(f"_dim_{dim_key}")

        return result, orphan_count, quarantined_df

    def _join_dimension_pandas(
        self,
        fact_df,
        dim_df,
        source_col: str,
        dim_key: str,
        sk_col: str,
        orphan_handling: str,
        dim_table: str,
        quarantine_config: Dict,
    ):
        import pandas as pd

        dim_subset = dim_df[[dim_key, sk_col]].copy()
        dim_subset = dim_subset.rename(columns={dim_key: f"_dim_{dim_key}"})

        merged = pd.merge(
            fact_df,
            dim_subset,
            left_on=source_col,
            right_on=f"_dim_{dim_key}",
            how="left",
        )

        orphan_mask = merged[sk_col].isna()
        orphan_count = orphan_mask.sum()
        quarantined_df = None

        if orphan_handling == "reject" and orphan_count > 0:
            raise ValueError(
                f"FactPattern (node '{self.config.name}'): {orphan_count} orphan records found for dimension "
                f"lookup on '{source_col}' -> '{dim_table}'. Orphan handling is set to 'reject'. "
                f"Fix: Either update orphan_handling to 'unknown' or 'quarantine', or ensure all '{source_col}' values exist in '{dim_table}.{dim_key}'."
            )

        if orphan_handling == "unknown":
            merged[sk_col] = merged[sk_col].fillna(0).infer_objects(copy=False).astype(int)

        if orphan_handling == "quarantine" and orphan_count > 0:
            orphan_rows = merged[orphan_mask].drop(columns=[f"_dim_{dim_key}"]).copy()
            orphan_rows = self._add_quarantine_metadata_pandas(
                orphan_rows, dim_table, source_col, quarantine_config
            )
            quarantined_df = orphan_rows
            merged = merged[~orphan_mask].copy()

        result = merged.drop(columns=[f"_dim_{dim_key}"])

        return result, int(orphan_count), quarantined_df

    def _apply_measures(self, context: EngineContext, df, measures: List):
        """
        Apply measure transformations.

        Measures can be:
        - String: passthrough column name
        - Dict with single key-value: rename or calculate
          - {"new_name": "old_name"} -> rename
          - {"new_name": "expr"} -> calculate (if expr contains operators)
        """
        for measure in measures:
            if isinstance(measure, str):
                continue
            elif isinstance(measure, dict):
                for new_name, expr in measure.items():
                    if self._is_expression(expr):
                        df = self._add_calculated_measure(context, df, new_name, expr)
                    else:
                        df = self._rename_column(context, df, expr, new_name)

        return df

    def _is_expression(self, expr: str) -> bool:
        """Check if string is a calculation expression vs a column name.

        Operators like + - * / must be surrounded by spaces to count as
        expression operators. Parentheses always indicate an expression.
        This avoids treating column names with hyphens (e.g. 'total-cost')
        as expressions.
        """
        if "(" in expr or ")" in expr:
            return True
        spaced_operators = [" + ", " - ", " * ", " / "]
        return any(op in expr for op in spaced_operators)

    def _add_calculated_measure(self, context: EngineContext, df, name: str, expr: str):
        """Add a calculated measure column using an expression.

        Args:
            context: Engine context containing engine type and configuration.
            df: The DataFrame to add the calculated measure to.
            name: The name of the new calculated measure column.
            expr: The expression to evaluate for the calculated measure.

        Returns:
            DataFrame with the new calculated measure column added.
        """
        if context.engine_type == EngineType.SPARK:
            from pyspark.sql import functions as F

            return df.withColumn(name, F.expr(expr))
        else:
            df = df.copy()
            df[name] = df.eval(expr)
            return df

    def _rename_column(self, context: EngineContext, df, old_name: str, new_name: str):
        """Rename a column in the DataFrame.

        Args:
            context: Engine context containing engine type and configuration.
            df: The DataFrame containing the column to rename.
            old_name: The current name of the column.
            new_name: The new name for the column.

        Returns:
            DataFrame with the column renamed.
        """
        if context.engine_type == EngineType.SPARK:
            return df.withColumnRenamed(old_name, new_name)
        else:
            return df.rename(columns={old_name: new_name})

    def _validate_grain(self, context: EngineContext, df, grain: List[str]):
        """
        Validate that no duplicate rows exist at the grain level.

        Raises ValueError if duplicates are found.
        """
        ctx = get_logging_context()

        if context.engine_type == EngineType.SPARK:
            total_count = df.count()
            distinct_count = df.select(grain).distinct().count()
        else:
            total_count = len(df)
            distinct_count = len(df.drop_duplicates(subset=grain))

        if total_count != distinct_count:
            duplicate_count = total_count - distinct_count
            ctx.error(
                f"FactPattern grain validation failed: {duplicate_count} duplicate rows",
                pattern="FactPattern",
                node=self.config.name,
                grain=grain,
                total_rows=total_count,
                distinct_rows=distinct_count,
            )
            raise ValueError(
                f"FactPattern (node '{self.config.name}'): Grain validation failed. Found {duplicate_count} duplicate "
                f"rows at grain level {grain}. Total rows: {total_count}, "
                f"Distinct rows: {distinct_count}. "
                f"Fix: Check for duplicate data in source or add deduplication before grain validation."
            )

        ctx.debug(
            "FactPattern grain validation passed",
            pattern="FactPattern",
            grain=grain,
            total_rows=total_count,
        )

    def _add_quarantine_metadata_spark(
        self,
        df,
        dim_table: str,
        source_col: str,
        quarantine_config: Dict,
    ):
        """Add metadata columns to quarantined Spark DataFrame.

        Args:
            df: The Spark DataFrame containing quarantined records.
            dim_table: Name of the dimension table that caused the orphan rejection.
            source_col: Name of the source column that failed the dimension lookup.
            quarantine_config: Configuration dictionary with 'add_columns' specifying
                which metadata columns to add (_rejection_reason, _rejected_at, _source_dimension).

        Returns:
            Spark DataFrame with quarantine metadata columns added.
        """
        from pyspark.sql import functions as F

        add_columns = quarantine_config.get("add_columns", {})

        if add_columns.get("_rejection_reason", False):
            reason = f"Orphan record: no match in dimension '{dim_table}' on column '{source_col}'"
            df = df.withColumn("_rejection_reason", F.lit(reason))

        if add_columns.get("_rejected_at", False):
            df = df.withColumn("_rejected_at", F.current_timestamp())

        if add_columns.get("_source_dimension", False):
            df = df.withColumn("_source_dimension", F.lit(dim_table))

        return df

    def _add_quarantine_metadata_pandas(
        self,
        df,
        dim_table: str,
        source_col: str,
        quarantine_config: Dict,
    ):
        """Add metadata columns to quarantined Pandas DataFrame.

        Args:
            df: The Pandas DataFrame containing quarantined records.
            dim_table: Name of the dimension table that caused the orphan rejection.
            source_col: Name of the source column that failed the dimension lookup.
            quarantine_config: Configuration dictionary with 'add_columns' specifying
                which metadata columns to add (_rejection_reason, _rejected_at, _source_dimension).

        Returns:
            Pandas DataFrame with quarantine metadata columns added.
        """
        add_columns = quarantine_config.get("add_columns", {})

        if add_columns.get("_rejection_reason", False):
            reason = f"Orphan record: no match in dimension '{dim_table}' on column '{source_col}'"
            df["_rejection_reason"] = reason

        if add_columns.get("_rejected_at", False):
            # Use timezone-aware timestamp for Delta Lake compatibility
            from datetime import timezone

            df["_rejected_at"] = datetime.now(timezone.utc)

        if add_columns.get("_source_dimension", False):
            df["_source_dimension"] = dim_table

        return df

    def _write_quarantine(
        self,
        context: EngineContext,
        quarantined_df,
        quarantine_config: Dict,
    ):
        """Write quarantined records to the configured destination.

        Args:
            context: Engine context containing engine type and configuration.
            quarantined_df: DataFrame containing the quarantined orphan records.
            quarantine_config: Configuration dictionary specifying destination with keys:
                'connection', 'path', and/or 'table'.
        """
        ctx = get_logging_context()
        connection = quarantine_config.get("connection")
        path = quarantine_config.get("path")
        table = quarantine_config.get("table")

        if context.engine_type == EngineType.SPARK:
            self._write_quarantine_spark(context, quarantined_df, connection, path, table)
        else:
            self._write_quarantine_pandas(context, quarantined_df, connection, path, table)

        ctx.debug(
            "Quarantine data written",
            pattern="FactPattern",
            connection=connection,
            destination=path or table,
        )

    def _write_quarantine_spark(
        self,
        context: EngineContext,
        df,
        connection: str,
        path: Optional[str],
        table: Optional[str],
    ):
        """Write quarantine data using Spark to Delta Lake format.

        Args:
            context: Engine context with engine instance for connection resolution.
            df: Spark DataFrame containing quarantined records.
            connection: Connection name for resolving paths.
            path: Optional file path for quarantine storage.
            table: Optional table name for quarantine storage.
        """
        if table:
            full_table = f"{connection}.{table}" if connection else table
            df.write.format("delta").mode("append").saveAsTable(full_table)
        elif path:
            full_path = path
            if hasattr(context, "engine") and context.engine:
                if connection in getattr(context.engine, "connections", {}):
                    try:
                        full_path = context.engine.connections[connection].get_path(path)
                    except Exception:
                        pass
            df.write.format("delta").mode("append").save(full_path)

    def _write_quarantine_pandas(
        self,
        context: EngineContext,
        df,
        connection: str,
        path: Optional[str],
        table: Optional[str],
    ):
        """Write quarantine data using Pandas to Delta Lake or SQL Server.

        Args:
            context: Engine context with engine instance for connection resolution.
            df: Pandas DataFrame containing quarantined records.
            connection: Connection name for resolving paths or database connections.
            path: Optional file path for quarantine storage (Delta Lake).
            table: Optional table name for quarantine storage (SQL Server).
        """
        import os

        destination = path or table
        full_path = destination

        if hasattr(context, "engine") and context.engine:
            if connection in getattr(context.engine, "connections", {}):
                try:
                    full_path = context.engine.connections[connection].get_path(destination)
                except Exception:
                    pass

        path_lower = str(full_path).lower()

        # Create parent directories if needed
        dir_name = os.path.dirname(full_path)
        if dir_name:
            os.makedirs(dir_name, exist_ok=True)

        if path_lower.endswith(".csv"):
            if os.path.exists(full_path):
                df.to_csv(full_path, mode="a", header=False, index=False)
            else:
                df.to_csv(full_path, index=False)
        elif path_lower.endswith(".json"):
            if os.path.exists(full_path):
                import pandas as pd

                existing = pd.read_json(full_path)
                combined = pd.concat([existing, df], ignore_index=True)
                combined.to_json(full_path, orient="records")
            else:
                df.to_json(full_path, orient="records")
        else:
            if os.path.exists(full_path):
                import pandas as pd

                existing = pd.read_parquet(full_path)
                combined = pd.concat([existing, df], ignore_index=True)
                combined.to_parquet(full_path, index=False)
            else:
                df.to_parquet(full_path, index=False)

execute(context)

Execute the fact pattern to build a fact table with surrogate key lookups.

Builds a fact table with automatic surrogate key lookups from dimension tables. The execution flow: 1. Deduplicate source data if configured 2. Perform dimension lookups to replace natural keys with surrogate keys 3. Handle orphan records (unknown SK=0, reject with error, or quarantine) 4. Apply measure calculations and transformations if configured 5. Validate grain (check for duplicates) if grain is specified 6. Add audit columns (load_timestamp, source_system) if configured

Parameters:

Name Type Description Default
context EngineContext

Engine context containing the source DataFrame, dimension tables, and execution environment.

required

Returns:

Type Description
Any

Fact DataFrame with surrogate keys from dimension lookups and all transformations applied.

Raises:

Type Description
ValueError

If orphan records are found and orphan_handling='reject', if grain validation fails (duplicates found), or if dimension lookup fails.

Exception

If quarantine write fails or measure calculations fail.

Source code in odibi/patterns/fact.py
def execute(self, context: EngineContext) -> Any:
    """Execute the fact pattern to build a fact table with surrogate key lookups.

    Builds a fact table with automatic surrogate key lookups from dimension tables.
    The execution flow:
    1. Deduplicate source data if configured
    2. Perform dimension lookups to replace natural keys with surrogate keys
    3. Handle orphan records (unknown SK=0, reject with error, or quarantine)
    4. Apply measure calculations and transformations if configured
    5. Validate grain (check for duplicates) if grain is specified
    6. Add audit columns (load_timestamp, source_system) if configured

    Args:
        context: Engine context containing the source DataFrame, dimension tables,
            and execution environment.

    Returns:
        Fact DataFrame with surrogate keys from dimension lookups and all transformations applied.

    Raises:
        ValueError: If orphan records are found and orphan_handling='reject', if grain
            validation fails (duplicates found), or if dimension lookup fails.
        Exception: If quarantine write fails or measure calculations fail.
    """
    ctx = get_logging_context()
    start_time = time.time()

    deduplicate = self.params.get("deduplicate")
    keys = self.params.get("keys")
    grain = self.params.get("grain")
    dimensions = self.params.get("dimensions", [])
    orphan_handling = self.params.get("orphan_handling", "unknown")
    quarantine_config = self.params.get("quarantine", {})
    measures = self.params.get("measures", [])
    audit_config = self.params.get("audit", {})

    ctx.debug(
        "FactPattern starting",
        pattern="FactPattern",
        deduplicate=deduplicate,
        keys=keys,
        grain=grain,
        dimensions_count=len(dimensions),
        orphan_handling=orphan_handling,
    )

    df = context.df
    source_count = self._get_row_count(df, context.engine_type)
    ctx.debug("Fact source loaded", pattern="FactPattern", source_rows=source_count)

    try:
        if deduplicate and keys:
            df = self._deduplicate(context, df, keys)
            ctx.debug(
                "Fact deduplication complete",
                pattern="FactPattern",
                rows_after=self._get_row_count(df, context.engine_type),
            )

        if dimensions:
            df, orphan_count, quarantined_df = self._lookup_dimensions(
                context, df, dimensions, orphan_handling, quarantine_config
            )
            ctx.debug(
                "Fact dimension lookups complete",
                pattern="FactPattern",
                orphan_count=orphan_count,
            )

            if orphan_handling == "quarantine" and quarantined_df is not None:
                self._write_quarantine(context, quarantined_df, quarantine_config)
                ctx.info(
                    f"Quarantined {orphan_count} orphan records",
                    pattern="FactPattern",
                    quarantine_path=quarantine_config.get("path")
                    or quarantine_config.get("table"),
                )

        if measures:
            df = self._apply_measures(context, df, measures)

        if grain:
            self._validate_grain(context, df, grain)

        df = self._add_audit_columns(context, df, audit_config)

        result_count = self._get_row_count(df, context.engine_type)
        elapsed_ms = (time.time() - start_time) * 1000

        ctx.info(
            "FactPattern completed",
            pattern="FactPattern",
            elapsed_ms=round(elapsed_ms, 2),
            source_rows=source_count,
            result_rows=result_count,
        )

        return df

    except Exception as e:
        elapsed_ms = (time.time() - start_time) * 1000
        ctx.error(
            f"FactPattern failed: {e}",
            pattern="FactPattern",
            node=self.config.name,
            error_type=type(e).__name__,
            elapsed_ms=round(elapsed_ms, 2),
        )
        raise

validate()

Validate fact pattern configuration parameters.

Ensures that all required parameters are present and valid. Checks that: - keys are provided when deduplicate is True - orphan_handling is valid ('unknown', 'reject', or 'quarantine') - quarantine config is complete when orphan_handling='quarantine' - all dimension lookups have required fields (source_column, dimension_table, etc.)

Raises:

Type Description
ValueError

If keys are missing for deduplication, orphan_handling is invalid, quarantine config is incomplete, or dimension configs are missing required fields.

Source code in odibi/patterns/fact.py
def validate(self) -> None:
    """Validate fact pattern configuration parameters.

    Ensures that all required parameters are present and valid. Checks that:
    - keys are provided when deduplicate is True
    - orphan_handling is valid ('unknown', 'reject', or 'quarantine')
    - quarantine config is complete when orphan_handling='quarantine'
    - all dimension lookups have required fields (source_column, dimension_table, etc.)

    Raises:
        ValueError: If keys are missing for deduplication, orphan_handling is invalid,
            quarantine config is incomplete, or dimension configs are missing required fields.
    """
    ctx = get_logging_context()
    deduplicate = self.params.get("deduplicate")
    keys = self.params.get("keys")
    grain = self.params.get("grain")
    dimensions = self.params.get("dimensions", [])
    orphan_handling = self.params.get("orphan_handling", "unknown")

    ctx.debug(
        "FactPattern validation starting",
        pattern="FactPattern",
        deduplicate=deduplicate,
        keys=keys,
        grain=grain,
        dimensions_count=len(dimensions),
    )

    if deduplicate and not keys:
        ctx.error(
            "FactPattern validation failed: 'keys' required when 'deduplicate' is True",
            pattern="FactPattern",
            node=self.config.name,
        )
        raise ValueError(
            f"FactPattern (node '{self.config.name}'): 'keys' required when 'deduplicate' is True. "
            "Keys define which columns uniquely identify a fact row for deduplication. "
            "Provide keys=['col1', 'col2'] to specify the deduplication columns."
        )

    if orphan_handling not in ("unknown", "reject", "quarantine"):
        ctx.error(
            f"FactPattern validation failed: invalid orphan_handling '{orphan_handling}'",
            pattern="FactPattern",
            node=self.config.name,
        )
        raise ValueError(
            f"FactPattern (node '{self.config.name}'): 'orphan_handling' must be 'unknown', 'reject', or 'quarantine'. "
            f"Got: {orphan_handling}"
        )

    if orphan_handling == "quarantine":
        quarantine_config = self.params.get("quarantine")
        if not quarantine_config:
            ctx.error(
                "FactPattern validation failed: 'quarantine' config required "
                "when orphan_handling='quarantine'",
                pattern="FactPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"FactPattern (node '{self.config.name}'): 'quarantine' configuration is required when "
                "orphan_handling='quarantine'."
            )
        if not quarantine_config.get("connection"):
            ctx.error(
                "FactPattern validation failed: quarantine.connection is required",
                pattern="FactPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"FactPattern (node '{self.config.name}'): 'quarantine.connection' is required. "
                "The connection specifies where to write quarantined orphan records "
                "(e.g., a Spark session or database connection). "
                "Add 'connection' to your quarantine config."
            )
        if not quarantine_config.get("path") and not quarantine_config.get("table"):
            ctx.error(
                "FactPattern validation failed: quarantine requires 'path' or 'table'",
                pattern="FactPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"FactPattern (node '{self.config.name}'): 'quarantine' requires either 'path' or 'table'. "
                f"Got config: {quarantine_config}. "
                "Add 'path' for file storage or 'table' for database storage."
            )

    for i, dim in enumerate(dimensions):
        required_keys = ["source_column", "dimension_table", "dimension_key", "surrogate_key"]
        for key in required_keys:
            if key not in dim:
                ctx.error(
                    f"FactPattern validation failed: dimension[{i}] missing '{key}'",
                    pattern="FactPattern",
                    node=self.config.name,
                )
                raise ValueError(
                    f"FactPattern (node '{self.config.name}'): dimension[{i}] missing required key '{key}'. "
                    f"Required keys: {required_keys}. "
                    f"Got: {dim}. "
                    f"Ensure all required keys are provided in the dimension config."
                )

    ctx.debug(
        "FactPattern validation passed",
        pattern="FactPattern",
    )

odibi.patterns.aggregation

AggregationPattern

Bases: Pattern

Aggregation Pattern: Declarative aggregation with time-grain rollups.

Features: - Declare grain (GROUP BY columns) - Declare measures with aggregation functions - Incremental aggregation (merge new data with existing) - Time rollups (generate multiple grain levels) - Audit columns

Configuration Options (via params dict): - grain (list): Columns to GROUP BY (defines uniqueness) - measures (list): Measure definitions with name and aggregation expr - name: Output column name - expr: SQL aggregation expression (e.g., "SUM(amount)") - incremental (dict): Incremental merge configuration (optional) - timestamp_column: Column to identify new data - merge_strategy: "replace", "sum", "min", or "max" - having (str): Optional HAVING clause for filtering aggregates - audit (dict): Audit column configuration

Example Config
Source code in odibi/patterns/aggregation.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
class AggregationPattern(Pattern):
    """
    Aggregation Pattern: Declarative aggregation with time-grain rollups.

    Features:
    - Declare grain (GROUP BY columns)
    - Declare measures with aggregation functions
    - Incremental aggregation (merge new data with existing)
    - Time rollups (generate multiple grain levels)
    - Audit columns

    Configuration Options (via params dict):
        - **grain** (list): Columns to GROUP BY (defines uniqueness)
        - **measures** (list): Measure definitions with name and aggregation expr
            - name: Output column name
            - expr: SQL aggregation expression (e.g., "SUM(amount)")
        - **incremental** (dict): Incremental merge configuration (optional)
            - timestamp_column: Column to identify new data
            - merge_strategy: "replace", "sum", "min", or "max"
        - **having** (str): Optional HAVING clause for filtering aggregates
        - **audit** (dict): Audit column configuration

    Example Config:
        pattern:
          type: aggregation
          params:
            grain: [date_sk, product_sk]
            measures:
              - name: total_revenue
                expr: "SUM(total_amount)"
              - name: order_count
                expr: "COUNT(*)"
              - name: avg_order_value
                expr: "AVG(total_amount)"
            having: "COUNT(*) > 0"
            audit:
              load_timestamp: true
    """

    required_params: ClassVar[List[str]] = ["grain", "measures"]
    optional_params: ClassVar[List[str]] = ["incremental", "having", "audit"]

    def validate(self) -> None:
        """Validate aggregation pattern configuration parameters.

        Ensures that all required parameters are present and valid. Checks that:
        - grain is specified (list of GROUP BY columns)
        - measures are provided with correct structure (list of dicts with 'name' and 'expr')
        - incremental config is valid if provided (requires 'timestamp_column' and valid merge_strategy)

        Raises:
            ValueError: If grain is missing, measures are missing/invalid, or incremental config is invalid.
        """
        ctx = get_logging_context()
        grain = self.params.get("grain")
        measures = self.params.get("measures", [])

        ctx.debug(
            "AggregationPattern validation starting",
            pattern="AggregationPattern",
            grain=grain,
            measures_count=len(measures),
        )

        if not grain:
            ctx.error(
                "AggregationPattern validation failed: 'grain' is required",
                pattern="AggregationPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"AggregationPattern (node '{self.config.name}'): 'grain' parameter is required. "
                "Grain defines the grouping columns for aggregation (e.g., ['date', 'region']). "
                "Provide a list of column names to group by."
            )

        if not measures:
            ctx.error(
                "AggregationPattern validation failed: 'measures' is required",
                pattern="AggregationPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"AggregationPattern (node '{self.config.name}'): 'measures' parameter is required. "
                "Measures define the aggregations to compute (e.g., [{'name': 'total_sales', 'expr': 'sum(amount)'}]). "
                "Provide a list of dicts, each with 'name' and 'expr' keys."
            )

        for i, measure in enumerate(measures):
            if not isinstance(measure, dict):
                ctx.error(
                    f"AggregationPattern validation failed: measure[{i}] must be a dict",
                    pattern="AggregationPattern",
                    node=self.config.name,
                )
                raise ValueError(
                    f"AggregationPattern (node '{self.config.name}'): measure[{i}] must be a dict with 'name' and 'expr'. "
                    f"Got {type(measure).__name__}: {measure!r}. "
                    "Example: {'name': 'total_sales', 'expr': 'sum(amount)'}"
                )
            if "name" not in measure:
                ctx.error(
                    f"AggregationPattern validation failed: measure[{i}] missing 'name'",
                    pattern="AggregationPattern",
                    node=self.config.name,
                )
                raise ValueError(
                    f"AggregationPattern (node '{self.config.name}'): measure[{i}] missing 'name'. "
                    f"Got: {measure!r}. Add a 'name' key for the output column name."
                )
            if "expr" not in measure:
                ctx.error(
                    f"AggregationPattern validation failed: measure[{i}] missing 'expr'",
                    pattern="AggregationPattern",
                    node=self.config.name,
                )
                raise ValueError(
                    f"AggregationPattern (node '{self.config.name}'): measure[{i}] missing 'expr'. "
                    f"Got: {measure!r}. Add an 'expr' key with the aggregation expression (e.g., 'sum(amount)')."
                )

        incremental = self.params.get("incremental")
        if incremental:
            if "timestamp_column" not in incremental:
                ctx.error(
                    "AggregationPattern validation failed: incremental missing 'timestamp_column'",
                    pattern="AggregationPattern",
                    node=self.config.name,
                )
                raise ValueError(
                    f"AggregationPattern (node '{self.config.name}'): incremental config requires 'timestamp_column'. "
                    f"Got: {incremental!r}. "
                    "Add 'timestamp_column' to specify which column tracks record timestamps."
                )
            merge_strategy = incremental.get("merge_strategy", "replace")
            if merge_strategy not in ("replace", "sum", "min", "max"):
                ctx.error(
                    f"AggregationPattern validation failed: invalid merge_strategy '{merge_strategy}'",
                    pattern="AggregationPattern",
                    node=self.config.name,
                )
                raise ValueError(
                    f"AggregationPattern (node '{self.config.name}'): 'merge_strategy' must be 'replace', 'sum', 'min', or 'max'. "
                    f"Got: {merge_strategy}"
                )

        ctx.debug(
            "AggregationPattern validation passed",
            pattern="AggregationPattern",
        )

    def execute(self, context: EngineContext) -> Any:
        """Execute the aggregation pattern on the input data.

        Performs aggregation operations on the source DataFrame, optionally applying incremental
        merge with existing target data. The execution flow:
        1. Aggregate source data by grain columns with specified measures
        2. Apply HAVING clause filtering if configured
        3. Merge with existing target data if incremental mode is enabled
        4. Add audit columns (load_timestamp, source_system) if configured

        Args:
            context: Engine context containing the source DataFrame and execution environment.

        Returns:
            Aggregated DataFrame with measures computed at the specified grain level.

        Raises:
            Exception: If aggregation fails, incremental merge fails, or target loading fails.
        """
        ctx = get_logging_context()
        start_time = time.time()

        grain = self.params.get("grain")
        measures = self.params.get("measures", [])
        having = self.params.get("having")
        incremental = self.params.get("incremental")
        audit_config = self.params.get("audit", {})
        target = self.params.get("target")

        ctx.debug(
            "AggregationPattern starting",
            pattern="AggregationPattern",
            grain=grain,
            measures_count=len(measures),
            incremental=incremental is not None,
        )

        df = context.df
        source_count = self._get_row_count(df, context.engine_type)
        ctx.debug(
            "Aggregation source loaded",
            pattern="AggregationPattern",
            source_rows=source_count,
        )

        try:
            result_df = self._aggregate(context, df, grain, measures, having)

            if incremental and target:
                result_df = self._apply_incremental(
                    context, result_df, grain, measures, incremental, target
                )

            result_df = self._add_audit_columns(context, result_df, audit_config)

            result_count = self._get_row_count(result_df, context.engine_type)
            elapsed_ms = (time.time() - start_time) * 1000

            ctx.info(
                "AggregationPattern completed",
                pattern="AggregationPattern",
                elapsed_ms=round(elapsed_ms, 2),
                source_rows=source_count,
                result_rows=result_count,
                grain=grain,
            )

            return result_df

        except Exception as e:
            elapsed_ms = (time.time() - start_time) * 1000
            ctx.error(
                f"AggregationPattern failed: {e}",
                pattern="AggregationPattern",
                node=self.config.name,
                error_type=type(e).__name__,
                elapsed_ms=round(elapsed_ms, 2),
            )
            raise

    def _aggregate(
        self,
        context: EngineContext,
        df,
        grain: List[str],
        measures: List[Dict],
        having: Optional[str],
    ):
        """Perform the aggregation using SQL.

        Args:
            context: Engine context containing engine type and configuration.
            df: The source DataFrame to aggregate.
            grain: List of column names to group by.
            measures: List of measure definitions with aggregation functions.
            having: Optional SQL HAVING clause filter for post-aggregation filtering.

        Returns:
            Aggregated DataFrame with grain columns and calculated measures.
        """
        if context.engine_type == EngineType.SPARK:
            return self._aggregate_spark(context, df, grain, measures, having)
        else:
            return self._aggregate_pandas(context, df, grain, measures, having)

    def _aggregate_spark(
        self,
        context: EngineContext,
        df,
        grain: List[str],
        measures: List[Dict],
        having: Optional[str],
    ):
        """Aggregate using Spark SQL."""
        from pyspark.sql import functions as F

        grain_cols = [F.col(c) for c in grain]

        agg_exprs = []
        for measure in measures:
            name = measure["name"]
            expr = measure["expr"]
            agg_exprs.append(F.expr(expr).alias(name))

        result = df.groupBy(*grain_cols).agg(*agg_exprs)

        if having:
            result = result.filter(F.expr(having))

        return result

    def _aggregate_pandas(
        self,
        context: EngineContext,
        df,
        grain: List[str],
        measures: List[Dict],
        having: Optional[str],
    ):
        """Aggregate using DuckDB SQL via context.sql()."""
        grain_str = ", ".join(grain)

        measure_exprs = []
        for measure in measures:
            name = measure["name"]
            expr = measure["expr"]
            measure_exprs.append(f"{expr} AS {name}")
        measures_str = ", ".join(measure_exprs)

        sql = f"SELECT {grain_str}, {measures_str} FROM df GROUP BY {grain_str}"

        if having:
            sql += f" HAVING {having}"

        temp_context = context.with_df(df)
        result_context = temp_context.sql(sql)
        return result_context.df

    def _apply_incremental(
        self,
        context: EngineContext,
        new_agg_df,
        grain: List[str],
        measures: List[Dict],
        incremental: Dict,
        target: str,
    ):
        """Apply incremental merge with existing aggregations."""
        merge_strategy = incremental.get("merge_strategy", "replace")

        existing_df = self._load_existing_target(context, target)
        if existing_df is None:
            return new_agg_df

        if merge_strategy == "replace":
            return self._merge_replace(context, existing_df, new_agg_df, grain)
        elif merge_strategy == "sum":
            return self._merge_sum(context, existing_df, new_agg_df, grain, measures)
        elif merge_strategy == "min":
            return self._merge_min(context, existing_df, new_agg_df, grain, measures)
        else:  # max
            return self._merge_max(context, existing_df, new_agg_df, grain, measures)

    def _merge_replace(self, context: EngineContext, existing_df, new_df, grain: List[str]):
        """
        Replace strategy: New aggregates overwrite existing for matching grain keys.
        """
        if context.engine_type == EngineType.SPARK:
            new_keys = new_df.select(grain).distinct()

            unchanged = existing_df.join(new_keys, on=grain, how="left_anti")

            return unchanged.unionByName(new_df, allowMissingColumns=True)
        else:
            import pandas as pd

            new_keys = new_df[grain].drop_duplicates()

            merged = pd.merge(existing_df, new_keys, on=grain, how="left", indicator=True)
            unchanged = merged[merged["_merge"] == "left_only"].drop(columns=["_merge"])

            return pd.concat([unchanged, new_df], ignore_index=True)

    def _merge_sum(
        self,
        context: EngineContext,
        existing_df,
        new_df,
        grain: List[str],
        measures: List[Dict],
    ):
        """
        Sum strategy: Add new measure values to existing for matching grain keys.
        """
        measure_names = [m["name"] for m in measures]

        if context.engine_type == EngineType.SPARK:
            from pyspark.sql import functions as F

            joined = existing_df.alias("e").join(new_df.alias("n"), on=grain, how="full_outer")

            select_cols = []
            for col in grain:
                select_cols.append(F.coalesce(F.col(f"e.{col}"), F.col(f"n.{col}")).alias(col))

            for name in measure_names:
                select_cols.append(
                    (
                        F.coalesce(F.col(f"e.{name}"), F.lit(0))
                        + F.coalesce(F.col(f"n.{name}"), F.lit(0))
                    ).alias(name)
                )

            other_cols = [
                c for c in existing_df.columns if c not in grain and c not in measure_names
            ]
            for col in other_cols:
                select_cols.append(F.coalesce(F.col(f"e.{col}"), F.col(f"n.{col}")).alias(col))

            return joined.select(select_cols)
        else:
            import pandas as pd

            merged = pd.merge(existing_df, new_df, on=grain, how="outer", suffixes=("_e", "_n"))

            result = merged[grain].copy()

            for name in measure_names:
                e_col = f"{name}_e" if f"{name}_e" in merged.columns else name
                n_col = f"{name}_n" if f"{name}_n" in merged.columns else name

                if e_col in merged.columns and n_col in merged.columns:
                    result[name] = merged[e_col].fillna(0).infer_objects(copy=False) + merged[
                        n_col
                    ].fillna(0).infer_objects(copy=False)
                elif e_col in merged.columns:
                    result[name] = merged[e_col].fillna(0).infer_objects(copy=False)
                elif n_col in merged.columns:
                    result[name] = merged[n_col].fillna(0).infer_objects(copy=False)
                else:
                    result[name] = 0

            other_cols = [
                c for c in existing_df.columns if c not in grain and c not in measure_names
            ]
            for col in other_cols:
                e_col = f"{col}_e" if f"{col}_e" in merged.columns else col
                n_col = f"{col}_n" if f"{col}_n" in merged.columns else col
                if e_col in merged.columns:
                    result[col] = merged[e_col]
                elif n_col in merged.columns:
                    result[col] = merged[n_col]

            return result

    def _merge_min(
        self,
        context: EngineContext,
        existing_df,
        new_df,
        grain: List[str],
        measures: List[Dict],
    ):
        """
        Min strategy: Keep the minimum value for each measure across existing and new.
        """
        measure_names = [m["name"] for m in measures]

        if context.engine_type == EngineType.SPARK:
            from pyspark.sql import functions as F

            joined = existing_df.alias("e").join(new_df.alias("n"), on=grain, how="full_outer")

            select_cols = []
            for col in grain:
                select_cols.append(F.coalesce(F.col(f"e.{col}"), F.col(f"n.{col}")).alias(col))

            for name in measure_names:
                select_cols.append(
                    F.least(
                        F.coalesce(F.col(f"e.{name}"), F.col(f"n.{name}")),
                        F.coalesce(F.col(f"n.{name}"), F.col(f"e.{name}")),
                    ).alias(name)
                )

            other_cols = [
                c for c in existing_df.columns if c not in grain and c not in measure_names
            ]
            for col in other_cols:
                select_cols.append(F.coalesce(F.col(f"e.{col}"), F.col(f"n.{col}")).alias(col))

            return joined.select(select_cols)
        else:
            import pandas as pd

            merged = pd.merge(existing_df, new_df, on=grain, how="outer", suffixes=("_e", "_n"))

            result = merged[grain].copy()

            for name in measure_names:
                e_col = f"{name}_e" if f"{name}_e" in merged.columns else name
                n_col = f"{name}_n" if f"{name}_n" in merged.columns else name

                if e_col in merged.columns and n_col in merged.columns:
                    result[name] = merged[[e_col, n_col]].min(axis=1)
                elif e_col in merged.columns:
                    result[name] = merged[e_col]
                elif n_col in merged.columns:
                    result[name] = merged[n_col]

            other_cols = [
                c for c in existing_df.columns if c not in grain and c not in measure_names
            ]
            for col in other_cols:
                e_col = f"{col}_e" if f"{col}_e" in merged.columns else col
                n_col = f"{col}_n" if f"{col}_n" in merged.columns else col
                if e_col in merged.columns:
                    result[col] = merged[e_col]
                elif n_col in merged.columns:
                    result[col] = merged[n_col]

            return result

    def _merge_max(
        self,
        context: EngineContext,
        existing_df,
        new_df,
        grain: List[str],
        measures: List[Dict],
    ):
        """
        Max strategy: Keep the maximum value for each measure across existing and new.
        """
        measure_names = [m["name"] for m in measures]

        if context.engine_type == EngineType.SPARK:
            from pyspark.sql import functions as F

            joined = existing_df.alias("e").join(new_df.alias("n"), on=grain, how="full_outer")

            select_cols = []
            for col in grain:
                select_cols.append(F.coalesce(F.col(f"e.{col}"), F.col(f"n.{col}")).alias(col))

            for name in measure_names:
                select_cols.append(
                    F.greatest(
                        F.coalesce(F.col(f"e.{name}"), F.col(f"n.{name}")),
                        F.coalesce(F.col(f"n.{name}"), F.col(f"e.{name}")),
                    ).alias(name)
                )

            other_cols = [
                c for c in existing_df.columns if c not in grain and c not in measure_names
            ]
            for col in other_cols:
                select_cols.append(F.coalesce(F.col(f"e.{col}"), F.col(f"n.{col}")).alias(col))

            return joined.select(select_cols)
        else:
            import pandas as pd

            merged = pd.merge(existing_df, new_df, on=grain, how="outer", suffixes=("_e", "_n"))

            result = merged[grain].copy()

            for name in measure_names:
                e_col = f"{name}_e" if f"{name}_e" in merged.columns else name
                n_col = f"{name}_n" if f"{name}_n" in merged.columns else name

                if e_col in merged.columns and n_col in merged.columns:
                    result[name] = merged[[e_col, n_col]].max(axis=1)
                elif e_col in merged.columns:
                    result[name] = merged[e_col]
                elif n_col in merged.columns:
                    result[name] = merged[n_col]

            other_cols = [
                c for c in existing_df.columns if c not in grain and c not in measure_names
            ]
            for col in other_cols:
                e_col = f"{col}_e" if f"{col}_e" in merged.columns else col
                n_col = f"{col}_n" if f"{col}_n" in merged.columns else col
                if e_col in merged.columns:
                    result[col] = merged[e_col]
                elif n_col in merged.columns:
                    result[col] = merged[n_col]

            return result

execute(context)

Execute the aggregation pattern on the input data.

Performs aggregation operations on the source DataFrame, optionally applying incremental merge with existing target data. The execution flow: 1. Aggregate source data by grain columns with specified measures 2. Apply HAVING clause filtering if configured 3. Merge with existing target data if incremental mode is enabled 4. Add audit columns (load_timestamp, source_system) if configured

Parameters:

Name Type Description Default
context EngineContext

Engine context containing the source DataFrame and execution environment.

required

Returns:

Type Description
Any

Aggregated DataFrame with measures computed at the specified grain level.

Raises:

Type Description
Exception

If aggregation fails, incremental merge fails, or target loading fails.

Source code in odibi/patterns/aggregation.py
def execute(self, context: EngineContext) -> Any:
    """Execute the aggregation pattern on the input data.

    Performs aggregation operations on the source DataFrame, optionally applying incremental
    merge with existing target data. The execution flow:
    1. Aggregate source data by grain columns with specified measures
    2. Apply HAVING clause filtering if configured
    3. Merge with existing target data if incremental mode is enabled
    4. Add audit columns (load_timestamp, source_system) if configured

    Args:
        context: Engine context containing the source DataFrame and execution environment.

    Returns:
        Aggregated DataFrame with measures computed at the specified grain level.

    Raises:
        Exception: If aggregation fails, incremental merge fails, or target loading fails.
    """
    ctx = get_logging_context()
    start_time = time.time()

    grain = self.params.get("grain")
    measures = self.params.get("measures", [])
    having = self.params.get("having")
    incremental = self.params.get("incremental")
    audit_config = self.params.get("audit", {})
    target = self.params.get("target")

    ctx.debug(
        "AggregationPattern starting",
        pattern="AggregationPattern",
        grain=grain,
        measures_count=len(measures),
        incremental=incremental is not None,
    )

    df = context.df
    source_count = self._get_row_count(df, context.engine_type)
    ctx.debug(
        "Aggregation source loaded",
        pattern="AggregationPattern",
        source_rows=source_count,
    )

    try:
        result_df = self._aggregate(context, df, grain, measures, having)

        if incremental and target:
            result_df = self._apply_incremental(
                context, result_df, grain, measures, incremental, target
            )

        result_df = self._add_audit_columns(context, result_df, audit_config)

        result_count = self._get_row_count(result_df, context.engine_type)
        elapsed_ms = (time.time() - start_time) * 1000

        ctx.info(
            "AggregationPattern completed",
            pattern="AggregationPattern",
            elapsed_ms=round(elapsed_ms, 2),
            source_rows=source_count,
            result_rows=result_count,
            grain=grain,
        )

        return result_df

    except Exception as e:
        elapsed_ms = (time.time() - start_time) * 1000
        ctx.error(
            f"AggregationPattern failed: {e}",
            pattern="AggregationPattern",
            node=self.config.name,
            error_type=type(e).__name__,
            elapsed_ms=round(elapsed_ms, 2),
        )
        raise

validate()

Validate aggregation pattern configuration parameters.

Ensures that all required parameters are present and valid. Checks that: - grain is specified (list of GROUP BY columns) - measures are provided with correct structure (list of dicts with 'name' and 'expr') - incremental config is valid if provided (requires 'timestamp_column' and valid merge_strategy)

Raises:

Type Description
ValueError

If grain is missing, measures are missing/invalid, or incremental config is invalid.

Source code in odibi/patterns/aggregation.py
def validate(self) -> None:
    """Validate aggregation pattern configuration parameters.

    Ensures that all required parameters are present and valid. Checks that:
    - grain is specified (list of GROUP BY columns)
    - measures are provided with correct structure (list of dicts with 'name' and 'expr')
    - incremental config is valid if provided (requires 'timestamp_column' and valid merge_strategy)

    Raises:
        ValueError: If grain is missing, measures are missing/invalid, or incremental config is invalid.
    """
    ctx = get_logging_context()
    grain = self.params.get("grain")
    measures = self.params.get("measures", [])

    ctx.debug(
        "AggregationPattern validation starting",
        pattern="AggregationPattern",
        grain=grain,
        measures_count=len(measures),
    )

    if not grain:
        ctx.error(
            "AggregationPattern validation failed: 'grain' is required",
            pattern="AggregationPattern",
            node=self.config.name,
        )
        raise ValueError(
            f"AggregationPattern (node '{self.config.name}'): 'grain' parameter is required. "
            "Grain defines the grouping columns for aggregation (e.g., ['date', 'region']). "
            "Provide a list of column names to group by."
        )

    if not measures:
        ctx.error(
            "AggregationPattern validation failed: 'measures' is required",
            pattern="AggregationPattern",
            node=self.config.name,
        )
        raise ValueError(
            f"AggregationPattern (node '{self.config.name}'): 'measures' parameter is required. "
            "Measures define the aggregations to compute (e.g., [{'name': 'total_sales', 'expr': 'sum(amount)'}]). "
            "Provide a list of dicts, each with 'name' and 'expr' keys."
        )

    for i, measure in enumerate(measures):
        if not isinstance(measure, dict):
            ctx.error(
                f"AggregationPattern validation failed: measure[{i}] must be a dict",
                pattern="AggregationPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"AggregationPattern (node '{self.config.name}'): measure[{i}] must be a dict with 'name' and 'expr'. "
                f"Got {type(measure).__name__}: {measure!r}. "
                "Example: {'name': 'total_sales', 'expr': 'sum(amount)'}"
            )
        if "name" not in measure:
            ctx.error(
                f"AggregationPattern validation failed: measure[{i}] missing 'name'",
                pattern="AggregationPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"AggregationPattern (node '{self.config.name}'): measure[{i}] missing 'name'. "
                f"Got: {measure!r}. Add a 'name' key for the output column name."
            )
        if "expr" not in measure:
            ctx.error(
                f"AggregationPattern validation failed: measure[{i}] missing 'expr'",
                pattern="AggregationPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"AggregationPattern (node '{self.config.name}'): measure[{i}] missing 'expr'. "
                f"Got: {measure!r}. Add an 'expr' key with the aggregation expression (e.g., 'sum(amount)')."
            )

    incremental = self.params.get("incremental")
    if incremental:
        if "timestamp_column" not in incremental:
            ctx.error(
                "AggregationPattern validation failed: incremental missing 'timestamp_column'",
                pattern="AggregationPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"AggregationPattern (node '{self.config.name}'): incremental config requires 'timestamp_column'. "
                f"Got: {incremental!r}. "
                "Add 'timestamp_column' to specify which column tracks record timestamps."
            )
        merge_strategy = incremental.get("merge_strategy", "replace")
        if merge_strategy not in ("replace", "sum", "min", "max"):
            ctx.error(
                f"AggregationPattern validation failed: invalid merge_strategy '{merge_strategy}'",
                pattern="AggregationPattern",
                node=self.config.name,
            )
            raise ValueError(
                f"AggregationPattern (node '{self.config.name}'): 'merge_strategy' must be 'replace', 'sum', 'min', or 'max'. "
                f"Got: {merge_strategy}"
            )

    ctx.debug(
        "AggregationPattern validation passed",
        pattern="AggregationPattern",
    )