Skip to content

Connections API

odibi.connections.base

Base connection interface.

BaseConnection

Bases: ABC

Abstract base class for connections.

Source code in odibi/connections/base.py
class BaseConnection(ABC):
    """Abstract base class for connections."""

    @abstractmethod
    def get_path(self, relative_path: str) -> str:
        """Get full path for a relative path.

        Args:
            relative_path: Relative path or table name

        Returns:
            Full path to resource
        """
        pass

    @abstractmethod
    def validate(self) -> None:
        """Validate connection configuration.

        Raises:
            ConnectionError: If validation fails
        """
        pass

    # ============ DISCOVERY API (Optional - Override in Subclasses) ============

    def discover_catalog(
        self,
        include_schema: bool = False,
        include_stats: bool = False,
        limit: int = 200,
        recursive: bool = True,
        path: str = "",
        pattern: str = "",
    ) -> Dict[str, Any]:
        """Discover available datasets (tables/files) in this connection.

        Args:
            include_schema: Include column schemas
            include_stats: Include row counts and stats
            limit: Max datasets to return per namespace
            recursive: Recursively scan all subfolders/schemas (default: True)
            path: Scope search to specific subfolder/schema
            pattern: Filter by pattern (e.g. "*.csv", "fact_*")

        Returns:
            CatalogSummary dict with datasets

        Raises:
            NotImplementedError: If connection type doesn't support discovery
        """
        raise NotImplementedError(
            f"{self.__class__.__name__} does not support discovery. "
            "Override discover_catalog() to add support."
        )

    def get_schema(self, dataset: str) -> Dict[str, Any]:
        """Get schema (columns + types) for a dataset.

        Args:
            dataset: Table name or file path

        Returns:
            Schema dict with columns
        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support get_schema")

    def profile(
        self, dataset: str, sample_rows: int = 1000, columns: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Profile a dataset with statistics.

        Args:
            dataset: Table/file to profile
            sample_rows: Number of rows to sample
            columns: Specific columns to profile (None = all)

        Returns:
            TableProfile dict with stats
        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support profiling")

    def preview(
        self, dataset: str, rows: int = 5, columns: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Preview sample rows from a dataset.

        Args:
            dataset: Table name or file path
            rows: Number of rows to return (default: 5, max: 100)
            columns: Specific columns to include (None = all)

        Returns:
            PreviewResult dict with sample rows
        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support preview")

    def get_freshness(self, dataset: str, timestamp_column: Optional[str] = None) -> Dict[str, Any]:
        """Get data freshness information.

        Args:
            dataset: Table/file to check
            timestamp_column: Column to check (SQL only)

        Returns:
            FreshnessResult dict
        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support freshness checks")

    # ============ SQL DIALECT HELPERS (Override in SQL Subclasses) ============

    sql_dialect: str = ""
    default_schema: str = ""

    def quote_identifier(self, name: str) -> str:
        """Quote an identifier (table/column name) for this SQL dialect.

        Default implementation returns the name unquoted.
        SQL subclasses should override with dialect-specific quoting.
        """
        return name

    def qualify_table(self, table_name: str, schema: str = "") -> str:
        """Build a fully qualified table reference.

        Args:
            table_name: Table name
            schema: Schema name (uses default_schema if empty)

        Returns:
            Qualified table reference for this dialect
        """
        schema = schema or self.default_schema
        if schema:
            return f"{self.quote_identifier(schema)}.{self.quote_identifier(table_name)}"
        return self.quote_identifier(table_name)

    def build_select_query(
        self,
        table_name: str,
        schema: str = "",
        where: str = "",
        limit: int = -1,
        columns: str = "*",
    ) -> str:
        """Build a SELECT query in this connection's SQL dialect.

        Args:
            table_name: Table name
            schema: Schema name
            where: Optional WHERE clause (without 'WHERE' keyword)
            limit: Row limit (-1 for no limit, 0 for schema-only)
            columns: Column list (default "*")

        Returns:
            SQL SELECT statement string
        """
        qualified = self.qualify_table(table_name, schema)
        query = f"SELECT {columns} FROM {qualified}"
        if where:
            query += f" WHERE {where}"
        if limit >= 0:
            query += f" LIMIT {limit}"
        return query

build_select_query(table_name, schema='', where='', limit=-1, columns='*')

Build a SELECT query in this connection's SQL dialect.

Parameters:

Name Type Description Default
table_name str

Table name

required
schema str

Schema name

''
where str

Optional WHERE clause (without 'WHERE' keyword)

''
limit int

Row limit (-1 for no limit, 0 for schema-only)

-1
columns str

Column list (default "*")

'*'

Returns:

Type Description
str

SQL SELECT statement string

Source code in odibi/connections/base.py
def build_select_query(
    self,
    table_name: str,
    schema: str = "",
    where: str = "",
    limit: int = -1,
    columns: str = "*",
) -> str:
    """Build a SELECT query in this connection's SQL dialect.

    Args:
        table_name: Table name
        schema: Schema name
        where: Optional WHERE clause (without 'WHERE' keyword)
        limit: Row limit (-1 for no limit, 0 for schema-only)
        columns: Column list (default "*")

    Returns:
        SQL SELECT statement string
    """
    qualified = self.qualify_table(table_name, schema)
    query = f"SELECT {columns} FROM {qualified}"
    if where:
        query += f" WHERE {where}"
    if limit >= 0:
        query += f" LIMIT {limit}"
    return query

discover_catalog(include_schema=False, include_stats=False, limit=200, recursive=True, path='', pattern='')

Discover available datasets (tables/files) in this connection.

Parameters:

Name Type Description Default
include_schema bool

Include column schemas

False
include_stats bool

Include row counts and stats

False
limit int

Max datasets to return per namespace

200
recursive bool

Recursively scan all subfolders/schemas (default: True)

True
path str

Scope search to specific subfolder/schema

''
pattern str

Filter by pattern (e.g. ".csv", "fact_")

''

Returns:

Type Description
Dict[str, Any]

CatalogSummary dict with datasets

Raises:

Type Description
NotImplementedError

If connection type doesn't support discovery

Source code in odibi/connections/base.py
def discover_catalog(
    self,
    include_schema: bool = False,
    include_stats: bool = False,
    limit: int = 200,
    recursive: bool = True,
    path: str = "",
    pattern: str = "",
) -> Dict[str, Any]:
    """Discover available datasets (tables/files) in this connection.

    Args:
        include_schema: Include column schemas
        include_stats: Include row counts and stats
        limit: Max datasets to return per namespace
        recursive: Recursively scan all subfolders/schemas (default: True)
        path: Scope search to specific subfolder/schema
        pattern: Filter by pattern (e.g. "*.csv", "fact_*")

    Returns:
        CatalogSummary dict with datasets

    Raises:
        NotImplementedError: If connection type doesn't support discovery
    """
    raise NotImplementedError(
        f"{self.__class__.__name__} does not support discovery. "
        "Override discover_catalog() to add support."
    )

get_freshness(dataset, timestamp_column=None)

Get data freshness information.

Parameters:

Name Type Description Default
dataset str

Table/file to check

required
timestamp_column Optional[str]

Column to check (SQL only)

None

Returns:

Type Description
Dict[str, Any]

FreshnessResult dict

Source code in odibi/connections/base.py
def get_freshness(self, dataset: str, timestamp_column: Optional[str] = None) -> Dict[str, Any]:
    """Get data freshness information.

    Args:
        dataset: Table/file to check
        timestamp_column: Column to check (SQL only)

    Returns:
        FreshnessResult dict
    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support freshness checks")

get_path(relative_path) abstractmethod

Get full path for a relative path.

Parameters:

Name Type Description Default
relative_path str

Relative path or table name

required

Returns:

Type Description
str

Full path to resource

Source code in odibi/connections/base.py
@abstractmethod
def get_path(self, relative_path: str) -> str:
    """Get full path for a relative path.

    Args:
        relative_path: Relative path or table name

    Returns:
        Full path to resource
    """
    pass

get_schema(dataset)

Get schema (columns + types) for a dataset.

Parameters:

Name Type Description Default
dataset str

Table name or file path

required

Returns:

Type Description
Dict[str, Any]

Schema dict with columns

Source code in odibi/connections/base.py
def get_schema(self, dataset: str) -> Dict[str, Any]:
    """Get schema (columns + types) for a dataset.

    Args:
        dataset: Table name or file path

    Returns:
        Schema dict with columns
    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support get_schema")

preview(dataset, rows=5, columns=None)

Preview sample rows from a dataset.

Parameters:

Name Type Description Default
dataset str

Table name or file path

required
rows int

Number of rows to return (default: 5, max: 100)

5
columns Optional[List[str]]

Specific columns to include (None = all)

None

Returns:

Type Description
Dict[str, Any]

PreviewResult dict with sample rows

Source code in odibi/connections/base.py
def preview(
    self, dataset: str, rows: int = 5, columns: Optional[List[str]] = None
) -> Dict[str, Any]:
    """Preview sample rows from a dataset.

    Args:
        dataset: Table name or file path
        rows: Number of rows to return (default: 5, max: 100)
        columns: Specific columns to include (None = all)

    Returns:
        PreviewResult dict with sample rows
    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support preview")

profile(dataset, sample_rows=1000, columns=None)

Profile a dataset with statistics.

Parameters:

Name Type Description Default
dataset str

Table/file to profile

required
sample_rows int

Number of rows to sample

1000
columns Optional[List[str]]

Specific columns to profile (None = all)

None

Returns:

Type Description
Dict[str, Any]

TableProfile dict with stats

Source code in odibi/connections/base.py
def profile(
    self, dataset: str, sample_rows: int = 1000, columns: Optional[List[str]] = None
) -> Dict[str, Any]:
    """Profile a dataset with statistics.

    Args:
        dataset: Table/file to profile
        sample_rows: Number of rows to sample
        columns: Specific columns to profile (None = all)

    Returns:
        TableProfile dict with stats
    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support profiling")

qualify_table(table_name, schema='')

Build a fully qualified table reference.

Parameters:

Name Type Description Default
table_name str

Table name

required
schema str

Schema name (uses default_schema if empty)

''

Returns:

Type Description
str

Qualified table reference for this dialect

Source code in odibi/connections/base.py
def qualify_table(self, table_name: str, schema: str = "") -> str:
    """Build a fully qualified table reference.

    Args:
        table_name: Table name
        schema: Schema name (uses default_schema if empty)

    Returns:
        Qualified table reference for this dialect
    """
    schema = schema or self.default_schema
    if schema:
        return f"{self.quote_identifier(schema)}.{self.quote_identifier(table_name)}"
    return self.quote_identifier(table_name)

quote_identifier(name)

Quote an identifier (table/column name) for this SQL dialect.

Default implementation returns the name unquoted. SQL subclasses should override with dialect-specific quoting.

Source code in odibi/connections/base.py
def quote_identifier(self, name: str) -> str:
    """Quote an identifier (table/column name) for this SQL dialect.

    Default implementation returns the name unquoted.
    SQL subclasses should override with dialect-specific quoting.
    """
    return name

validate() abstractmethod

Validate connection configuration.

Raises:

Type Description
ConnectionError

If validation fails

Source code in odibi/connections/base.py
@abstractmethod
def validate(self) -> None:
    """Validate connection configuration.

    Raises:
        ConnectionError: If validation fails
    """
    pass

odibi.connections.local

Local filesystem connection.

LocalConnection

Bases: BaseConnection

Connection to local filesystem or URI-based paths (e.g. dbfs:/, file://).

Source code in odibi/connections/local.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
class LocalConnection(BaseConnection):
    """Connection to local filesystem or URI-based paths (e.g. dbfs:/, file://)."""

    def __init__(self, base_path: str = "./data"):
        """Initialize local connection.

        Args:
            base_path: Base directory for all paths (can be local path or URI)
        """
        ctx = get_logging_context()
        ctx.log_connection(
            connection_type="local",
            connection_name="LocalConnection",
            action="init",
            base_path=base_path,
        )

        self.base_path_str = base_path
        # Detect URIs: "://" (standard URIs) or "dbfs:/" (Databricks)
        # Windows paths like "C:/path" or "D:\path" should NOT be treated as URIs
        # Windows drive letters are single character followed by ":/" which differs from "dbfs:/"
        self.is_uri = "://" in base_path or (
            ":/" in base_path and len(base_path.split(":/")[0]) > 1
        )

        if not self.is_uri:
            self.base_path = Path(base_path)
            ctx.debug(
                "LocalConnection initialized with filesystem path",
                base_path=base_path,
                is_uri=False,
            )
        else:
            self.base_path = None  # Not used for URIs
            ctx.debug(
                "LocalConnection initialized with URI path",
                base_path=base_path,
                is_uri=True,
            )

    def get_path(self, relative_path: str) -> str:
        """Get full path for a relative path.

        Args:
            relative_path: Relative path from base

        Returns:
            Full absolute path or URI
        """
        ctx = get_logging_context()

        if self.is_uri:
            # Use os.path for simple string joining, handling slashes manually for consistency
            # Strip leading slash from relative to avoid root replacement
            clean_rel = relative_path.lstrip("/").lstrip("\\")
            # Handle cases where base_path might not have trailing slash
            if self.base_path_str.endswith("/") or self.base_path_str.endswith("\\"):
                full_path = f"{self.base_path_str}{clean_rel}"
            else:
                # Use forward slash for URIs
                full_path = f"{self.base_path_str}/{clean_rel}"

            ctx.debug(
                "Resolved URI path",
                relative_path=relative_path,
                full_path=full_path,
            )
            return full_path
        else:
            # Standard local path logic
            full_path = self.base_path / relative_path
            resolved = str(full_path.absolute())

            ctx.debug(
                "Resolved local path",
                relative_path=relative_path,
                full_path=resolved,
            )
            return resolved

    def validate(self) -> None:
        """Validate that base path exists or can be created.

        Raises:
            ConnectionError: If validation fails
        """
        ctx = get_logging_context()
        ctx.debug(
            "Validating LocalConnection",
            base_path=self.base_path_str,
            is_uri=self.is_uri,
        )

        if self.is_uri:
            # Cannot validate/create URIs with local os module
            # Assume valid or handled by engine
            ctx.debug(
                "Skipping URI validation (handled by engine)",
                base_path=self.base_path_str,
            )
        else:
            # Create base directory if it doesn't exist
            try:
                self.base_path.mkdir(parents=True, exist_ok=True)
                ctx.info(
                    "LocalConnection validated successfully",
                    base_path=str(self.base_path.absolute()),
                    created=not self.base_path.exists(),
                )
            except Exception as e:
                ctx.error(
                    "LocalConnection validation failed",
                    base_path=self.base_path_str,
                    error=str(e),
                )
                raise

    def list_files(self, path: str = "", pattern: str = "*", limit: int = 1000) -> List[Dict]:
        """List files in local path.

        Args:
            path: Relative path within base_path
            pattern: Glob pattern for filtering (default: "*")
            limit: Maximum number of files to return

        Returns:
            List of dicts with keys: name, path, size, modified, format
        """
        ctx = get_logging_context()
        ctx.debug("Listing local files", path=path, pattern=pattern, limit=limit)

        if self.is_uri:
            ctx.warning("list_files not supported for URI paths")
            return []

        try:
            full_path = self.base_path / path if path else self.base_path
            all_files = []

            for file_path in full_path.glob(pattern):
                if file_path.is_file():
                    stat = file_path.stat()
                    all_files.append(
                        {
                            "name": file_path.name,
                            "path": str(file_path.absolute()),
                            "size": stat.st_size,
                            "modified": datetime.fromtimestamp(stat.st_mtime),
                            "format": infer_format_from_path(file_path.name),
                        }
                    )
                    if len(all_files) >= limit:
                        break

            ctx.info("Listed local files", count=len(all_files))
            return all_files

        except Exception as e:
            ctx.warning("Failed to list local files", error=str(e), path=path)
            return []

    def list_folders(self, path: str = "", limit: int = 100) -> List[str]:
        """List folders in local path.

        Args:
            path: Relative path within base_path
            limit: Maximum number of folders to return

        Returns:
            List of folder paths
        """
        ctx = get_logging_context()
        ctx.debug("Listing local folders", path=path, limit=limit)

        if self.is_uri:
            ctx.warning("list_folders not supported for URI paths")
            return []

        try:
            full_path = self.base_path / path if path else self.base_path
            folders = []

            for item in full_path.iterdir():
                if item.is_dir():
                    folders.append(str(item.absolute()))
                    if len(folders) >= limit:
                        break

            ctx.info("Listed local folders", count=len(folders))
            return folders

        except Exception as e:
            ctx.warning("Failed to list local folders", error=str(e), path=path)
            return []

    def discover_catalog(
        self,
        include_schema: bool = False,
        include_stats: bool = False,
        limit: int = 200,
        recursive: bool = True,
        path: str = "",
        pattern: str = "",
    ) -> Dict[str, Any]:
        """Discover datasets in local filesystem.

        Args:
            include_schema: Sample files and infer schema
            include_stats: Include row counts and stats
            limit: Maximum datasets to return
            recursive: Recursively scan all subfolders (default: True)
            path: Scope search to specific subfolder
            pattern: Filter by glob pattern (e.g. "*.csv", "sales_*")

        Returns:
            CatalogSummary dict
        """
        ctx = get_logging_context()
        ctx.info(
            "Discovering local catalog",
            base_path=self.base_path_str,
            include_schema=include_schema,
            include_stats=include_stats,
        )

        if self.is_uri:
            ctx.warning("discover_catalog not supported for URI paths")
            return CatalogSummary(
                connection_name=self.base_path_str,
                connection_type="local",
                total_datasets=0,
                next_step="URI paths not supported for catalog discovery",
            ).model_dump()

        try:
            folders = []
            files = []
            formats = {}

            # Determine search root - use path parameter to scope search
            search_root = self.base_path / path if path else self.base_path
            if not search_root.exists():
                ctx.warning(f"Path not found: {search_root}")
                return CatalogSummary(
                    connection_name=self.base_path_str,
                    connection_type="local",
                    total_datasets=0,
                    next_step=f"Path '{path}' not found in {self.base_path_str}",
                ).model_dump()

            # Use rglob for recursive or iterdir for shallow
            search_pattern = pattern if pattern else "*"
            if recursive:
                items_to_scan = list(search_root.rglob(search_pattern))
            else:
                items_to_scan = list(search_root.glob(search_pattern))

            for item in items_to_scan:
                if len(folders) + len(files) >= limit:
                    break

                if item.is_dir():
                    file_format = detect_file_format(str(item))

                    folders.append(
                        DatasetRef(
                            name=item.name,
                            kind="folder",
                            path=str(item.absolute()),
                            format=file_format,
                        )
                    )

                    if file_format:
                        formats[file_format] = formats.get(file_format, 0) + 1

                elif item.is_file():
                    stat = item.stat()
                    file_format = infer_format_from_path(item.name)

                    files.append(
                        DatasetRef(
                            name=item.name,
                            kind="file",
                            path=str(item.absolute()),
                            format=file_format,
                            size_bytes=stat.st_size,
                            modified_at=datetime.fromtimestamp(stat.st_mtime),
                        )
                    )

                    if file_format:
                        formats[file_format] = formats.get(file_format, 0) + 1

            summary = CatalogSummary(
                connection_name=self.base_path_str,
                connection_type="local",
                folders=[f.model_dump() for f in folders],
                files=[f.model_dump() for f in files],
                total_datasets=len(folders) + len(files),
                formats=formats,
                next_step="Use get_schema() to inspect individual datasets",
            )

            ctx.info(
                "Local catalog discovery complete",
                total_datasets=summary.total_datasets,
                folders=len(folders),
                files=len(files),
            )

            return summary.model_dump()

        except Exception as e:
            ctx.error("Failed to discover local catalog", error=str(e))
            return CatalogSummary(
                connection_name=self.base_path_str,
                connection_type="local",
                total_datasets=0,
                next_step=f"Error: {str(e)}",
            ).model_dump()

    def get_schema(self, dataset: str) -> Dict[str, Any]:
        """Get schema for a dataset.

        Args:
            dataset: Relative path to file or folder

        Returns:
            Schema dict with columns
        """
        ctx = get_logging_context()
        ctx.info("Getting local schema", dataset=dataset)

        try:
            import pandas as pd

            full_path = self.get_path(dataset)
            file_format = detect_file_format(full_path)

            if file_format == "parquet" or file_format == "delta":
                df = pd.read_parquet(full_path).head(0)
            elif file_format == "csv":
                df = pd.read_csv(full_path, nrows=1000).head(0)
            elif file_format == "json":
                df = pd.read_json(full_path, lines=True, nrows=1000).head(0)
            else:
                ctx.warning("Unsupported format for schema inference", format=file_format)
                return Schema(
                    dataset=DatasetRef(name=dataset, kind="file", format=file_format),
                    columns=[],
                ).model_dump()

            columns = [Column(name=col, dtype=str(dtype)) for col, dtype in df.dtypes.items()]

            schema = Schema(
                dataset=DatasetRef(name=dataset, kind="file", format=file_format),
                columns=columns,
            )

            ctx.info("Schema retrieved", column_count=len(columns))
            return schema.model_dump()

        except Exception as e:
            ctx.error("Failed to get schema", dataset=dataset, error=str(e))
            return Schema(
                dataset=DatasetRef(name=dataset, kind="file"),
                columns=[],
            ).model_dump()

    def profile(
        self, dataset: str, sample_rows: int = 1000, columns: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Profile a dataset with statistics.

        Args:
            dataset: Relative path to file or folder
            sample_rows: Number of rows to sample (max 10000)
            columns: Specific columns to profile (None = all)

        Returns:
            TableProfile dict with stats
        """
        ctx = get_logging_context()
        ctx.info("Profiling local dataset", dataset=dataset, sample_rows=sample_rows)

        sample_rows = min(sample_rows, 10000)  # Cap at 10k

        try:
            import pandas as pd

            full_path = self.get_path(dataset)
            file_format = detect_file_format(full_path)

            # Read sample
            if file_format == "parquet" or file_format == "delta":
                df = pd.read_parquet(full_path).head(sample_rows)
            elif file_format == "csv":
                df = pd.read_csv(full_path, nrows=sample_rows)
            elif file_format == "json":
                df = pd.read_json(full_path, lines=True, nrows=sample_rows)
            else:
                ctx.warning("Unsupported format for profiling", format=file_format)
                return TableProfile(
                    dataset=DatasetRef(name=dataset, kind="file", format=file_format),
                    rows_sampled=0,
                    columns=[],
                ).model_dump()

            # Profile columns
            profile_cols = columns or df.columns.tolist()
            profiled = []

            for col in profile_cols:
                if col not in df.columns:
                    continue

                null_count = int(df[col].isnull().sum())
                null_pct = null_count / len(df) if len(df) > 0 else 0
                distinct_count = int(df[col].nunique())

                # Cardinality heuristic
                if distinct_count == len(df):
                    cardinality = "unique"
                elif distinct_count > len(df) * 0.9:
                    cardinality = "high"
                elif distinct_count > len(df) * 0.1:
                    cardinality = "medium"
                else:
                    cardinality = "low"

                sample_values = df[col].dropna().head(5).tolist()

                profiled.append(
                    Column(
                        name=col,
                        dtype=str(df[col].dtype),
                        null_count=null_count,
                        null_pct=round(null_pct, 3),
                        cardinality=cardinality,
                        distinct_count=distinct_count,
                        sample_values=sample_values,
                    )
                )

            # Detect candidate keys (unique non-null columns)
            candidate_keys = [
                c.name for c in profiled if c.cardinality == "unique" and c.null_count == 0
            ]

            # Detect candidate watermarks (datetime columns)
            candidate_watermarks = [
                c.name
                for c in profiled
                if "datetime" in c.dtype.lower() or "date" in c.dtype.lower()
            ]

            completeness = 1.0 - (df.isnull().sum().sum() / (len(df) * len(df.columns)))

            profile = TableProfile(
                dataset=DatasetRef(name=dataset, kind="file", format=file_format),
                rows_sampled=len(df),
                columns=profiled,
                candidate_keys=candidate_keys,
                candidate_watermarks=candidate_watermarks,
                completeness=round(completeness, 3),
            )

            ctx.info("Profiling complete", rows_sampled=len(df), columns=len(profiled))
            return profile.model_dump()

        except Exception as e:
            ctx.error("Failed to profile dataset", dataset=dataset, error=str(e))
            return TableProfile(
                dataset=DatasetRef(name=dataset, kind="file"),
                rows_sampled=0,
                columns=[],
            ).model_dump()

    def preview(
        self, dataset: str, rows: int = 5, columns: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Preview sample rows from a local dataset."""
        ctx = get_logging_context()
        ctx.info("Previewing local dataset", dataset=dataset, rows=rows)

        max_rows = min(rows, 100)  # Cap at 100

        try:
            import pandas as pd

            full_path = self.get_path(dataset)
            file_format = detect_file_format(full_path)

            if file_format == "parquet" or file_format == "delta":
                df = pd.read_parquet(full_path).head(max_rows)
            elif file_format == "csv":
                df = pd.read_csv(full_path, nrows=max_rows)
            elif file_format == "json":
                df = pd.read_json(full_path, lines=True, nrows=max_rows)
            else:
                ctx.warning("Unsupported format for preview", format=file_format)
                return PreviewResult(
                    dataset=DatasetRef(name=dataset, kind="file", format=file_format),
                ).model_dump()

            if columns:
                df = df[[c for c in columns if c in df.columns]]

            result = PreviewResult(
                dataset=DatasetRef(name=dataset, kind="file", format=file_format),
                columns=df.columns.tolist(),
                rows=df.head(max_rows).to_dict(orient="records"),
                truncated=len(df) >= max_rows,
                format=file_format,
            )

            ctx.info("Preview complete", rows_returned=len(result.rows))
            return result.model_dump()

        except Exception as e:
            ctx.error("Failed to preview dataset", dataset=dataset, error=str(e))
            return PreviewResult(
                dataset=DatasetRef(name=dataset, kind="file"),
            ).model_dump()

    def detect_partitions(self, path: str = "") -> Dict[str, Any]:
        """Detect partition structure in local path.

        Args:
            path: Relative path to scan (default: base_path)

        Returns:
            PartitionInfo dict
        """
        ctx = get_logging_context()
        ctx.info("Detecting local partitions", path=path)

        if self.is_uri:
            ctx.warning("detect_partitions not supported for URI paths")
            return PartitionInfo(root=path, keys=[], example_values={}).model_dump()

        try:
            full_path = self.base_path / path if path else self.base_path

            # Walk directory and collect paths
            all_paths = []
            for root, dirs, files in os.walk(full_path):
                for file in files[:100]:  # Sample first 100
                    all_paths.append(os.path.join(root, file))
                if len(all_paths) >= 100:
                    break

            partition_info = detect_partitions(all_paths)

            result = PartitionInfo(
                root=str(full_path),
                keys=partition_info.get("keys", []),
                example_values=partition_info.get("example_values", {}),
                format=partition_info.get("format", "none"),
                partition_count=len(all_paths),
            )

            ctx.info("Partition detection complete", keys=result.keys)
            return result.model_dump()

        except Exception as e:
            ctx.error("Failed to detect partitions", path=path, error=str(e))
            return PartitionInfo(root=path, keys=[], example_values={}).model_dump()

    def get_freshness(self, dataset: str) -> Dict[str, Any]:
        """Get data freshness for a dataset.

        Args:
            dataset: Relative path to file or folder

        Returns:
            FreshnessResult dict
        """
        ctx = get_logging_context()
        ctx.info("Checking local freshness", dataset=dataset)

        if self.is_uri:
            ctx.warning("get_freshness not supported for URI paths")
            return FreshnessResult(
                dataset=DatasetRef(name=dataset, kind="file"), source="metadata"
            ).model_dump()

        try:
            full_path = Path(self.get_path(dataset))

            if full_path.exists():
                stat = full_path.stat()
                last_modified = datetime.fromtimestamp(stat.st_mtime)
                age_hours = (datetime.utcnow() - last_modified).total_seconds() / 3600

                result = FreshnessResult(
                    dataset=DatasetRef(name=dataset, kind="file"),
                    last_updated=last_modified,
                    source="metadata",
                    age_hours=round(age_hours, 2),
                )

                ctx.info("Freshness check complete", age_hours=age_hours)
                return result.model_dump()

        except Exception as e:
            ctx.error("Failed to check freshness", dataset=dataset, error=str(e))

        return FreshnessResult(
            dataset=DatasetRef(name=dataset, kind="file"), source="metadata"
        ).model_dump()

__init__(base_path='./data')

Initialize local connection.

Parameters:

Name Type Description Default
base_path str

Base directory for all paths (can be local path or URI)

'./data'
Source code in odibi/connections/local.py
def __init__(self, base_path: str = "./data"):
    """Initialize local connection.

    Args:
        base_path: Base directory for all paths (can be local path or URI)
    """
    ctx = get_logging_context()
    ctx.log_connection(
        connection_type="local",
        connection_name="LocalConnection",
        action="init",
        base_path=base_path,
    )

    self.base_path_str = base_path
    # Detect URIs: "://" (standard URIs) or "dbfs:/" (Databricks)
    # Windows paths like "C:/path" or "D:\path" should NOT be treated as URIs
    # Windows drive letters are single character followed by ":/" which differs from "dbfs:/"
    self.is_uri = "://" in base_path or (
        ":/" in base_path and len(base_path.split(":/")[0]) > 1
    )

    if not self.is_uri:
        self.base_path = Path(base_path)
        ctx.debug(
            "LocalConnection initialized with filesystem path",
            base_path=base_path,
            is_uri=False,
        )
    else:
        self.base_path = None  # Not used for URIs
        ctx.debug(
            "LocalConnection initialized with URI path",
            base_path=base_path,
            is_uri=True,
        )

detect_partitions(path='')

Detect partition structure in local path.

Parameters:

Name Type Description Default
path str

Relative path to scan (default: base_path)

''

Returns:

Type Description
Dict[str, Any]

PartitionInfo dict

Source code in odibi/connections/local.py
def detect_partitions(self, path: str = "") -> Dict[str, Any]:
    """Detect partition structure in local path.

    Args:
        path: Relative path to scan (default: base_path)

    Returns:
        PartitionInfo dict
    """
    ctx = get_logging_context()
    ctx.info("Detecting local partitions", path=path)

    if self.is_uri:
        ctx.warning("detect_partitions not supported for URI paths")
        return PartitionInfo(root=path, keys=[], example_values={}).model_dump()

    try:
        full_path = self.base_path / path if path else self.base_path

        # Walk directory and collect paths
        all_paths = []
        for root, dirs, files in os.walk(full_path):
            for file in files[:100]:  # Sample first 100
                all_paths.append(os.path.join(root, file))
            if len(all_paths) >= 100:
                break

        partition_info = detect_partitions(all_paths)

        result = PartitionInfo(
            root=str(full_path),
            keys=partition_info.get("keys", []),
            example_values=partition_info.get("example_values", {}),
            format=partition_info.get("format", "none"),
            partition_count=len(all_paths),
        )

        ctx.info("Partition detection complete", keys=result.keys)
        return result.model_dump()

    except Exception as e:
        ctx.error("Failed to detect partitions", path=path, error=str(e))
        return PartitionInfo(root=path, keys=[], example_values={}).model_dump()

discover_catalog(include_schema=False, include_stats=False, limit=200, recursive=True, path='', pattern='')

Discover datasets in local filesystem.

Parameters:

Name Type Description Default
include_schema bool

Sample files and infer schema

False
include_stats bool

Include row counts and stats

False
limit int

Maximum datasets to return

200
recursive bool

Recursively scan all subfolders (default: True)

True
path str

Scope search to specific subfolder

''
pattern str

Filter by glob pattern (e.g. ".csv", "sales_")

''

Returns:

Type Description
Dict[str, Any]

CatalogSummary dict

Source code in odibi/connections/local.py
def discover_catalog(
    self,
    include_schema: bool = False,
    include_stats: bool = False,
    limit: int = 200,
    recursive: bool = True,
    path: str = "",
    pattern: str = "",
) -> Dict[str, Any]:
    """Discover datasets in local filesystem.

    Args:
        include_schema: Sample files and infer schema
        include_stats: Include row counts and stats
        limit: Maximum datasets to return
        recursive: Recursively scan all subfolders (default: True)
        path: Scope search to specific subfolder
        pattern: Filter by glob pattern (e.g. "*.csv", "sales_*")

    Returns:
        CatalogSummary dict
    """
    ctx = get_logging_context()
    ctx.info(
        "Discovering local catalog",
        base_path=self.base_path_str,
        include_schema=include_schema,
        include_stats=include_stats,
    )

    if self.is_uri:
        ctx.warning("discover_catalog not supported for URI paths")
        return CatalogSummary(
            connection_name=self.base_path_str,
            connection_type="local",
            total_datasets=0,
            next_step="URI paths not supported for catalog discovery",
        ).model_dump()

    try:
        folders = []
        files = []
        formats = {}

        # Determine search root - use path parameter to scope search
        search_root = self.base_path / path if path else self.base_path
        if not search_root.exists():
            ctx.warning(f"Path not found: {search_root}")
            return CatalogSummary(
                connection_name=self.base_path_str,
                connection_type="local",
                total_datasets=0,
                next_step=f"Path '{path}' not found in {self.base_path_str}",
            ).model_dump()

        # Use rglob for recursive or iterdir for shallow
        search_pattern = pattern if pattern else "*"
        if recursive:
            items_to_scan = list(search_root.rglob(search_pattern))
        else:
            items_to_scan = list(search_root.glob(search_pattern))

        for item in items_to_scan:
            if len(folders) + len(files) >= limit:
                break

            if item.is_dir():
                file_format = detect_file_format(str(item))

                folders.append(
                    DatasetRef(
                        name=item.name,
                        kind="folder",
                        path=str(item.absolute()),
                        format=file_format,
                    )
                )

                if file_format:
                    formats[file_format] = formats.get(file_format, 0) + 1

            elif item.is_file():
                stat = item.stat()
                file_format = infer_format_from_path(item.name)

                files.append(
                    DatasetRef(
                        name=item.name,
                        kind="file",
                        path=str(item.absolute()),
                        format=file_format,
                        size_bytes=stat.st_size,
                        modified_at=datetime.fromtimestamp(stat.st_mtime),
                    )
                )

                if file_format:
                    formats[file_format] = formats.get(file_format, 0) + 1

        summary = CatalogSummary(
            connection_name=self.base_path_str,
            connection_type="local",
            folders=[f.model_dump() for f in folders],
            files=[f.model_dump() for f in files],
            total_datasets=len(folders) + len(files),
            formats=formats,
            next_step="Use get_schema() to inspect individual datasets",
        )

        ctx.info(
            "Local catalog discovery complete",
            total_datasets=summary.total_datasets,
            folders=len(folders),
            files=len(files),
        )

        return summary.model_dump()

    except Exception as e:
        ctx.error("Failed to discover local catalog", error=str(e))
        return CatalogSummary(
            connection_name=self.base_path_str,
            connection_type="local",
            total_datasets=0,
            next_step=f"Error: {str(e)}",
        ).model_dump()

get_freshness(dataset)

Get data freshness for a dataset.

Parameters:

Name Type Description Default
dataset str

Relative path to file or folder

required

Returns:

Type Description
Dict[str, Any]

FreshnessResult dict

Source code in odibi/connections/local.py
def get_freshness(self, dataset: str) -> Dict[str, Any]:
    """Get data freshness for a dataset.

    Args:
        dataset: Relative path to file or folder

    Returns:
        FreshnessResult dict
    """
    ctx = get_logging_context()
    ctx.info("Checking local freshness", dataset=dataset)

    if self.is_uri:
        ctx.warning("get_freshness not supported for URI paths")
        return FreshnessResult(
            dataset=DatasetRef(name=dataset, kind="file"), source="metadata"
        ).model_dump()

    try:
        full_path = Path(self.get_path(dataset))

        if full_path.exists():
            stat = full_path.stat()
            last_modified = datetime.fromtimestamp(stat.st_mtime)
            age_hours = (datetime.utcnow() - last_modified).total_seconds() / 3600

            result = FreshnessResult(
                dataset=DatasetRef(name=dataset, kind="file"),
                last_updated=last_modified,
                source="metadata",
                age_hours=round(age_hours, 2),
            )

            ctx.info("Freshness check complete", age_hours=age_hours)
            return result.model_dump()

    except Exception as e:
        ctx.error("Failed to check freshness", dataset=dataset, error=str(e))

    return FreshnessResult(
        dataset=DatasetRef(name=dataset, kind="file"), source="metadata"
    ).model_dump()

get_path(relative_path)

Get full path for a relative path.

Parameters:

Name Type Description Default
relative_path str

Relative path from base

required

Returns:

Type Description
str

Full absolute path or URI

Source code in odibi/connections/local.py
def get_path(self, relative_path: str) -> str:
    """Get full path for a relative path.

    Args:
        relative_path: Relative path from base

    Returns:
        Full absolute path or URI
    """
    ctx = get_logging_context()

    if self.is_uri:
        # Use os.path for simple string joining, handling slashes manually for consistency
        # Strip leading slash from relative to avoid root replacement
        clean_rel = relative_path.lstrip("/").lstrip("\\")
        # Handle cases where base_path might not have trailing slash
        if self.base_path_str.endswith("/") or self.base_path_str.endswith("\\"):
            full_path = f"{self.base_path_str}{clean_rel}"
        else:
            # Use forward slash for URIs
            full_path = f"{self.base_path_str}/{clean_rel}"

        ctx.debug(
            "Resolved URI path",
            relative_path=relative_path,
            full_path=full_path,
        )
        return full_path
    else:
        # Standard local path logic
        full_path = self.base_path / relative_path
        resolved = str(full_path.absolute())

        ctx.debug(
            "Resolved local path",
            relative_path=relative_path,
            full_path=resolved,
        )
        return resolved

get_schema(dataset)

Get schema for a dataset.

Parameters:

Name Type Description Default
dataset str

Relative path to file or folder

required

Returns:

Type Description
Dict[str, Any]

Schema dict with columns

Source code in odibi/connections/local.py
def get_schema(self, dataset: str) -> Dict[str, Any]:
    """Get schema for a dataset.

    Args:
        dataset: Relative path to file or folder

    Returns:
        Schema dict with columns
    """
    ctx = get_logging_context()
    ctx.info("Getting local schema", dataset=dataset)

    try:
        import pandas as pd

        full_path = self.get_path(dataset)
        file_format = detect_file_format(full_path)

        if file_format == "parquet" or file_format == "delta":
            df = pd.read_parquet(full_path).head(0)
        elif file_format == "csv":
            df = pd.read_csv(full_path, nrows=1000).head(0)
        elif file_format == "json":
            df = pd.read_json(full_path, lines=True, nrows=1000).head(0)
        else:
            ctx.warning("Unsupported format for schema inference", format=file_format)
            return Schema(
                dataset=DatasetRef(name=dataset, kind="file", format=file_format),
                columns=[],
            ).model_dump()

        columns = [Column(name=col, dtype=str(dtype)) for col, dtype in df.dtypes.items()]

        schema = Schema(
            dataset=DatasetRef(name=dataset, kind="file", format=file_format),
            columns=columns,
        )

        ctx.info("Schema retrieved", column_count=len(columns))
        return schema.model_dump()

    except Exception as e:
        ctx.error("Failed to get schema", dataset=dataset, error=str(e))
        return Schema(
            dataset=DatasetRef(name=dataset, kind="file"),
            columns=[],
        ).model_dump()

list_files(path='', pattern='*', limit=1000)

List files in local path.

Parameters:

Name Type Description Default
path str

Relative path within base_path

''
pattern str

Glob pattern for filtering (default: "*")

'*'
limit int

Maximum number of files to return

1000

Returns:

Type Description
List[Dict]

List of dicts with keys: name, path, size, modified, format

Source code in odibi/connections/local.py
def list_files(self, path: str = "", pattern: str = "*", limit: int = 1000) -> List[Dict]:
    """List files in local path.

    Args:
        path: Relative path within base_path
        pattern: Glob pattern for filtering (default: "*")
        limit: Maximum number of files to return

    Returns:
        List of dicts with keys: name, path, size, modified, format
    """
    ctx = get_logging_context()
    ctx.debug("Listing local files", path=path, pattern=pattern, limit=limit)

    if self.is_uri:
        ctx.warning("list_files not supported for URI paths")
        return []

    try:
        full_path = self.base_path / path if path else self.base_path
        all_files = []

        for file_path in full_path.glob(pattern):
            if file_path.is_file():
                stat = file_path.stat()
                all_files.append(
                    {
                        "name": file_path.name,
                        "path": str(file_path.absolute()),
                        "size": stat.st_size,
                        "modified": datetime.fromtimestamp(stat.st_mtime),
                        "format": infer_format_from_path(file_path.name),
                    }
                )
                if len(all_files) >= limit:
                    break

        ctx.info("Listed local files", count=len(all_files))
        return all_files

    except Exception as e:
        ctx.warning("Failed to list local files", error=str(e), path=path)
        return []

list_folders(path='', limit=100)

List folders in local path.

Parameters:

Name Type Description Default
path str

Relative path within base_path

''
limit int

Maximum number of folders to return

100

Returns:

Type Description
List[str]

List of folder paths

Source code in odibi/connections/local.py
def list_folders(self, path: str = "", limit: int = 100) -> List[str]:
    """List folders in local path.

    Args:
        path: Relative path within base_path
        limit: Maximum number of folders to return

    Returns:
        List of folder paths
    """
    ctx = get_logging_context()
    ctx.debug("Listing local folders", path=path, limit=limit)

    if self.is_uri:
        ctx.warning("list_folders not supported for URI paths")
        return []

    try:
        full_path = self.base_path / path if path else self.base_path
        folders = []

        for item in full_path.iterdir():
            if item.is_dir():
                folders.append(str(item.absolute()))
                if len(folders) >= limit:
                    break

        ctx.info("Listed local folders", count=len(folders))
        return folders

    except Exception as e:
        ctx.warning("Failed to list local folders", error=str(e), path=path)
        return []

preview(dataset, rows=5, columns=None)

Preview sample rows from a local dataset.

Source code in odibi/connections/local.py
def preview(
    self, dataset: str, rows: int = 5, columns: Optional[List[str]] = None
) -> Dict[str, Any]:
    """Preview sample rows from a local dataset."""
    ctx = get_logging_context()
    ctx.info("Previewing local dataset", dataset=dataset, rows=rows)

    max_rows = min(rows, 100)  # Cap at 100

    try:
        import pandas as pd

        full_path = self.get_path(dataset)
        file_format = detect_file_format(full_path)

        if file_format == "parquet" or file_format == "delta":
            df = pd.read_parquet(full_path).head(max_rows)
        elif file_format == "csv":
            df = pd.read_csv(full_path, nrows=max_rows)
        elif file_format == "json":
            df = pd.read_json(full_path, lines=True, nrows=max_rows)
        else:
            ctx.warning("Unsupported format for preview", format=file_format)
            return PreviewResult(
                dataset=DatasetRef(name=dataset, kind="file", format=file_format),
            ).model_dump()

        if columns:
            df = df[[c for c in columns if c in df.columns]]

        result = PreviewResult(
            dataset=DatasetRef(name=dataset, kind="file", format=file_format),
            columns=df.columns.tolist(),
            rows=df.head(max_rows).to_dict(orient="records"),
            truncated=len(df) >= max_rows,
            format=file_format,
        )

        ctx.info("Preview complete", rows_returned=len(result.rows))
        return result.model_dump()

    except Exception as e:
        ctx.error("Failed to preview dataset", dataset=dataset, error=str(e))
        return PreviewResult(
            dataset=DatasetRef(name=dataset, kind="file"),
        ).model_dump()

profile(dataset, sample_rows=1000, columns=None)

Profile a dataset with statistics.

Parameters:

Name Type Description Default
dataset str

Relative path to file or folder

required
sample_rows int

Number of rows to sample (max 10000)

1000
columns Optional[List[str]]

Specific columns to profile (None = all)

None

Returns:

Type Description
Dict[str, Any]

TableProfile dict with stats

Source code in odibi/connections/local.py
def profile(
    self, dataset: str, sample_rows: int = 1000, columns: Optional[List[str]] = None
) -> Dict[str, Any]:
    """Profile a dataset with statistics.

    Args:
        dataset: Relative path to file or folder
        sample_rows: Number of rows to sample (max 10000)
        columns: Specific columns to profile (None = all)

    Returns:
        TableProfile dict with stats
    """
    ctx = get_logging_context()
    ctx.info("Profiling local dataset", dataset=dataset, sample_rows=sample_rows)

    sample_rows = min(sample_rows, 10000)  # Cap at 10k

    try:
        import pandas as pd

        full_path = self.get_path(dataset)
        file_format = detect_file_format(full_path)

        # Read sample
        if file_format == "parquet" or file_format == "delta":
            df = pd.read_parquet(full_path).head(sample_rows)
        elif file_format == "csv":
            df = pd.read_csv(full_path, nrows=sample_rows)
        elif file_format == "json":
            df = pd.read_json(full_path, lines=True, nrows=sample_rows)
        else:
            ctx.warning("Unsupported format for profiling", format=file_format)
            return TableProfile(
                dataset=DatasetRef(name=dataset, kind="file", format=file_format),
                rows_sampled=0,
                columns=[],
            ).model_dump()

        # Profile columns
        profile_cols = columns or df.columns.tolist()
        profiled = []

        for col in profile_cols:
            if col not in df.columns:
                continue

            null_count = int(df[col].isnull().sum())
            null_pct = null_count / len(df) if len(df) > 0 else 0
            distinct_count = int(df[col].nunique())

            # Cardinality heuristic
            if distinct_count == len(df):
                cardinality = "unique"
            elif distinct_count > len(df) * 0.9:
                cardinality = "high"
            elif distinct_count > len(df) * 0.1:
                cardinality = "medium"
            else:
                cardinality = "low"

            sample_values = df[col].dropna().head(5).tolist()

            profiled.append(
                Column(
                    name=col,
                    dtype=str(df[col].dtype),
                    null_count=null_count,
                    null_pct=round(null_pct, 3),
                    cardinality=cardinality,
                    distinct_count=distinct_count,
                    sample_values=sample_values,
                )
            )

        # Detect candidate keys (unique non-null columns)
        candidate_keys = [
            c.name for c in profiled if c.cardinality == "unique" and c.null_count == 0
        ]

        # Detect candidate watermarks (datetime columns)
        candidate_watermarks = [
            c.name
            for c in profiled
            if "datetime" in c.dtype.lower() or "date" in c.dtype.lower()
        ]

        completeness = 1.0 - (df.isnull().sum().sum() / (len(df) * len(df.columns)))

        profile = TableProfile(
            dataset=DatasetRef(name=dataset, kind="file", format=file_format),
            rows_sampled=len(df),
            columns=profiled,
            candidate_keys=candidate_keys,
            candidate_watermarks=candidate_watermarks,
            completeness=round(completeness, 3),
        )

        ctx.info("Profiling complete", rows_sampled=len(df), columns=len(profiled))
        return profile.model_dump()

    except Exception as e:
        ctx.error("Failed to profile dataset", dataset=dataset, error=str(e))
        return TableProfile(
            dataset=DatasetRef(name=dataset, kind="file"),
            rows_sampled=0,
            columns=[],
        ).model_dump()

validate()

Validate that base path exists or can be created.

Raises:

Type Description
ConnectionError

If validation fails

Source code in odibi/connections/local.py
def validate(self) -> None:
    """Validate that base path exists or can be created.

    Raises:
        ConnectionError: If validation fails
    """
    ctx = get_logging_context()
    ctx.debug(
        "Validating LocalConnection",
        base_path=self.base_path_str,
        is_uri=self.is_uri,
    )

    if self.is_uri:
        # Cannot validate/create URIs with local os module
        # Assume valid or handled by engine
        ctx.debug(
            "Skipping URI validation (handled by engine)",
            base_path=self.base_path_str,
        )
    else:
        # Create base directory if it doesn't exist
        try:
            self.base_path.mkdir(parents=True, exist_ok=True)
            ctx.info(
                "LocalConnection validated successfully",
                base_path=str(self.base_path.absolute()),
                created=not self.base_path.exists(),
            )
        except Exception as e:
            ctx.error(
                "LocalConnection validation failed",
                base_path=self.base_path_str,
                error=str(e),
            )
            raise

odibi.connections.azure_adls

Azure Data Lake Storage Gen2 connection (Phase 2A: Multi-mode authentication).

AzureADLS

Bases: BaseConnection

Azure Data Lake Storage Gen2 connection.

Phase 2A: Multi-mode authentication + multi-account support Supports key_vault (recommended), direct_key, service_principal, and managed_identity.

Source code in odibi/connections/azure_adls.py
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
class AzureADLS(BaseConnection):
    """Azure Data Lake Storage Gen2 connection.

    Phase 2A: Multi-mode authentication + multi-account support
    Supports key_vault (recommended), direct_key, service_principal, and managed_identity.
    """

    def __init__(
        self,
        account: str,
        container: str,
        path_prefix: str = "",
        auth_mode: str = "key_vault",
        key_vault_name: Optional[str] = None,
        secret_name: Optional[str] = None,
        account_key: Optional[str] = None,
        sas_token: Optional[str] = None,
        tenant_id: Optional[str] = None,
        client_id: Optional[str] = None,
        client_secret: Optional[str] = None,
        validate: bool = True,
        **kwargs,
    ):
        """Initialize ADLS connection.

        Args:
            account: Storage account name (e.g., 'mystorageaccount')
            container: Container/filesystem name
            path_prefix: Optional prefix for all paths
            auth_mode: Authentication mode
                ('key_vault', 'direct_key', 'sas_token', 'service_principal', 'managed_identity')
            key_vault_name: Azure Key Vault name (required for key_vault mode)
            secret_name: Secret name in Key Vault (required for key_vault mode)
            account_key: Storage account key (required for direct_key mode)
            sas_token: Shared Access Signature token (required for sas_token mode)
            tenant_id: Azure Tenant ID (required for service_principal)
            client_id: Service Principal Client ID (required for service_principal)
            client_secret: Service Principal Client Secret (required for service_principal)
            validate: Validate configuration on init
        """
        ctx = get_logging_context()
        ctx.log_connection(
            connection_type="azure_adls",
            connection_name=f"{account}/{container}",
            action="init",
            account=account,
            container=container,
            auth_mode=auth_mode,
            path_prefix=path_prefix or "(none)",
        )

        self.account = account
        self.container = container
        self.path_prefix = path_prefix.strip("/") if path_prefix else ""
        self.auth_mode = auth_mode
        self.key_vault_name = key_vault_name
        self.secret_name = secret_name
        self.account_key = account_key
        self.sas_token = sas_token
        self.tenant_id = tenant_id
        self.client_id = client_id
        self.client_secret = client_secret

        self._cached_storage_key: Optional[str] = None
        self._cached_client_secret: Optional[str] = None
        self._cache_lock = threading.Lock()

        if validate:
            self.validate()

    def validate(self) -> None:
        """Validate ADLS connection configuration.

        Raises:
            ValueError: If required fields are missing for the selected auth_mode
        """
        ctx = get_logging_context()
        ctx.debug(
            "Validating AzureADLS connection",
            account=self.account,
            container=self.container,
            auth_mode=self.auth_mode,
        )

        if not self.account:
            ctx.error("ADLS connection validation failed: missing 'account'")
            raise ValueError(
                "ADLS connection requires 'account'. "
                "Provide the storage account name (e.g., account: 'mystorageaccount')."
            )
        if not self.container:
            ctx.error(
                "ADLS connection validation failed: missing 'container'",
                account=self.account,
            )
            raise ValueError(
                f"ADLS connection requires 'container' for account '{self.account}'. "
                "Provide the container/filesystem name."
            )

        if self.auth_mode == "key_vault":
            if not self.key_vault_name or not self.secret_name:
                ctx.error(
                    "ADLS key_vault mode validation failed",
                    account=self.account,
                    container=self.container,
                    key_vault_name=self.key_vault_name or "(missing)",
                    secret_name=self.secret_name or "(missing)",
                )
                raise ValueError(
                    f"key_vault mode requires 'key_vault_name' and 'secret_name' "
                    f"for connection to {self.account}/{self.container}"
                )
        elif self.auth_mode == "direct_key":
            if not self.account_key:
                ctx.error(
                    "ADLS direct_key mode validation failed: missing account_key",
                    account=self.account,
                    container=self.container,
                )
                raise ValueError(
                    f"direct_key mode requires 'account_key' "
                    f"for connection to {self.account}/{self.container}"
                )

            # Warn in production
            if os.getenv("ODIBI_ENV") == "production":
                ctx.warning(
                    "Using direct_key in production is not recommended",
                    account=self.account,
                    container=self.container,
                )
                warnings.warn(
                    f"⚠️  Using direct_key in production is not recommended. "
                    f"Use auth_mode: key_vault. Connection: {self.account}/{self.container}",
                    UserWarning,
                )
        elif self.auth_mode == "sas_token":
            if not self.sas_token and not (self.key_vault_name and self.secret_name):
                ctx.error(
                    "ADLS sas_token mode validation failed",
                    account=self.account,
                    container=self.container,
                )
                raise ValueError(
                    f"sas_token mode requires 'sas_token' (or key_vault_name/secret_name) "
                    f"for connection to {self.account}/{self.container}"
                )
        elif self.auth_mode == "service_principal":
            if not self.tenant_id or not self.client_id:
                ctx.error(
                    "ADLS service_principal mode validation failed",
                    account=self.account,
                    container=self.container,
                    missing="tenant_id and/or client_id",
                )
                raise ValueError(
                    f"service_principal mode requires 'tenant_id' and 'client_id' "
                    f"for connection to {self.account}/{self.container}. "
                    f"Got tenant_id={self.tenant_id or '(missing)'}, "
                    f"client_id={self.client_id or '(missing)'}."
                )

            if not self.client_secret and not (self.key_vault_name and self.secret_name):
                ctx.error(
                    "ADLS service_principal mode validation failed: missing client_secret",
                    account=self.account,
                    container=self.container,
                )
                raise ValueError(
                    f"service_principal mode requires 'client_secret' "
                    f"(or key_vault_name/secret_name) for {self.account}/{self.container}"
                )
        elif self.auth_mode == "managed_identity":
            # No specific config required, but we might check if environment supports it
            ctx.debug(
                "Using managed_identity auth mode",
                account=self.account,
                container=self.container,
            )
        else:
            ctx.error(
                "ADLS validation failed: unsupported auth_mode",
                account=self.account,
                container=self.container,
                auth_mode=self.auth_mode,
            )
            raise ValueError(
                f"Unsupported auth_mode: '{self.auth_mode}'. "
                f"Use 'key_vault', 'direct_key', 'service_principal', or 'managed_identity'."
            )

        ctx.info(
            "AzureADLS connection validated successfully",
            account=self.account,
            container=self.container,
            auth_mode=self.auth_mode,
        )

    def get_storage_key(self, timeout: float = 30.0) -> Optional[str]:
        """Get storage account key (cached).

        Only relevant for 'key_vault' and 'direct_key' modes.

        Args:
            timeout: Timeout for Key Vault operations in seconds (default: 30.0)

        Returns:
            Storage account key or None if not applicable for auth_mode

        Raises:
            ImportError: If azure libraries not installed (key_vault mode)
            TimeoutError: If Key Vault fetch exceeds timeout
            Exception: If Key Vault access fails
        """
        ctx = get_logging_context()

        with self._cache_lock:
            # Return cached key if available (double-check inside lock)
            if self._cached_storage_key:
                ctx.debug(
                    "Using cached storage key",
                    account=self.account,
                    container=self.container,
                )
                return self._cached_storage_key

            if self.auth_mode == "key_vault":
                ctx.debug(
                    "Fetching storage key from Key Vault",
                    account=self.account,
                    key_vault_name=self.key_vault_name,
                    secret_name=self.secret_name,
                    timeout=timeout,
                )

                try:
                    import concurrent.futures

                    from azure.identity import DefaultAzureCredential
                    from azure.keyvault.secrets import SecretClient
                except ImportError as e:
                    ctx.error(
                        "Key Vault authentication failed: missing azure libraries",
                        account=self.account,
                        error=str(e),
                    )
                    raise ImportError(
                        "Key Vault authentication requires 'azure-identity' and "
                        "'azure-keyvault-secrets'. Install with: pip install odibi[azure]"
                    ) from e

                # Create Key Vault client
                credential = DefaultAzureCredential()
                kv_uri = f"https://{self.key_vault_name}.vault.azure.net"
                client = SecretClient(vault_url=kv_uri, credential=credential)

                ctx.debug(
                    "Connecting to Key Vault",
                    key_vault_uri=kv_uri,
                    secret_name=self.secret_name,
                )

                # Fetch secret with timeout protection
                def _fetch():
                    secret = client.get_secret(self.secret_name)
                    return secret.value

                with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
                    future = executor.submit(_fetch)
                    try:
                        self._cached_storage_key = future.result(timeout=timeout)
                        logger.register_secret(self._cached_storage_key)
                        ctx.info(
                            "Successfully fetched storage key from Key Vault",
                            account=self.account,
                            key_vault_name=self.key_vault_name,
                        )
                        return self._cached_storage_key
                    except concurrent.futures.TimeoutError:
                        ctx.error(
                            "Key Vault fetch timed out",
                            account=self.account,
                            key_vault_name=self.key_vault_name,
                            secret_name=self.secret_name,
                            timeout=timeout,
                        )
                        raise TimeoutError(
                            f"Key Vault fetch timed out after {timeout}s for "
                            f"vault '{self.key_vault_name}', secret '{self.secret_name}'"
                        )

            elif self.auth_mode == "direct_key":
                ctx.debug(
                    "Using direct account key",
                    account=self.account,
                )
                return self.account_key

            elif self.auth_mode == "sas_token":
                # Return cached key (fetched from KV) if available, else sas_token arg
                ctx.debug(
                    "Using SAS token",
                    account=self.account,
                    from_cache=bool(self._cached_storage_key),
                )
                return self._cached_storage_key or self.sas_token

            # For other modes (SP, MI), we don't use an account key
            ctx.debug(
                "No storage key required for auth_mode",
                account=self.account,
                auth_mode=self.auth_mode,
            )
            return None

    def get_client_secret(self) -> Optional[str]:
        """Get Service Principal client secret (cached or literal).

        Returns the cached secret if available (loaded from Azure Key Vault or
        environment variable during initialization), otherwise returns the literal
        client_secret value from the configuration.

        Returns:
            Client secret string, or None if not using Service Principal authentication
        """
        return self._cached_client_secret or self.client_secret

    def pandas_storage_options(self) -> Dict[str, Any]:
        """Get storage options for pandas/fsspec.

        Returns:
            Dictionary with appropriate authentication parameters for fsspec
        """
        ctx = get_logging_context()
        ctx.debug(
            "Building pandas storage options",
            account=self.account,
            container=self.container,
            auth_mode=self.auth_mode,
        )

        base_options = {"account_name": self.account}

        if self.auth_mode in ["key_vault", "direct_key"]:
            return {**base_options, "account_key": self.get_storage_key()}

        elif self.auth_mode == "sas_token":
            # Use get_storage_key() which handles KV fallback for SAS
            # Strip leading '?' if present for fsspec compatibility
            sas_token = self.get_storage_key()
            if sas_token and sas_token.startswith("?"):
                sas_token = sas_token[1:]
            return {**base_options, "sas_token": sas_token}

        elif self.auth_mode == "service_principal":
            return {
                **base_options,
                "tenant_id": self.tenant_id,
                "client_id": self.client_id,
                "client_secret": self.get_client_secret(),
            }

        elif self.auth_mode == "managed_identity":
            # adlfs supports using DefaultAzureCredential implicitly if anon=False
            # and no other creds provided, assuming azure.identity is installed
            return {**base_options, "anon": False}

        return base_options

    def configure_spark(self, spark: "Any") -> None:
        """Configure Spark session with storage credentials.

        Args:
            spark: SparkSession instance
        """
        ctx = get_logging_context()
        ctx.info(
            "Configuring Spark for AzureADLS",
            account=self.account,
            container=self.container,
            auth_mode=self.auth_mode,
        )

        if self.auth_mode in ["key_vault", "direct_key"]:
            config_key = f"fs.azure.account.key.{self.account}.dfs.core.windows.net"
            spark.conf.set(config_key, self.get_storage_key())
            ctx.debug(
                "Set Spark config for account key",
                config_key=config_key,
            )

        elif self.auth_mode == "sas_token":
            # SAS Token Configuration
            # fs.azure.sas.token.provider.type -> FixedSASTokenProvider
            # fs.azure.sas.fixed.token -> <token>
            provider_key = f"fs.azure.account.auth.type.{self.account}.dfs.core.windows.net"
            spark.conf.set(provider_key, "SAS")

            sas_provider_key = (
                f"fs.azure.sas.token.provider.type.{self.account}.dfs.core.windows.net"
            )
            spark.conf.set(
                sas_provider_key, "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider"
            )

            sas_token = self.get_storage_key()

            # Strip leading '?' if present - FixedSASTokenProvider expects token without it
            if sas_token and sas_token.startswith("?"):
                sas_token = sas_token[1:]
                ctx.debug("Stripped leading '?' from SAS token for Spark configuration")

            sas_token_key = f"fs.azure.sas.fixed.token.{self.account}.dfs.core.windows.net"
            spark.conf.set(sas_token_key, sas_token)

            # Disable ACL/namespace checks that SAS tokens don't support
            # The getAccessControl operation fails with SAS tokens on ADLS Gen2
            # These settings tell the driver to skip those checks
            spark.conf.set(
                f"fs.azure.account.hns.enabled.{self.account}.dfs.core.windows.net", "false"
            )
            spark.conf.set("fs.azure.skip.user.group.metadata.during.initialization", "true")

            ctx.debug(
                "Set Spark config for SAS token",
                auth_type_key=provider_key,
                provider_key=sas_provider_key,
            )

        elif self.auth_mode == "service_principal":
            # Configure OAuth for ADLS Gen2
            # Ref: https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html
            prefix = f"fs.azure.account.auth.type.{self.account}.dfs.core.windows.net"
            spark.conf.set(prefix, "OAuth")

            prefix = f"fs.azure.account.oauth.provider.type.{self.account}.dfs.core.windows.net"
            spark.conf.set(prefix, "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")

            prefix = f"fs.azure.account.oauth2.client.id.{self.account}.dfs.core.windows.net"
            spark.conf.set(prefix, self.client_id)

            prefix = f"fs.azure.account.oauth2.client.secret.{self.account}.dfs.core.windows.net"
            spark.conf.set(prefix, self.get_client_secret())

            prefix = f"fs.azure.account.oauth2.client.endpoint.{self.account}.dfs.core.windows.net"
            endpoint = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/token"
            spark.conf.set(prefix, endpoint)

            ctx.debug(
                "Set Spark config for service principal OAuth",
                tenant_id=self.tenant_id,
                client_id=self.client_id,
            )

        elif self.auth_mode == "managed_identity":
            prefix = f"fs.azure.account.auth.type.{self.account}.dfs.core.windows.net"
            spark.conf.set(prefix, "OAuth")

            prefix = f"fs.azure.account.oauth.provider.type.{self.account}.dfs.core.windows.net"
            spark.conf.set(prefix, "org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider")

            ctx.debug(
                "Set Spark config for managed identity",
                account=self.account,
            )

        ctx.info(
            "Spark configuration complete",
            account=self.account,
            auth_mode=self.auth_mode,
        )

    def uri(self, path: str) -> str:
        """Build abfss:// URI for given path.

        Args:
            path: Relative path within container

        Returns:
            Full abfss:// URI

        Example:
            >>> conn = AzureADLS(
            ...     account="myaccount", container="data",
            ...     auth_mode="direct_key", account_key="key123"
            ... )
            >>> conn.uri("folder/file.csv")
            'abfss://data@myaccount.dfs.core.windows.net/folder/file.csv'
        """
        if self.path_prefix:
            full_path = posixpath.join(self.path_prefix, path.lstrip("/"))
        else:
            full_path = path.lstrip("/")

        return f"abfss://{self.container}@{self.account}.dfs.core.windows.net/{full_path}"

    def get_path(self, relative_path: str) -> str:
        """Get full abfss:// URI for relative path."""
        ctx = get_logging_context()
        full_uri = self.uri(relative_path)

        ctx.debug(
            "Resolved ADLS path",
            account=self.account,
            container=self.container,
            relative_path=relative_path,
            full_uri=full_uri,
        )

        return full_uri

    def _get_fs(self):
        """Get fsspec filesystem instance for this connection."""
        try:
            import adlfs
        except ImportError as e:
            raise ImportError(
                "Azure ADLS discovery requires 'adlfs'. Install with: pip install odibi[azure]"
            ) from e

        storage_opts = self.pandas_storage_options()
        fs = adlfs.AzureBlobFileSystem(**storage_opts)
        return fs

    def list_files(self, path: str = "", pattern: str = "*", limit: int = 1000) -> List[Dict]:
        """List files in ADLS path.

        Args:
            path: Relative path within container (default: path_prefix)
            pattern: Glob pattern for filtering (default: "*")
            limit: Maximum number of files to return

        Returns:
            List of dicts with keys: name, path, size, modified, format
        """
        ctx = get_logging_context()
        ctx.debug(
            "Listing ADLS files",
            account=self.account,
            container=self.container,
            path=path,
            pattern=pattern,
            limit=limit,
        )

        try:
            fs = self._get_fs()
            full_path = f"{self.container}/{self.path_prefix}/{path}".strip("/")

            # Use glob for pattern matching
            import fnmatch

            all_files = []
            for entry in fs.ls(full_path, detail=True):
                if entry["type"] == "file":
                    file_name = entry["name"].split("/")[-1]
                    if fnmatch.fnmatch(file_name, pattern):
                        all_files.append(
                            {
                                "name": file_name,
                                "path": entry["name"],
                                "size": entry.get("size", 0),
                                "modified": entry.get("last_modified"),
                                "format": infer_format_from_path(file_name),
                            }
                        )
                        if len(all_files) >= limit:
                            break

            ctx.info("Listed ADLS files", count=len(all_files))
            return all_files

        except Exception as e:
            ctx.warning("Failed to list ADLS files", error=str(e), path=path)
            return []

    def list_folders(self, path: str = "", limit: int = 100) -> List[str]:
        """List folders in ADLS path.

        Args:
            path: Relative path within container
            limit: Maximum number of folders to return

        Returns:
            List of folder paths
        """
        ctx = get_logging_context()
        ctx.debug(
            "Listing ADLS folders",
            account=self.account,
            container=self.container,
            path=path,
            limit=limit,
        )

        try:
            fs = self._get_fs()
            full_path = f"{self.container}/{self.path_prefix}/{path}".strip("/")

            folders = []
            for entry in fs.ls(full_path, detail=True):
                if entry["type"] == "directory":
                    folders.append(entry["name"])
                    if len(folders) >= limit:
                        break

            ctx.info("Listed ADLS folders", count=len(folders))
            return folders

        except Exception as e:
            ctx.warning("Failed to list ADLS folders", error=str(e), path=path)
            return []

    def discover_catalog(
        self,
        include_schema: bool = False,
        include_stats: bool = False,
        limit: int = 200,
        recursive: bool = True,
        path: str = "",
        pattern: str = "",
    ) -> Dict[str, Any]:
        """Discover datasets in ADLS container.

        Args:
            include_schema: Sample files and infer schema
            include_stats: Include row counts and stats
            limit: Maximum datasets to return
            recursive: Recursively scan all subfolders (default: True)
            path: Scope search to specific subfolder in container
            pattern: Filter by pattern (e.g. "*.csv", "sales_*")

        Returns:
            CatalogSummary dict
        """
        ctx = get_logging_context()
        ctx.info(
            "Discovering ADLS catalog",
            account=self.account,
            container=self.container,
            include_schema=include_schema,
            include_stats=include_stats,
        )

        try:
            fs = self._get_fs()
            base_path = f"{self.container}/{self.path_prefix}".strip("/")

            # Use path parameter to scope search
            if path:
                base_path = f"{base_path}/{path}".strip("/")

            folders = []
            files = []
            formats = {}

            # Compile pattern for filtering if provided
            import fnmatch

            has_pattern = bool(pattern)

            # Use walk for recursive or ls for shallow
            if recursive:
                entries_to_scan = []
                for root, dirs, file_names in fs.walk(base_path, maxdepth=None, detail=True):
                    # Add directories
                    for dir_name, dir_info in dirs.items():
                        entries_to_scan.append(
                            {"name": dir_info["name"], "type": "directory", **dir_info}
                        )
                    # Add files
                    for file_name, file_info in file_names.items():
                        entries_to_scan.append(
                            {"name": file_info["name"], "type": "file", **file_info}
                        )
            else:
                entries_to_scan = fs.ls(base_path, detail=True)

            for entry in entries_to_scan:
                if len(folders) + len(files) >= limit:
                    break

                entry_name = entry["name"].split("/")[-1]

                # Apply pattern filter if specified
                if has_pattern and not fnmatch.fnmatch(entry_name, pattern):
                    continue

                if entry["type"] == "directory":
                    folder_name = entry_name
                    file_format = detect_file_format(entry["name"], fs)

                    folders.append(
                        DatasetRef(
                            name=folder_name,
                            kind="folder",
                            path=entry["name"],
                            format=file_format,
                            size_bytes=entry.get("size", 0),
                        )
                    )

                    if file_format:
                        formats[file_format] = formats.get(file_format, 0) + 1

                elif entry["type"] == "file":
                    file_name = entry_name
                    file_format = infer_format_from_path(file_name)

                    files.append(
                        DatasetRef(
                            name=file_name,
                            kind="file",
                            path=entry["name"],
                            format=file_format,
                            size_bytes=entry.get("size", 0),
                            modified_at=entry.get("last_modified"),
                        )
                    )

                    if file_format:
                        formats[file_format] = formats.get(file_format, 0) + 1

            summary = CatalogSummary(
                connection_name=f"{self.account}/{self.container}",
                connection_type="azure_adls",
                folders=[f.model_dump() for f in folders],
                files=[f.model_dump() for f in files],
                total_datasets=len(folders) + len(files),
                formats=formats,
                next_step="Use get_schema() to inspect individual datasets",
            )

            ctx.info(
                "ADLS catalog discovery complete",
                total_datasets=summary.total_datasets,
                folders=len(folders),
                files=len(files),
            )

            return summary.model_dump()

        except Exception as e:
            ctx.error("Failed to discover ADLS catalog", error=str(e))
            return CatalogSummary(
                connection_name=f"{self.account}/{self.container}",
                connection_type="azure_adls",
                total_datasets=0,
                next_step=f"Error: {str(e)}",
            ).model_dump()

    def get_schema(self, dataset: str) -> Dict[str, Any]:
        """Get schema for a dataset.

        Args:
            dataset: Relative path to file or folder

        Returns:
            Schema dict with columns
        """
        ctx = get_logging_context()
        ctx.info("Getting ADLS schema", dataset=dataset)

        try:
            import pandas as pd

            full_path = self.uri(dataset)
            storage_opts = self.pandas_storage_options()

            # Detect format
            file_format = detect_file_format(dataset, self._get_fs())

            if file_format == "parquet" or file_format == "delta":
                df = pd.read_parquet(full_path, storage_options=storage_opts).head(0)
            elif file_format == "csv":
                df = pd.read_csv(full_path, storage_options=storage_opts, nrows=1000).head(0)
            elif file_format == "json":
                df = pd.read_json(
                    full_path, storage_options=storage_opts, lines=True, nrows=1000
                ).head(0)
            else:
                ctx.warning("Unsupported format for schema inference", format=file_format)
                return Schema(
                    dataset=DatasetRef(name=dataset, kind="file", format=file_format),
                    columns=[],
                ).model_dump()

            columns = [Column(name=col, dtype=str(dtype)) for col, dtype in df.dtypes.items()]

            schema = Schema(
                dataset=DatasetRef(name=dataset, kind="file", format=file_format),
                columns=columns,
            )

            ctx.info("Schema retrieved", column_count=len(columns))
            return schema.model_dump()

        except Exception as e:
            ctx.error("Failed to get schema", dataset=dataset, error=str(e))
            return Schema(
                dataset=DatasetRef(name=dataset, kind="file"),
                columns=[],
            ).model_dump()

    def profile(
        self, dataset: str, sample_rows: int = 1000, columns: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Profile a dataset with statistics.

        Args:
            dataset: Relative path to file or folder
            sample_rows: Number of rows to sample (max 10000)
            columns: Specific columns to profile (None = all)

        Returns:
            TableProfile dict with stats
        """
        ctx = get_logging_context()
        ctx.info("Profiling ADLS dataset", dataset=dataset, sample_rows=sample_rows)

        sample_rows = min(sample_rows, 10000)  # Cap at 10k

        try:
            import pandas as pd

            full_path = self.uri(dataset)
            storage_opts = self.pandas_storage_options()
            file_format = detect_file_format(dataset, self._get_fs())

            # Read sample
            if file_format == "parquet" or file_format == "delta":
                df = pd.read_parquet(full_path, storage_options=storage_opts).head(sample_rows)
            elif file_format == "csv":
                df = pd.read_csv(full_path, storage_options=storage_opts, nrows=sample_rows)
            elif file_format == "json":
                df = pd.read_json(
                    full_path, storage_options=storage_opts, lines=True, nrows=sample_rows
                )
            else:
                ctx.warning("Unsupported format for profiling", format=file_format)
                return TableProfile(
                    dataset=DatasetRef(name=dataset, kind="file", format=file_format),
                    rows_sampled=0,
                    columns=[],
                ).model_dump()

            # Profile columns
            profile_cols = columns or df.columns.tolist()
            profiled = []

            for col in profile_cols:
                if col not in df.columns:
                    continue

                null_count = int(df[col].isnull().sum())
                null_pct = null_count / len(df) if len(df) > 0 else 0
                distinct_count = int(df[col].nunique())

                # Cardinality heuristic
                if distinct_count == len(df):
                    cardinality = "unique"
                elif distinct_count > len(df) * 0.9:
                    cardinality = "high"
                elif distinct_count > len(df) * 0.1:
                    cardinality = "medium"
                else:
                    cardinality = "low"

                sample_values = df[col].dropna().head(5).tolist()

                profiled.append(
                    Column(
                        name=col,
                        dtype=str(df[col].dtype),
                        null_count=null_count,
                        null_pct=round(null_pct, 3),
                        cardinality=cardinality,
                        distinct_count=distinct_count,
                        sample_values=sample_values,
                    )
                )

            # Detect candidate keys (unique non-null columns)
            candidate_keys = [
                c.name for c in profiled if c.cardinality == "unique" and c.null_count == 0
            ]

            # Detect candidate watermarks (datetime columns)
            candidate_watermarks = [
                c.name
                for c in profiled
                if "datetime" in c.dtype.lower() or "date" in c.dtype.lower()
            ]

            completeness = 1.0 - (df.isnull().sum().sum() / (len(df) * len(df.columns)))

            profile = TableProfile(
                dataset=DatasetRef(name=dataset, kind="file", format=file_format),
                rows_sampled=len(df),
                columns=profiled,
                candidate_keys=candidate_keys,
                candidate_watermarks=candidate_watermarks,
                completeness=round(completeness, 3),
            )

            ctx.info("Profiling complete", rows_sampled=len(df), columns=len(profiled))
            return profile.model_dump()

        except Exception as e:
            ctx.error("Failed to profile dataset", dataset=dataset, error=str(e))
            return TableProfile(
                dataset=DatasetRef(name=dataset, kind="file"),
                rows_sampled=0,
                columns=[],
            ).model_dump()

    def preview(
        self, dataset: str, rows: int = 5, columns: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Preview sample rows from an ADLS dataset."""
        ctx = get_logging_context()
        ctx.info("Previewing ADLS dataset", dataset=dataset, rows=rows)

        max_rows = min(rows, 100)  # Cap at 100

        try:
            import pandas as pd

            full_path = self.uri(dataset)
            storage_opts = self.pandas_storage_options()
            file_format = detect_file_format(dataset, self._get_fs())

            if file_format == "parquet" or file_format == "delta":
                df = pd.read_parquet(full_path, storage_options=storage_opts).head(max_rows)
            elif file_format == "csv":
                df = pd.read_csv(full_path, storage_options=storage_opts, nrows=max_rows)
            elif file_format == "json":
                df = pd.read_json(
                    full_path, storage_options=storage_opts, lines=True, nrows=max_rows
                )
            else:
                ctx.warning("Unsupported format for preview", format=file_format)
                return PreviewResult(
                    dataset=DatasetRef(name=dataset, kind="file", format=file_format),
                ).model_dump()

            if columns:
                df = df[[c for c in columns if c in df.columns]]

            result = PreviewResult(
                dataset=DatasetRef(name=dataset, kind="file", format=file_format),
                columns=df.columns.tolist(),
                rows=df.head(max_rows).to_dict(orient="records"),
                truncated=len(df) >= max_rows,
                format=file_format,
            )

            ctx.info("Preview complete", rows_returned=len(result.rows))
            return result.model_dump()

        except Exception as e:
            ctx.error("Failed to preview dataset", dataset=dataset, error=str(e))
            return PreviewResult(
                dataset=DatasetRef(name=dataset, kind="file"),
            ).model_dump()

    def detect_partitions(self, path: str = "") -> Dict[str, Any]:
        """Detect partition structure in ADLS path.

        Args:
            path: Relative path to scan (default: path_prefix)

        Returns:
            PartitionInfo dict
        """
        ctx = get_logging_context()
        ctx.info("Detecting ADLS partitions", path=path)

        try:
            fs = self._get_fs()
            full_path = f"{self.container}/{self.path_prefix}/{path}".strip("/")

            # List all files recursively
            all_paths = []
            try:
                all_paths = [
                    entry["name"]
                    for entry in fs.ls(full_path, detail=True, recursive=True)
                    if entry["type"] == "file"
                ][:100]  # Sample first 100
            except Exception:
                pass

            partition_info = detect_partitions(all_paths)

            result = PartitionInfo(
                root=full_path,
                keys=partition_info.get("keys", []),
                example_values=partition_info.get("example_values", {}),
                format=partition_info.get("format", "none"),
                partition_count=len(all_paths),
            )

            ctx.info("Partition detection complete", keys=result.keys)
            return result.model_dump()

        except Exception as e:
            ctx.error("Failed to detect partitions", path=path, error=str(e))
            return PartitionInfo(root=path, keys=[], example_values={}).model_dump()

    def get_freshness(self, dataset: str) -> Dict[str, Any]:
        """Get data freshness for a dataset.

        Args:
            dataset: Relative path to file or folder

        Returns:
            FreshnessResult dict
        """
        ctx = get_logging_context()
        ctx.info("Checking ADLS freshness", dataset=dataset)

        try:
            fs = self._get_fs()
            full_path = f"{self.container}/{self.path_prefix}/{dataset}".strip("/")

            # Get file/folder metadata
            info = fs.info(full_path)
            last_modified = info.get("last_modified")

            if last_modified:
                if isinstance(last_modified, str):
                    last_modified = datetime.fromisoformat(last_modified.replace("Z", "+00:00"))

                age_hours = (datetime.utcnow() - last_modified).total_seconds() / 3600

                result = FreshnessResult(
                    dataset=DatasetRef(name=dataset, kind="file"),
                    last_updated=last_modified,
                    source="metadata",
                    age_hours=round(age_hours, 2),
                )

                ctx.info("Freshness check complete", age_hours=age_hours)
                return result.model_dump()

        except Exception as e:
            ctx.error("Failed to check freshness", dataset=dataset, error=str(e))

        return FreshnessResult(
            dataset=DatasetRef(name=dataset, kind="file"), source="metadata"
        ).model_dump()

__init__(account, container, path_prefix='', auth_mode='key_vault', key_vault_name=None, secret_name=None, account_key=None, sas_token=None, tenant_id=None, client_id=None, client_secret=None, validate=True, **kwargs)

Initialize ADLS connection.

Parameters:

Name Type Description Default
account str

Storage account name (e.g., 'mystorageaccount')

required
container str

Container/filesystem name

required
path_prefix str

Optional prefix for all paths

''
auth_mode str

Authentication mode ('key_vault', 'direct_key', 'sas_token', 'service_principal', 'managed_identity')

'key_vault'
key_vault_name Optional[str]

Azure Key Vault name (required for key_vault mode)

None
secret_name Optional[str]

Secret name in Key Vault (required for key_vault mode)

None
account_key Optional[str]

Storage account key (required for direct_key mode)

None
sas_token Optional[str]

Shared Access Signature token (required for sas_token mode)

None
tenant_id Optional[str]

Azure Tenant ID (required for service_principal)

None
client_id Optional[str]

Service Principal Client ID (required for service_principal)

None
client_secret Optional[str]

Service Principal Client Secret (required for service_principal)

None
validate bool

Validate configuration on init

True
Source code in odibi/connections/azure_adls.py
def __init__(
    self,
    account: str,
    container: str,
    path_prefix: str = "",
    auth_mode: str = "key_vault",
    key_vault_name: Optional[str] = None,
    secret_name: Optional[str] = None,
    account_key: Optional[str] = None,
    sas_token: Optional[str] = None,
    tenant_id: Optional[str] = None,
    client_id: Optional[str] = None,
    client_secret: Optional[str] = None,
    validate: bool = True,
    **kwargs,
):
    """Initialize ADLS connection.

    Args:
        account: Storage account name (e.g., 'mystorageaccount')
        container: Container/filesystem name
        path_prefix: Optional prefix for all paths
        auth_mode: Authentication mode
            ('key_vault', 'direct_key', 'sas_token', 'service_principal', 'managed_identity')
        key_vault_name: Azure Key Vault name (required for key_vault mode)
        secret_name: Secret name in Key Vault (required for key_vault mode)
        account_key: Storage account key (required for direct_key mode)
        sas_token: Shared Access Signature token (required for sas_token mode)
        tenant_id: Azure Tenant ID (required for service_principal)
        client_id: Service Principal Client ID (required for service_principal)
        client_secret: Service Principal Client Secret (required for service_principal)
        validate: Validate configuration on init
    """
    ctx = get_logging_context()
    ctx.log_connection(
        connection_type="azure_adls",
        connection_name=f"{account}/{container}",
        action="init",
        account=account,
        container=container,
        auth_mode=auth_mode,
        path_prefix=path_prefix or "(none)",
    )

    self.account = account
    self.container = container
    self.path_prefix = path_prefix.strip("/") if path_prefix else ""
    self.auth_mode = auth_mode
    self.key_vault_name = key_vault_name
    self.secret_name = secret_name
    self.account_key = account_key
    self.sas_token = sas_token
    self.tenant_id = tenant_id
    self.client_id = client_id
    self.client_secret = client_secret

    self._cached_storage_key: Optional[str] = None
    self._cached_client_secret: Optional[str] = None
    self._cache_lock = threading.Lock()

    if validate:
        self.validate()

configure_spark(spark)

Configure Spark session with storage credentials.

Parameters:

Name Type Description Default
spark Any

SparkSession instance

required
Source code in odibi/connections/azure_adls.py
def configure_spark(self, spark: "Any") -> None:
    """Configure Spark session with storage credentials.

    Args:
        spark: SparkSession instance
    """
    ctx = get_logging_context()
    ctx.info(
        "Configuring Spark for AzureADLS",
        account=self.account,
        container=self.container,
        auth_mode=self.auth_mode,
    )

    if self.auth_mode in ["key_vault", "direct_key"]:
        config_key = f"fs.azure.account.key.{self.account}.dfs.core.windows.net"
        spark.conf.set(config_key, self.get_storage_key())
        ctx.debug(
            "Set Spark config for account key",
            config_key=config_key,
        )

    elif self.auth_mode == "sas_token":
        # SAS Token Configuration
        # fs.azure.sas.token.provider.type -> FixedSASTokenProvider
        # fs.azure.sas.fixed.token -> <token>
        provider_key = f"fs.azure.account.auth.type.{self.account}.dfs.core.windows.net"
        spark.conf.set(provider_key, "SAS")

        sas_provider_key = (
            f"fs.azure.sas.token.provider.type.{self.account}.dfs.core.windows.net"
        )
        spark.conf.set(
            sas_provider_key, "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider"
        )

        sas_token = self.get_storage_key()

        # Strip leading '?' if present - FixedSASTokenProvider expects token without it
        if sas_token and sas_token.startswith("?"):
            sas_token = sas_token[1:]
            ctx.debug("Stripped leading '?' from SAS token for Spark configuration")

        sas_token_key = f"fs.azure.sas.fixed.token.{self.account}.dfs.core.windows.net"
        spark.conf.set(sas_token_key, sas_token)

        # Disable ACL/namespace checks that SAS tokens don't support
        # The getAccessControl operation fails with SAS tokens on ADLS Gen2
        # These settings tell the driver to skip those checks
        spark.conf.set(
            f"fs.azure.account.hns.enabled.{self.account}.dfs.core.windows.net", "false"
        )
        spark.conf.set("fs.azure.skip.user.group.metadata.during.initialization", "true")

        ctx.debug(
            "Set Spark config for SAS token",
            auth_type_key=provider_key,
            provider_key=sas_provider_key,
        )

    elif self.auth_mode == "service_principal":
        # Configure OAuth for ADLS Gen2
        # Ref: https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html
        prefix = f"fs.azure.account.auth.type.{self.account}.dfs.core.windows.net"
        spark.conf.set(prefix, "OAuth")

        prefix = f"fs.azure.account.oauth.provider.type.{self.account}.dfs.core.windows.net"
        spark.conf.set(prefix, "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")

        prefix = f"fs.azure.account.oauth2.client.id.{self.account}.dfs.core.windows.net"
        spark.conf.set(prefix, self.client_id)

        prefix = f"fs.azure.account.oauth2.client.secret.{self.account}.dfs.core.windows.net"
        spark.conf.set(prefix, self.get_client_secret())

        prefix = f"fs.azure.account.oauth2.client.endpoint.{self.account}.dfs.core.windows.net"
        endpoint = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/token"
        spark.conf.set(prefix, endpoint)

        ctx.debug(
            "Set Spark config for service principal OAuth",
            tenant_id=self.tenant_id,
            client_id=self.client_id,
        )

    elif self.auth_mode == "managed_identity":
        prefix = f"fs.azure.account.auth.type.{self.account}.dfs.core.windows.net"
        spark.conf.set(prefix, "OAuth")

        prefix = f"fs.azure.account.oauth.provider.type.{self.account}.dfs.core.windows.net"
        spark.conf.set(prefix, "org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider")

        ctx.debug(
            "Set Spark config for managed identity",
            account=self.account,
        )

    ctx.info(
        "Spark configuration complete",
        account=self.account,
        auth_mode=self.auth_mode,
    )

detect_partitions(path='')

Detect partition structure in ADLS path.

Parameters:

Name Type Description Default
path str

Relative path to scan (default: path_prefix)

''

Returns:

Type Description
Dict[str, Any]

PartitionInfo dict

Source code in odibi/connections/azure_adls.py
def detect_partitions(self, path: str = "") -> Dict[str, Any]:
    """Detect partition structure in ADLS path.

    Args:
        path: Relative path to scan (default: path_prefix)

    Returns:
        PartitionInfo dict
    """
    ctx = get_logging_context()
    ctx.info("Detecting ADLS partitions", path=path)

    try:
        fs = self._get_fs()
        full_path = f"{self.container}/{self.path_prefix}/{path}".strip("/")

        # List all files recursively
        all_paths = []
        try:
            all_paths = [
                entry["name"]
                for entry in fs.ls(full_path, detail=True, recursive=True)
                if entry["type"] == "file"
            ][:100]  # Sample first 100
        except Exception:
            pass

        partition_info = detect_partitions(all_paths)

        result = PartitionInfo(
            root=full_path,
            keys=partition_info.get("keys", []),
            example_values=partition_info.get("example_values", {}),
            format=partition_info.get("format", "none"),
            partition_count=len(all_paths),
        )

        ctx.info("Partition detection complete", keys=result.keys)
        return result.model_dump()

    except Exception as e:
        ctx.error("Failed to detect partitions", path=path, error=str(e))
        return PartitionInfo(root=path, keys=[], example_values={}).model_dump()

discover_catalog(include_schema=False, include_stats=False, limit=200, recursive=True, path='', pattern='')

Discover datasets in ADLS container.

Parameters:

Name Type Description Default
include_schema bool

Sample files and infer schema

False
include_stats bool

Include row counts and stats

False
limit int

Maximum datasets to return

200
recursive bool

Recursively scan all subfolders (default: True)

True
path str

Scope search to specific subfolder in container

''
pattern str

Filter by pattern (e.g. ".csv", "sales_")

''

Returns:

Type Description
Dict[str, Any]

CatalogSummary dict

Source code in odibi/connections/azure_adls.py
def discover_catalog(
    self,
    include_schema: bool = False,
    include_stats: bool = False,
    limit: int = 200,
    recursive: bool = True,
    path: str = "",
    pattern: str = "",
) -> Dict[str, Any]:
    """Discover datasets in ADLS container.

    Args:
        include_schema: Sample files and infer schema
        include_stats: Include row counts and stats
        limit: Maximum datasets to return
        recursive: Recursively scan all subfolders (default: True)
        path: Scope search to specific subfolder in container
        pattern: Filter by pattern (e.g. "*.csv", "sales_*")

    Returns:
        CatalogSummary dict
    """
    ctx = get_logging_context()
    ctx.info(
        "Discovering ADLS catalog",
        account=self.account,
        container=self.container,
        include_schema=include_schema,
        include_stats=include_stats,
    )

    try:
        fs = self._get_fs()
        base_path = f"{self.container}/{self.path_prefix}".strip("/")

        # Use path parameter to scope search
        if path:
            base_path = f"{base_path}/{path}".strip("/")

        folders = []
        files = []
        formats = {}

        # Compile pattern for filtering if provided
        import fnmatch

        has_pattern = bool(pattern)

        # Use walk for recursive or ls for shallow
        if recursive:
            entries_to_scan = []
            for root, dirs, file_names in fs.walk(base_path, maxdepth=None, detail=True):
                # Add directories
                for dir_name, dir_info in dirs.items():
                    entries_to_scan.append(
                        {"name": dir_info["name"], "type": "directory", **dir_info}
                    )
                # Add files
                for file_name, file_info in file_names.items():
                    entries_to_scan.append(
                        {"name": file_info["name"], "type": "file", **file_info}
                    )
        else:
            entries_to_scan = fs.ls(base_path, detail=True)

        for entry in entries_to_scan:
            if len(folders) + len(files) >= limit:
                break

            entry_name = entry["name"].split("/")[-1]

            # Apply pattern filter if specified
            if has_pattern and not fnmatch.fnmatch(entry_name, pattern):
                continue

            if entry["type"] == "directory":
                folder_name = entry_name
                file_format = detect_file_format(entry["name"], fs)

                folders.append(
                    DatasetRef(
                        name=folder_name,
                        kind="folder",
                        path=entry["name"],
                        format=file_format,
                        size_bytes=entry.get("size", 0),
                    )
                )

                if file_format:
                    formats[file_format] = formats.get(file_format, 0) + 1

            elif entry["type"] == "file":
                file_name = entry_name
                file_format = infer_format_from_path(file_name)

                files.append(
                    DatasetRef(
                        name=file_name,
                        kind="file",
                        path=entry["name"],
                        format=file_format,
                        size_bytes=entry.get("size", 0),
                        modified_at=entry.get("last_modified"),
                    )
                )

                if file_format:
                    formats[file_format] = formats.get(file_format, 0) + 1

        summary = CatalogSummary(
            connection_name=f"{self.account}/{self.container}",
            connection_type="azure_adls",
            folders=[f.model_dump() for f in folders],
            files=[f.model_dump() for f in files],
            total_datasets=len(folders) + len(files),
            formats=formats,
            next_step="Use get_schema() to inspect individual datasets",
        )

        ctx.info(
            "ADLS catalog discovery complete",
            total_datasets=summary.total_datasets,
            folders=len(folders),
            files=len(files),
        )

        return summary.model_dump()

    except Exception as e:
        ctx.error("Failed to discover ADLS catalog", error=str(e))
        return CatalogSummary(
            connection_name=f"{self.account}/{self.container}",
            connection_type="azure_adls",
            total_datasets=0,
            next_step=f"Error: {str(e)}",
        ).model_dump()

get_client_secret()

Get Service Principal client secret (cached or literal).

Returns the cached secret if available (loaded from Azure Key Vault or environment variable during initialization), otherwise returns the literal client_secret value from the configuration.

Returns:

Type Description
Optional[str]

Client secret string, or None if not using Service Principal authentication

Source code in odibi/connections/azure_adls.py
def get_client_secret(self) -> Optional[str]:
    """Get Service Principal client secret (cached or literal).

    Returns the cached secret if available (loaded from Azure Key Vault or
    environment variable during initialization), otherwise returns the literal
    client_secret value from the configuration.

    Returns:
        Client secret string, or None if not using Service Principal authentication
    """
    return self._cached_client_secret or self.client_secret

get_freshness(dataset)

Get data freshness for a dataset.

Parameters:

Name Type Description Default
dataset str

Relative path to file or folder

required

Returns:

Type Description
Dict[str, Any]

FreshnessResult dict

Source code in odibi/connections/azure_adls.py
def get_freshness(self, dataset: str) -> Dict[str, Any]:
    """Get data freshness for a dataset.

    Args:
        dataset: Relative path to file or folder

    Returns:
        FreshnessResult dict
    """
    ctx = get_logging_context()
    ctx.info("Checking ADLS freshness", dataset=dataset)

    try:
        fs = self._get_fs()
        full_path = f"{self.container}/{self.path_prefix}/{dataset}".strip("/")

        # Get file/folder metadata
        info = fs.info(full_path)
        last_modified = info.get("last_modified")

        if last_modified:
            if isinstance(last_modified, str):
                last_modified = datetime.fromisoformat(last_modified.replace("Z", "+00:00"))

            age_hours = (datetime.utcnow() - last_modified).total_seconds() / 3600

            result = FreshnessResult(
                dataset=DatasetRef(name=dataset, kind="file"),
                last_updated=last_modified,
                source="metadata",
                age_hours=round(age_hours, 2),
            )

            ctx.info("Freshness check complete", age_hours=age_hours)
            return result.model_dump()

    except Exception as e:
        ctx.error("Failed to check freshness", dataset=dataset, error=str(e))

    return FreshnessResult(
        dataset=DatasetRef(name=dataset, kind="file"), source="metadata"
    ).model_dump()

get_path(relative_path)

Get full abfss:// URI for relative path.

Source code in odibi/connections/azure_adls.py
def get_path(self, relative_path: str) -> str:
    """Get full abfss:// URI for relative path."""
    ctx = get_logging_context()
    full_uri = self.uri(relative_path)

    ctx.debug(
        "Resolved ADLS path",
        account=self.account,
        container=self.container,
        relative_path=relative_path,
        full_uri=full_uri,
    )

    return full_uri

get_schema(dataset)

Get schema for a dataset.

Parameters:

Name Type Description Default
dataset str

Relative path to file or folder

required

Returns:

Type Description
Dict[str, Any]

Schema dict with columns

Source code in odibi/connections/azure_adls.py
def get_schema(self, dataset: str) -> Dict[str, Any]:
    """Get schema for a dataset.

    Args:
        dataset: Relative path to file or folder

    Returns:
        Schema dict with columns
    """
    ctx = get_logging_context()
    ctx.info("Getting ADLS schema", dataset=dataset)

    try:
        import pandas as pd

        full_path = self.uri(dataset)
        storage_opts = self.pandas_storage_options()

        # Detect format
        file_format = detect_file_format(dataset, self._get_fs())

        if file_format == "parquet" or file_format == "delta":
            df = pd.read_parquet(full_path, storage_options=storage_opts).head(0)
        elif file_format == "csv":
            df = pd.read_csv(full_path, storage_options=storage_opts, nrows=1000).head(0)
        elif file_format == "json":
            df = pd.read_json(
                full_path, storage_options=storage_opts, lines=True, nrows=1000
            ).head(0)
        else:
            ctx.warning("Unsupported format for schema inference", format=file_format)
            return Schema(
                dataset=DatasetRef(name=dataset, kind="file", format=file_format),
                columns=[],
            ).model_dump()

        columns = [Column(name=col, dtype=str(dtype)) for col, dtype in df.dtypes.items()]

        schema = Schema(
            dataset=DatasetRef(name=dataset, kind="file", format=file_format),
            columns=columns,
        )

        ctx.info("Schema retrieved", column_count=len(columns))
        return schema.model_dump()

    except Exception as e:
        ctx.error("Failed to get schema", dataset=dataset, error=str(e))
        return Schema(
            dataset=DatasetRef(name=dataset, kind="file"),
            columns=[],
        ).model_dump()

get_storage_key(timeout=30.0)

Get storage account key (cached).

Only relevant for 'key_vault' and 'direct_key' modes.

Parameters:

Name Type Description Default
timeout float

Timeout for Key Vault operations in seconds (default: 30.0)

30.0

Returns:

Type Description
Optional[str]

Storage account key or None if not applicable for auth_mode

Raises:

Type Description
ImportError

If azure libraries not installed (key_vault mode)

TimeoutError

If Key Vault fetch exceeds timeout

Exception

If Key Vault access fails

Source code in odibi/connections/azure_adls.py
def get_storage_key(self, timeout: float = 30.0) -> Optional[str]:
    """Get storage account key (cached).

    Only relevant for 'key_vault' and 'direct_key' modes.

    Args:
        timeout: Timeout for Key Vault operations in seconds (default: 30.0)

    Returns:
        Storage account key or None if not applicable for auth_mode

    Raises:
        ImportError: If azure libraries not installed (key_vault mode)
        TimeoutError: If Key Vault fetch exceeds timeout
        Exception: If Key Vault access fails
    """
    ctx = get_logging_context()

    with self._cache_lock:
        # Return cached key if available (double-check inside lock)
        if self._cached_storage_key:
            ctx.debug(
                "Using cached storage key",
                account=self.account,
                container=self.container,
            )
            return self._cached_storage_key

        if self.auth_mode == "key_vault":
            ctx.debug(
                "Fetching storage key from Key Vault",
                account=self.account,
                key_vault_name=self.key_vault_name,
                secret_name=self.secret_name,
                timeout=timeout,
            )

            try:
                import concurrent.futures

                from azure.identity import DefaultAzureCredential
                from azure.keyvault.secrets import SecretClient
            except ImportError as e:
                ctx.error(
                    "Key Vault authentication failed: missing azure libraries",
                    account=self.account,
                    error=str(e),
                )
                raise ImportError(
                    "Key Vault authentication requires 'azure-identity' and "
                    "'azure-keyvault-secrets'. Install with: pip install odibi[azure]"
                ) from e

            # Create Key Vault client
            credential = DefaultAzureCredential()
            kv_uri = f"https://{self.key_vault_name}.vault.azure.net"
            client = SecretClient(vault_url=kv_uri, credential=credential)

            ctx.debug(
                "Connecting to Key Vault",
                key_vault_uri=kv_uri,
                secret_name=self.secret_name,
            )

            # Fetch secret with timeout protection
            def _fetch():
                secret = client.get_secret(self.secret_name)
                return secret.value

            with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
                future = executor.submit(_fetch)
                try:
                    self._cached_storage_key = future.result(timeout=timeout)
                    logger.register_secret(self._cached_storage_key)
                    ctx.info(
                        "Successfully fetched storage key from Key Vault",
                        account=self.account,
                        key_vault_name=self.key_vault_name,
                    )
                    return self._cached_storage_key
                except concurrent.futures.TimeoutError:
                    ctx.error(
                        "Key Vault fetch timed out",
                        account=self.account,
                        key_vault_name=self.key_vault_name,
                        secret_name=self.secret_name,
                        timeout=timeout,
                    )
                    raise TimeoutError(
                        f"Key Vault fetch timed out after {timeout}s for "
                        f"vault '{self.key_vault_name}', secret '{self.secret_name}'"
                    )

        elif self.auth_mode == "direct_key":
            ctx.debug(
                "Using direct account key",
                account=self.account,
            )
            return self.account_key

        elif self.auth_mode == "sas_token":
            # Return cached key (fetched from KV) if available, else sas_token arg
            ctx.debug(
                "Using SAS token",
                account=self.account,
                from_cache=bool(self._cached_storage_key),
            )
            return self._cached_storage_key or self.sas_token

        # For other modes (SP, MI), we don't use an account key
        ctx.debug(
            "No storage key required for auth_mode",
            account=self.account,
            auth_mode=self.auth_mode,
        )
        return None

list_files(path='', pattern='*', limit=1000)

List files in ADLS path.

Parameters:

Name Type Description Default
path str

Relative path within container (default: path_prefix)

''
pattern str

Glob pattern for filtering (default: "*")

'*'
limit int

Maximum number of files to return

1000

Returns:

Type Description
List[Dict]

List of dicts with keys: name, path, size, modified, format

Source code in odibi/connections/azure_adls.py
def list_files(self, path: str = "", pattern: str = "*", limit: int = 1000) -> List[Dict]:
    """List files in ADLS path.

    Args:
        path: Relative path within container (default: path_prefix)
        pattern: Glob pattern for filtering (default: "*")
        limit: Maximum number of files to return

    Returns:
        List of dicts with keys: name, path, size, modified, format
    """
    ctx = get_logging_context()
    ctx.debug(
        "Listing ADLS files",
        account=self.account,
        container=self.container,
        path=path,
        pattern=pattern,
        limit=limit,
    )

    try:
        fs = self._get_fs()
        full_path = f"{self.container}/{self.path_prefix}/{path}".strip("/")

        # Use glob for pattern matching
        import fnmatch

        all_files = []
        for entry in fs.ls(full_path, detail=True):
            if entry["type"] == "file":
                file_name = entry["name"].split("/")[-1]
                if fnmatch.fnmatch(file_name, pattern):
                    all_files.append(
                        {
                            "name": file_name,
                            "path": entry["name"],
                            "size": entry.get("size", 0),
                            "modified": entry.get("last_modified"),
                            "format": infer_format_from_path(file_name),
                        }
                    )
                    if len(all_files) >= limit:
                        break

        ctx.info("Listed ADLS files", count=len(all_files))
        return all_files

    except Exception as e:
        ctx.warning("Failed to list ADLS files", error=str(e), path=path)
        return []

list_folders(path='', limit=100)

List folders in ADLS path.

Parameters:

Name Type Description Default
path str

Relative path within container

''
limit int

Maximum number of folders to return

100

Returns:

Type Description
List[str]

List of folder paths

Source code in odibi/connections/azure_adls.py
def list_folders(self, path: str = "", limit: int = 100) -> List[str]:
    """List folders in ADLS path.

    Args:
        path: Relative path within container
        limit: Maximum number of folders to return

    Returns:
        List of folder paths
    """
    ctx = get_logging_context()
    ctx.debug(
        "Listing ADLS folders",
        account=self.account,
        container=self.container,
        path=path,
        limit=limit,
    )

    try:
        fs = self._get_fs()
        full_path = f"{self.container}/{self.path_prefix}/{path}".strip("/")

        folders = []
        for entry in fs.ls(full_path, detail=True):
            if entry["type"] == "directory":
                folders.append(entry["name"])
                if len(folders) >= limit:
                    break

        ctx.info("Listed ADLS folders", count=len(folders))
        return folders

    except Exception as e:
        ctx.warning("Failed to list ADLS folders", error=str(e), path=path)
        return []

pandas_storage_options()

Get storage options for pandas/fsspec.

Returns:

Type Description
Dict[str, Any]

Dictionary with appropriate authentication parameters for fsspec

Source code in odibi/connections/azure_adls.py
def pandas_storage_options(self) -> Dict[str, Any]:
    """Get storage options for pandas/fsspec.

    Returns:
        Dictionary with appropriate authentication parameters for fsspec
    """
    ctx = get_logging_context()
    ctx.debug(
        "Building pandas storage options",
        account=self.account,
        container=self.container,
        auth_mode=self.auth_mode,
    )

    base_options = {"account_name": self.account}

    if self.auth_mode in ["key_vault", "direct_key"]:
        return {**base_options, "account_key": self.get_storage_key()}

    elif self.auth_mode == "sas_token":
        # Use get_storage_key() which handles KV fallback for SAS
        # Strip leading '?' if present for fsspec compatibility
        sas_token = self.get_storage_key()
        if sas_token and sas_token.startswith("?"):
            sas_token = sas_token[1:]
        return {**base_options, "sas_token": sas_token}

    elif self.auth_mode == "service_principal":
        return {
            **base_options,
            "tenant_id": self.tenant_id,
            "client_id": self.client_id,
            "client_secret": self.get_client_secret(),
        }

    elif self.auth_mode == "managed_identity":
        # adlfs supports using DefaultAzureCredential implicitly if anon=False
        # and no other creds provided, assuming azure.identity is installed
        return {**base_options, "anon": False}

    return base_options

preview(dataset, rows=5, columns=None)

Preview sample rows from an ADLS dataset.

Source code in odibi/connections/azure_adls.py
def preview(
    self, dataset: str, rows: int = 5, columns: Optional[List[str]] = None
) -> Dict[str, Any]:
    """Preview sample rows from an ADLS dataset."""
    ctx = get_logging_context()
    ctx.info("Previewing ADLS dataset", dataset=dataset, rows=rows)

    max_rows = min(rows, 100)  # Cap at 100

    try:
        import pandas as pd

        full_path = self.uri(dataset)
        storage_opts = self.pandas_storage_options()
        file_format = detect_file_format(dataset, self._get_fs())

        if file_format == "parquet" or file_format == "delta":
            df = pd.read_parquet(full_path, storage_options=storage_opts).head(max_rows)
        elif file_format == "csv":
            df = pd.read_csv(full_path, storage_options=storage_opts, nrows=max_rows)
        elif file_format == "json":
            df = pd.read_json(
                full_path, storage_options=storage_opts, lines=True, nrows=max_rows
            )
        else:
            ctx.warning("Unsupported format for preview", format=file_format)
            return PreviewResult(
                dataset=DatasetRef(name=dataset, kind="file", format=file_format),
            ).model_dump()

        if columns:
            df = df[[c for c in columns if c in df.columns]]

        result = PreviewResult(
            dataset=DatasetRef(name=dataset, kind="file", format=file_format),
            columns=df.columns.tolist(),
            rows=df.head(max_rows).to_dict(orient="records"),
            truncated=len(df) >= max_rows,
            format=file_format,
        )

        ctx.info("Preview complete", rows_returned=len(result.rows))
        return result.model_dump()

    except Exception as e:
        ctx.error("Failed to preview dataset", dataset=dataset, error=str(e))
        return PreviewResult(
            dataset=DatasetRef(name=dataset, kind="file"),
        ).model_dump()

profile(dataset, sample_rows=1000, columns=None)

Profile a dataset with statistics.

Parameters:

Name Type Description Default
dataset str

Relative path to file or folder

required
sample_rows int

Number of rows to sample (max 10000)

1000
columns Optional[List[str]]

Specific columns to profile (None = all)

None

Returns:

Type Description
Dict[str, Any]

TableProfile dict with stats

Source code in odibi/connections/azure_adls.py
def profile(
    self, dataset: str, sample_rows: int = 1000, columns: Optional[List[str]] = None
) -> Dict[str, Any]:
    """Profile a dataset with statistics.

    Args:
        dataset: Relative path to file or folder
        sample_rows: Number of rows to sample (max 10000)
        columns: Specific columns to profile (None = all)

    Returns:
        TableProfile dict with stats
    """
    ctx = get_logging_context()
    ctx.info("Profiling ADLS dataset", dataset=dataset, sample_rows=sample_rows)

    sample_rows = min(sample_rows, 10000)  # Cap at 10k

    try:
        import pandas as pd

        full_path = self.uri(dataset)
        storage_opts = self.pandas_storage_options()
        file_format = detect_file_format(dataset, self._get_fs())

        # Read sample
        if file_format == "parquet" or file_format == "delta":
            df = pd.read_parquet(full_path, storage_options=storage_opts).head(sample_rows)
        elif file_format == "csv":
            df = pd.read_csv(full_path, storage_options=storage_opts, nrows=sample_rows)
        elif file_format == "json":
            df = pd.read_json(
                full_path, storage_options=storage_opts, lines=True, nrows=sample_rows
            )
        else:
            ctx.warning("Unsupported format for profiling", format=file_format)
            return TableProfile(
                dataset=DatasetRef(name=dataset, kind="file", format=file_format),
                rows_sampled=0,
                columns=[],
            ).model_dump()

        # Profile columns
        profile_cols = columns or df.columns.tolist()
        profiled = []

        for col in profile_cols:
            if col not in df.columns:
                continue

            null_count = int(df[col].isnull().sum())
            null_pct = null_count / len(df) if len(df) > 0 else 0
            distinct_count = int(df[col].nunique())

            # Cardinality heuristic
            if distinct_count == len(df):
                cardinality = "unique"
            elif distinct_count > len(df) * 0.9:
                cardinality = "high"
            elif distinct_count > len(df) * 0.1:
                cardinality = "medium"
            else:
                cardinality = "low"

            sample_values = df[col].dropna().head(5).tolist()

            profiled.append(
                Column(
                    name=col,
                    dtype=str(df[col].dtype),
                    null_count=null_count,
                    null_pct=round(null_pct, 3),
                    cardinality=cardinality,
                    distinct_count=distinct_count,
                    sample_values=sample_values,
                )
            )

        # Detect candidate keys (unique non-null columns)
        candidate_keys = [
            c.name for c in profiled if c.cardinality == "unique" and c.null_count == 0
        ]

        # Detect candidate watermarks (datetime columns)
        candidate_watermarks = [
            c.name
            for c in profiled
            if "datetime" in c.dtype.lower() or "date" in c.dtype.lower()
        ]

        completeness = 1.0 - (df.isnull().sum().sum() / (len(df) * len(df.columns)))

        profile = TableProfile(
            dataset=DatasetRef(name=dataset, kind="file", format=file_format),
            rows_sampled=len(df),
            columns=profiled,
            candidate_keys=candidate_keys,
            candidate_watermarks=candidate_watermarks,
            completeness=round(completeness, 3),
        )

        ctx.info("Profiling complete", rows_sampled=len(df), columns=len(profiled))
        return profile.model_dump()

    except Exception as e:
        ctx.error("Failed to profile dataset", dataset=dataset, error=str(e))
        return TableProfile(
            dataset=DatasetRef(name=dataset, kind="file"),
            rows_sampled=0,
            columns=[],
        ).model_dump()

uri(path)

Build abfss:// URI for given path.

Parameters:

Name Type Description Default
path str

Relative path within container

required

Returns:

Type Description
str

Full abfss:// URI

Example

conn = AzureADLS( ... account="myaccount", container="data", ... auth_mode="direct_key", account_key="key123" ... ) conn.uri("folder/file.csv") 'abfss://data@myaccount.dfs.core.windows.net/folder/file.csv'

Source code in odibi/connections/azure_adls.py
def uri(self, path: str) -> str:
    """Build abfss:// URI for given path.

    Args:
        path: Relative path within container

    Returns:
        Full abfss:// URI

    Example:
        >>> conn = AzureADLS(
        ...     account="myaccount", container="data",
        ...     auth_mode="direct_key", account_key="key123"
        ... )
        >>> conn.uri("folder/file.csv")
        'abfss://data@myaccount.dfs.core.windows.net/folder/file.csv'
    """
    if self.path_prefix:
        full_path = posixpath.join(self.path_prefix, path.lstrip("/"))
    else:
        full_path = path.lstrip("/")

    return f"abfss://{self.container}@{self.account}.dfs.core.windows.net/{full_path}"

validate()

Validate ADLS connection configuration.

Raises:

Type Description
ValueError

If required fields are missing for the selected auth_mode

Source code in odibi/connections/azure_adls.py
def validate(self) -> None:
    """Validate ADLS connection configuration.

    Raises:
        ValueError: If required fields are missing for the selected auth_mode
    """
    ctx = get_logging_context()
    ctx.debug(
        "Validating AzureADLS connection",
        account=self.account,
        container=self.container,
        auth_mode=self.auth_mode,
    )

    if not self.account:
        ctx.error("ADLS connection validation failed: missing 'account'")
        raise ValueError(
            "ADLS connection requires 'account'. "
            "Provide the storage account name (e.g., account: 'mystorageaccount')."
        )
    if not self.container:
        ctx.error(
            "ADLS connection validation failed: missing 'container'",
            account=self.account,
        )
        raise ValueError(
            f"ADLS connection requires 'container' for account '{self.account}'. "
            "Provide the container/filesystem name."
        )

    if self.auth_mode == "key_vault":
        if not self.key_vault_name or not self.secret_name:
            ctx.error(
                "ADLS key_vault mode validation failed",
                account=self.account,
                container=self.container,
                key_vault_name=self.key_vault_name or "(missing)",
                secret_name=self.secret_name or "(missing)",
            )
            raise ValueError(
                f"key_vault mode requires 'key_vault_name' and 'secret_name' "
                f"for connection to {self.account}/{self.container}"
            )
    elif self.auth_mode == "direct_key":
        if not self.account_key:
            ctx.error(
                "ADLS direct_key mode validation failed: missing account_key",
                account=self.account,
                container=self.container,
            )
            raise ValueError(
                f"direct_key mode requires 'account_key' "
                f"for connection to {self.account}/{self.container}"
            )

        # Warn in production
        if os.getenv("ODIBI_ENV") == "production":
            ctx.warning(
                "Using direct_key in production is not recommended",
                account=self.account,
                container=self.container,
            )
            warnings.warn(
                f"⚠️  Using direct_key in production is not recommended. "
                f"Use auth_mode: key_vault. Connection: {self.account}/{self.container}",
                UserWarning,
            )
    elif self.auth_mode == "sas_token":
        if not self.sas_token and not (self.key_vault_name and self.secret_name):
            ctx.error(
                "ADLS sas_token mode validation failed",
                account=self.account,
                container=self.container,
            )
            raise ValueError(
                f"sas_token mode requires 'sas_token' (or key_vault_name/secret_name) "
                f"for connection to {self.account}/{self.container}"
            )
    elif self.auth_mode == "service_principal":
        if not self.tenant_id or not self.client_id:
            ctx.error(
                "ADLS service_principal mode validation failed",
                account=self.account,
                container=self.container,
                missing="tenant_id and/or client_id",
            )
            raise ValueError(
                f"service_principal mode requires 'tenant_id' and 'client_id' "
                f"for connection to {self.account}/{self.container}. "
                f"Got tenant_id={self.tenant_id or '(missing)'}, "
                f"client_id={self.client_id or '(missing)'}."
            )

        if not self.client_secret and not (self.key_vault_name and self.secret_name):
            ctx.error(
                "ADLS service_principal mode validation failed: missing client_secret",
                account=self.account,
                container=self.container,
            )
            raise ValueError(
                f"service_principal mode requires 'client_secret' "
                f"(or key_vault_name/secret_name) for {self.account}/{self.container}"
            )
    elif self.auth_mode == "managed_identity":
        # No specific config required, but we might check if environment supports it
        ctx.debug(
            "Using managed_identity auth mode",
            account=self.account,
            container=self.container,
        )
    else:
        ctx.error(
            "ADLS validation failed: unsupported auth_mode",
            account=self.account,
            container=self.container,
            auth_mode=self.auth_mode,
        )
        raise ValueError(
            f"Unsupported auth_mode: '{self.auth_mode}'. "
            f"Use 'key_vault', 'direct_key', 'service_principal', or 'managed_identity'."
        )

    ctx.info(
        "AzureADLS connection validated successfully",
        account=self.account,
        container=self.container,
        auth_mode=self.auth_mode,
    )

odibi.connections.azure_sql

Azure SQL Database Connection

Provides connectivity to Azure SQL databases with authentication support.

AzureSQL

Bases: BaseConnection

Azure SQL Database connection.

Supports: - SQL authentication (username/password) - Azure Active Directory Managed Identity - Connection pooling - Read/write operations via SQLAlchemy

Source code in odibi/connections/azure_sql.py
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
class AzureSQL(BaseConnection):
    """
    Azure SQL Database connection.

    Supports:
    - SQL authentication (username/password)
    - Azure Active Directory Managed Identity
    - Connection pooling
    - Read/write operations via SQLAlchemy
    """

    sql_dialect = "mssql"
    default_schema = "dbo"

    def __init__(
        self,
        server: str,
        database: str,
        driver: str = "ODBC Driver 18 for SQL Server",
        username: Optional[str] = None,
        password: Optional[str] = None,
        auth_mode: str = "aad_msi",  # "aad_msi", "sql", "key_vault"
        key_vault_name: Optional[str] = None,
        secret_name: Optional[str] = None,
        port: int = 1433,
        timeout: int = 30,
        trust_server_certificate: bool = True,
        **kwargs,
    ):
        """
        Initialize Azure SQL connection.

        Args:
            server: SQL server hostname (e.g., 'myserver.database.windows.net')
            database: Database name
            driver: ODBC driver name (default: ODBC Driver 18 for SQL Server)
            username: SQL auth username (required if auth_mode='sql')
            password: SQL auth password (required if auth_mode='sql')
            auth_mode: Authentication mode ('aad_msi', 'sql', 'key_vault')
            key_vault_name: Key Vault name (required if auth_mode='key_vault')
            secret_name: Secret name containing password (required if auth_mode='key_vault')
            port: SQL Server port (default: 1433)
            timeout: Connection timeout in seconds (default: 30)
            trust_server_certificate: Whether to trust the server certificate
                (default: True for backward compatibility).
        """
        ctx = get_logging_context()
        ctx.log_connection(
            connection_type="azure_sql",
            connection_name=f"{server}/{database}",
            action="init",
            server=server,
            database=database,
            auth_mode=auth_mode,
            port=port,
        )

        self.server = server
        self.database = database
        self.driver = driver
        self.username = username
        self.password = password
        self.auth_mode = auth_mode
        self.key_vault_name = key_vault_name
        self.secret_name = secret_name
        self.port = port
        self.timeout = timeout
        self.trust_server_certificate = trust_server_certificate
        self._engine = None
        self._cached_key = None  # For consistency with ADLS / parallel fetch

        ctx.debug(
            "AzureSQL connection initialized",
            server=server,
            database=database,
            auth_mode=auth_mode,
            driver=driver,
        )

    def get_password(self) -> Optional[str]:
        """Get password (cached)."""
        ctx = get_logging_context()

        if self.password:
            ctx.debug(
                "Using provided password",
                server=self.server,
                database=self.database,
            )
            return self.password

        if self._cached_key:
            ctx.debug(
                "Using cached password",
                server=self.server,
                database=self.database,
            )
            return self._cached_key

        if self.auth_mode == "key_vault":
            if not self.key_vault_name or not self.secret_name:
                ctx.error(
                    "Key Vault mode requires key_vault_name and secret_name",
                    server=self.server,
                    database=self.database,
                )
                raise ValueError(
                    f"key_vault mode requires 'key_vault_name' and 'secret_name' "
                    f"for connection to {self.server}/{self.database}. "
                    f"Got key_vault_name={self.key_vault_name or '(missing)'}, "
                    f"secret_name={self.secret_name or '(missing)'}."
                )

            ctx.debug(
                "Fetching password from Key Vault",
                server=self.server,
                key_vault_name=self.key_vault_name,
                secret_name=self.secret_name,
            )

            try:
                from azure.identity import DefaultAzureCredential
                from azure.keyvault.secrets import SecretClient

                credential = DefaultAzureCredential()
                kv_uri = f"https://{self.key_vault_name}.vault.azure.net"
                client = SecretClient(vault_url=kv_uri, credential=credential)
                secret = client.get_secret(self.secret_name)
                self._cached_key = secret.value
                logger.register_secret(self._cached_key)

                ctx.info(
                    "Successfully fetched password from Key Vault",
                    server=self.server,
                    key_vault_name=self.key_vault_name,
                )
                return self._cached_key
            except ImportError as e:
                ctx.error(
                    "Key Vault support requires azure libraries",
                    server=self.server,
                    error=str(e),
                )
                raise ImportError(
                    "Key Vault support requires 'azure-identity' and 'azure-keyvault-secrets'. "
                    "Install with: pip install odibi[azure]"
                )

        ctx.debug(
            "No password required for auth_mode",
            server=self.server,
            auth_mode=self.auth_mode,
        )
        return None

    def odbc_dsn(self) -> str:
        """Build ODBC connection string.

        Returns:
            ODBC DSN string

        Example:
            >>> conn = AzureSQL(server="myserver.database.windows.net", database="mydb")
            >>> conn.odbc_dsn()
            'Driver={ODBC Driver 18 for SQL Server};Server=tcp:myserver...'
        """
        ctx = get_logging_context()
        ctx.debug(
            "Building ODBC connection string",
            server=self.server,
            database=self.database,
            auth_mode=self.auth_mode,
        )

        dsn = (
            f"Driver={{{self.driver}}};"
            f"Server=tcp:{self.server},{self.port};"
            f"Database={self.database};"
            f"Encrypt=yes;"
            f"TrustServerCertificate={'yes' if self.trust_server_certificate else 'no'};"
            f"Connection Timeout={self.timeout};"
        )

        pwd = self.get_password()
        if self.username and pwd:
            dsn += f"UID={self.username};PWD={pwd};"
            ctx.debug(
                "Using SQL authentication",
                server=self.server,
                username=self.username,
            )
        elif self.auth_mode == "aad_msi":
            dsn += "Authentication=ActiveDirectoryMsi;"
            ctx.debug(
                "Using AAD Managed Identity authentication",
                server=self.server,
            )
        elif self.auth_mode == "aad_service_principal":
            # Not fully supported via ODBC string simply without token usually
            ctx.debug(
                "Using AAD Service Principal authentication",
                server=self.server,
            )

        return dsn

    def get_path(self, relative_path: str) -> str:
        """Get table reference for relative path.

        In Azure SQL, the relative path is the table reference itself
        (e.g., "schema.table" or "table"), so this method returns it as-is.

        Args:
            relative_path: Table reference (e.g., "dbo.users", "customers")

        Returns:
            Same table reference unchanged
        """
        return relative_path

    def validate(self) -> None:
        """Validate Azure SQL connection configuration."""
        ctx = get_logging_context()
        ctx.debug(
            "Validating AzureSQL connection",
            server=self.server,
            database=self.database,
            auth_mode=self.auth_mode,
        )

        if not self.server:
            ctx.error("AzureSQL validation failed: missing 'server'")
            raise ValueError(
                "Azure SQL connection requires 'server'. "
                "Provide the SQL server hostname (e.g., server: 'myserver.database.windows.net')."
            )
        if not self.database:
            ctx.error(
                "AzureSQL validation failed: missing 'database'",
                server=self.server,
            )
            raise ValueError(
                f"Azure SQL connection requires 'database' for server '{self.server}'."
            )

        if self.auth_mode == "sql":
            if not self.username:
                ctx.error(
                    "AzureSQL validation failed: SQL auth requires username",
                    server=self.server,
                    database=self.database,
                )
                raise ValueError(
                    f"Azure SQL with auth_mode='sql' requires 'username' "
                    f"for connection to {self.server}/{self.database}."
                )
            if not self.password and not (self.key_vault_name and self.secret_name):
                ctx.error(
                    "AzureSQL validation failed: SQL auth requires password",
                    server=self.server,
                    database=self.database,
                )
                raise ValueError(
                    "Azure SQL with auth_mode='sql' requires password "
                    "(or key_vault_name/secret_name)"
                )

        if self.auth_mode == "key_vault":
            if not self.key_vault_name or not self.secret_name:
                ctx.error(
                    "AzureSQL validation failed: key_vault mode missing config",
                    server=self.server,
                    database=self.database,
                )
                raise ValueError(
                    "Azure SQL with auth_mode='key_vault' requires key_vault_name and secret_name"
                )
            if not self.username:
                ctx.error(
                    "AzureSQL validation failed: key_vault mode requires username",
                    server=self.server,
                    database=self.database,
                )
                raise ValueError("Azure SQL with auth_mode='key_vault' requires username")

        ctx.info(
            "AzureSQL connection validated successfully",
            server=self.server,
            database=self.database,
            auth_mode=self.auth_mode,
        )

    def get_engine(self) -> Any:
        """
        Get or create SQLAlchemy engine.

        Returns:
            SQLAlchemy engine instance

        Raises:
            ConnectionError: If connection fails or drivers missing
        """
        ctx = get_logging_context()

        if self._engine is not None:
            ctx.debug(
                "Using cached SQLAlchemy engine",
                server=self.server,
                database=self.database,
            )
            return self._engine

        ctx.debug(
            "Creating SQLAlchemy engine",
            server=self.server,
            database=self.database,
        )

        try:
            from urllib.parse import quote_plus

            from sqlalchemy import create_engine
        except ImportError as e:
            ctx.error(
                "SQLAlchemy import failed",
                server=self.server,
                database=self.database,
                error=str(e),
            )
            raise ConnectionError(
                connection_name=f"AzureSQL({self.server})",
                reason="Required packages 'sqlalchemy' or 'pyodbc' not found.",
                suggestions=[
                    "Install required packages: pip install sqlalchemy pyodbc",
                    "Or install odibi with azure extras: pip install 'odibi[azure]'",
                ],
            )

        try:
            # Build connection string
            conn_str = self.odbc_dsn()
            connection_url = f"mssql+pyodbc:///?odbc_connect={quote_plus(conn_str)}"

            ctx.debug(
                "Creating SQLAlchemy engine with connection pooling",
                server=self.server,
                database=self.database,
            )

            # Create engine with connection pooling
            self._engine = create_engine(
                connection_url,
                pool_pre_ping=True,  # Verify connections before use
                pool_recycle=3600,  # Recycle connections after 1 hour
                echo=False,
            )

            # Test connection
            with self._engine.connect():
                pass

            ctx.info(
                "SQLAlchemy engine created successfully",
                server=self.server,
                database=self.database,
            )

            return self._engine

        except Exception as e:
            suggestions = self._get_error_suggestions(str(e))
            ctx.error(
                "Failed to create SQLAlchemy engine",
                server=self.server,
                database=self.database,
                error=str(e),
                suggestions=suggestions,
            )
            raise ConnectionError(
                connection_name=f"AzureSQL({self.server})",
                reason=f"Failed to create engine: {self._sanitize_error(str(e))}",
                suggestions=suggestions,
            )

    def read_sql(self, query: str, params: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
        """
        Execute SQL query and return results as DataFrame.

        Args:
            query: SQL query string
            params: Optional query parameters for parameterized queries

        Returns:
            Query results as pandas DataFrame

        Raises:
            ConnectionError: If execution fails
        """
        ctx = get_logging_context()
        ctx.debug(
            "Executing SQL query",
            server=self.server,
            database=self.database,
            query_length=len(query),
        )

        try:
            engine = self.get_engine()
            # Use SQLAlchemy connection directly (preferred by pandas)
            with engine.connect() as conn:
                result = pd.read_sql(query, conn, params=params)

            ctx.info(
                "SQL query executed successfully",
                server=self.server,
                database=self.database,
                rows_returned=len(result),
            )
            return result
        except Exception as e:
            if isinstance(e, ConnectionError):
                raise
            ctx.error(
                "SQL query execution failed",
                server=self.server,
                database=self.database,
                error=str(e),
            )
            raise ConnectionError(
                connection_name=f"AzureSQL({self.server})",
                reason=f"Query execution failed: {self._sanitize_error(str(e))}",
                suggestions=self._get_error_suggestions(str(e)),
            )

    def read_table(self, table_name: str, schema: Optional[str] = "dbo") -> pd.DataFrame:
        """
        Read entire table into DataFrame.

        Args:
            table_name: Name of the table
            schema: Schema name (default: dbo)

        Returns:
            Table contents as pandas DataFrame
        """
        ctx = get_logging_context()
        ctx.info(
            "Reading table",
            server=self.server,
            database=self.database,
            table_name=table_name,
            schema=schema,
        )

        if schema:
            query = f"SELECT * FROM [{schema}].[{table_name}]"
        else:
            query = f"SELECT * FROM [{table_name}]"

        return self.read_sql(query)

    def read_sql_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
        """
        Execute a SQL query and return results as DataFrame.

        Use this for custom SELECT queries (e.g., to exclude unsupported columns).

        Args:
            query: SQL SELECT query
            params: Optional parameters for parameterized query

        Returns:
            Query results as pandas DataFrame
        """
        return self.read_sql(query, params)

    def write_table(
        self,
        df: pd.DataFrame,
        table_name: str,
        schema: Optional[str] = "dbo",
        if_exists: str = "replace",
        index: bool = False,
        chunksize: Optional[int] = 1000,
    ) -> int:
        """
        Write DataFrame to SQL table.

        Args:
            df: DataFrame to write
            table_name: Name of the table
            schema: Schema name (default: dbo)
            if_exists: How to behave if table exists ('fail', 'replace', 'append')
            index: Whether to write DataFrame index as column
            chunksize: Number of rows to write in each batch (default: 1000)

        Returns:
            Number of rows written

        Raises:
            ConnectionError: If write fails
        """
        ctx = get_logging_context()
        ctx.info(
            "Writing DataFrame to table",
            server=self.server,
            database=self.database,
            table_name=table_name,
            schema=schema,
            rows=len(df),
            if_exists=if_exists,
            chunksize=chunksize,
        )

        try:
            engine = self.get_engine()

            rows_written = df.to_sql(
                name=table_name,
                con=engine,
                schema=schema,
                if_exists=if_exists,
                index=index,
                chunksize=chunksize,
                method="multi",  # Use multi-row INSERT for better performance
            )

            result_rows = rows_written if rows_written is not None else len(df)
            ctx.info(
                "Table write completed successfully",
                server=self.server,
                database=self.database,
                table_name=table_name,
                rows_written=result_rows,
            )
            return result_rows
        except Exception as e:
            if isinstance(e, ConnectionError):
                raise
            ctx.error(
                "Table write failed",
                server=self.server,
                database=self.database,
                table_name=table_name,
                error=str(e),
            )
            raise ConnectionError(
                connection_name=f"AzureSQL({self.server})",
                reason=f"Write operation failed: {self._sanitize_error(str(e))}",
                suggestions=self._get_error_suggestions(str(e)),
            )

    def execute_sql(self, sql: str, params: Optional[Dict[str, Any]] = None) -> Any:
        """
        Execute SQL statement (INSERT, UPDATE, DELETE, etc.).

        Alias for execute() - used by SqlServerMergeWriter.

        Args:
            sql: SQL statement
            params: Optional parameters for parameterized query

        Returns:
            Result from execution

        Raises:
            ConnectionError: If execution fails
        """
        return self.execute(sql, params)

    def execute(self, sql: str, params: Optional[Dict[str, Any]] = None) -> Any:
        """
        Execute SQL statement (INSERT, UPDATE, DELETE, etc.).

        Args:
            sql: SQL statement
            params: Optional parameters for parameterized query

        Returns:
            Result from execution

        Raises:
            ConnectionError: If execution fails
        """
        ctx = get_logging_context()
        ctx.debug(
            "Executing SQL statement",
            server=self.server,
            database=self.database,
            statement_length=len(sql),
        )

        try:
            engine = self.get_engine()
            from sqlalchemy import text

            # Use begin() for proper transaction handling in SQLAlchemy 1.4+
            with engine.begin() as conn:
                result = conn.execute(text(sql), params or {})
                # Fetch all results before transaction ends
                if result.returns_rows:
                    rows = result.fetchall()
                else:
                    rows = None
                # Transaction auto-commits on exit from begin() context

                ctx.info(
                    "SQL statement executed successfully",
                    server=self.server,
                    database=self.database,
                )
                return rows
        except Exception as e:
            if isinstance(e, ConnectionError):
                raise
            ctx.error(
                "SQL statement execution failed",
                server=self.server,
                database=self.database,
                error=str(e),
            )
            raise ConnectionError(
                connection_name=f"AzureSQL({self.server})",
                reason=f"Statement execution failed: {self._sanitize_error(str(e))}",
                suggestions=self._get_error_suggestions(str(e)),
            )

    # -------------------------------------------------------------------------
    # Discovery Methods
    # -------------------------------------------------------------------------

    def list_schemas(self) -> List[str]:
        """List all schemas in the database.

        Returns:
            List of schema names

        Example:
            >>> conn = AzureSQL(server="...", database="mydb")
            >>> schemas = conn.list_schemas()
            >>> print(schemas)
            ['dbo', 'staging', 'warehouse']
        """
        ctx = get_logging_context()
        ctx.debug("Listing schemas", server=self.server, database=self.database)

        query = """
            SELECT SCHEMA_NAME
            FROM INFORMATION_SCHEMA.SCHEMATA
            WHERE SCHEMA_NAME NOT IN ('db_owner', 'db_accessadmin', 'db_securityadmin',
                                      'db_ddladmin', 'db_backupoperator', 'db_datareader',
                                      'db_datawriter', 'db_denydatareader', 'db_denydatawriter',
                                      'sys', 'INFORMATION_SCHEMA', 'guest')
            ORDER BY SCHEMA_NAME
        """

        try:
            df = self.read_sql(query)
            schemas = df["SCHEMA_NAME"].tolist()
            ctx.info("Schemas listed successfully", count=len(schemas))
            return schemas
        except Exception as e:
            ctx.error("Failed to list schemas", error=str(e))
            return []

    def list_tables(self, schema: str = "dbo") -> List[Dict]:
        """List tables and views in a schema.

        Args:
            schema: Schema name (default: "dbo")

        Returns:
            List of dicts with keys: name, type (table/view), schema

        Example:
            >>> conn = AzureSQL(server="...", database="mydb")
            >>> tables = conn.list_tables("dbo")
            >>> print(tables)
            [{'name': 'customers', 'type': 'table', 'schema': 'dbo'},
             {'name': 'orders', 'type': 'table', 'schema': 'dbo'}]
        """
        ctx = get_logging_context()
        ctx.debug("Listing tables", schema=schema)

        query = """
            SELECT TABLE_NAME, TABLE_TYPE, TABLE_SCHEMA
            FROM INFORMATION_SCHEMA.TABLES
            WHERE TABLE_SCHEMA = :schema
            ORDER BY TABLE_NAME
        """

        try:
            df = self.read_sql(query, params={"schema": schema})
            tables = []
            for _, row in df.iterrows():
                tables.append(
                    {
                        "name": row["TABLE_NAME"],
                        "type": "table" if row["TABLE_TYPE"] == "BASE TABLE" else "view",
                        "schema": row["TABLE_SCHEMA"],
                    }
                )
            ctx.info("Tables listed successfully", schema=schema, count=len(tables))
            return tables
        except Exception as e:
            ctx.error("Failed to list tables", schema=schema, error=str(e))
            return []

    def get_table_info(self, table: str) -> Dict:
        """Get detailed schema information for a table.

        Args:
            table: Table name (can be "schema.table" or just "table")

        Returns:
            Schema-like dict with dataset and columns info

        Example:
            >>> conn = AzureSQL(server="...", database="mydb")
            >>> info = conn.get_table_info("dbo.customers")
            >>> print(info['columns'])
            [{'name': 'customer_id', 'dtype': 'int', ...}, ...]
        """
        ctx = get_logging_context()

        # Parse schema and table name
        if "." in table:
            parts = table.split(".")
            schema = parts[0]
            table_name = parts[1]
        else:
            schema = "dbo"
            table_name = table

        ctx.debug("Getting table info", schema=schema, table=table_name)

        query = """
            SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE, ORDINAL_POSITION
            FROM INFORMATION_SCHEMA.COLUMNS
            WHERE TABLE_SCHEMA = :schema AND TABLE_NAME = :table
            ORDER BY ORDINAL_POSITION
        """

        try:
            df = self.read_sql(query, params={"schema": schema, "table": table_name})

            columns = []
            for _, row in df.iterrows():
                columns.append(
                    {
                        "name": row["COLUMN_NAME"],
                        "dtype": row["DATA_TYPE"],
                        "nullable": row["IS_NULLABLE"] == "YES",
                    }
                )

            # Try to get row count
            row_count = None
            try:
                count_query = """
                    SELECT SUM(p.rows) AS row_count
                    FROM sys.tables t
                    INNER JOIN sys.partitions p ON t.object_id = p.object_id
                    INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
                    WHERE s.name = :schema AND t.name = :table
                    AND p.index_id IN (0,1)
                """
                count_df = self.read_sql(
                    count_query, params={"schema": schema, "table": table_name}
                )
                if not count_df.empty and count_df["row_count"].iloc[0] is not None:
                    row_count = int(count_df["row_count"].iloc[0])
            except Exception:
                pass  # Row count is optional

            dataset = DatasetRef(
                name=table_name,
                namespace=schema,
                kind="table",
                path=f"{schema}.{table_name}",
                row_count=row_count,
            )

            schema_obj = Schema(dataset=dataset, columns=[Column(**c) for c in columns])

            ctx.info("Table info retrieved", schema=schema, table=table_name, columns=len(columns))
            return schema_obj.model_dump()

        except Exception as e:
            ctx.error("Failed to get table info", schema=schema, table=table_name, error=str(e))
            return {}

    def discover_catalog(
        self,
        include_schema: bool = False,
        include_stats: bool = False,
        limit: Optional[int] = None,
        recursive: bool = True,
        path: str = "",
        pattern: str = "",
    ) -> Dict:
        """Discover all datasets in the database.

        Args:
            include_schema: If True, include column information for each table
            include_stats: If True, include row counts and stats
            limit: Maximum number of datasets per schema
            recursive: Ignored for SQL (schemas are flat)
            path: Scope to specific schema (e.g. "dbo", "sales")
            pattern: Filter table names by pattern (e.g. "fact_*", "*_2024")

        Returns:
            CatalogSummary dict with schemas and tables

        Example:
            >>> conn = AzureSQL(server="...", database="mydb")
            >>> catalog = conn.discover_catalog(include_schema=True, limit=10)
            >>> print(catalog['total_datasets'])
            25
        """
        ctx = get_logging_context()
        ctx.info(
            "Discovering catalog",
            include_schema=include_schema,
            include_stats=include_stats,
            limit=limit,
        )

        # Filter schemas if path is specified (path = schema name for SQL)
        all_schemas = self.list_schemas()
        if path:
            schemas = [s for s in all_schemas if s == path]
            if not schemas:
                ctx.warning(f"Schema not found: {path}")
                return CatalogSummary(
                    connection_name=f"{self.server}/{self.database}",
                    connection_type="azure_sql",
                    schemas=[],
                    tables=[],
                    total_datasets=0,
                    next_step=f"Schema '{path}' not found. Available: {', '.join(all_schemas)}",
                ).model_dump()
        else:
            schemas = all_schemas

        all_tables = []
        import fnmatch

        has_pattern = bool(pattern)

        for schema in schemas:
            tables = self.list_tables(schema)

            for table_info in tables:
                # Apply pattern filter to table name
                if has_pattern and not fnmatch.fnmatch(table_info["name"], pattern):
                    continue

                if limit and len(all_tables) >= limit:
                    break
                dataset = DatasetRef(
                    name=table_info["name"],
                    namespace=table_info["schema"],
                    kind="table" if table_info["type"] == "table" else "view",
                    path=f"{table_info['schema']}.{table_info['name']}",
                )

                # Optionally get schema and stats
                if include_schema or include_stats:
                    try:
                        full_info = self.get_table_info(
                            f"{table_info['schema']}.{table_info['name']}"
                        )
                        if full_info and "dataset" in full_info:
                            dataset = DatasetRef(**full_info["dataset"])
                    except Exception as e:
                        ctx.debug(
                            "Could not get extended info for table",
                            table=table_info["name"],
                            error=str(e),
                        )

                all_tables.append(dataset)

        catalog = CatalogSummary(
            connection_name=f"{self.server}/{self.database}",
            connection_type="azure_sql",
            schemas=schemas,
            tables=all_tables,
            total_datasets=len(all_tables),
            next_step="Use profile() to analyze specific tables or get_table_info() for schema details",
            suggestions=[
                f"Found {len(all_tables)} tables across {len(schemas)} schemas",
                "Use include_schema=True to get column details",
                "Use include_stats=True for row counts",
            ],
        )

        ctx.info("Catalog discovery complete", total_datasets=len(all_tables), schemas=len(schemas))
        return catalog.model_dump()

    def profile(
        self,
        dataset: str,
        sample_rows: int = 1000,
        columns: Optional[List[str]] = None,
    ) -> Dict:
        """Profile a table with statistical analysis.

        Args:
            dataset: Table name (can be "schema.table" or just "table")
            sample_rows: Number of rows to sample (default: 1000)
            columns: Specific columns to profile (None = all columns)

        Returns:
            TableProfile dict with profiling statistics

        Example:
            >>> conn = AzureSQL(server="...", database="mydb")
            >>> profile = conn.profile("dbo.customers", sample_rows=5000)
            >>> print(profile['candidate_keys'])
            ['customer_id']
        """
        ctx = get_logging_context()

        # Parse schema and table name
        if "." in dataset:
            parts = dataset.split(".")
            schema = parts[0]
            table_name = parts[1]
        else:
            schema = "dbo"
            table_name = dataset

        ctx.info("Profiling table", schema=schema, table=table_name, sample_rows=sample_rows)

        # Read sample data
        col_filter = ", ".join(f"[{c}]" for c in columns) if columns else "*"
        query = f"SELECT TOP ({sample_rows}) {col_filter} FROM [{schema}].[{table_name}]"

        try:
            df = self.read_sql(query)

            # Get total row count
            total_rows = None
            try:
                count_query = """
                    SELECT SUM(p.rows) AS row_count
                    FROM sys.tables t
                    INNER JOIN sys.partitions p ON t.object_id = p.object_id
                    INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
                    WHERE s.name = :schema AND t.name = :table
                    AND p.index_id IN (0,1)
                """
                count_df = self.read_sql(
                    count_query, params={"schema": schema, "table": table_name}
                )
                if not count_df.empty and count_df["row_count"].iloc[0] is not None:
                    total_rows = int(count_df["row_count"].iloc[0])
            except Exception:
                total_rows = len(df)

            # Profile columns
            profiled_columns = []
            candidate_keys = []
            candidate_watermarks = []

            for col in df.columns:
                null_count = int(df[col].isnull().sum())
                null_pct = null_count / len(df) if len(df) > 0 else 0
                distinct_count = int(df[col].nunique())

                # Determine cardinality
                if distinct_count == len(df):
                    cardinality = "unique"
                    candidate_keys.append(col)
                elif distinct_count > len(df) * 0.9:
                    cardinality = "high"
                elif distinct_count < 10:
                    cardinality = "low"
                else:
                    cardinality = "medium"

                # Check if datetime (candidate watermark)
                if pd.api.types.is_datetime64_any_dtype(df[col]):
                    candidate_watermarks.append(col)

                # Get sample values (non-null)
                sample_values = df[col].dropna().head(5).tolist()

                profiled_columns.append(
                    Column(
                        name=col,
                        dtype=str(df[col].dtype),
                        nullable=null_count > 0,
                        null_count=null_count,
                        null_pct=round(null_pct, 4),
                        cardinality=cardinality,
                        distinct_count=distinct_count,
                        sample_values=sample_values,
                    )
                )

            # Calculate overall completeness
            total_cells = len(df) * len(df.columns)
            null_cells = df.isnull().sum().sum()
            completeness = 1 - (null_cells / total_cells) if total_cells > 0 else 0

            dataset_ref = DatasetRef(
                name=table_name,
                namespace=schema,
                kind="table",
                path=f"{schema}.{table_name}",
                row_count=total_rows,
            )

            profile = TableProfile(
                dataset=dataset_ref,
                rows_sampled=len(df),
                total_rows=total_rows,
                columns=profiled_columns,
                candidate_keys=candidate_keys,
                candidate_watermarks=candidate_watermarks,
                completeness=round(completeness, 4),
                suggestions=[
                    (
                        f"Sampled {len(df)} of {total_rows} rows"
                        if total_rows
                        else f"Sampled {len(df)} rows"
                    ),
                    (
                        f"Found {len(candidate_keys)} candidate key columns: {candidate_keys}"
                        if candidate_keys
                        else "No unique key columns found"
                    ),
                    (
                        f"Found {len(candidate_watermarks)} timestamp columns: {candidate_watermarks}"
                        if candidate_watermarks
                        else "No timestamp columns for incremental loading"
                    ),
                ],
            )

            ctx.info(
                "Table profiling complete",
                schema=schema,
                table=table_name,
                rows_sampled=len(df),
                columns=len(profiled_columns),
                candidate_keys=len(candidate_keys),
            )

            return profile.model_dump()

        except Exception as e:
            ctx.error("Failed to profile table", schema=schema, table=table_name, error=str(e))
            return {}

    def preview(
        self, dataset: str, rows: int = 5, columns: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Preview sample rows from a SQL table.

        Args:
            dataset: Table name (can be "schema.table" or just "table")
            rows: Number of rows to return (default: 5, max: 100)
            columns: Specific columns to include (None = all)

        Returns:
            PreviewResult dict with sample rows

        Example:
            >>> conn = AzureSQL(server="...", database="mydb")
            >>> preview = conn.preview("dbo.customers", rows=10)
            >>> for row in preview['rows']:
            ...     print(row)
        """
        ctx = get_logging_context()

        max_rows = min(rows, 100)

        # Parse schema and table name
        if "." in dataset:
            parts = dataset.split(".")
            schema = parts[0]
            table_name = parts[1]
        else:
            schema = "dbo"
            table_name = dataset

        ctx.info("Previewing table", schema=schema, table=table_name, rows=max_rows)

        try:
            col_filter = "*"
            if columns:
                col_filter = ", ".join(f"[{c}]" for c in columns)

            query = f"SELECT TOP ({max_rows}) {col_filter} FROM [{schema}].[{table_name}]"
            df = self.read_sql(query)

            # Get total row count
            total_rows = None
            try:
                count_query = """
                    SELECT SUM(p.rows) AS row_count
                    FROM sys.tables t
                    INNER JOIN sys.partitions p ON t.object_id = p.object_id
                    INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
                    WHERE s.name = :schema AND t.name = :table
                    AND p.index_id IN (0,1)
                """
                count_df = self.read_sql(
                    count_query, params={"schema": schema, "table": table_name}
                )
                if not count_df.empty and count_df["row_count"].iloc[0] is not None:
                    total_rows = int(count_df["row_count"].iloc[0])
            except Exception:
                pass

            result = PreviewResult(
                dataset=DatasetRef(
                    name=table_name,
                    namespace=schema,
                    kind="table",
                    path=f"{schema}.{table_name}",
                    row_count=total_rows,
                ),
                columns=df.columns.tolist(),
                rows=df.to_dict(orient="records"),
                total_rows=total_rows,
                truncated=(total_rows or 0) > max_rows,
            )

            ctx.info("Preview complete", schema=schema, table=table_name, rows_returned=len(df))
            return result.model_dump()

        except Exception as e:
            ctx.error("Failed to preview table", schema=schema, table=table_name, error=str(e))
            return PreviewResult(
                dataset=DatasetRef(name=table_name, namespace=schema, kind="table"),
            ).model_dump()

    def relationships(self, schema: Optional[str] = None) -> List[Dict[str, Any]]:
        """Discover foreign key relationships in the database.

        Args:
            schema: Limit to specific schema (default: all schemas)

        Returns:
            List of Relationship dicts with parent/child table info and key columns

        Example:
            >>> conn = AzureSQL(server="...", database="mydb")
            >>> rels = conn.relationships("dbo")
            >>> for rel in rels:
            ...     print(f"{rel['child']['name']} -> {rel['parent']['name']}")
        """
        ctx = get_logging_context()
        ctx.info("Discovering relationships", schema=schema or "all")

        query = """
            SELECT
                fk.name AS fk_name,
                ps.name AS parent_schema,
                pt.name AS parent_table,
                pc.name AS parent_column,
                cs.name AS child_schema,
                ct.name AS child_table,
                cc.name AS child_column
            FROM sys.foreign_keys fk
            INNER JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id
            INNER JOIN sys.tables pt ON fk.referenced_object_id = pt.object_id
            INNER JOIN sys.schemas ps ON pt.schema_id = ps.schema_id
            INNER JOIN sys.columns pc ON fkc.referenced_object_id = pc.object_id
                AND fkc.referenced_column_id = pc.column_id
            INNER JOIN sys.tables ct ON fk.parent_object_id = ct.object_id
            INNER JOIN sys.schemas cs ON ct.schema_id = cs.schema_id
            INNER JOIN sys.columns cc ON fkc.parent_object_id = cc.object_id
                AND fkc.parent_column_id = cc.column_id
        """

        if schema:
            query += "\n            WHERE cs.name = :schema OR ps.name = :schema"

        query += "\n            ORDER BY fk.name, fkc.constraint_column_id"

        try:
            params = {"schema": schema} if schema else None
            df = self.read_sql(query, params=params)

            if df.empty:
                ctx.info("No foreign key relationships found", schema=schema or "all")
                return []

            # Group by FK name to build relationships
            relationships = []
            for fk_name, group in df.groupby("fk_name"):
                first = group.iloc[0]
                keys = [(row["parent_column"], row["child_column"]) for _, row in group.iterrows()]

                rel = Relationship(
                    parent=DatasetRef(
                        name=first["parent_table"],
                        namespace=first["parent_schema"],
                        kind="table",
                        path=f"{first['parent_schema']}.{first['parent_table']}",
                    ),
                    child=DatasetRef(
                        name=first["child_table"],
                        namespace=first["child_schema"],
                        kind="table",
                        path=f"{first['child_schema']}.{first['child_table']}",
                    ),
                    keys=keys,
                    source="declared",
                    confidence=1.0,
                    details={"constraint_name": fk_name},
                )
                relationships.append(rel.model_dump())

            ctx.info("Relationships discovered", count=len(relationships))
            return relationships

        except Exception as e:
            ctx.error("Failed to discover relationships", error=str(e))
            return []

    def get_freshness(
        self,
        dataset: str,
        timestamp_column: Optional[str] = None,
    ) -> Dict:
        """Get data freshness information.

        Args:
            dataset: Table name (can be "schema.table" or just "table")
            timestamp_column: Column to check for max timestamp (optional)

        Returns:
            FreshnessResult dict with last_updated timestamp

        Example:
            >>> conn = AzureSQL(server="...", database="mydb")
            >>> freshness = conn.get_freshness("dbo.orders", timestamp_column="order_date")
            >>> print(freshness['last_updated'])
            2024-03-15 10:30:00
        """
        ctx = get_logging_context()

        # Parse schema and table name
        if "." in dataset:
            parts = dataset.split(".")
            schema = parts[0]
            table_name = parts[1]
        else:
            schema = "dbo"
            table_name = dataset

        ctx.debug("Getting freshness", schema=schema, table=table_name, column=timestamp_column)

        dataset_ref = DatasetRef(
            name=table_name,
            namespace=schema,
            kind="table",
            path=f"{schema}.{table_name}",
        )

        # If timestamp column specified, query data
        if timestamp_column:
            try:
                query = f"SELECT MAX([{timestamp_column}]) AS max_ts FROM [{schema}].[{table_name}]"
                df = self.read_sql(query)

                if not df.empty and df["max_ts"].iloc[0] is not None:
                    last_updated = pd.to_datetime(df["max_ts"].iloc[0])
                    age_hours = (datetime.utcnow() - last_updated).total_seconds() / 3600

                    result = FreshnessResult(
                        dataset=dataset_ref,
                        last_updated=last_updated,
                        source="data",
                        age_hours=round(age_hours, 2),
                        details={"timestamp_column": timestamp_column},
                    )

                    ctx.info(
                        "Freshness retrieved from data",
                        schema=schema,
                        table=table_name,
                        age_hours=age_hours,
                    )
                    return result.model_dump()
            except Exception as e:
                ctx.debug("Could not get freshness from data column", error=str(e))

        # Fallback to table metadata
        try:
            query = """
                SELECT t.modify_date
                FROM sys.tables t
                INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
                WHERE s.name = :schema AND t.name = :table
            """
            df = self.read_sql(query, params={"schema": schema, "table": table_name})

            if not df.empty and df["modify_date"].iloc[0] is not None:
                last_updated = pd.to_datetime(df["modify_date"].iloc[0])
                age_hours = (datetime.utcnow() - last_updated).total_seconds() / 3600

                result = FreshnessResult(
                    dataset=dataset_ref,
                    last_updated=last_updated,
                    source="metadata",
                    age_hours=round(age_hours, 2),
                    details={"note": "Table modification time from sys.tables"},
                )

                ctx.info(
                    "Freshness retrieved from metadata",
                    schema=schema,
                    table=table_name,
                    age_hours=age_hours,
                )
                return result.model_dump()
        except Exception as e:
            ctx.error("Failed to get freshness", schema=schema, table=table_name, error=str(e))

        return {}

    def close(self):
        """Close database connection and dispose of engine.

        Cleanly closes the SQLAlchemy connection pool and disposes of the engine.
        Safe to call multiple times (subsequent calls are no-ops).
        Should be called when done with the connection to free database resources.
        """
        ctx = get_logging_context()
        ctx.debug(
            "Closing AzureSQL connection",
            server=self.server,
            database=self.database,
        )

        if self._engine:
            self._engine.dispose()
            self._engine = None
            ctx.info(
                "AzureSQL connection closed",
                server=self.server,
                database=self.database,
            )

    def quote_identifier(self, name: str) -> str:
        """Quote an identifier using SQL Server bracket notation."""
        return f"[{name}]"

    def qualify_table(self, table_name: str, schema: str = "") -> str:
        """Build a SQL Server qualified table reference."""
        schema = schema or self.default_schema
        if schema:
            return f"[{schema}].[{table_name}]"
        return f"[{table_name}]"

    def build_select_query(
        self,
        table_name: str,
        schema: str = "",
        where: str = "",
        limit: int = -1,
        columns: str = "*",
    ) -> str:
        """Build a SELECT query using T-SQL syntax."""
        qualified = self.qualify_table(table_name, schema)
        if limit >= 0:
            query = f"SELECT TOP ({limit}) {columns} FROM {qualified}"
        else:
            query = f"SELECT {columns} FROM {qualified}"
        if where:
            query += f" WHERE {where}"
        return query

    def _sanitize_error(self, error_msg: str) -> str:
        """Remove credentials from error messages to prevent leaks.

        Args:
            error_msg: Raw error message that may contain credentials.

        Returns:
            Sanitized error message with credentials redacted.
        """
        import re

        sanitized = re.sub(r"PWD=[^;]*", "PWD=***", error_msg)
        sanitized = re.sub(r"password=[^;]*", "password=***", sanitized, flags=re.IGNORECASE)
        sanitized = re.sub(r"UID=[^;]*", "UID=***", sanitized)
        sanitized = re.sub(r"user=[^;]*", "user=***", sanitized, flags=re.IGNORECASE)
        return sanitized

    def _get_error_suggestions(self, error_msg: str) -> List[str]:
        """Generate suggestions using centralized error suggestion engine."""
        try:
            error = Exception(error_msg)
            return get_suggestions_for_connection(
                error=error,
                connection_name=self.name if hasattr(self, "name") else "azure_sql",
                connection_type="azure_sql",
                auth_mode=self.auth_mode,
            )
        except Exception:
            return []

    def get_spark_options(self) -> Dict[str, str]:
        """Get Spark JDBC options.

        Returns:
            Dictionary of Spark JDBC options (url, user, password, etc.)
        """
        ctx = get_logging_context()
        ctx.info(
            "Building Spark JDBC options",
            server=self.server,
            database=self.database,
            auth_mode=self.auth_mode,
        )

        jdbc_url = (
            f"jdbc:sqlserver://{self.server}:{self.port};"
            f"databaseName={self.database};encrypt=true;"
            f"trustServerCertificate={'true' if self.trust_server_certificate else 'false'};"
        )

        if self.auth_mode == "aad_msi":
            jdbc_url += (
                "hostNameInCertificate=*.database.windows.net;"
                "loginTimeout=30;authentication=ActiveDirectoryMsi;"
            )
            ctx.debug(
                "Configured JDBC URL for AAD MSI",
                server=self.server,
            )
        elif self.auth_mode == "aad_service_principal":
            # Not fully implemented in init yet, but placeholder
            ctx.debug(
                "Configured JDBC URL for AAD Service Principal",
                server=self.server,
            )

        options = {
            "url": jdbc_url,
            "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
        }

        if self.auth_mode == "sql" or self.auth_mode == "key_vault":
            if self.username:
                options["user"] = self.username

            pwd = self.get_password()
            if pwd:
                options["password"] = pwd

            ctx.debug(
                "Added SQL authentication to Spark options",
                server=self.server,
                username=self.username,
            )

        ctx.info(
            "Spark JDBC options built successfully",
            server=self.server,
            database=self.database,
        )

        return options

__init__(server, database, driver='ODBC Driver 18 for SQL Server', username=None, password=None, auth_mode='aad_msi', key_vault_name=None, secret_name=None, port=1433, timeout=30, trust_server_certificate=True, **kwargs)

Initialize Azure SQL connection.

Parameters:

Name Type Description Default
server str

SQL server hostname (e.g., 'myserver.database.windows.net')

required
database str

Database name

required
driver str

ODBC driver name (default: ODBC Driver 18 for SQL Server)

'ODBC Driver 18 for SQL Server'
username Optional[str]

SQL auth username (required if auth_mode='sql')

None
password Optional[str]

SQL auth password (required if auth_mode='sql')

None
auth_mode str

Authentication mode ('aad_msi', 'sql', 'key_vault')

'aad_msi'
key_vault_name Optional[str]

Key Vault name (required if auth_mode='key_vault')

None
secret_name Optional[str]

Secret name containing password (required if auth_mode='key_vault')

None
port int

SQL Server port (default: 1433)

1433
timeout int

Connection timeout in seconds (default: 30)

30
trust_server_certificate bool

Whether to trust the server certificate (default: True for backward compatibility).

True
Source code in odibi/connections/azure_sql.py
def __init__(
    self,
    server: str,
    database: str,
    driver: str = "ODBC Driver 18 for SQL Server",
    username: Optional[str] = None,
    password: Optional[str] = None,
    auth_mode: str = "aad_msi",  # "aad_msi", "sql", "key_vault"
    key_vault_name: Optional[str] = None,
    secret_name: Optional[str] = None,
    port: int = 1433,
    timeout: int = 30,
    trust_server_certificate: bool = True,
    **kwargs,
):
    """
    Initialize Azure SQL connection.

    Args:
        server: SQL server hostname (e.g., 'myserver.database.windows.net')
        database: Database name
        driver: ODBC driver name (default: ODBC Driver 18 for SQL Server)
        username: SQL auth username (required if auth_mode='sql')
        password: SQL auth password (required if auth_mode='sql')
        auth_mode: Authentication mode ('aad_msi', 'sql', 'key_vault')
        key_vault_name: Key Vault name (required if auth_mode='key_vault')
        secret_name: Secret name containing password (required if auth_mode='key_vault')
        port: SQL Server port (default: 1433)
        timeout: Connection timeout in seconds (default: 30)
        trust_server_certificate: Whether to trust the server certificate
            (default: True for backward compatibility).
    """
    ctx = get_logging_context()
    ctx.log_connection(
        connection_type="azure_sql",
        connection_name=f"{server}/{database}",
        action="init",
        server=server,
        database=database,
        auth_mode=auth_mode,
        port=port,
    )

    self.server = server
    self.database = database
    self.driver = driver
    self.username = username
    self.password = password
    self.auth_mode = auth_mode
    self.key_vault_name = key_vault_name
    self.secret_name = secret_name
    self.port = port
    self.timeout = timeout
    self.trust_server_certificate = trust_server_certificate
    self._engine = None
    self._cached_key = None  # For consistency with ADLS / parallel fetch

    ctx.debug(
        "AzureSQL connection initialized",
        server=server,
        database=database,
        auth_mode=auth_mode,
        driver=driver,
    )

build_select_query(table_name, schema='', where='', limit=-1, columns='*')

Build a SELECT query using T-SQL syntax.

Source code in odibi/connections/azure_sql.py
def build_select_query(
    self,
    table_name: str,
    schema: str = "",
    where: str = "",
    limit: int = -1,
    columns: str = "*",
) -> str:
    """Build a SELECT query using T-SQL syntax."""
    qualified = self.qualify_table(table_name, schema)
    if limit >= 0:
        query = f"SELECT TOP ({limit}) {columns} FROM {qualified}"
    else:
        query = f"SELECT {columns} FROM {qualified}"
    if where:
        query += f" WHERE {where}"
    return query

close()

Close database connection and dispose of engine.

Cleanly closes the SQLAlchemy connection pool and disposes of the engine. Safe to call multiple times (subsequent calls are no-ops). Should be called when done with the connection to free database resources.

Source code in odibi/connections/azure_sql.py
def close(self):
    """Close database connection and dispose of engine.

    Cleanly closes the SQLAlchemy connection pool and disposes of the engine.
    Safe to call multiple times (subsequent calls are no-ops).
    Should be called when done with the connection to free database resources.
    """
    ctx = get_logging_context()
    ctx.debug(
        "Closing AzureSQL connection",
        server=self.server,
        database=self.database,
    )

    if self._engine:
        self._engine.dispose()
        self._engine = None
        ctx.info(
            "AzureSQL connection closed",
            server=self.server,
            database=self.database,
        )

discover_catalog(include_schema=False, include_stats=False, limit=None, recursive=True, path='', pattern='')

Discover all datasets in the database.

Parameters:

Name Type Description Default
include_schema bool

If True, include column information for each table

False
include_stats bool

If True, include row counts and stats

False
limit Optional[int]

Maximum number of datasets per schema

None
recursive bool

Ignored for SQL (schemas are flat)

True
path str

Scope to specific schema (e.g. "dbo", "sales")

''
pattern str

Filter table names by pattern (e.g. "fact_", "_2024")

''

Returns:

Type Description
Dict

CatalogSummary dict with schemas and tables

Example

conn = AzureSQL(server="...", database="mydb") catalog = conn.discover_catalog(include_schema=True, limit=10) print(catalog['total_datasets']) 25

Source code in odibi/connections/azure_sql.py
def discover_catalog(
    self,
    include_schema: bool = False,
    include_stats: bool = False,
    limit: Optional[int] = None,
    recursive: bool = True,
    path: str = "",
    pattern: str = "",
) -> Dict:
    """Discover all datasets in the database.

    Args:
        include_schema: If True, include column information for each table
        include_stats: If True, include row counts and stats
        limit: Maximum number of datasets per schema
        recursive: Ignored for SQL (schemas are flat)
        path: Scope to specific schema (e.g. "dbo", "sales")
        pattern: Filter table names by pattern (e.g. "fact_*", "*_2024")

    Returns:
        CatalogSummary dict with schemas and tables

    Example:
        >>> conn = AzureSQL(server="...", database="mydb")
        >>> catalog = conn.discover_catalog(include_schema=True, limit=10)
        >>> print(catalog['total_datasets'])
        25
    """
    ctx = get_logging_context()
    ctx.info(
        "Discovering catalog",
        include_schema=include_schema,
        include_stats=include_stats,
        limit=limit,
    )

    # Filter schemas if path is specified (path = schema name for SQL)
    all_schemas = self.list_schemas()
    if path:
        schemas = [s for s in all_schemas if s == path]
        if not schemas:
            ctx.warning(f"Schema not found: {path}")
            return CatalogSummary(
                connection_name=f"{self.server}/{self.database}",
                connection_type="azure_sql",
                schemas=[],
                tables=[],
                total_datasets=0,
                next_step=f"Schema '{path}' not found. Available: {', '.join(all_schemas)}",
            ).model_dump()
    else:
        schemas = all_schemas

    all_tables = []
    import fnmatch

    has_pattern = bool(pattern)

    for schema in schemas:
        tables = self.list_tables(schema)

        for table_info in tables:
            # Apply pattern filter to table name
            if has_pattern and not fnmatch.fnmatch(table_info["name"], pattern):
                continue

            if limit and len(all_tables) >= limit:
                break
            dataset = DatasetRef(
                name=table_info["name"],
                namespace=table_info["schema"],
                kind="table" if table_info["type"] == "table" else "view",
                path=f"{table_info['schema']}.{table_info['name']}",
            )

            # Optionally get schema and stats
            if include_schema or include_stats:
                try:
                    full_info = self.get_table_info(
                        f"{table_info['schema']}.{table_info['name']}"
                    )
                    if full_info and "dataset" in full_info:
                        dataset = DatasetRef(**full_info["dataset"])
                except Exception as e:
                    ctx.debug(
                        "Could not get extended info for table",
                        table=table_info["name"],
                        error=str(e),
                    )

            all_tables.append(dataset)

    catalog = CatalogSummary(
        connection_name=f"{self.server}/{self.database}",
        connection_type="azure_sql",
        schemas=schemas,
        tables=all_tables,
        total_datasets=len(all_tables),
        next_step="Use profile() to analyze specific tables or get_table_info() for schema details",
        suggestions=[
            f"Found {len(all_tables)} tables across {len(schemas)} schemas",
            "Use include_schema=True to get column details",
            "Use include_stats=True for row counts",
        ],
    )

    ctx.info("Catalog discovery complete", total_datasets=len(all_tables), schemas=len(schemas))
    return catalog.model_dump()

execute(sql, params=None)

Execute SQL statement (INSERT, UPDATE, DELETE, etc.).

Parameters:

Name Type Description Default
sql str

SQL statement

required
params Optional[Dict[str, Any]]

Optional parameters for parameterized query

None

Returns:

Type Description
Any

Result from execution

Raises:

Type Description
ConnectionError

If execution fails

Source code in odibi/connections/azure_sql.py
def execute(self, sql: str, params: Optional[Dict[str, Any]] = None) -> Any:
    """
    Execute SQL statement (INSERT, UPDATE, DELETE, etc.).

    Args:
        sql: SQL statement
        params: Optional parameters for parameterized query

    Returns:
        Result from execution

    Raises:
        ConnectionError: If execution fails
    """
    ctx = get_logging_context()
    ctx.debug(
        "Executing SQL statement",
        server=self.server,
        database=self.database,
        statement_length=len(sql),
    )

    try:
        engine = self.get_engine()
        from sqlalchemy import text

        # Use begin() for proper transaction handling in SQLAlchemy 1.4+
        with engine.begin() as conn:
            result = conn.execute(text(sql), params or {})
            # Fetch all results before transaction ends
            if result.returns_rows:
                rows = result.fetchall()
            else:
                rows = None
            # Transaction auto-commits on exit from begin() context

            ctx.info(
                "SQL statement executed successfully",
                server=self.server,
                database=self.database,
            )
            return rows
    except Exception as e:
        if isinstance(e, ConnectionError):
            raise
        ctx.error(
            "SQL statement execution failed",
            server=self.server,
            database=self.database,
            error=str(e),
        )
        raise ConnectionError(
            connection_name=f"AzureSQL({self.server})",
            reason=f"Statement execution failed: {self._sanitize_error(str(e))}",
            suggestions=self._get_error_suggestions(str(e)),
        )

execute_sql(sql, params=None)

Execute SQL statement (INSERT, UPDATE, DELETE, etc.).

Alias for execute() - used by SqlServerMergeWriter.

Parameters:

Name Type Description Default
sql str

SQL statement

required
params Optional[Dict[str, Any]]

Optional parameters for parameterized query

None

Returns:

Type Description
Any

Result from execution

Raises:

Type Description
ConnectionError

If execution fails

Source code in odibi/connections/azure_sql.py
def execute_sql(self, sql: str, params: Optional[Dict[str, Any]] = None) -> Any:
    """
    Execute SQL statement (INSERT, UPDATE, DELETE, etc.).

    Alias for execute() - used by SqlServerMergeWriter.

    Args:
        sql: SQL statement
        params: Optional parameters for parameterized query

    Returns:
        Result from execution

    Raises:
        ConnectionError: If execution fails
    """
    return self.execute(sql, params)

get_engine()

Get or create SQLAlchemy engine.

Returns:

Type Description
Any

SQLAlchemy engine instance

Raises:

Type Description
ConnectionError

If connection fails or drivers missing

Source code in odibi/connections/azure_sql.py
def get_engine(self) -> Any:
    """
    Get or create SQLAlchemy engine.

    Returns:
        SQLAlchemy engine instance

    Raises:
        ConnectionError: If connection fails or drivers missing
    """
    ctx = get_logging_context()

    if self._engine is not None:
        ctx.debug(
            "Using cached SQLAlchemy engine",
            server=self.server,
            database=self.database,
        )
        return self._engine

    ctx.debug(
        "Creating SQLAlchemy engine",
        server=self.server,
        database=self.database,
    )

    try:
        from urllib.parse import quote_plus

        from sqlalchemy import create_engine
    except ImportError as e:
        ctx.error(
            "SQLAlchemy import failed",
            server=self.server,
            database=self.database,
            error=str(e),
        )
        raise ConnectionError(
            connection_name=f"AzureSQL({self.server})",
            reason="Required packages 'sqlalchemy' or 'pyodbc' not found.",
            suggestions=[
                "Install required packages: pip install sqlalchemy pyodbc",
                "Or install odibi with azure extras: pip install 'odibi[azure]'",
            ],
        )

    try:
        # Build connection string
        conn_str = self.odbc_dsn()
        connection_url = f"mssql+pyodbc:///?odbc_connect={quote_plus(conn_str)}"

        ctx.debug(
            "Creating SQLAlchemy engine with connection pooling",
            server=self.server,
            database=self.database,
        )

        # Create engine with connection pooling
        self._engine = create_engine(
            connection_url,
            pool_pre_ping=True,  # Verify connections before use
            pool_recycle=3600,  # Recycle connections after 1 hour
            echo=False,
        )

        # Test connection
        with self._engine.connect():
            pass

        ctx.info(
            "SQLAlchemy engine created successfully",
            server=self.server,
            database=self.database,
        )

        return self._engine

    except Exception as e:
        suggestions = self._get_error_suggestions(str(e))
        ctx.error(
            "Failed to create SQLAlchemy engine",
            server=self.server,
            database=self.database,
            error=str(e),
            suggestions=suggestions,
        )
        raise ConnectionError(
            connection_name=f"AzureSQL({self.server})",
            reason=f"Failed to create engine: {self._sanitize_error(str(e))}",
            suggestions=suggestions,
        )

get_freshness(dataset, timestamp_column=None)

Get data freshness information.

Parameters:

Name Type Description Default
dataset str

Table name (can be "schema.table" or just "table")

required
timestamp_column Optional[str]

Column to check for max timestamp (optional)

None

Returns:

Type Description
Dict

FreshnessResult dict with last_updated timestamp

Example

conn = AzureSQL(server="...", database="mydb") freshness = conn.get_freshness("dbo.orders", timestamp_column="order_date") print(freshness['last_updated']) 2024-03-15 10:30:00

Source code in odibi/connections/azure_sql.py
def get_freshness(
    self,
    dataset: str,
    timestamp_column: Optional[str] = None,
) -> Dict:
    """Get data freshness information.

    Args:
        dataset: Table name (can be "schema.table" or just "table")
        timestamp_column: Column to check for max timestamp (optional)

    Returns:
        FreshnessResult dict with last_updated timestamp

    Example:
        >>> conn = AzureSQL(server="...", database="mydb")
        >>> freshness = conn.get_freshness("dbo.orders", timestamp_column="order_date")
        >>> print(freshness['last_updated'])
        2024-03-15 10:30:00
    """
    ctx = get_logging_context()

    # Parse schema and table name
    if "." in dataset:
        parts = dataset.split(".")
        schema = parts[0]
        table_name = parts[1]
    else:
        schema = "dbo"
        table_name = dataset

    ctx.debug("Getting freshness", schema=schema, table=table_name, column=timestamp_column)

    dataset_ref = DatasetRef(
        name=table_name,
        namespace=schema,
        kind="table",
        path=f"{schema}.{table_name}",
    )

    # If timestamp column specified, query data
    if timestamp_column:
        try:
            query = f"SELECT MAX([{timestamp_column}]) AS max_ts FROM [{schema}].[{table_name}]"
            df = self.read_sql(query)

            if not df.empty and df["max_ts"].iloc[0] is not None:
                last_updated = pd.to_datetime(df["max_ts"].iloc[0])
                age_hours = (datetime.utcnow() - last_updated).total_seconds() / 3600

                result = FreshnessResult(
                    dataset=dataset_ref,
                    last_updated=last_updated,
                    source="data",
                    age_hours=round(age_hours, 2),
                    details={"timestamp_column": timestamp_column},
                )

                ctx.info(
                    "Freshness retrieved from data",
                    schema=schema,
                    table=table_name,
                    age_hours=age_hours,
                )
                return result.model_dump()
        except Exception as e:
            ctx.debug("Could not get freshness from data column", error=str(e))

    # Fallback to table metadata
    try:
        query = """
            SELECT t.modify_date
            FROM sys.tables t
            INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
            WHERE s.name = :schema AND t.name = :table
        """
        df = self.read_sql(query, params={"schema": schema, "table": table_name})

        if not df.empty and df["modify_date"].iloc[0] is not None:
            last_updated = pd.to_datetime(df["modify_date"].iloc[0])
            age_hours = (datetime.utcnow() - last_updated).total_seconds() / 3600

            result = FreshnessResult(
                dataset=dataset_ref,
                last_updated=last_updated,
                source="metadata",
                age_hours=round(age_hours, 2),
                details={"note": "Table modification time from sys.tables"},
            )

            ctx.info(
                "Freshness retrieved from metadata",
                schema=schema,
                table=table_name,
                age_hours=age_hours,
            )
            return result.model_dump()
    except Exception as e:
        ctx.error("Failed to get freshness", schema=schema, table=table_name, error=str(e))

    return {}

get_password()

Get password (cached).

Source code in odibi/connections/azure_sql.py
def get_password(self) -> Optional[str]:
    """Get password (cached)."""
    ctx = get_logging_context()

    if self.password:
        ctx.debug(
            "Using provided password",
            server=self.server,
            database=self.database,
        )
        return self.password

    if self._cached_key:
        ctx.debug(
            "Using cached password",
            server=self.server,
            database=self.database,
        )
        return self._cached_key

    if self.auth_mode == "key_vault":
        if not self.key_vault_name or not self.secret_name:
            ctx.error(
                "Key Vault mode requires key_vault_name and secret_name",
                server=self.server,
                database=self.database,
            )
            raise ValueError(
                f"key_vault mode requires 'key_vault_name' and 'secret_name' "
                f"for connection to {self.server}/{self.database}. "
                f"Got key_vault_name={self.key_vault_name or '(missing)'}, "
                f"secret_name={self.secret_name or '(missing)'}."
            )

        ctx.debug(
            "Fetching password from Key Vault",
            server=self.server,
            key_vault_name=self.key_vault_name,
            secret_name=self.secret_name,
        )

        try:
            from azure.identity import DefaultAzureCredential
            from azure.keyvault.secrets import SecretClient

            credential = DefaultAzureCredential()
            kv_uri = f"https://{self.key_vault_name}.vault.azure.net"
            client = SecretClient(vault_url=kv_uri, credential=credential)
            secret = client.get_secret(self.secret_name)
            self._cached_key = secret.value
            logger.register_secret(self._cached_key)

            ctx.info(
                "Successfully fetched password from Key Vault",
                server=self.server,
                key_vault_name=self.key_vault_name,
            )
            return self._cached_key
        except ImportError as e:
            ctx.error(
                "Key Vault support requires azure libraries",
                server=self.server,
                error=str(e),
            )
            raise ImportError(
                "Key Vault support requires 'azure-identity' and 'azure-keyvault-secrets'. "
                "Install with: pip install odibi[azure]"
            )

    ctx.debug(
        "No password required for auth_mode",
        server=self.server,
        auth_mode=self.auth_mode,
    )
    return None

get_path(relative_path)

Get table reference for relative path.

In Azure SQL, the relative path is the table reference itself (e.g., "schema.table" or "table"), so this method returns it as-is.

Parameters:

Name Type Description Default
relative_path str

Table reference (e.g., "dbo.users", "customers")

required

Returns:

Type Description
str

Same table reference unchanged

Source code in odibi/connections/azure_sql.py
def get_path(self, relative_path: str) -> str:
    """Get table reference for relative path.

    In Azure SQL, the relative path is the table reference itself
    (e.g., "schema.table" or "table"), so this method returns it as-is.

    Args:
        relative_path: Table reference (e.g., "dbo.users", "customers")

    Returns:
        Same table reference unchanged
    """
    return relative_path

get_spark_options()

Get Spark JDBC options.

Returns:

Type Description
Dict[str, str]

Dictionary of Spark JDBC options (url, user, password, etc.)

Source code in odibi/connections/azure_sql.py
def get_spark_options(self) -> Dict[str, str]:
    """Get Spark JDBC options.

    Returns:
        Dictionary of Spark JDBC options (url, user, password, etc.)
    """
    ctx = get_logging_context()
    ctx.info(
        "Building Spark JDBC options",
        server=self.server,
        database=self.database,
        auth_mode=self.auth_mode,
    )

    jdbc_url = (
        f"jdbc:sqlserver://{self.server}:{self.port};"
        f"databaseName={self.database};encrypt=true;"
        f"trustServerCertificate={'true' if self.trust_server_certificate else 'false'};"
    )

    if self.auth_mode == "aad_msi":
        jdbc_url += (
            "hostNameInCertificate=*.database.windows.net;"
            "loginTimeout=30;authentication=ActiveDirectoryMsi;"
        )
        ctx.debug(
            "Configured JDBC URL for AAD MSI",
            server=self.server,
        )
    elif self.auth_mode == "aad_service_principal":
        # Not fully implemented in init yet, but placeholder
        ctx.debug(
            "Configured JDBC URL for AAD Service Principal",
            server=self.server,
        )

    options = {
        "url": jdbc_url,
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
    }

    if self.auth_mode == "sql" or self.auth_mode == "key_vault":
        if self.username:
            options["user"] = self.username

        pwd = self.get_password()
        if pwd:
            options["password"] = pwd

        ctx.debug(
            "Added SQL authentication to Spark options",
            server=self.server,
            username=self.username,
        )

    ctx.info(
        "Spark JDBC options built successfully",
        server=self.server,
        database=self.database,
    )

    return options

get_table_info(table)

Get detailed schema information for a table.

Parameters:

Name Type Description Default
table str

Table name (can be "schema.table" or just "table")

required

Returns:

Type Description
Dict

Schema-like dict with dataset and columns info

Example

conn = AzureSQL(server="...", database="mydb") info = conn.get_table_info("dbo.customers") print(info['columns']) [{'name': 'customer_id', 'dtype': 'int', ...}, ...]

Source code in odibi/connections/azure_sql.py
def get_table_info(self, table: str) -> Dict:
    """Get detailed schema information for a table.

    Args:
        table: Table name (can be "schema.table" or just "table")

    Returns:
        Schema-like dict with dataset and columns info

    Example:
        >>> conn = AzureSQL(server="...", database="mydb")
        >>> info = conn.get_table_info("dbo.customers")
        >>> print(info['columns'])
        [{'name': 'customer_id', 'dtype': 'int', ...}, ...]
    """
    ctx = get_logging_context()

    # Parse schema and table name
    if "." in table:
        parts = table.split(".")
        schema = parts[0]
        table_name = parts[1]
    else:
        schema = "dbo"
        table_name = table

    ctx.debug("Getting table info", schema=schema, table=table_name)

    query = """
        SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE, ORDINAL_POSITION
        FROM INFORMATION_SCHEMA.COLUMNS
        WHERE TABLE_SCHEMA = :schema AND TABLE_NAME = :table
        ORDER BY ORDINAL_POSITION
    """

    try:
        df = self.read_sql(query, params={"schema": schema, "table": table_name})

        columns = []
        for _, row in df.iterrows():
            columns.append(
                {
                    "name": row["COLUMN_NAME"],
                    "dtype": row["DATA_TYPE"],
                    "nullable": row["IS_NULLABLE"] == "YES",
                }
            )

        # Try to get row count
        row_count = None
        try:
            count_query = """
                SELECT SUM(p.rows) AS row_count
                FROM sys.tables t
                INNER JOIN sys.partitions p ON t.object_id = p.object_id
                INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
                WHERE s.name = :schema AND t.name = :table
                AND p.index_id IN (0,1)
            """
            count_df = self.read_sql(
                count_query, params={"schema": schema, "table": table_name}
            )
            if not count_df.empty and count_df["row_count"].iloc[0] is not None:
                row_count = int(count_df["row_count"].iloc[0])
        except Exception:
            pass  # Row count is optional

        dataset = DatasetRef(
            name=table_name,
            namespace=schema,
            kind="table",
            path=f"{schema}.{table_name}",
            row_count=row_count,
        )

        schema_obj = Schema(dataset=dataset, columns=[Column(**c) for c in columns])

        ctx.info("Table info retrieved", schema=schema, table=table_name, columns=len(columns))
        return schema_obj.model_dump()

    except Exception as e:
        ctx.error("Failed to get table info", schema=schema, table=table_name, error=str(e))
        return {}

list_schemas()

List all schemas in the database.

Returns:

Type Description
List[str]

List of schema names

Example

conn = AzureSQL(server="...", database="mydb") schemas = conn.list_schemas() print(schemas) ['dbo', 'staging', 'warehouse']

Source code in odibi/connections/azure_sql.py
def list_schemas(self) -> List[str]:
    """List all schemas in the database.

    Returns:
        List of schema names

    Example:
        >>> conn = AzureSQL(server="...", database="mydb")
        >>> schemas = conn.list_schemas()
        >>> print(schemas)
        ['dbo', 'staging', 'warehouse']
    """
    ctx = get_logging_context()
    ctx.debug("Listing schemas", server=self.server, database=self.database)

    query = """
        SELECT SCHEMA_NAME
        FROM INFORMATION_SCHEMA.SCHEMATA
        WHERE SCHEMA_NAME NOT IN ('db_owner', 'db_accessadmin', 'db_securityadmin',
                                  'db_ddladmin', 'db_backupoperator', 'db_datareader',
                                  'db_datawriter', 'db_denydatareader', 'db_denydatawriter',
                                  'sys', 'INFORMATION_SCHEMA', 'guest')
        ORDER BY SCHEMA_NAME
    """

    try:
        df = self.read_sql(query)
        schemas = df["SCHEMA_NAME"].tolist()
        ctx.info("Schemas listed successfully", count=len(schemas))
        return schemas
    except Exception as e:
        ctx.error("Failed to list schemas", error=str(e))
        return []

list_tables(schema='dbo')

List tables and views in a schema.

Parameters:

Name Type Description Default
schema str

Schema name (default: "dbo")

'dbo'

Returns:

Type Description
List[Dict]

List of dicts with keys: name, type (table/view), schema

Example

conn = AzureSQL(server="...", database="mydb") tables = conn.list_tables("dbo") print(tables) [{'name': 'customers', 'type': 'table', 'schema': 'dbo'}, {'name': 'orders', 'type': 'table', 'schema': 'dbo'}]

Source code in odibi/connections/azure_sql.py
def list_tables(self, schema: str = "dbo") -> List[Dict]:
    """List tables and views in a schema.

    Args:
        schema: Schema name (default: "dbo")

    Returns:
        List of dicts with keys: name, type (table/view), schema

    Example:
        >>> conn = AzureSQL(server="...", database="mydb")
        >>> tables = conn.list_tables("dbo")
        >>> print(tables)
        [{'name': 'customers', 'type': 'table', 'schema': 'dbo'},
         {'name': 'orders', 'type': 'table', 'schema': 'dbo'}]
    """
    ctx = get_logging_context()
    ctx.debug("Listing tables", schema=schema)

    query = """
        SELECT TABLE_NAME, TABLE_TYPE, TABLE_SCHEMA
        FROM INFORMATION_SCHEMA.TABLES
        WHERE TABLE_SCHEMA = :schema
        ORDER BY TABLE_NAME
    """

    try:
        df = self.read_sql(query, params={"schema": schema})
        tables = []
        for _, row in df.iterrows():
            tables.append(
                {
                    "name": row["TABLE_NAME"],
                    "type": "table" if row["TABLE_TYPE"] == "BASE TABLE" else "view",
                    "schema": row["TABLE_SCHEMA"],
                }
            )
        ctx.info("Tables listed successfully", schema=schema, count=len(tables))
        return tables
    except Exception as e:
        ctx.error("Failed to list tables", schema=schema, error=str(e))
        return []

odbc_dsn()

Build ODBC connection string.

Returns:

Type Description
str

ODBC DSN string

Example

conn = AzureSQL(server="myserver.database.windows.net", database="mydb") conn.odbc_dsn() 'Driver={ODBC Driver 18 for SQL Server};Server=tcp:myserver...'

Source code in odibi/connections/azure_sql.py
def odbc_dsn(self) -> str:
    """Build ODBC connection string.

    Returns:
        ODBC DSN string

    Example:
        >>> conn = AzureSQL(server="myserver.database.windows.net", database="mydb")
        >>> conn.odbc_dsn()
        'Driver={ODBC Driver 18 for SQL Server};Server=tcp:myserver...'
    """
    ctx = get_logging_context()
    ctx.debug(
        "Building ODBC connection string",
        server=self.server,
        database=self.database,
        auth_mode=self.auth_mode,
    )

    dsn = (
        f"Driver={{{self.driver}}};"
        f"Server=tcp:{self.server},{self.port};"
        f"Database={self.database};"
        f"Encrypt=yes;"
        f"TrustServerCertificate={'yes' if self.trust_server_certificate else 'no'};"
        f"Connection Timeout={self.timeout};"
    )

    pwd = self.get_password()
    if self.username and pwd:
        dsn += f"UID={self.username};PWD={pwd};"
        ctx.debug(
            "Using SQL authentication",
            server=self.server,
            username=self.username,
        )
    elif self.auth_mode == "aad_msi":
        dsn += "Authentication=ActiveDirectoryMsi;"
        ctx.debug(
            "Using AAD Managed Identity authentication",
            server=self.server,
        )
    elif self.auth_mode == "aad_service_principal":
        # Not fully supported via ODBC string simply without token usually
        ctx.debug(
            "Using AAD Service Principal authentication",
            server=self.server,
        )

    return dsn

preview(dataset, rows=5, columns=None)

Preview sample rows from a SQL table.

Parameters:

Name Type Description Default
dataset str

Table name (can be "schema.table" or just "table")

required
rows int

Number of rows to return (default: 5, max: 100)

5
columns Optional[List[str]]

Specific columns to include (None = all)

None

Returns:

Type Description
Dict[str, Any]

PreviewResult dict with sample rows

Example

conn = AzureSQL(server="...", database="mydb") preview = conn.preview("dbo.customers", rows=10) for row in preview['rows']: ... print(row)

Source code in odibi/connections/azure_sql.py
def preview(
    self, dataset: str, rows: int = 5, columns: Optional[List[str]] = None
) -> Dict[str, Any]:
    """Preview sample rows from a SQL table.

    Args:
        dataset: Table name (can be "schema.table" or just "table")
        rows: Number of rows to return (default: 5, max: 100)
        columns: Specific columns to include (None = all)

    Returns:
        PreviewResult dict with sample rows

    Example:
        >>> conn = AzureSQL(server="...", database="mydb")
        >>> preview = conn.preview("dbo.customers", rows=10)
        >>> for row in preview['rows']:
        ...     print(row)
    """
    ctx = get_logging_context()

    max_rows = min(rows, 100)

    # Parse schema and table name
    if "." in dataset:
        parts = dataset.split(".")
        schema = parts[0]
        table_name = parts[1]
    else:
        schema = "dbo"
        table_name = dataset

    ctx.info("Previewing table", schema=schema, table=table_name, rows=max_rows)

    try:
        col_filter = "*"
        if columns:
            col_filter = ", ".join(f"[{c}]" for c in columns)

        query = f"SELECT TOP ({max_rows}) {col_filter} FROM [{schema}].[{table_name}]"
        df = self.read_sql(query)

        # Get total row count
        total_rows = None
        try:
            count_query = """
                SELECT SUM(p.rows) AS row_count
                FROM sys.tables t
                INNER JOIN sys.partitions p ON t.object_id = p.object_id
                INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
                WHERE s.name = :schema AND t.name = :table
                AND p.index_id IN (0,1)
            """
            count_df = self.read_sql(
                count_query, params={"schema": schema, "table": table_name}
            )
            if not count_df.empty and count_df["row_count"].iloc[0] is not None:
                total_rows = int(count_df["row_count"].iloc[0])
        except Exception:
            pass

        result = PreviewResult(
            dataset=DatasetRef(
                name=table_name,
                namespace=schema,
                kind="table",
                path=f"{schema}.{table_name}",
                row_count=total_rows,
            ),
            columns=df.columns.tolist(),
            rows=df.to_dict(orient="records"),
            total_rows=total_rows,
            truncated=(total_rows or 0) > max_rows,
        )

        ctx.info("Preview complete", schema=schema, table=table_name, rows_returned=len(df))
        return result.model_dump()

    except Exception as e:
        ctx.error("Failed to preview table", schema=schema, table=table_name, error=str(e))
        return PreviewResult(
            dataset=DatasetRef(name=table_name, namespace=schema, kind="table"),
        ).model_dump()

profile(dataset, sample_rows=1000, columns=None)

Profile a table with statistical analysis.

Parameters:

Name Type Description Default
dataset str

Table name (can be "schema.table" or just "table")

required
sample_rows int

Number of rows to sample (default: 1000)

1000
columns Optional[List[str]]

Specific columns to profile (None = all columns)

None

Returns:

Type Description
Dict

TableProfile dict with profiling statistics

Example

conn = AzureSQL(server="...", database="mydb") profile = conn.profile("dbo.customers", sample_rows=5000) print(profile['candidate_keys']) ['customer_id']

Source code in odibi/connections/azure_sql.py
def profile(
    self,
    dataset: str,
    sample_rows: int = 1000,
    columns: Optional[List[str]] = None,
) -> Dict:
    """Profile a table with statistical analysis.

    Args:
        dataset: Table name (can be "schema.table" or just "table")
        sample_rows: Number of rows to sample (default: 1000)
        columns: Specific columns to profile (None = all columns)

    Returns:
        TableProfile dict with profiling statistics

    Example:
        >>> conn = AzureSQL(server="...", database="mydb")
        >>> profile = conn.profile("dbo.customers", sample_rows=5000)
        >>> print(profile['candidate_keys'])
        ['customer_id']
    """
    ctx = get_logging_context()

    # Parse schema and table name
    if "." in dataset:
        parts = dataset.split(".")
        schema = parts[0]
        table_name = parts[1]
    else:
        schema = "dbo"
        table_name = dataset

    ctx.info("Profiling table", schema=schema, table=table_name, sample_rows=sample_rows)

    # Read sample data
    col_filter = ", ".join(f"[{c}]" for c in columns) if columns else "*"
    query = f"SELECT TOP ({sample_rows}) {col_filter} FROM [{schema}].[{table_name}]"

    try:
        df = self.read_sql(query)

        # Get total row count
        total_rows = None
        try:
            count_query = """
                SELECT SUM(p.rows) AS row_count
                FROM sys.tables t
                INNER JOIN sys.partitions p ON t.object_id = p.object_id
                INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
                WHERE s.name = :schema AND t.name = :table
                AND p.index_id IN (0,1)
            """
            count_df = self.read_sql(
                count_query, params={"schema": schema, "table": table_name}
            )
            if not count_df.empty and count_df["row_count"].iloc[0] is not None:
                total_rows = int(count_df["row_count"].iloc[0])
        except Exception:
            total_rows = len(df)

        # Profile columns
        profiled_columns = []
        candidate_keys = []
        candidate_watermarks = []

        for col in df.columns:
            null_count = int(df[col].isnull().sum())
            null_pct = null_count / len(df) if len(df) > 0 else 0
            distinct_count = int(df[col].nunique())

            # Determine cardinality
            if distinct_count == len(df):
                cardinality = "unique"
                candidate_keys.append(col)
            elif distinct_count > len(df) * 0.9:
                cardinality = "high"
            elif distinct_count < 10:
                cardinality = "low"
            else:
                cardinality = "medium"

            # Check if datetime (candidate watermark)
            if pd.api.types.is_datetime64_any_dtype(df[col]):
                candidate_watermarks.append(col)

            # Get sample values (non-null)
            sample_values = df[col].dropna().head(5).tolist()

            profiled_columns.append(
                Column(
                    name=col,
                    dtype=str(df[col].dtype),
                    nullable=null_count > 0,
                    null_count=null_count,
                    null_pct=round(null_pct, 4),
                    cardinality=cardinality,
                    distinct_count=distinct_count,
                    sample_values=sample_values,
                )
            )

        # Calculate overall completeness
        total_cells = len(df) * len(df.columns)
        null_cells = df.isnull().sum().sum()
        completeness = 1 - (null_cells / total_cells) if total_cells > 0 else 0

        dataset_ref = DatasetRef(
            name=table_name,
            namespace=schema,
            kind="table",
            path=f"{schema}.{table_name}",
            row_count=total_rows,
        )

        profile = TableProfile(
            dataset=dataset_ref,
            rows_sampled=len(df),
            total_rows=total_rows,
            columns=profiled_columns,
            candidate_keys=candidate_keys,
            candidate_watermarks=candidate_watermarks,
            completeness=round(completeness, 4),
            suggestions=[
                (
                    f"Sampled {len(df)} of {total_rows} rows"
                    if total_rows
                    else f"Sampled {len(df)} rows"
                ),
                (
                    f"Found {len(candidate_keys)} candidate key columns: {candidate_keys}"
                    if candidate_keys
                    else "No unique key columns found"
                ),
                (
                    f"Found {len(candidate_watermarks)} timestamp columns: {candidate_watermarks}"
                    if candidate_watermarks
                    else "No timestamp columns for incremental loading"
                ),
            ],
        )

        ctx.info(
            "Table profiling complete",
            schema=schema,
            table=table_name,
            rows_sampled=len(df),
            columns=len(profiled_columns),
            candidate_keys=len(candidate_keys),
        )

        return profile.model_dump()

    except Exception as e:
        ctx.error("Failed to profile table", schema=schema, table=table_name, error=str(e))
        return {}

qualify_table(table_name, schema='')

Build a SQL Server qualified table reference.

Source code in odibi/connections/azure_sql.py
def qualify_table(self, table_name: str, schema: str = "") -> str:
    """Build a SQL Server qualified table reference."""
    schema = schema or self.default_schema
    if schema:
        return f"[{schema}].[{table_name}]"
    return f"[{table_name}]"

quote_identifier(name)

Quote an identifier using SQL Server bracket notation.

Source code in odibi/connections/azure_sql.py
def quote_identifier(self, name: str) -> str:
    """Quote an identifier using SQL Server bracket notation."""
    return f"[{name}]"

read_sql(query, params=None)

Execute SQL query and return results as DataFrame.

Parameters:

Name Type Description Default
query str

SQL query string

required
params Optional[Dict[str, Any]]

Optional query parameters for parameterized queries

None

Returns:

Type Description
DataFrame

Query results as pandas DataFrame

Raises:

Type Description
ConnectionError

If execution fails

Source code in odibi/connections/azure_sql.py
def read_sql(self, query: str, params: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
    """
    Execute SQL query and return results as DataFrame.

    Args:
        query: SQL query string
        params: Optional query parameters for parameterized queries

    Returns:
        Query results as pandas DataFrame

    Raises:
        ConnectionError: If execution fails
    """
    ctx = get_logging_context()
    ctx.debug(
        "Executing SQL query",
        server=self.server,
        database=self.database,
        query_length=len(query),
    )

    try:
        engine = self.get_engine()
        # Use SQLAlchemy connection directly (preferred by pandas)
        with engine.connect() as conn:
            result = pd.read_sql(query, conn, params=params)

        ctx.info(
            "SQL query executed successfully",
            server=self.server,
            database=self.database,
            rows_returned=len(result),
        )
        return result
    except Exception as e:
        if isinstance(e, ConnectionError):
            raise
        ctx.error(
            "SQL query execution failed",
            server=self.server,
            database=self.database,
            error=str(e),
        )
        raise ConnectionError(
            connection_name=f"AzureSQL({self.server})",
            reason=f"Query execution failed: {self._sanitize_error(str(e))}",
            suggestions=self._get_error_suggestions(str(e)),
        )

read_sql_query(query, params=None)

Execute a SQL query and return results as DataFrame.

Use this for custom SELECT queries (e.g., to exclude unsupported columns).

Parameters:

Name Type Description Default
query str

SQL SELECT query

required
params Optional[Dict[str, Any]]

Optional parameters for parameterized query

None

Returns:

Type Description
DataFrame

Query results as pandas DataFrame

Source code in odibi/connections/azure_sql.py
def read_sql_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
    """
    Execute a SQL query and return results as DataFrame.

    Use this for custom SELECT queries (e.g., to exclude unsupported columns).

    Args:
        query: SQL SELECT query
        params: Optional parameters for parameterized query

    Returns:
        Query results as pandas DataFrame
    """
    return self.read_sql(query, params)

read_table(table_name, schema='dbo')

Read entire table into DataFrame.

Parameters:

Name Type Description Default
table_name str

Name of the table

required
schema Optional[str]

Schema name (default: dbo)

'dbo'

Returns:

Type Description
DataFrame

Table contents as pandas DataFrame

Source code in odibi/connections/azure_sql.py
def read_table(self, table_name: str, schema: Optional[str] = "dbo") -> pd.DataFrame:
    """
    Read entire table into DataFrame.

    Args:
        table_name: Name of the table
        schema: Schema name (default: dbo)

    Returns:
        Table contents as pandas DataFrame
    """
    ctx = get_logging_context()
    ctx.info(
        "Reading table",
        server=self.server,
        database=self.database,
        table_name=table_name,
        schema=schema,
    )

    if schema:
        query = f"SELECT * FROM [{schema}].[{table_name}]"
    else:
        query = f"SELECT * FROM [{table_name}]"

    return self.read_sql(query)

relationships(schema=None)

Discover foreign key relationships in the database.

Parameters:

Name Type Description Default
schema Optional[str]

Limit to specific schema (default: all schemas)

None

Returns:

Type Description
List[Dict[str, Any]]

List of Relationship dicts with parent/child table info and key columns

Example

conn = AzureSQL(server="...", database="mydb") rels = conn.relationships("dbo") for rel in rels: ... print(f"{rel['child']['name']} -> {rel['parent']['name']}")

Source code in odibi/connections/azure_sql.py
def relationships(self, schema: Optional[str] = None) -> List[Dict[str, Any]]:
    """Discover foreign key relationships in the database.

    Args:
        schema: Limit to specific schema (default: all schemas)

    Returns:
        List of Relationship dicts with parent/child table info and key columns

    Example:
        >>> conn = AzureSQL(server="...", database="mydb")
        >>> rels = conn.relationships("dbo")
        >>> for rel in rels:
        ...     print(f"{rel['child']['name']} -> {rel['parent']['name']}")
    """
    ctx = get_logging_context()
    ctx.info("Discovering relationships", schema=schema or "all")

    query = """
        SELECT
            fk.name AS fk_name,
            ps.name AS parent_schema,
            pt.name AS parent_table,
            pc.name AS parent_column,
            cs.name AS child_schema,
            ct.name AS child_table,
            cc.name AS child_column
        FROM sys.foreign_keys fk
        INNER JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id
        INNER JOIN sys.tables pt ON fk.referenced_object_id = pt.object_id
        INNER JOIN sys.schemas ps ON pt.schema_id = ps.schema_id
        INNER JOIN sys.columns pc ON fkc.referenced_object_id = pc.object_id
            AND fkc.referenced_column_id = pc.column_id
        INNER JOIN sys.tables ct ON fk.parent_object_id = ct.object_id
        INNER JOIN sys.schemas cs ON ct.schema_id = cs.schema_id
        INNER JOIN sys.columns cc ON fkc.parent_object_id = cc.object_id
            AND fkc.parent_column_id = cc.column_id
    """

    if schema:
        query += "\n            WHERE cs.name = :schema OR ps.name = :schema"

    query += "\n            ORDER BY fk.name, fkc.constraint_column_id"

    try:
        params = {"schema": schema} if schema else None
        df = self.read_sql(query, params=params)

        if df.empty:
            ctx.info("No foreign key relationships found", schema=schema or "all")
            return []

        # Group by FK name to build relationships
        relationships = []
        for fk_name, group in df.groupby("fk_name"):
            first = group.iloc[0]
            keys = [(row["parent_column"], row["child_column"]) for _, row in group.iterrows()]

            rel = Relationship(
                parent=DatasetRef(
                    name=first["parent_table"],
                    namespace=first["parent_schema"],
                    kind="table",
                    path=f"{first['parent_schema']}.{first['parent_table']}",
                ),
                child=DatasetRef(
                    name=first["child_table"],
                    namespace=first["child_schema"],
                    kind="table",
                    path=f"{first['child_schema']}.{first['child_table']}",
                ),
                keys=keys,
                source="declared",
                confidence=1.0,
                details={"constraint_name": fk_name},
            )
            relationships.append(rel.model_dump())

        ctx.info("Relationships discovered", count=len(relationships))
        return relationships

    except Exception as e:
        ctx.error("Failed to discover relationships", error=str(e))
        return []

validate()

Validate Azure SQL connection configuration.

Source code in odibi/connections/azure_sql.py
def validate(self) -> None:
    """Validate Azure SQL connection configuration."""
    ctx = get_logging_context()
    ctx.debug(
        "Validating AzureSQL connection",
        server=self.server,
        database=self.database,
        auth_mode=self.auth_mode,
    )

    if not self.server:
        ctx.error("AzureSQL validation failed: missing 'server'")
        raise ValueError(
            "Azure SQL connection requires 'server'. "
            "Provide the SQL server hostname (e.g., server: 'myserver.database.windows.net')."
        )
    if not self.database:
        ctx.error(
            "AzureSQL validation failed: missing 'database'",
            server=self.server,
        )
        raise ValueError(
            f"Azure SQL connection requires 'database' for server '{self.server}'."
        )

    if self.auth_mode == "sql":
        if not self.username:
            ctx.error(
                "AzureSQL validation failed: SQL auth requires username",
                server=self.server,
                database=self.database,
            )
            raise ValueError(
                f"Azure SQL with auth_mode='sql' requires 'username' "
                f"for connection to {self.server}/{self.database}."
            )
        if not self.password and not (self.key_vault_name and self.secret_name):
            ctx.error(
                "AzureSQL validation failed: SQL auth requires password",
                server=self.server,
                database=self.database,
            )
            raise ValueError(
                "Azure SQL with auth_mode='sql' requires password "
                "(or key_vault_name/secret_name)"
            )

    if self.auth_mode == "key_vault":
        if not self.key_vault_name or not self.secret_name:
            ctx.error(
                "AzureSQL validation failed: key_vault mode missing config",
                server=self.server,
                database=self.database,
            )
            raise ValueError(
                "Azure SQL with auth_mode='key_vault' requires key_vault_name and secret_name"
            )
        if not self.username:
            ctx.error(
                "AzureSQL validation failed: key_vault mode requires username",
                server=self.server,
                database=self.database,
            )
            raise ValueError("Azure SQL with auth_mode='key_vault' requires username")

    ctx.info(
        "AzureSQL connection validated successfully",
        server=self.server,
        database=self.database,
        auth_mode=self.auth_mode,
    )

write_table(df, table_name, schema='dbo', if_exists='replace', index=False, chunksize=1000)

Write DataFrame to SQL table.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to write

required
table_name str

Name of the table

required
schema Optional[str]

Schema name (default: dbo)

'dbo'
if_exists str

How to behave if table exists ('fail', 'replace', 'append')

'replace'
index bool

Whether to write DataFrame index as column

False
chunksize Optional[int]

Number of rows to write in each batch (default: 1000)

1000

Returns:

Type Description
int

Number of rows written

Raises:

Type Description
ConnectionError

If write fails

Source code in odibi/connections/azure_sql.py
def write_table(
    self,
    df: pd.DataFrame,
    table_name: str,
    schema: Optional[str] = "dbo",
    if_exists: str = "replace",
    index: bool = False,
    chunksize: Optional[int] = 1000,
) -> int:
    """
    Write DataFrame to SQL table.

    Args:
        df: DataFrame to write
        table_name: Name of the table
        schema: Schema name (default: dbo)
        if_exists: How to behave if table exists ('fail', 'replace', 'append')
        index: Whether to write DataFrame index as column
        chunksize: Number of rows to write in each batch (default: 1000)

    Returns:
        Number of rows written

    Raises:
        ConnectionError: If write fails
    """
    ctx = get_logging_context()
    ctx.info(
        "Writing DataFrame to table",
        server=self.server,
        database=self.database,
        table_name=table_name,
        schema=schema,
        rows=len(df),
        if_exists=if_exists,
        chunksize=chunksize,
    )

    try:
        engine = self.get_engine()

        rows_written = df.to_sql(
            name=table_name,
            con=engine,
            schema=schema,
            if_exists=if_exists,
            index=index,
            chunksize=chunksize,
            method="multi",  # Use multi-row INSERT for better performance
        )

        result_rows = rows_written if rows_written is not None else len(df)
        ctx.info(
            "Table write completed successfully",
            server=self.server,
            database=self.database,
            table_name=table_name,
            rows_written=result_rows,
        )
        return result_rows
    except Exception as e:
        if isinstance(e, ConnectionError):
            raise
        ctx.error(
            "Table write failed",
            server=self.server,
            database=self.database,
            table_name=table_name,
            error=str(e),
        )
        raise ConnectionError(
            connection_name=f"AzureSQL({self.server})",
            reason=f"Write operation failed: {self._sanitize_error(str(e))}",
            suggestions=self._get_error_suggestions(str(e)),
        )

odibi.connections.postgres

PostgreSQL Database Connection

Provides connectivity to PostgreSQL databases with standard authentication.

PostgreSQLConnection

Bases: BaseConnection

PostgreSQL database connection.

Supports: - Standard authentication (username/password) - SSL connections - Connection pooling via SQLAlchemy - Read/write operations - Spark JDBC integration

Source code in odibi/connections/postgres.py
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
class PostgreSQLConnection(BaseConnection):
    """
    PostgreSQL database connection.

    Supports:
    - Standard authentication (username/password)
    - SSL connections
    - Connection pooling via SQLAlchemy
    - Read/write operations
    - Spark JDBC integration
    """

    sql_dialect = "postgres"
    default_schema = "public"

    def __init__(
        self,
        host: str,
        database: str,
        username: Optional[str] = None,
        password: Optional[str] = None,
        port: int = 5432,
        timeout: int = 30,
        sslmode: str = "prefer",
        **kwargs,
    ):
        """
        Initialize PostgreSQL connection.

        Args:
            host: PostgreSQL server hostname
            database: Database name
            username: Database username
            password: Database password
            port: PostgreSQL port (default: 5432)
            timeout: Connection timeout in seconds (default: 30)
            sslmode: SSL mode ('disable', 'allow', 'prefer', 'require',
                     'verify-ca', 'verify-full'). Default: 'prefer'
        """
        ctx = get_logging_context()
        ctx.log_connection(
            connection_type="postgres",
            connection_name=f"{host}/{database}",
            action="init",
            server=host,
            database=database,
            port=port,
        )

        self.host = host
        self.database = database
        self.username = username
        self.password = password
        self.port = port
        self.timeout = timeout
        self.sslmode = sslmode
        self._engine = None

        ctx.debug(
            "PostgreSQL connection initialized",
            server=host,
            database=database,
            port=port,
            sslmode=sslmode,
        )

    def get_path(self, relative_path: str) -> str:
        """Get table reference for relative path.

        In PostgreSQL, the relative path is the table reference itself
        (e.g., "schema.table" or "table"), so this method returns it as-is.
        """
        return relative_path

    def validate(self) -> None:
        """Validate PostgreSQL connection configuration."""
        ctx = get_logging_context()
        ctx.debug(
            "Validating PostgreSQL connection",
            server=self.host,
            database=self.database,
        )

        if not self.host:
            ctx.error("PostgreSQL validation failed: missing 'host'")
            raise ValueError(
                "PostgreSQL connection requires 'host'. "
                "Provide the server hostname (e.g., host: 'localhost')."
            )
        if not self.database:
            ctx.error(
                "PostgreSQL validation failed: missing 'database'",
                server=self.host,
            )
            raise ValueError(f"PostgreSQL connection requires 'database' for host '{self.host}'.")

        ctx.info(
            "PostgreSQL connection validated successfully",
            server=self.host,
            database=self.database,
        )

    def get_engine(self) -> Any:
        """
        Get or create SQLAlchemy engine.

        Returns:
            SQLAlchemy engine instance

        Raises:
            ConnectionError: If connection fails or drivers missing
        """
        ctx = get_logging_context()

        if self._engine is not None:
            ctx.debug(
                "Using cached SQLAlchemy engine",
                server=self.host,
                database=self.database,
            )
            return self._engine

        ctx.debug(
            "Creating SQLAlchemy engine",
            server=self.host,
            database=self.database,
        )

        try:
            from sqlalchemy import create_engine
        except ImportError as e:
            ctx.error(
                "SQLAlchemy import failed",
                server=self.host,
                database=self.database,
                error=str(e),
            )
            raise ConnectionError(
                connection_name=f"PostgreSQL({self.host})",
                reason="Required package 'sqlalchemy' not found.",
                suggestions=[
                    "Install required packages: pip install sqlalchemy psycopg2-binary",
                    "Or install odibi with postgres extras: pip install 'odibi[postgres]'",
                ],
            )

        try:
            from urllib.parse import quote_plus

            # Build PostgreSQL connection URL
            if self.username and self.password:
                user_part = f"{quote_plus(self.username)}:{quote_plus(self.password)}@"
            elif self.username:
                user_part = f"{quote_plus(self.username)}@"
            else:
                user_part = ""

            connection_url = (
                f"postgresql+psycopg2://{user_part}{self.host}:{self.port}/{self.database}"
            )

            connect_args = {}
            if self.sslmode != "prefer":
                connect_args["sslmode"] = self.sslmode
            connect_args["connect_timeout"] = self.timeout

            ctx.debug(
                "Creating SQLAlchemy engine with connection pooling",
                server=self.host,
                database=self.database,
            )

            self._engine = create_engine(
                connection_url,
                pool_pre_ping=True,
                pool_recycle=3600,
                echo=False,
                connect_args=connect_args,
            )

            # Test connection
            with self._engine.connect():
                pass

            ctx.info(
                "SQLAlchemy engine created successfully",
                server=self.host,
                database=self.database,
            )

            return self._engine

        except Exception as e:
            ctx.error(
                "Failed to create SQLAlchemy engine",
                server=self.host,
                database=self.database,
                error=str(e),
            )
            raise ConnectionError(
                connection_name=f"PostgreSQL({self.host})",
                reason=f"Failed to create engine: {self._sanitize_error(str(e))}",
                suggestions=self._get_error_suggestions(str(e)),
            )

    def read_sql(self, query: str, params: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
        """
        Execute SQL query and return results as DataFrame.

        Args:
            query: SQL query string
            params: Optional query parameters for parameterized queries

        Returns:
            Query results as pandas DataFrame
        """
        ctx = get_logging_context()
        ctx.debug(
            "Executing SQL query",
            server=self.host,
            database=self.database,
            query_length=len(query),
        )

        try:
            engine = self.get_engine()
            with engine.connect() as conn:
                result = pd.read_sql(query, conn, params=params)

            ctx.info(
                "SQL query executed successfully",
                server=self.host,
                database=self.database,
                rows_returned=len(result),
            )
            return result
        except Exception as e:
            if isinstance(e, ConnectionError):
                raise
            ctx.error(
                "SQL query execution failed",
                server=self.host,
                database=self.database,
                error=str(e),
            )
            raise ConnectionError(
                connection_name=f"PostgreSQL({self.host})",
                reason=f"Query execution failed: {self._sanitize_error(str(e))}",
                suggestions=self._get_error_suggestions(str(e)),
            )

    def read_table(self, table_name: str, schema: Optional[str] = None) -> pd.DataFrame:
        """
        Read entire table into DataFrame.

        Args:
            table_name: Name of the table
            schema: Schema name (default: public)

        Returns:
            Table contents as pandas DataFrame
        """
        schema = schema or self.default_schema
        ctx = get_logging_context()
        ctx.info(
            "Reading table",
            server=self.host,
            database=self.database,
            table_name=table_name,
            schema=schema,
        )

        query = self.build_select_query(table_name, schema)
        return self.read_sql(query)

    def read_sql_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
        """Execute a SQL query and return results as DataFrame."""
        return self.read_sql(query, params)

    def write_table(
        self,
        df: pd.DataFrame,
        table_name: str,
        schema: Optional[str] = None,
        if_exists: str = "replace",
        index: bool = False,
        chunksize: Optional[int] = 1000,
    ) -> int:
        """
        Write DataFrame to SQL table.

        Args:
            df: DataFrame to write
            table_name: Name of the table
            schema: Schema name (default: public)
            if_exists: How to behave if table exists ('fail', 'replace', 'append')
            index: Whether to write DataFrame index as column
            chunksize: Number of rows to write in each batch

        Returns:
            Number of rows written
        """
        schema = schema or self.default_schema
        ctx = get_logging_context()
        ctx.info(
            "Writing DataFrame to table",
            server=self.host,
            database=self.database,
            table_name=table_name,
            schema=schema,
            rows=len(df),
            if_exists=if_exists,
        )

        try:
            engine = self.get_engine()

            rows_written = df.to_sql(
                name=table_name,
                con=engine,
                schema=schema,
                if_exists=if_exists,
                index=index,
                chunksize=chunksize,
                method="multi",
            )

            result_rows = rows_written if rows_written is not None else len(df)
            ctx.info(
                "Table write completed successfully",
                server=self.host,
                database=self.database,
                table_name=table_name,
                rows_written=result_rows,
            )
            return result_rows
        except Exception as e:
            if isinstance(e, ConnectionError):
                raise
            ctx.error(
                "Table write failed",
                server=self.host,
                database=self.database,
                table_name=table_name,
                error=str(e),
            )
            raise ConnectionError(
                connection_name=f"PostgreSQL({self.host})",
                reason=f"Write operation failed: {self._sanitize_error(str(e))}",
                suggestions=self._get_error_suggestions(str(e)),
            )

    def execute_sql(self, sql: str, params: Optional[Dict[str, Any]] = None) -> Any:
        """Execute SQL statement. Alias for execute()."""
        return self.execute(sql, params)

    def execute(self, sql: str, params: Optional[Dict[str, Any]] = None) -> Any:
        """
        Execute SQL statement (INSERT, UPDATE, DELETE, etc.).

        Args:
            sql: SQL statement
            params: Optional parameters for parameterized query

        Returns:
            Result from execution
        """
        ctx = get_logging_context()
        ctx.debug(
            "Executing SQL statement",
            server=self.host,
            database=self.database,
            statement_length=len(sql),
        )

        try:
            engine = self.get_engine()
            from sqlalchemy import text

            with engine.begin() as conn:
                result = conn.execute(text(sql), params or {})
                if result.returns_rows:
                    rows = result.fetchall()
                else:
                    rows = None

                ctx.info(
                    "SQL statement executed successfully",
                    server=self.host,
                    database=self.database,
                )
                return rows
        except Exception as e:
            if isinstance(e, ConnectionError):
                raise
            ctx.error(
                "SQL statement execution failed",
                server=self.host,
                database=self.database,
                error=str(e),
            )
            raise ConnectionError(
                connection_name=f"PostgreSQL({self.host})",
                reason=f"Statement execution failed: {self._sanitize_error(str(e))}",
                suggestions=self._get_error_suggestions(str(e)),
            )

    # -------------------------------------------------------------------------
    # SQL Dialect Helpers
    # -------------------------------------------------------------------------

    def quote_identifier(self, name: str) -> str:
        """Quote an identifier using PostgreSQL double-quote notation."""
        return f'"{name}"'

    def qualify_table(self, table_name: str, schema: str = "") -> str:
        """Build a PostgreSQL qualified table reference."""
        schema = schema or self.default_schema
        if schema:
            return f'"{schema}"."{table_name}"'
        return f'"{table_name}"'

    def build_select_query(
        self,
        table_name: str,
        schema: str = "",
        where: str = "",
        limit: int = -1,
        columns: str = "*",
    ) -> str:
        """Build a SELECT query using PostgreSQL syntax."""
        qualified = self.qualify_table(table_name, schema)
        query = f"SELECT {columns} FROM {qualified}"
        if where:
            query += f" WHERE {where}"
        if limit >= 0:
            query += f" LIMIT {limit}"
        return query

    # -------------------------------------------------------------------------
    # Discovery Methods
    # -------------------------------------------------------------------------

    def list_schemas(self) -> List[str]:
        """List all schemas in the database."""
        ctx = get_logging_context()
        ctx.debug("Listing schemas", server=self.host, database=self.database)

        query = """
            SELECT schema_name
            FROM information_schema.schemata
            WHERE schema_name NOT IN (
                'pg_catalog', 'information_schema', 'pg_toast', 'pg_temp_1', 'pg_toast_temp_1'
            )
            AND schema_name NOT LIKE 'pg_temp_%'
            AND schema_name NOT LIKE 'pg_toast_temp_%'
            ORDER BY schema_name
        """

        try:
            df = self.read_sql(query)
            schemas = df["schema_name"].tolist()
            ctx.info("Schemas listed successfully", count=len(schemas))
            return schemas
        except Exception as e:
            ctx.error("Failed to list schemas", error=str(e))
            return []

    def list_tables(self, schema: str = "public") -> List[Dict]:
        """List tables and views in a schema."""
        ctx = get_logging_context()
        ctx.debug("Listing tables", schema=schema)

        query = """
            SELECT table_name, table_type, table_schema
            FROM information_schema.tables
            WHERE table_schema = :schema
            ORDER BY table_name
        """

        try:
            df = self.read_sql(query, params={"schema": schema})
            tables = []
            for _, row in df.iterrows():
                tables.append(
                    {
                        "name": row["table_name"],
                        "type": "table" if row["table_type"] == "BASE TABLE" else "view",
                        "schema": row["table_schema"],
                    }
                )
            ctx.info("Tables listed successfully", schema=schema, count=len(tables))
            return tables
        except Exception as e:
            ctx.error("Failed to list tables", schema=schema, error=str(e))
            return []

    def get_table_info(self, table: str) -> Dict:
        """Get detailed schema information for a table."""
        ctx = get_logging_context()

        if "." in table:
            parts = table.split(".")
            schema = parts[0]
            table_name = parts[1]
        else:
            schema = "public"
            table_name = table

        ctx.debug("Getting table info", schema=schema, table=table_name)

        query = """
            SELECT column_name, data_type, is_nullable, ordinal_position
            FROM information_schema.columns
            WHERE table_schema = :schema AND table_name = :table
            ORDER BY ordinal_position
        """

        try:
            df = self.read_sql(query, params={"schema": schema, "table": table_name})

            columns = []
            for _, row in df.iterrows():
                columns.append(
                    {
                        "name": row["column_name"],
                        "dtype": row["data_type"],
                        "nullable": row["is_nullable"] == "YES",
                    }
                )

            # Try to get approximate row count
            row_count = None
            try:
                count_query = """
                    SELECT n_live_tup AS row_count
                    FROM pg_stat_user_tables
                    WHERE schemaname = :schema AND relname = :table
                """
                count_df = self.read_sql(
                    count_query, params={"schema": schema, "table": table_name}
                )
                if not count_df.empty and count_df["row_count"].iloc[0] is not None:
                    row_count = int(count_df["row_count"].iloc[0])
            except Exception:
                pass

            dataset = DatasetRef(
                name=table_name,
                namespace=schema,
                kind="table",
                path=f"{schema}.{table_name}",
                row_count=row_count,
            )

            schema_obj = Schema(dataset=dataset, columns=[Column(**c) for c in columns])

            ctx.info("Table info retrieved", schema=schema, table=table_name, columns=len(columns))
            return schema_obj.model_dump()

        except Exception as e:
            ctx.error("Failed to get table info", schema=schema, table=table_name, error=str(e))
            return {}

    def discover_catalog(
        self,
        include_schema: bool = False,
        include_stats: bool = False,
        limit: Optional[int] = None,
        recursive: bool = True,
        path: str = "",
        pattern: str = "",
    ) -> Dict:
        """Discover all datasets in the database."""
        ctx = get_logging_context()
        ctx.info(
            "Discovering catalog",
            include_schema=include_schema,
            include_stats=include_stats,
            limit=limit,
        )

        all_schemas = self.list_schemas()
        if path:
            schemas = [s for s in all_schemas if s == path]
            if not schemas:
                ctx.warning(f"Schema not found: {path}")
                return CatalogSummary(
                    connection_name=f"{self.host}/{self.database}",
                    connection_type="postgres",
                    schemas=[],
                    tables=[],
                    total_datasets=0,
                    next_step=f"Schema '{path}' not found. Available: {', '.join(all_schemas)}",
                ).model_dump()
        else:
            schemas = all_schemas

        all_tables = []
        import fnmatch

        has_pattern = bool(pattern)

        for schema in schemas:
            tables = self.list_tables(schema)

            for table_info in tables:
                if has_pattern and not fnmatch.fnmatch(table_info["name"], pattern):
                    continue

                if limit and len(all_tables) >= limit:
                    break
                dataset = DatasetRef(
                    name=table_info["name"],
                    namespace=table_info["schema"],
                    kind="table" if table_info["type"] == "table" else "view",
                    path=f"{table_info['schema']}.{table_info['name']}",
                )

                if include_schema or include_stats:
                    try:
                        full_info = self.get_table_info(
                            f"{table_info['schema']}.{table_info['name']}"
                        )
                        if full_info and "dataset" in full_info:
                            dataset = DatasetRef(**full_info["dataset"])
                    except Exception as e:
                        ctx.debug(
                            "Could not get extended info for table",
                            table=table_info["name"],
                            error=str(e),
                        )

                all_tables.append(dataset)

        catalog = CatalogSummary(
            connection_name=f"{self.host}/{self.database}",
            connection_type="postgres",
            schemas=schemas,
            tables=all_tables,
            total_datasets=len(all_tables),
            next_step="Use profile() to analyze specific tables or get_table_info() for schema details",
            suggestions=[
                f"Found {len(all_tables)} tables across {len(schemas)} schemas",
                "Use include_schema=True to get column details",
                "Use include_stats=True for row counts",
            ],
        )

        ctx.info("Catalog discovery complete", total_datasets=len(all_tables), schemas=len(schemas))
        return catalog.model_dump()

    def profile(
        self,
        dataset: str,
        sample_rows: int = 1000,
        columns: Optional[List[str]] = None,
    ) -> Dict:
        """Profile a table with statistical analysis."""
        ctx = get_logging_context()

        if "." in dataset:
            parts = dataset.split(".")
            schema = parts[0]
            table_name = parts[1]
        else:
            schema = "public"
            table_name = dataset

        ctx.info("Profiling table", schema=schema, table=table_name, sample_rows=sample_rows)

        col_filter = ", ".join(f'"{c}"' for c in columns) if columns else "*"
        query = self.build_select_query(table_name, schema, columns=col_filter, limit=sample_rows)

        try:
            df = self.read_sql(query)

            # Get approximate total row count
            total_rows = None
            try:
                count_query = """
                    SELECT n_live_tup AS row_count
                    FROM pg_stat_user_tables
                    WHERE schemaname = :schema AND relname = :table
                """
                count_df = self.read_sql(
                    count_query, params={"schema": schema, "table": table_name}
                )
                if not count_df.empty and count_df["row_count"].iloc[0] is not None:
                    total_rows = int(count_df["row_count"].iloc[0])
            except Exception:
                total_rows = len(df)

            profiled_columns = []
            candidate_keys = []
            candidate_watermarks = []

            for col in df.columns:
                null_count = int(df[col].isnull().sum())
                null_pct = null_count / len(df) if len(df) > 0 else 0
                distinct_count = int(df[col].nunique())

                if distinct_count == len(df):
                    cardinality = "unique"
                    candidate_keys.append(col)
                elif distinct_count > len(df) * 0.9:
                    cardinality = "high"
                elif distinct_count < 10:
                    cardinality = "low"
                else:
                    cardinality = "medium"

                if pd.api.types.is_datetime64_any_dtype(df[col]):
                    candidate_watermarks.append(col)

                sample_values = df[col].dropna().head(5).tolist()

                profiled_columns.append(
                    Column(
                        name=col,
                        dtype=str(df[col].dtype),
                        nullable=null_count > 0,
                        null_count=null_count,
                        null_pct=round(null_pct, 4),
                        cardinality=cardinality,
                        distinct_count=distinct_count,
                        sample_values=sample_values,
                    )
                )

            total_cells = len(df) * len(df.columns)
            null_cells = df.isnull().sum().sum()
            completeness = 1 - (null_cells / total_cells) if total_cells > 0 else 0

            dataset_ref = DatasetRef(
                name=table_name,
                namespace=schema,
                kind="table",
                path=f"{schema}.{table_name}",
                row_count=total_rows,
            )

            profile = TableProfile(
                dataset=dataset_ref,
                rows_sampled=len(df),
                total_rows=total_rows,
                columns=profiled_columns,
                candidate_keys=candidate_keys,
                candidate_watermarks=candidate_watermarks,
                completeness=round(completeness, 4),
                suggestions=[
                    (
                        f"Sampled {len(df)} of {total_rows} rows"
                        if total_rows
                        else f"Sampled {len(df)} rows"
                    ),
                    (
                        f"Found {len(candidate_keys)} candidate key columns: {candidate_keys}"
                        if candidate_keys
                        else "No unique key columns found"
                    ),
                    (
                        f"Found {len(candidate_watermarks)} timestamp columns: {candidate_watermarks}"
                        if candidate_watermarks
                        else "No timestamp columns for incremental loading"
                    ),
                ],
            )

            ctx.info(
                "Table profiling complete",
                schema=schema,
                table=table_name,
                rows_sampled=len(df),
                columns=len(profiled_columns),
                candidate_keys=len(candidate_keys),
            )

            return profile.model_dump()

        except Exception as e:
            ctx.error("Failed to profile table", schema=schema, table=table_name, error=str(e))
            return {}

    def preview(
        self, dataset: str, rows: int = 5, columns: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Preview sample rows from a PostgreSQL table."""
        ctx = get_logging_context()

        max_rows = min(rows, 100)

        if "." in dataset:
            parts = dataset.split(".")
            schema = parts[0]
            table_name = parts[1]
        else:
            schema = "public"
            table_name = dataset

        ctx.info("Previewing table", schema=schema, table=table_name, rows=max_rows)

        try:
            col_filter = "*"
            if columns:
                col_filter = ", ".join(f'"{c}"' for c in columns)

            query = self.build_select_query(table_name, schema, columns=col_filter, limit=max_rows)
            df = self.read_sql(query)

            # Get approximate row count
            total_rows = None
            try:
                count_query = """
                    SELECT n_live_tup AS row_count
                    FROM pg_stat_user_tables
                    WHERE schemaname = :schema AND relname = :table
                """
                count_df = self.read_sql(
                    count_query, params={"schema": schema, "table": table_name}
                )
                if not count_df.empty and count_df["row_count"].iloc[0] is not None:
                    total_rows = int(count_df["row_count"].iloc[0])
            except Exception:
                pass

            result = PreviewResult(
                dataset=DatasetRef(
                    name=table_name,
                    namespace=schema,
                    kind="table",
                    path=f"{schema}.{table_name}",
                    row_count=total_rows,
                ),
                columns=df.columns.tolist(),
                rows=df.to_dict(orient="records"),
                total_rows=total_rows,
                truncated=(total_rows or 0) > max_rows,
            )

            ctx.info("Preview complete", schema=schema, table=table_name, rows_returned=len(df))
            return result.model_dump()

        except Exception as e:
            ctx.error("Failed to preview table", schema=schema, table=table_name, error=str(e))
            return PreviewResult(
                dataset=DatasetRef(name=table_name, namespace=schema, kind="table"),
            ).model_dump()

    def relationships(self, schema: Optional[str] = None) -> List[Dict[str, Any]]:
        """Discover foreign key relationships in the database."""
        ctx = get_logging_context()
        ctx.info("Discovering relationships", schema=schema or "all")

        query = """
            SELECT
                tc.constraint_name AS fk_name,
                ccu.table_schema AS parent_schema,
                ccu.table_name AS parent_table,
                ccu.column_name AS parent_column,
                kcu.table_schema AS child_schema,
                kcu.table_name AS child_table,
                kcu.column_name AS child_column
            FROM information_schema.table_constraints tc
            JOIN information_schema.key_column_usage kcu
                ON tc.constraint_name = kcu.constraint_name
                AND tc.table_schema = kcu.table_schema
            JOIN information_schema.constraint_column_usage ccu
                ON tc.constraint_name = ccu.constraint_name
                AND tc.table_schema = ccu.table_schema
            WHERE tc.constraint_type = 'FOREIGN KEY'
        """

        if schema:
            query += "\n            AND (kcu.table_schema = :schema OR ccu.table_schema = :schema)"

        query += "\n            ORDER BY tc.constraint_name, kcu.ordinal_position"

        try:
            params = {"schema": schema} if schema else None
            df = self.read_sql(query, params=params)

            if df.empty:
                ctx.info("No foreign key relationships found", schema=schema or "all")
                return []

            relationships = []
            for fk_name, group in df.groupby("fk_name"):
                first = group.iloc[0]
                keys = [(row["parent_column"], row["child_column"]) for _, row in group.iterrows()]

                rel = Relationship(
                    parent=DatasetRef(
                        name=first["parent_table"],
                        namespace=first["parent_schema"],
                        kind="table",
                        path=f"{first['parent_schema']}.{first['parent_table']}",
                    ),
                    child=DatasetRef(
                        name=first["child_table"],
                        namespace=first["child_schema"],
                        kind="table",
                        path=f"{first['child_schema']}.{first['child_table']}",
                    ),
                    keys=keys,
                    source="declared",
                    confidence=1.0,
                    details={"constraint_name": fk_name},
                )
                relationships.append(rel.model_dump())

            ctx.info("Relationships discovered", count=len(relationships))
            return relationships

        except Exception as e:
            ctx.error("Failed to discover relationships", error=str(e))
            return []

    def get_freshness(
        self,
        dataset: str,
        timestamp_column: Optional[str] = None,
    ) -> Dict:
        """Get data freshness information."""
        ctx = get_logging_context()

        if "." in dataset:
            parts = dataset.split(".")
            schema = parts[0]
            table_name = parts[1]
        else:
            schema = "public"
            table_name = dataset

        ctx.debug("Getting freshness", schema=schema, table=table_name, column=timestamp_column)

        dataset_ref = DatasetRef(
            name=table_name,
            namespace=schema,
            kind="table",
            path=f"{schema}.{table_name}",
        )

        if timestamp_column:
            try:
                query = f'SELECT MAX("{timestamp_column}") AS max_ts FROM "{schema}"."{table_name}"'
                df = self.read_sql(query)

                if not df.empty and df["max_ts"].iloc[0] is not None:
                    last_updated = pd.to_datetime(df["max_ts"].iloc[0])
                    age_hours = (datetime.utcnow() - last_updated).total_seconds() / 3600

                    result = FreshnessResult(
                        dataset=dataset_ref,
                        last_updated=last_updated,
                        source="data",
                        age_hours=round(age_hours, 2),
                        details={"timestamp_column": timestamp_column},
                    )

                    ctx.info(
                        "Freshness retrieved from data",
                        schema=schema,
                        table=table_name,
                        age_hours=age_hours,
                    )
                    return result.model_dump()
            except Exception as e:
                ctx.debug("Could not get freshness from data column", error=str(e))

        # Fallback to table metadata (last analyze/autoanalyze time)
        try:
            query = """
                SELECT GREATEST(
                    COALESCE(last_analyze, '1970-01-01'),
                    COALESCE(last_autoanalyze, '1970-01-01')
                ) AS modify_date
                FROM pg_stat_user_tables
                WHERE schemaname = :schema AND relname = :table
            """
            df = self.read_sql(query, params={"schema": schema, "table": table_name})

            if not df.empty and df["modify_date"].iloc[0] is not None:
                last_updated = pd.to_datetime(df["modify_date"].iloc[0])
                if last_updated.year > 1970:
                    age_hours = (datetime.utcnow() - last_updated).total_seconds() / 3600

                    result = FreshnessResult(
                        dataset=dataset_ref,
                        last_updated=last_updated,
                        source="metadata",
                        age_hours=round(age_hours, 2),
                        details={"note": "Last analyze time from pg_stat_user_tables"},
                    )

                    ctx.info(
                        "Freshness retrieved from metadata",
                        schema=schema,
                        table=table_name,
                        age_hours=age_hours,
                    )
                    return result.model_dump()
        except Exception as e:
            ctx.error("Failed to get freshness", schema=schema, table=table_name, error=str(e))

        return {}

    def get_spark_options(self) -> Dict[str, str]:
        """Get Spark JDBC options for PostgreSQL.

        Returns:
            Dictionary of Spark JDBC options (url, driver, user, password)

        Note:
            Spark requires the PostgreSQL JDBC driver jar on the classpath.
            Add it via: --packages org.postgresql:postgresql:42.7.3
        """
        ctx = get_logging_context()
        ctx.info(
            "Building Spark JDBC options",
            server=self.host,
            database=self.database,
        )

        jdbc_url = f"jdbc:postgresql://{self.host}:{self.port}/{self.database}"

        if self.sslmode not in ("prefer", "disable"):
            jdbc_url += f"?sslmode={self.sslmode}"

        options = {
            "url": jdbc_url,
            "driver": "org.postgresql.Driver",
        }

        if self.username:
            options["user"] = self.username
        if self.password:
            options["password"] = self.password

        ctx.info(
            "Spark JDBC options built successfully",
            server=self.host,
            database=self.database,
        )

        return options

    def close(self):
        """Close database connection and dispose of engine."""
        ctx = get_logging_context()
        ctx.debug(
            "Closing PostgreSQL connection",
            server=self.host,
            database=self.database,
        )

        if self._engine:
            self._engine.dispose()
            self._engine = None
            ctx.info(
                "PostgreSQL connection closed",
                server=self.host,
                database=self.database,
            )

    def _sanitize_error(self, error_msg: str) -> str:
        """Remove credentials from error messages."""
        import re

        sanitized = re.sub(r"password=[^&\s]*", "password=***", error_msg, flags=re.IGNORECASE)
        sanitized = re.sub(r"user=[^&\s]*", "user=***", sanitized, flags=re.IGNORECASE)
        sanitized = re.sub(r"://[^@]+@", "://***:***@", sanitized)
        return sanitized

    def _get_error_suggestions(self, error_msg: str) -> List[str]:
        """Generate suggestions for common PostgreSQL errors."""
        suggestions = []
        lower = error_msg.lower()

        if "could not connect" in lower or "connection refused" in lower:
            suggestions.extend(
                [
                    f"Check that PostgreSQL is running on {self.host}:{self.port}",
                    "Verify pg_hba.conf allows connections from this host",
                    "Check firewall rules",
                ]
            )
        elif "authentication failed" in lower or "password" in lower:
            suggestions.extend(
                [
                    "Verify username and password are correct",
                    "Check pg_hba.conf authentication method",
                ]
            )
        elif "does not exist" in lower:
            suggestions.extend(
                [
                    f"Verify database '{self.database}' exists",
                    "Run: SELECT datname FROM pg_database;",
                ]
            )
        elif "psycopg2" in lower:
            suggestions.extend(
                [
                    "Install the PostgreSQL driver: pip install psycopg2-binary",
                    "Or install odibi with postgres extras: pip install 'odibi[postgres]'",
                ]
            )

        return suggestions

__init__(host, database, username=None, password=None, port=5432, timeout=30, sslmode='prefer', **kwargs)

Initialize PostgreSQL connection.

Parameters:

Name Type Description Default
host str

PostgreSQL server hostname

required
database str

Database name

required
username Optional[str]

Database username

None
password Optional[str]

Database password

None
port int

PostgreSQL port (default: 5432)

5432
timeout int

Connection timeout in seconds (default: 30)

30
sslmode str

SSL mode ('disable', 'allow', 'prefer', 'require', 'verify-ca', 'verify-full'). Default: 'prefer'

'prefer'
Source code in odibi/connections/postgres.py
def __init__(
    self,
    host: str,
    database: str,
    username: Optional[str] = None,
    password: Optional[str] = None,
    port: int = 5432,
    timeout: int = 30,
    sslmode: str = "prefer",
    **kwargs,
):
    """
    Initialize PostgreSQL connection.

    Args:
        host: PostgreSQL server hostname
        database: Database name
        username: Database username
        password: Database password
        port: PostgreSQL port (default: 5432)
        timeout: Connection timeout in seconds (default: 30)
        sslmode: SSL mode ('disable', 'allow', 'prefer', 'require',
                 'verify-ca', 'verify-full'). Default: 'prefer'
    """
    ctx = get_logging_context()
    ctx.log_connection(
        connection_type="postgres",
        connection_name=f"{host}/{database}",
        action="init",
        server=host,
        database=database,
        port=port,
    )

    self.host = host
    self.database = database
    self.username = username
    self.password = password
    self.port = port
    self.timeout = timeout
    self.sslmode = sslmode
    self._engine = None

    ctx.debug(
        "PostgreSQL connection initialized",
        server=host,
        database=database,
        port=port,
        sslmode=sslmode,
    )

build_select_query(table_name, schema='', where='', limit=-1, columns='*')

Build a SELECT query using PostgreSQL syntax.

Source code in odibi/connections/postgres.py
def build_select_query(
    self,
    table_name: str,
    schema: str = "",
    where: str = "",
    limit: int = -1,
    columns: str = "*",
) -> str:
    """Build a SELECT query using PostgreSQL syntax."""
    qualified = self.qualify_table(table_name, schema)
    query = f"SELECT {columns} FROM {qualified}"
    if where:
        query += f" WHERE {where}"
    if limit >= 0:
        query += f" LIMIT {limit}"
    return query

close()

Close database connection and dispose of engine.

Source code in odibi/connections/postgres.py
def close(self):
    """Close database connection and dispose of engine."""
    ctx = get_logging_context()
    ctx.debug(
        "Closing PostgreSQL connection",
        server=self.host,
        database=self.database,
    )

    if self._engine:
        self._engine.dispose()
        self._engine = None
        ctx.info(
            "PostgreSQL connection closed",
            server=self.host,
            database=self.database,
        )

discover_catalog(include_schema=False, include_stats=False, limit=None, recursive=True, path='', pattern='')

Discover all datasets in the database.

Source code in odibi/connections/postgres.py
def discover_catalog(
    self,
    include_schema: bool = False,
    include_stats: bool = False,
    limit: Optional[int] = None,
    recursive: bool = True,
    path: str = "",
    pattern: str = "",
) -> Dict:
    """Discover all datasets in the database."""
    ctx = get_logging_context()
    ctx.info(
        "Discovering catalog",
        include_schema=include_schema,
        include_stats=include_stats,
        limit=limit,
    )

    all_schemas = self.list_schemas()
    if path:
        schemas = [s for s in all_schemas if s == path]
        if not schemas:
            ctx.warning(f"Schema not found: {path}")
            return CatalogSummary(
                connection_name=f"{self.host}/{self.database}",
                connection_type="postgres",
                schemas=[],
                tables=[],
                total_datasets=0,
                next_step=f"Schema '{path}' not found. Available: {', '.join(all_schemas)}",
            ).model_dump()
    else:
        schemas = all_schemas

    all_tables = []
    import fnmatch

    has_pattern = bool(pattern)

    for schema in schemas:
        tables = self.list_tables(schema)

        for table_info in tables:
            if has_pattern and not fnmatch.fnmatch(table_info["name"], pattern):
                continue

            if limit and len(all_tables) >= limit:
                break
            dataset = DatasetRef(
                name=table_info["name"],
                namespace=table_info["schema"],
                kind="table" if table_info["type"] == "table" else "view",
                path=f"{table_info['schema']}.{table_info['name']}",
            )

            if include_schema or include_stats:
                try:
                    full_info = self.get_table_info(
                        f"{table_info['schema']}.{table_info['name']}"
                    )
                    if full_info and "dataset" in full_info:
                        dataset = DatasetRef(**full_info["dataset"])
                except Exception as e:
                    ctx.debug(
                        "Could not get extended info for table",
                        table=table_info["name"],
                        error=str(e),
                    )

            all_tables.append(dataset)

    catalog = CatalogSummary(
        connection_name=f"{self.host}/{self.database}",
        connection_type="postgres",
        schemas=schemas,
        tables=all_tables,
        total_datasets=len(all_tables),
        next_step="Use profile() to analyze specific tables or get_table_info() for schema details",
        suggestions=[
            f"Found {len(all_tables)} tables across {len(schemas)} schemas",
            "Use include_schema=True to get column details",
            "Use include_stats=True for row counts",
        ],
    )

    ctx.info("Catalog discovery complete", total_datasets=len(all_tables), schemas=len(schemas))
    return catalog.model_dump()

execute(sql, params=None)

Execute SQL statement (INSERT, UPDATE, DELETE, etc.).

Parameters:

Name Type Description Default
sql str

SQL statement

required
params Optional[Dict[str, Any]]

Optional parameters for parameterized query

None

Returns:

Type Description
Any

Result from execution

Source code in odibi/connections/postgres.py
def execute(self, sql: str, params: Optional[Dict[str, Any]] = None) -> Any:
    """
    Execute SQL statement (INSERT, UPDATE, DELETE, etc.).

    Args:
        sql: SQL statement
        params: Optional parameters for parameterized query

    Returns:
        Result from execution
    """
    ctx = get_logging_context()
    ctx.debug(
        "Executing SQL statement",
        server=self.host,
        database=self.database,
        statement_length=len(sql),
    )

    try:
        engine = self.get_engine()
        from sqlalchemy import text

        with engine.begin() as conn:
            result = conn.execute(text(sql), params or {})
            if result.returns_rows:
                rows = result.fetchall()
            else:
                rows = None

            ctx.info(
                "SQL statement executed successfully",
                server=self.host,
                database=self.database,
            )
            return rows
    except Exception as e:
        if isinstance(e, ConnectionError):
            raise
        ctx.error(
            "SQL statement execution failed",
            server=self.host,
            database=self.database,
            error=str(e),
        )
        raise ConnectionError(
            connection_name=f"PostgreSQL({self.host})",
            reason=f"Statement execution failed: {self._sanitize_error(str(e))}",
            suggestions=self._get_error_suggestions(str(e)),
        )

execute_sql(sql, params=None)

Execute SQL statement. Alias for execute().

Source code in odibi/connections/postgres.py
def execute_sql(self, sql: str, params: Optional[Dict[str, Any]] = None) -> Any:
    """Execute SQL statement. Alias for execute()."""
    return self.execute(sql, params)

get_engine()

Get or create SQLAlchemy engine.

Returns:

Type Description
Any

SQLAlchemy engine instance

Raises:

Type Description
ConnectionError

If connection fails or drivers missing

Source code in odibi/connections/postgres.py
def get_engine(self) -> Any:
    """
    Get or create SQLAlchemy engine.

    Returns:
        SQLAlchemy engine instance

    Raises:
        ConnectionError: If connection fails or drivers missing
    """
    ctx = get_logging_context()

    if self._engine is not None:
        ctx.debug(
            "Using cached SQLAlchemy engine",
            server=self.host,
            database=self.database,
        )
        return self._engine

    ctx.debug(
        "Creating SQLAlchemy engine",
        server=self.host,
        database=self.database,
    )

    try:
        from sqlalchemy import create_engine
    except ImportError as e:
        ctx.error(
            "SQLAlchemy import failed",
            server=self.host,
            database=self.database,
            error=str(e),
        )
        raise ConnectionError(
            connection_name=f"PostgreSQL({self.host})",
            reason="Required package 'sqlalchemy' not found.",
            suggestions=[
                "Install required packages: pip install sqlalchemy psycopg2-binary",
                "Or install odibi with postgres extras: pip install 'odibi[postgres]'",
            ],
        )

    try:
        from urllib.parse import quote_plus

        # Build PostgreSQL connection URL
        if self.username and self.password:
            user_part = f"{quote_plus(self.username)}:{quote_plus(self.password)}@"
        elif self.username:
            user_part = f"{quote_plus(self.username)}@"
        else:
            user_part = ""

        connection_url = (
            f"postgresql+psycopg2://{user_part}{self.host}:{self.port}/{self.database}"
        )

        connect_args = {}
        if self.sslmode != "prefer":
            connect_args["sslmode"] = self.sslmode
        connect_args["connect_timeout"] = self.timeout

        ctx.debug(
            "Creating SQLAlchemy engine with connection pooling",
            server=self.host,
            database=self.database,
        )

        self._engine = create_engine(
            connection_url,
            pool_pre_ping=True,
            pool_recycle=3600,
            echo=False,
            connect_args=connect_args,
        )

        # Test connection
        with self._engine.connect():
            pass

        ctx.info(
            "SQLAlchemy engine created successfully",
            server=self.host,
            database=self.database,
        )

        return self._engine

    except Exception as e:
        ctx.error(
            "Failed to create SQLAlchemy engine",
            server=self.host,
            database=self.database,
            error=str(e),
        )
        raise ConnectionError(
            connection_name=f"PostgreSQL({self.host})",
            reason=f"Failed to create engine: {self._sanitize_error(str(e))}",
            suggestions=self._get_error_suggestions(str(e)),
        )

get_freshness(dataset, timestamp_column=None)

Get data freshness information.

Source code in odibi/connections/postgres.py
def get_freshness(
    self,
    dataset: str,
    timestamp_column: Optional[str] = None,
) -> Dict:
    """Get data freshness information."""
    ctx = get_logging_context()

    if "." in dataset:
        parts = dataset.split(".")
        schema = parts[0]
        table_name = parts[1]
    else:
        schema = "public"
        table_name = dataset

    ctx.debug("Getting freshness", schema=schema, table=table_name, column=timestamp_column)

    dataset_ref = DatasetRef(
        name=table_name,
        namespace=schema,
        kind="table",
        path=f"{schema}.{table_name}",
    )

    if timestamp_column:
        try:
            query = f'SELECT MAX("{timestamp_column}") AS max_ts FROM "{schema}"."{table_name}"'
            df = self.read_sql(query)

            if not df.empty and df["max_ts"].iloc[0] is not None:
                last_updated = pd.to_datetime(df["max_ts"].iloc[0])
                age_hours = (datetime.utcnow() - last_updated).total_seconds() / 3600

                result = FreshnessResult(
                    dataset=dataset_ref,
                    last_updated=last_updated,
                    source="data",
                    age_hours=round(age_hours, 2),
                    details={"timestamp_column": timestamp_column},
                )

                ctx.info(
                    "Freshness retrieved from data",
                    schema=schema,
                    table=table_name,
                    age_hours=age_hours,
                )
                return result.model_dump()
        except Exception as e:
            ctx.debug("Could not get freshness from data column", error=str(e))

    # Fallback to table metadata (last analyze/autoanalyze time)
    try:
        query = """
            SELECT GREATEST(
                COALESCE(last_analyze, '1970-01-01'),
                COALESCE(last_autoanalyze, '1970-01-01')
            ) AS modify_date
            FROM pg_stat_user_tables
            WHERE schemaname = :schema AND relname = :table
        """
        df = self.read_sql(query, params={"schema": schema, "table": table_name})

        if not df.empty and df["modify_date"].iloc[0] is not None:
            last_updated = pd.to_datetime(df["modify_date"].iloc[0])
            if last_updated.year > 1970:
                age_hours = (datetime.utcnow() - last_updated).total_seconds() / 3600

                result = FreshnessResult(
                    dataset=dataset_ref,
                    last_updated=last_updated,
                    source="metadata",
                    age_hours=round(age_hours, 2),
                    details={"note": "Last analyze time from pg_stat_user_tables"},
                )

                ctx.info(
                    "Freshness retrieved from metadata",
                    schema=schema,
                    table=table_name,
                    age_hours=age_hours,
                )
                return result.model_dump()
    except Exception as e:
        ctx.error("Failed to get freshness", schema=schema, table=table_name, error=str(e))

    return {}

get_path(relative_path)

Get table reference for relative path.

In PostgreSQL, the relative path is the table reference itself (e.g., "schema.table" or "table"), so this method returns it as-is.

Source code in odibi/connections/postgres.py
def get_path(self, relative_path: str) -> str:
    """Get table reference for relative path.

    In PostgreSQL, the relative path is the table reference itself
    (e.g., "schema.table" or "table"), so this method returns it as-is.
    """
    return relative_path

get_spark_options()

Get Spark JDBC options for PostgreSQL.

Returns:

Type Description
Dict[str, str]

Dictionary of Spark JDBC options (url, driver, user, password)

Note

Spark requires the PostgreSQL JDBC driver jar on the classpath. Add it via: --packages org.postgresql:postgresql:42.7.3

Source code in odibi/connections/postgres.py
def get_spark_options(self) -> Dict[str, str]:
    """Get Spark JDBC options for PostgreSQL.

    Returns:
        Dictionary of Spark JDBC options (url, driver, user, password)

    Note:
        Spark requires the PostgreSQL JDBC driver jar on the classpath.
        Add it via: --packages org.postgresql:postgresql:42.7.3
    """
    ctx = get_logging_context()
    ctx.info(
        "Building Spark JDBC options",
        server=self.host,
        database=self.database,
    )

    jdbc_url = f"jdbc:postgresql://{self.host}:{self.port}/{self.database}"

    if self.sslmode not in ("prefer", "disable"):
        jdbc_url += f"?sslmode={self.sslmode}"

    options = {
        "url": jdbc_url,
        "driver": "org.postgresql.Driver",
    }

    if self.username:
        options["user"] = self.username
    if self.password:
        options["password"] = self.password

    ctx.info(
        "Spark JDBC options built successfully",
        server=self.host,
        database=self.database,
    )

    return options

get_table_info(table)

Get detailed schema information for a table.

Source code in odibi/connections/postgres.py
def get_table_info(self, table: str) -> Dict:
    """Get detailed schema information for a table."""
    ctx = get_logging_context()

    if "." in table:
        parts = table.split(".")
        schema = parts[0]
        table_name = parts[1]
    else:
        schema = "public"
        table_name = table

    ctx.debug("Getting table info", schema=schema, table=table_name)

    query = """
        SELECT column_name, data_type, is_nullable, ordinal_position
        FROM information_schema.columns
        WHERE table_schema = :schema AND table_name = :table
        ORDER BY ordinal_position
    """

    try:
        df = self.read_sql(query, params={"schema": schema, "table": table_name})

        columns = []
        for _, row in df.iterrows():
            columns.append(
                {
                    "name": row["column_name"],
                    "dtype": row["data_type"],
                    "nullable": row["is_nullable"] == "YES",
                }
            )

        # Try to get approximate row count
        row_count = None
        try:
            count_query = """
                SELECT n_live_tup AS row_count
                FROM pg_stat_user_tables
                WHERE schemaname = :schema AND relname = :table
            """
            count_df = self.read_sql(
                count_query, params={"schema": schema, "table": table_name}
            )
            if not count_df.empty and count_df["row_count"].iloc[0] is not None:
                row_count = int(count_df["row_count"].iloc[0])
        except Exception:
            pass

        dataset = DatasetRef(
            name=table_name,
            namespace=schema,
            kind="table",
            path=f"{schema}.{table_name}",
            row_count=row_count,
        )

        schema_obj = Schema(dataset=dataset, columns=[Column(**c) for c in columns])

        ctx.info("Table info retrieved", schema=schema, table=table_name, columns=len(columns))
        return schema_obj.model_dump()

    except Exception as e:
        ctx.error("Failed to get table info", schema=schema, table=table_name, error=str(e))
        return {}

list_schemas()

List all schemas in the database.

Source code in odibi/connections/postgres.py
def list_schemas(self) -> List[str]:
    """List all schemas in the database."""
    ctx = get_logging_context()
    ctx.debug("Listing schemas", server=self.host, database=self.database)

    query = """
        SELECT schema_name
        FROM information_schema.schemata
        WHERE schema_name NOT IN (
            'pg_catalog', 'information_schema', 'pg_toast', 'pg_temp_1', 'pg_toast_temp_1'
        )
        AND schema_name NOT LIKE 'pg_temp_%'
        AND schema_name NOT LIKE 'pg_toast_temp_%'
        ORDER BY schema_name
    """

    try:
        df = self.read_sql(query)
        schemas = df["schema_name"].tolist()
        ctx.info("Schemas listed successfully", count=len(schemas))
        return schemas
    except Exception as e:
        ctx.error("Failed to list schemas", error=str(e))
        return []

list_tables(schema='public')

List tables and views in a schema.

Source code in odibi/connections/postgres.py
def list_tables(self, schema: str = "public") -> List[Dict]:
    """List tables and views in a schema."""
    ctx = get_logging_context()
    ctx.debug("Listing tables", schema=schema)

    query = """
        SELECT table_name, table_type, table_schema
        FROM information_schema.tables
        WHERE table_schema = :schema
        ORDER BY table_name
    """

    try:
        df = self.read_sql(query, params={"schema": schema})
        tables = []
        for _, row in df.iterrows():
            tables.append(
                {
                    "name": row["table_name"],
                    "type": "table" if row["table_type"] == "BASE TABLE" else "view",
                    "schema": row["table_schema"],
                }
            )
        ctx.info("Tables listed successfully", schema=schema, count=len(tables))
        return tables
    except Exception as e:
        ctx.error("Failed to list tables", schema=schema, error=str(e))
        return []

preview(dataset, rows=5, columns=None)

Preview sample rows from a PostgreSQL table.

Source code in odibi/connections/postgres.py
def preview(
    self, dataset: str, rows: int = 5, columns: Optional[List[str]] = None
) -> Dict[str, Any]:
    """Preview sample rows from a PostgreSQL table."""
    ctx = get_logging_context()

    max_rows = min(rows, 100)

    if "." in dataset:
        parts = dataset.split(".")
        schema = parts[0]
        table_name = parts[1]
    else:
        schema = "public"
        table_name = dataset

    ctx.info("Previewing table", schema=schema, table=table_name, rows=max_rows)

    try:
        col_filter = "*"
        if columns:
            col_filter = ", ".join(f'"{c}"' for c in columns)

        query = self.build_select_query(table_name, schema, columns=col_filter, limit=max_rows)
        df = self.read_sql(query)

        # Get approximate row count
        total_rows = None
        try:
            count_query = """
                SELECT n_live_tup AS row_count
                FROM pg_stat_user_tables
                WHERE schemaname = :schema AND relname = :table
            """
            count_df = self.read_sql(
                count_query, params={"schema": schema, "table": table_name}
            )
            if not count_df.empty and count_df["row_count"].iloc[0] is not None:
                total_rows = int(count_df["row_count"].iloc[0])
        except Exception:
            pass

        result = PreviewResult(
            dataset=DatasetRef(
                name=table_name,
                namespace=schema,
                kind="table",
                path=f"{schema}.{table_name}",
                row_count=total_rows,
            ),
            columns=df.columns.tolist(),
            rows=df.to_dict(orient="records"),
            total_rows=total_rows,
            truncated=(total_rows or 0) > max_rows,
        )

        ctx.info("Preview complete", schema=schema, table=table_name, rows_returned=len(df))
        return result.model_dump()

    except Exception as e:
        ctx.error("Failed to preview table", schema=schema, table=table_name, error=str(e))
        return PreviewResult(
            dataset=DatasetRef(name=table_name, namespace=schema, kind="table"),
        ).model_dump()

profile(dataset, sample_rows=1000, columns=None)

Profile a table with statistical analysis.

Source code in odibi/connections/postgres.py
def profile(
    self,
    dataset: str,
    sample_rows: int = 1000,
    columns: Optional[List[str]] = None,
) -> Dict:
    """Profile a table with statistical analysis."""
    ctx = get_logging_context()

    if "." in dataset:
        parts = dataset.split(".")
        schema = parts[0]
        table_name = parts[1]
    else:
        schema = "public"
        table_name = dataset

    ctx.info("Profiling table", schema=schema, table=table_name, sample_rows=sample_rows)

    col_filter = ", ".join(f'"{c}"' for c in columns) if columns else "*"
    query = self.build_select_query(table_name, schema, columns=col_filter, limit=sample_rows)

    try:
        df = self.read_sql(query)

        # Get approximate total row count
        total_rows = None
        try:
            count_query = """
                SELECT n_live_tup AS row_count
                FROM pg_stat_user_tables
                WHERE schemaname = :schema AND relname = :table
            """
            count_df = self.read_sql(
                count_query, params={"schema": schema, "table": table_name}
            )
            if not count_df.empty and count_df["row_count"].iloc[0] is not None:
                total_rows = int(count_df["row_count"].iloc[0])
        except Exception:
            total_rows = len(df)

        profiled_columns = []
        candidate_keys = []
        candidate_watermarks = []

        for col in df.columns:
            null_count = int(df[col].isnull().sum())
            null_pct = null_count / len(df) if len(df) > 0 else 0
            distinct_count = int(df[col].nunique())

            if distinct_count == len(df):
                cardinality = "unique"
                candidate_keys.append(col)
            elif distinct_count > len(df) * 0.9:
                cardinality = "high"
            elif distinct_count < 10:
                cardinality = "low"
            else:
                cardinality = "medium"

            if pd.api.types.is_datetime64_any_dtype(df[col]):
                candidate_watermarks.append(col)

            sample_values = df[col].dropna().head(5).tolist()

            profiled_columns.append(
                Column(
                    name=col,
                    dtype=str(df[col].dtype),
                    nullable=null_count > 0,
                    null_count=null_count,
                    null_pct=round(null_pct, 4),
                    cardinality=cardinality,
                    distinct_count=distinct_count,
                    sample_values=sample_values,
                )
            )

        total_cells = len(df) * len(df.columns)
        null_cells = df.isnull().sum().sum()
        completeness = 1 - (null_cells / total_cells) if total_cells > 0 else 0

        dataset_ref = DatasetRef(
            name=table_name,
            namespace=schema,
            kind="table",
            path=f"{schema}.{table_name}",
            row_count=total_rows,
        )

        profile = TableProfile(
            dataset=dataset_ref,
            rows_sampled=len(df),
            total_rows=total_rows,
            columns=profiled_columns,
            candidate_keys=candidate_keys,
            candidate_watermarks=candidate_watermarks,
            completeness=round(completeness, 4),
            suggestions=[
                (
                    f"Sampled {len(df)} of {total_rows} rows"
                    if total_rows
                    else f"Sampled {len(df)} rows"
                ),
                (
                    f"Found {len(candidate_keys)} candidate key columns: {candidate_keys}"
                    if candidate_keys
                    else "No unique key columns found"
                ),
                (
                    f"Found {len(candidate_watermarks)} timestamp columns: {candidate_watermarks}"
                    if candidate_watermarks
                    else "No timestamp columns for incremental loading"
                ),
            ],
        )

        ctx.info(
            "Table profiling complete",
            schema=schema,
            table=table_name,
            rows_sampled=len(df),
            columns=len(profiled_columns),
            candidate_keys=len(candidate_keys),
        )

        return profile.model_dump()

    except Exception as e:
        ctx.error("Failed to profile table", schema=schema, table=table_name, error=str(e))
        return {}

qualify_table(table_name, schema='')

Build a PostgreSQL qualified table reference.

Source code in odibi/connections/postgres.py
def qualify_table(self, table_name: str, schema: str = "") -> str:
    """Build a PostgreSQL qualified table reference."""
    schema = schema or self.default_schema
    if schema:
        return f'"{schema}"."{table_name}"'
    return f'"{table_name}"'

quote_identifier(name)

Quote an identifier using PostgreSQL double-quote notation.

Source code in odibi/connections/postgres.py
def quote_identifier(self, name: str) -> str:
    """Quote an identifier using PostgreSQL double-quote notation."""
    return f'"{name}"'

read_sql(query, params=None)

Execute SQL query and return results as DataFrame.

Parameters:

Name Type Description Default
query str

SQL query string

required
params Optional[Dict[str, Any]]

Optional query parameters for parameterized queries

None

Returns:

Type Description
DataFrame

Query results as pandas DataFrame

Source code in odibi/connections/postgres.py
def read_sql(self, query: str, params: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
    """
    Execute SQL query and return results as DataFrame.

    Args:
        query: SQL query string
        params: Optional query parameters for parameterized queries

    Returns:
        Query results as pandas DataFrame
    """
    ctx = get_logging_context()
    ctx.debug(
        "Executing SQL query",
        server=self.host,
        database=self.database,
        query_length=len(query),
    )

    try:
        engine = self.get_engine()
        with engine.connect() as conn:
            result = pd.read_sql(query, conn, params=params)

        ctx.info(
            "SQL query executed successfully",
            server=self.host,
            database=self.database,
            rows_returned=len(result),
        )
        return result
    except Exception as e:
        if isinstance(e, ConnectionError):
            raise
        ctx.error(
            "SQL query execution failed",
            server=self.host,
            database=self.database,
            error=str(e),
        )
        raise ConnectionError(
            connection_name=f"PostgreSQL({self.host})",
            reason=f"Query execution failed: {self._sanitize_error(str(e))}",
            suggestions=self._get_error_suggestions(str(e)),
        )

read_sql_query(query, params=None)

Execute a SQL query and return results as DataFrame.

Source code in odibi/connections/postgres.py
def read_sql_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
    """Execute a SQL query and return results as DataFrame."""
    return self.read_sql(query, params)

read_table(table_name, schema=None)

Read entire table into DataFrame.

Parameters:

Name Type Description Default
table_name str

Name of the table

required
schema Optional[str]

Schema name (default: public)

None

Returns:

Type Description
DataFrame

Table contents as pandas DataFrame

Source code in odibi/connections/postgres.py
def read_table(self, table_name: str, schema: Optional[str] = None) -> pd.DataFrame:
    """
    Read entire table into DataFrame.

    Args:
        table_name: Name of the table
        schema: Schema name (default: public)

    Returns:
        Table contents as pandas DataFrame
    """
    schema = schema or self.default_schema
    ctx = get_logging_context()
    ctx.info(
        "Reading table",
        server=self.host,
        database=self.database,
        table_name=table_name,
        schema=schema,
    )

    query = self.build_select_query(table_name, schema)
    return self.read_sql(query)

relationships(schema=None)

Discover foreign key relationships in the database.

Source code in odibi/connections/postgres.py
def relationships(self, schema: Optional[str] = None) -> List[Dict[str, Any]]:
    """Discover foreign key relationships in the database."""
    ctx = get_logging_context()
    ctx.info("Discovering relationships", schema=schema or "all")

    query = """
        SELECT
            tc.constraint_name AS fk_name,
            ccu.table_schema AS parent_schema,
            ccu.table_name AS parent_table,
            ccu.column_name AS parent_column,
            kcu.table_schema AS child_schema,
            kcu.table_name AS child_table,
            kcu.column_name AS child_column
        FROM information_schema.table_constraints tc
        JOIN information_schema.key_column_usage kcu
            ON tc.constraint_name = kcu.constraint_name
            AND tc.table_schema = kcu.table_schema
        JOIN information_schema.constraint_column_usage ccu
            ON tc.constraint_name = ccu.constraint_name
            AND tc.table_schema = ccu.table_schema
        WHERE tc.constraint_type = 'FOREIGN KEY'
    """

    if schema:
        query += "\n            AND (kcu.table_schema = :schema OR ccu.table_schema = :schema)"

    query += "\n            ORDER BY tc.constraint_name, kcu.ordinal_position"

    try:
        params = {"schema": schema} if schema else None
        df = self.read_sql(query, params=params)

        if df.empty:
            ctx.info("No foreign key relationships found", schema=schema or "all")
            return []

        relationships = []
        for fk_name, group in df.groupby("fk_name"):
            first = group.iloc[0]
            keys = [(row["parent_column"], row["child_column"]) for _, row in group.iterrows()]

            rel = Relationship(
                parent=DatasetRef(
                    name=first["parent_table"],
                    namespace=first["parent_schema"],
                    kind="table",
                    path=f"{first['parent_schema']}.{first['parent_table']}",
                ),
                child=DatasetRef(
                    name=first["child_table"],
                    namespace=first["child_schema"],
                    kind="table",
                    path=f"{first['child_schema']}.{first['child_table']}",
                ),
                keys=keys,
                source="declared",
                confidence=1.0,
                details={"constraint_name": fk_name},
            )
            relationships.append(rel.model_dump())

        ctx.info("Relationships discovered", count=len(relationships))
        return relationships

    except Exception as e:
        ctx.error("Failed to discover relationships", error=str(e))
        return []

validate()

Validate PostgreSQL connection configuration.

Source code in odibi/connections/postgres.py
def validate(self) -> None:
    """Validate PostgreSQL connection configuration."""
    ctx = get_logging_context()
    ctx.debug(
        "Validating PostgreSQL connection",
        server=self.host,
        database=self.database,
    )

    if not self.host:
        ctx.error("PostgreSQL validation failed: missing 'host'")
        raise ValueError(
            "PostgreSQL connection requires 'host'. "
            "Provide the server hostname (e.g., host: 'localhost')."
        )
    if not self.database:
        ctx.error(
            "PostgreSQL validation failed: missing 'database'",
            server=self.host,
        )
        raise ValueError(f"PostgreSQL connection requires 'database' for host '{self.host}'.")

    ctx.info(
        "PostgreSQL connection validated successfully",
        server=self.host,
        database=self.database,
    )

write_table(df, table_name, schema=None, if_exists='replace', index=False, chunksize=1000)

Write DataFrame to SQL table.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to write

required
table_name str

Name of the table

required
schema Optional[str]

Schema name (default: public)

None
if_exists str

How to behave if table exists ('fail', 'replace', 'append')

'replace'
index bool

Whether to write DataFrame index as column

False
chunksize Optional[int]

Number of rows to write in each batch

1000

Returns:

Type Description
int

Number of rows written

Source code in odibi/connections/postgres.py
def write_table(
    self,
    df: pd.DataFrame,
    table_name: str,
    schema: Optional[str] = None,
    if_exists: str = "replace",
    index: bool = False,
    chunksize: Optional[int] = 1000,
) -> int:
    """
    Write DataFrame to SQL table.

    Args:
        df: DataFrame to write
        table_name: Name of the table
        schema: Schema name (default: public)
        if_exists: How to behave if table exists ('fail', 'replace', 'append')
        index: Whether to write DataFrame index as column
        chunksize: Number of rows to write in each batch

    Returns:
        Number of rows written
    """
    schema = schema or self.default_schema
    ctx = get_logging_context()
    ctx.info(
        "Writing DataFrame to table",
        server=self.host,
        database=self.database,
        table_name=table_name,
        schema=schema,
        rows=len(df),
        if_exists=if_exists,
    )

    try:
        engine = self.get_engine()

        rows_written = df.to_sql(
            name=table_name,
            con=engine,
            schema=schema,
            if_exists=if_exists,
            index=index,
            chunksize=chunksize,
            method="multi",
        )

        result_rows = rows_written if rows_written is not None else len(df)
        ctx.info(
            "Table write completed successfully",
            server=self.host,
            database=self.database,
            table_name=table_name,
            rows_written=result_rows,
        )
        return result_rows
    except Exception as e:
        if isinstance(e, ConnectionError):
            raise
        ctx.error(
            "Table write failed",
            server=self.host,
            database=self.database,
            table_name=table_name,
            error=str(e),
        )
        raise ConnectionError(
            connection_name=f"PostgreSQL({self.host})",
            reason=f"Write operation failed: {self._sanitize_error(str(e))}",
            suggestions=self._get_error_suggestions(str(e)),
        )