Context API¶
The Context API provides access to DataFrames, engine-specific features, and execution state within transforms.
Overview¶
Every transform function receives a context parameter:
from odibi.registry import transform
@transform
def my_transform(context, current, param1: str):
# context: ExecutionContext with DataFrame access
# current: The input DataFrame for this node
return current
Context Classes¶
Odibi has engine-specific context implementations:
| Engine | Context Class | Key Features |
|---|---|---|
| Pandas | PandasContext |
In-memory DataFrames |
| Polars | PolarsContext |
Lazy/eager DataFrames |
| Spark | SparkContext |
Distributed DataFrames, SQL |
All contexts implement the base Context interface.
Core Methods¶
get(name)¶
Retrieve a DataFrame by node name:
@transform
def join_data(context, current):
# Get another node's output
customers = context.get("load_customers")
return current.join(customers, on="customer_id")
set(name, df)¶
Register a DataFrame for downstream nodes:
@transform
def split_data(context, current):
valid = current.filter("is_valid = true")
invalid = current.filter("is_valid = false")
# Register additional output
context.set("invalid_records", invalid)
return valid # Primary output
sql(query)¶
Execute SQL against registered DataFrames (Spark only):
@transform
def sql_transform(context, current):
# Register current DataFrame as a view
context.set("input_data", current)
# Execute SQL
result = context.sql("""
SELECT customer_id, SUM(amount) as total
FROM input_data
GROUP BY customer_id
""")
return result
Engine Context¶
Access engine-specific features via engine_context:
Spark¶
@transform
def spark_specific(context, current):
spark = context.engine_context.spark
# Use Spark session directly
df = spark.read.parquet("/path/to/data")
# Access catalog
spark.catalog.listTables()
return current
Pandas¶
@transform
def pandas_specific(context, current):
# current is already a pd.DataFrame
# No special engine context needed
return current.groupby("category").sum()
Polars¶
@transform
def polars_specific(context, current):
# current is a polars DataFrame
import polars as pl
return current.with_columns(
pl.col("amount").sum().over("category").alias("category_total")
)
EngineContext Class¶
The EngineContext provides engine metadata:
class EngineContext:
engine_type: str # "pandas", "polars", "spark"
spark: SparkSession # Only for Spark engine
@property
def is_spark(self) -> bool: ...
@property
def is_pandas(self) -> bool: ...
@property
def is_polars(self) -> bool: ...
Example: Engine-Agnostic Transform¶
Write transforms that work on all engines:
@transform
def engine_agnostic(context, current, threshold: float = 100):
engine = context.engine_context
if engine.is_spark:
from pyspark.sql import functions as F
return current.filter(F.col("amount") > threshold)
elif engine.is_polars:
import polars as pl
return current.filter(pl.col("amount") > threshold)
else: # pandas
return current[current["amount"] > threshold]
Available in Context¶
| Property | Description |
|---|---|
engine_context |
Engine-specific context with spark, engine_type |
config |
Current node configuration |
connections |
Connection registry |
state_manager |
Access to HWM and run state |
Related¶
- Writing Transformations — Transform authoring guide