Connections¶
Unified connection system for accessing local filesystems, cloud storage, databases, and HTTP endpoints with pluggable authentication.
Overview¶
Odibi's connection system provides:
- Multiple backends: Local filesystem, Azure ADLS, Azure SQL, PostgreSQL, HTTP APIs
- Flexible authentication: Service principals, managed identity, Key Vault, connection strings
- Environment variables: Secure secret injection via ${VAR} syntax
- Plugin architecture: Register custom connection types via factory pattern
Note: This page documents the runtime connection factory API (
odibi.connections.factory), which uses type names likeazure_adls,azure_sql, and flatauth_modefields. For YAML config-level connections validated by Pydantic (odibi.config), usetype: azure_blob,type: sql_server, and nestedauth:blocks. See Azure Setup Guide and Configuration Reference for Pydantic-valid YAML examples.
Built-in Connection Types¶
| Type | Description |
|---|---|
local |
Local filesystem or URI-based paths |
azure_adls |
Azure Data Lake Storage Gen2 |
azure_sql |
Azure SQL Database |
postgres |
PostgreSQL database |
http |
HTTP/REST API endpoints |
delta |
Delta Lake tables (path-based or catalog) |
Configuration¶
Basic Structure¶
connections:
bronze:
type: local
base_path: ./data/bronze
silver:
type: azure_adls
account_name: mystorageaccount
container: data
path_prefix: silver
auth:
key_vault_name: my-keyvault
secret_name: storage-key
Connection Config Options¶
| Field | Type | Required | Description |
|---|---|---|---|
type |
string | Yes | Connection type (see table above) |
auth |
object | No | Authentication configuration |
auth_mode |
string | No | Authentication mode (auto-detected if omitted) |
validation_mode |
string | No | eager or lazy validation (default: lazy) |
Local Connection¶
Simple filesystem connection for local development or mounted volumes.
connections:
raw_data:
type: local
base_path: ./data/raw
mounted_volume:
type: local
base_path: /mnt/storage/data
URI-Based Paths¶
Supports URI schemes like file:// or dbfs:/:
Config Options¶
| Field | Type | Default | Description |
|---|---|---|---|
base_path |
string | ./data |
Base directory for all paths |
Azure Data Lake Storage (ADLS) Connection¶
Azure Data Lake Storage Gen2 with multi-mode authentication.
connections:
datalake:
type: azure_adls
account_name: mystorageaccount
container: datalake
path_prefix: bronze
auth_mode: key_vault
auth:
key_vault_name: my-keyvault
secret_name: storage-account-key
Config Options¶
| Field | Type | Required | Description |
|---|---|---|---|
account_name |
string | Yes | Storage account name |
container |
string | Yes | Container/filesystem name |
path_prefix |
string | No | Optional prefix for all paths |
auth_mode |
string | No | Authentication mode (auto-detected) |
Authentication Modes¶
Key Vault (Recommended)¶
Retrieves storage account key from Azure Key Vault:
connections:
secure_storage:
type: azure_adls
account_name: mystorageaccount
container: data
auth_mode: key_vault
auth:
key_vault_name: my-keyvault
secret_name: storage-account-key
Service Principal¶
OAuth authentication with Azure AD service principal:
connections:
sp_storage:
type: azure_adls
account_name: mystorageaccount
container: data
auth_mode: service_principal
auth:
tenant_id: ${AZURE_TENANT_ID}
client_id: ${AZURE_CLIENT_ID}
client_secret: ${AZURE_CLIENT_SECRET}
Managed Identity¶
Use Azure Managed Identity (recommended for Azure-hosted workloads):
connections:
msi_storage:
type: azure_adls
account_name: mystorageaccount
container: data
auth_mode: managed_identity
SAS Token¶
Shared Access Signature for time-limited access:
connections:
sas_storage:
type: azure_adls
account_name: mystorageaccount
container: data
auth_mode: sas_token
auth:
sas_token: ${STORAGE_SAS_TOKEN}
Direct Key (Development Only)¶
⚠️ Not recommended for production
connections:
dev_storage:
type: azure_adls
account_name: mystorageaccount
container: data
auth_mode: direct_key
auth:
account_key: ${STORAGE_ACCOUNT_KEY}
Path Resolution¶
ADLS connections generate abfss:// URIs:
conn.get_path("folder/file.parquet")
# Returns: abfss://data@mystorageaccount.dfs.core.windows.net/bronze/folder/file.parquet
Azure SQL Connection¶
Azure SQL Database with SQL auth, Managed Identity, or Key Vault.
connections:
warehouse:
type: azure_sql
host: myserver.database.windows.net
database: analytics
auth_mode: aad_msi
Config Options¶
| Field | Type | Default | Description |
|---|---|---|---|
host / server |
string | Required | SQL Server hostname |
database |
string | Required | Database name |
driver |
string | ODBC Driver 18 for SQL Server |
ODBC driver |
port |
int | 1433 |
SQL Server port |
timeout |
int | 30 |
Connection timeout (seconds) |
auth_mode |
string | Auto | Auto-detected from provided fields. Explicit options: 'sql', 'aad_msi', 'key_vault' |
Authentication Modes¶
SQL Authentication¶
connections:
sql_auth:
type: azure_sql
host: myserver.database.windows.net
database: mydb
auth_mode: sql
auth:
username: ${SQL_USERNAME}
password: ${SQL_PASSWORD}
Managed Identity¶
connections:
msi_sql:
type: azure_sql
host: myserver.database.windows.net
database: mydb
auth_mode: aad_msi
Key Vault¶
connections:
keyvault_sql:
type: azure_sql
host: myserver.database.windows.net
database: mydb
auth_mode: key_vault
auth:
username: sqladmin
key_vault_name: my-keyvault
secret_name: sql-password
Usage¶
from odibi.connections.azure_sql import AzureSQL
conn = AzureSQL(
server="myserver.database.windows.net",
database="analytics",
auth_mode="aad_msi",
)
# Read data
df = conn.read_sql("SELECT * FROM customers WHERE region = 'US'")
# Read entire table
df = conn.read_table("orders", schema="dbo")
# Write data
conn.write_table(df, "processed_orders", if_exists="replace")
# Execute statements
conn.execute("DELETE FROM staging WHERE processed = 1")
PostgreSQL Connection¶
PostgreSQL database with standard username/password authentication and optional SSL.
connections:
pg_warehouse:
type: postgres
host: localhost
database: analytics
port: 5432
auth:
username: ${PG_USERNAME}
password: ${PG_PASSWORD}
Config Options¶
| Field | Type | Default | Description |
|---|---|---|---|
host / server |
string | Required | PostgreSQL hostname |
database |
string | Required | Database name |
port |
int | 5432 |
PostgreSQL port |
timeout |
int | 30 |
Connection timeout (seconds) |
sslmode |
string | prefer |
SSL mode (see below) |
SSL Modes¶
| Mode | Behavior |
|---|---|
disable |
No SSL |
allow |
Try non-SSL first, fall back to SSL |
prefer |
Try SSL first, fall back to non-SSL (default) |
require |
Must use SSL, skip certificate verification |
verify-ca |
Must use SSL + verify server CA certificate |
verify-full |
Must use SSL + verify CA + verify hostname matches |
For local development, prefer or disable is fine. For production or cloud-hosted PostgreSQL, use require or verify-full.
Authentication¶
PostgreSQL uses standard username/password authentication. Credentials can be provided inline or via the auth block:
Auth Block (Recommended)¶
connections:
pg_prod:
type: postgres
host: pg-prod.example.com
database: warehouse
sslmode: require
auth:
username: ${PG_USERNAME}
password: ${PG_PASSWORD}
Inline (Development Only)¶
connections:
pg_dev:
type: postgres
host: localhost
database: devdb
username: devuser
password: devpass
Reading from PostgreSQL¶
pipelines:
- pipeline: ingest_from_postgres
nodes:
- name: read_orders
read:
connection: pg_warehouse
format: postgres
path: public.orders
write:
connection: delta_lake
format: delta
path: bronze/orders
mode: overwrite
The path uses schema.table notation. If no schema is specified, public is used by default.
Writing to PostgreSQL¶
- name: write_summary
read:
connection: delta_lake
format: delta
path: gold/order_summary
write:
connection: pg_warehouse
format: postgres
path: public.order_summary
mode: append
Supported write modes: append, overwrite. The merge mode is not supported for PostgreSQL — it is only available for SQL Server (T-SQL MERGE syntax).
Spark JDBC Integration¶
When using the Spark engine, Odibi automatically builds JDBC options for PostgreSQL:
options = conn.get_spark_options()
# Returns: {"url": "jdbc:postgresql://host:5432/db", "driver": "org.postgresql.Driver", ...}
Note
Spark requires the PostgreSQL JDBC driver on the classpath. Add it via --packages org.postgresql:postgresql:42.7.3.
Installation¶
This installs psycopg2-binary and sqlalchemy.
HTTP Connection¶
Connect to REST APIs with various authentication methods.
Config Options¶
| Field | Type | Required | Description |
|---|---|---|---|
base_url |
string | Yes | Base URL for API |
headers |
object | No | Default request headers |
auth |
object | No | Authentication configuration |
Authentication Methods¶
Bearer Token¶
connections:
bearer_api:
type: http
base_url: https://api.example.com/
auth:
token: ${API_BEARER_TOKEN}
Basic Auth¶
connections:
basic_api:
type: http
base_url: https://api.example.com/
auth:
username: ${API_USER}
password: ${API_PASSWORD}
API Key¶
connections:
apikey_api:
type: http
base_url: https://api.example.com/
auth:
api_key: ${API_KEY}
header_name: X-API-Key # Optional, defaults to X-API-Key
Custom Headers¶
connections:
custom_api:
type: http
base_url: https://api.example.com/
headers:
Content-Type: application/json
X-Custom-Header: custom-value
auth:
token: ${API_TOKEN}
Delta Connection¶
Delta Lake tables via path or Unity Catalog.
Path-Based Delta¶
Catalog-Based Delta (Spark)¶
Environment Variables¶
Use ${VAR} syntax to inject secrets from environment variables:
connections:
secure:
type: azure_adls
account_name: ${STORAGE_ACCOUNT}
container: data
auth:
client_id: ${AZURE_CLIENT_ID}
client_secret: ${AZURE_CLIENT_SECRET}
tenant_id: ${AZURE_TENANT_ID}
Environment variables are resolved at runtime, keeping secrets out of configuration files.
Connection Factory¶
Odibi uses a plugin system for connection types. Built-in types are registered automatically.
Registering Custom Connections¶
from odibi.plugins import register_connection_factory
from odibi.connections.base import BaseConnection
class MyCustomConnection(BaseConnection):
def __init__(self, endpoint: str, api_key: str):
self.endpoint = endpoint
self.api_key = api_key
def get_path(self, relative_path: str) -> str:
return f"{self.endpoint}/{relative_path}"
def validate(self) -> None:
if not self.endpoint:
raise ValueError("Endpoint is required")
def create_custom_connection(name: str, config: dict):
return MyCustomConnection(
endpoint=config["endpoint"],
api_key=config.get("api_key", ""),
)
# Register the factory
register_connection_factory("my_custom", create_custom_connection)
Then use in YAML:
connections:
custom:
type: my_custom
endpoint: https://custom-service.example.com
api_key: ${CUSTOM_API_KEY}
Built-in Factory Registration¶
Built-in connections are registered via register_builtins():
| Factory Name | Connection Class |
|---|---|
local |
LocalConnection |
http |
HttpConnection |
azure_blob |
AzureADLS |
azure_adls |
AzureADLS |
delta |
LocalConnection or DeltaCatalogConnection |
sql_server |
AzureSQL |
azure_sql |
AzureSQL |
postgres |
PostgreSQLConnection |
postgresql |
PostgreSQLConnection |
Complete Examples¶
Multi-Environment Setup¶
project: DataPipeline
engine: spark
connections:
# Local development
local_bronze:
type: local
base_path: ./data/bronze
local_silver:
type: local
base_path: ./data/silver
# Azure production
azure_bronze:
type: azure_adls
account_name: ${STORAGE_ACCOUNT}
container: datalake
path_prefix: bronze
auth_mode: managed_identity
azure_silver:
type: azure_adls
account_name: ${STORAGE_ACCOUNT}
container: datalake
path_prefix: silver
auth_mode: managed_identity
# SQL database
warehouse:
type: azure_sql
host: ${SQL_SERVER}
database: analytics
auth_mode: aad_msi
# PostgreSQL database
postgres_db:
type: postgres
host: ${PG_HOST}
database: analytics
auth:
username: ${PG_USERNAME}
password: ${PG_PASSWORD}
# External API
weather_api:
type: http
base_url: https://api.weather.com/v1/
auth:
api_key: ${WEATHER_API_KEY}
pipelines:
- pipeline: ingest_orders
nodes:
- name: read_orders
source:
connection: azure_bronze
path: orders/
# ...
Service Principal Authentication¶
connections:
adls_sp:
type: azure_adls
account_name: mystorageaccount
container: data
path_prefix: ingestion
auth_mode: service_principal
auth:
tenant_id: ${AZURE_TENANT_ID}
client_id: ${AZURE_CLIENT_ID}
client_secret: ${AZURE_CLIENT_SECRET}
sql_sp:
type: azure_sql
host: myserver.database.windows.net
database: warehouse
auth_mode: sql
auth:
username: ${SQL_USER}
password: ${SQL_PASSWORD}
Key Vault Integration¶
connections:
secure_storage:
type: azure_adls
account_name: mystorageaccount
container: sensitive-data
auth_mode: key_vault
auth:
key_vault_name: my-keyvault
secret_name: storage-account-key
secure_sql:
type: azure_sql
host: myserver.database.windows.net
database: secure_db
auth_mode: key_vault
auth:
username: sqladmin
key_vault_name: my-keyvault
secret_name: sql-admin-password
Best Practices¶
- Use Managed Identity - Preferred for Azure-hosted workloads (no secrets to manage)
- Use Key Vault - Store secrets in Key Vault, not config files
- Environment variables - Use
${VAR}for any sensitive values - Lazy validation - Default
validation_mode: lazydefers validation until first use - Separate connections - Use different connections for different security zones
- Register secrets - Secrets are automatically registered for log redaction
- Use SSL for remote PostgreSQL - Set
sslmode: requireorverify-fullfor non-localhost connections
Troubleshooting¶
"Connection not found" error¶
Symptom: ConnectionError: Connection 'my_conn' not found
Causes: - Typo in connection name (check spelling, case-sensitive) - Connection defined in wrong environment block - YAML indentation error
Fix:
Azure authentication failures¶
Symptom: AuthenticationError: DefaultAzureCredential failed
Causes: - Service principal credentials incorrect or expired - Managed Identity not enabled on compute - Missing RBAC permissions on storage account
Fixes:
# Check if Azure CLI is authenticated
az account show
# For Service Principal, verify credentials
az login --service-principal -u $CLIENT_ID -p $CLIENT_SECRET --tenant $TENANT_ID
# For Managed Identity, ensure it's enabled and has Storage Blob Data Contributor role
"Path not found" on Azure ADLS¶
Symptom: File reads fail with path errors
Causes: - Container name missing or incorrect - Path prefix doesn't match actual structure - SAS token doesn't have read permissions
Fix: Verify the full path:
connections:
adls_data:
type: azure_adls
account_name: mystorageaccount
container: data # Container name
path_prefix: bronze # Prefix within container
The actual path read will be: abfss://data@mystorageaccount.dfs.core.windows.net/bronze/<your_path>
Environment variable not substituted¶
Symptom: Literal ${VAR} appears in logs or errors
Causes: - Environment variable not set - Variable name typo - Running in wrong shell/environment
Fix:
# Check if variable is set
echo $MY_SECRET
# Use odibi secrets to validate
odibi secrets validate config.yaml
Related¶
- Data Exploration — Preview, profile, and explore data sources
- YAML Schema Reference
- Pipeline Configuration
- Secrets Management