Python API Guide
The yoda package exposes Yoda's HTAP engine to Python via PyO3 bindings with a fully async API compatible with both asyncio and anyio.
Installation
From PyPI
pip install yoda
# or with uv
uv add yodaFrom source
Build and install an editable (development) wheel using maturin:
git clone https://github.com/ValerioL29/Yoda.git
cd Yoda
uv run maturin develop # debug build
uv run maturin develop --release # optimised buildPrerequisites
Building from source requires Rust (stable toolchain), a C compiler, and uv. The DataFusion backend is enabled by default. The DuckDB backend requires the duckdb-backend Cargo feature and a C++ compiler.
Quick-start example
import anyio
import pyarrow as pa
import yoda
async def main():
# Open engine via the async factory (recommended)
async with await yoda.open(oltp_path="app.db") as engine:
# 1. Create the table in OLTP (SQLite)
await engine.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)"
)
# 2. Register the table for CDC replication to OLAP
schema = yoda.TableSchema(
"users",
[("id", "int64"), ("name", "utf8"), ("age", "int32")],
["id"],
)
await engine.register_table(schema)
# 3. Write rows (routed to OLTP / SQLite)
await engine.execute("INSERT INTO users VALUES (1, 'Alice', 30)")
await engine.execute("INSERT INTO users VALUES (2, 'Bob', 25)")
# 4. Sync CDC events to OLAP
result = await engine.sync_now()
print(result) # SyncResult(events=2, inserted=2, updated=0, deleted=0, pruned=None)
# 5. Run an analytical query (routed to OLAP / DataFusion)
batches = await engine.query("SELECT AVG(age) as avg_age FROM users")
# 6. Convert to pandas via PyArrow
table = pa.Table.from_batches(batches)
df = table.to_pandas()
print(df)
anyio.run(main)Engine shutdown
When using async with await yoda.open(...) as engine:, shutdown() is called automatically on context-manager exit. If you instantiate HtapEngine directly, call engine.shutdown() yourself when done.
yoda.open()
The recommended entry point for async code. Creates and returns an HtapEngine without blocking the event loop.
async def open(
oltp_path: str = "yoda.db",
*,
sync_mode: str = "destructive",
olap_backend: str = "datafusion",
storage_mode: str = "inmemory",
storage_path: str | None = None,
prune_after_sync: bool = True,
sync_batch_size: int = 1000,
read_pool_size: int = 4,
sidecar_source: str | None = None,
sidecar_tables: list[TimestampTableConfig] | None = None,
sidecar_poll_batch_size: int = 500,
sidecar_delete_detection: str = "disabled",
sidecar_enable_oltp: bool = False,
) -> HtapEngineAll parameters are the same as HtapConfig (documented below). Returns an HtapEngine that supports async with.
# Pattern: open, use, auto-close
async with await yoda.open(oltp_path="my.db") as engine:
...HtapConfig
Configuration value object. Construct once and pass to HtapEngine().
class HtapConfig:
def __init__(
self,
oltp_path: str = "yoda.db",
prune_after_sync: bool = True,
sync_batch_size: int = 1000,
read_pool_size: int = 4,
sync_mode: str = "destructive",
olap_backend: str = "datafusion",
storage_mode: str = "inmemory",
storage_path: str | None = None,
sidecar_source: str | None = None,
sidecar_tables: list[TimestampTableConfig] | None = None,
sidecar_poll_batch_size: int = 500,
sidecar_delete_detection: str = "disabled",
sidecar_enable_oltp: bool = False,
) -> NoneParameters
| Parameter | Type | Default | Description |
|---|---|---|---|
oltp_path | str | "yoda.db" | Path to the SQLite database file used for OLTP writes. |
prune_after_sync | bool | True | Delete CDC log entries after each successful sync cycle. |
sync_batch_size | int | 1000 | Maximum CDC events processed per sync_now() call. |
read_pool_size | int | 4 | Number of concurrent OLTP read connections (round-robin pool). |
sync_mode | str | "destructive" | "destructive" — mirror semantics (UPDATE overwrites, DELETE removes). "temporal" — SCD Type 2 append-only history. |
olap_backend | str | "datafusion" | "datafusion" (default, always available) or "duckdb" (requires duckdb-backend feature). |
storage_mode | str | "inmemory" | DataFusion storage: "inmemory", "arrow_ipc", or "parquet". "arrow_ipc" and "parquet" require storage_path. |
storage_path | str | None | None | Directory for file-backed DataFusion storage (Arrow IPC or Parquet). Also used as the DuckDB file path when olap_backend="duckdb". |
sidecar_source | str | None | None | Connection string / path for the external DB to follow in sidecar mode. Setting this activates sidecar CDC. |
sidecar_tables | list[TimestampTableConfig] | None | None | Per-table sidecar configuration. Required when sidecar_source is set. |
sidecar_poll_batch_size | int | 500 | Rows fetched from the external DB per poll cycle. |
sidecar_delete_detection | str | "disabled" | "disabled" or "soft_delete:<column>", e.g. "soft_delete:deleted_at". |
sidecar_enable_oltp | bool | False | Also create a local OLTP (SQLite) alongside the sidecar source. Rarely needed. |
Eager validation
HtapConfig.__init__ validates sync_mode, olap_backend, storage_mode, and sidecar_delete_detection immediately and raises ValueError if an unknown value is passed. No silent defaults.
HtapEngine
The main engine class. All I/O methods are async and must be awaited.
class HtapEngine:
def __init__(self, config: HtapConfig | None = None) -> None
@classmethod
async def create(cls, config: HtapConfig | None = None) -> HtapEngine
async def __aenter__(self) -> HtapEngine
async def __aexit__(self, *exc: object) -> NoneConstructor vs. factory
| Form | Blocks event loop? | Recommended? |
|---|---|---|
HtapEngine(config) | Yes (blocking init) | Only in sync startup code or tests |
await HtapEngine.create(config) | No (uses anyio.to_thread.run_sync) | Yes, from async code |
await yoda.open(...) | No | Yes, preferred one-liner |
Write operations
execute
async def execute(self, sql: str, params: list | None = None) -> intExecute a single DML or DDL statement. Returns the number of rows affected.
SQL is routed by SqlParserRouter: writes (INSERT / UPDATE / DELETE) and DDL go to OLTP (SQLite). Analytical queries with aggregates, GROUP BY, JOINs, or window functions go to OLAP.
params are typed: bool, int, float, None, and str are preserved. Wide Python ints outside the signed 64-bit range are serialised as strings.
rows = await engine.execute(
"INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
[1, "Alice", 30],
)Error handling
All engine methods raise RuntimeError on Rust-side failures (e.g. constraint violations, SQL parse errors). Wrap calls in try/except RuntimeError where needed.
execute_batch
async def execute_batch(self, statements: list[str]) -> NoneExecute a list of SQL strings in a single OLTP transaction. Avoids per-statement fsync overhead and is substantially faster for bulk loads (~18 K ops/sec vs ~400 ops/sec for individual execute() calls with CDC triggers active).
await engine.execute_batch([
"INSERT INTO users VALUES (1, 'Alice', 30)",
"INSERT INTO users VALUES (2, 'Bob', 25)",
"INSERT INTO users VALUES (3, 'Charlie', 35)",
])Query operations
query
async def query(self, sql: str) -> list[pyarrow.RecordBatch]Execute a SQL query. The router automatically sends analytical queries (aggregates, GROUP BY, window functions, CTEs, JOINs, set operations, subqueries) to OLAP, and simple point queries (SELECT … WHERE pk =) to OLTP.
Returns a list[pyarrow.RecordBatch] via the Arrow PyArrow FFI bridge. The conversion is zero-copy on the Arrow side.
No bound parameters
query does not accept a params argument — only execute does. To filter analytical queries by user input, build the SQL with explicit literal escaping or, preferably, run the filter through execute (which routes to OLTP and supports bound parameters) before aggregating. Treat untrusted input that must reach query as unsafe and validate it at the boundary.
batches = await engine.query("SELECT AVG(age) as avg_age FROM users")
table = pa.Table.from_batches(batches)query_olap
async def query_olap(self, sql: str) -> list[pyarrow.RecordBatch]Force a query directly to the OLAP backend, bypassing the router. Useful for analytical queries that the router might mis-classify, or when you know the data is only in OLAP.
Sync operations
sync_now
async def sync_now(self) -> SyncResultDrain the pending CDC event log and apply all events to OLAP. In sidecar mode this polls the external database. Returns a SyncResult describing what happened.
sync_status
async def sync_status(self) -> SyncStatusReturn the current sync engine status without triggering a sync cycle.
initial_sync
async def initial_sync(self, table_name: str) -> intBulk-load all existing OLTP rows for a table into OLAP. Use this after calling register_table() on a table that already has data. Returns the number of rows loaded.
# Register a pre-populated table
await engine.register_table(schema)
rows = await engine.initial_sync("orders")
print(f"Loaded {rows} existing rows")Schema operations
register_table
async def register_table(self, schema: TableSchema) -> NoneRegister a table for CDC-based replication. Installs triggers on the OLTP side and creates the corresponding table in OLAP. Must be called before any CDC events for the table are consumed.
Raises RuntimeError if the table is already registered.
add_column
async def add_column(self, table_name: str, column_name: str, data_type: str) -> NoneAdd a column to a registered table. Propagates the schema change to the OLAP table and rebuilds the CDC triggers. data_type uses the same type strings as TableSchema (see below).
await engine.execute("ALTER TABLE users ADD COLUMN email TEXT")
await engine.add_column("users", "email", "utf8")drop_column
async def drop_column(self, table_name: str, column_name: str) -> NoneDrop a column from a registered table. Tears down CDC triggers (SQLite rejects DROP COLUMN while triggers reference the column), alters the OLTP schema, and updates OLAP.
Raises RuntimeError if you attempt to drop a primary key column.
Lifecycle
shutdown
def shutdown(self) -> NoneRelease engine resources. Currently a no-op — resources are freed when the Python object is garbage-collected. Safe to call explicitly in cleanup code. Called automatically by __aexit__ when using async with.
TableSchema
Describes the schema of a table to be registered for HTAP replication.
class TableSchema:
def __init__(
self,
name: str,
columns: list[tuple[str, str]],
pk: list[str],
) -> None| Parameter | Type | Description |
|---|---|---|
name | str | Table name (must match the actual SQL table name). |
columns | list[tuple[str, str]] | Ordered list of (column_name, type_string) pairs. |
pk | list[str] | List of primary key column names. PK columns are non-nullable. |
Supported type strings
| String(s) | Arrow type |
|---|---|
"int8" | Int8 |
"int16" | Int16 |
"int32", "int" | Int32 |
"int64", "bigint" | Int64 |
"uint8" | UInt8 |
"uint16" | UInt16 |
"uint32" | UInt32 |
"uint64" | UInt64 |
"float16", "half" | Float16 |
"float32", "float" | Float32 |
"float64", "double" | Float64 |
"utf8", "string", "text" | Utf8 |
"bool", "boolean" | Boolean |
"binary", "bytes" | Binary |
Type strings are case-insensitive. An unknown type raises ValueError immediately at construction time.
Date / Timestamp / Decimal / List / Struct
These Arrow types are not available in TableSchema. Columns with those SQLite affinities fall back to the SQL path inside the sync engine and are not accelerated by the Arrow bulk-INSERT path. Use "int64" for Unix epoch timestamps.
schema = yoda.TableSchema(
"orders",
[
("order_id", "int64"),
("customer_id", "int64"),
("total", "float64"),
("status", "utf8"),
],
["order_id"],
)SyncResult
Returned by engine.sync_now(). Immutable (frozen).
class SyncResult:
events_processed: int # CDC events consumed this cycle
rows_inserted: int # rows INSERTed into OLAP
rows_updated: int # rows UPDATEd in OLAP
rows_deleted: int # rows DELETEd from OLAP
pruned_count: int | None # CDC log entries pruned (None if pruning disabled)result = await engine.sync_now()
print(f"Processed {result.events_processed} events: "
f"+{result.rows_inserted} ~{result.rows_updated} -{result.rows_deleted}")SyncStatus
Returned by engine.sync_status(). Immutable (frozen).
class SyncStatus:
lag: int # number of unsynced CDC events
last_synced_seq: int | None # sequence number of last synced event (None before first sync)A lag of 0 means OLAP is fully up-to-date. last_synced_seq is None before the first successful sync cycle.
TimestampTableConfig
Per-table configuration for sidecar mode. Only used when sidecar_source is set in HtapConfig.
class TimestampTableConfig:
def __init__(
self,
table_name: str,
primary_key: list[str],
created_at_column: str = "created_at",
updated_at_column: str = "updated_at",
columns: list[str] = [],
) -> None| Parameter | Type | Default | Description |
|---|---|---|---|
table_name | str | — | Table name in the external source database. |
primary_key | list[str] | — | Primary key columns (supports composite PKs). |
created_at_column | str | "created_at" | Column used to detect new inserts (compared to updated_at). |
updated_at_column | str | "updated_at" | Watermark column: polls rows where updated_at > last_watermark. |
columns | list[str] | [] | Columns to SELECT (empty list means SELECT *). |
See Sidecar Mode → INSERT vs UPDATE Heuristic for how rows from the source DB are classified as inserts vs updates.
config = yoda.HtapConfig(
oltp_path="/tmp/local.db",
sidecar_source="/data/production.db",
sidecar_tables=[
yoda.TimestampTableConfig(
table_name="orders",
primary_key=["order_id"],
columns=["order_id", "customer_id", "total", "created_at", "updated_at"],
),
],
sidecar_enable_oltp=False,
)For more details on sidecar mode, see Sidecar Mode.
End-to-end example with Pandas
import anyio
import pyarrow as pa
import yoda
async def analytics_report():
async with await yoda.open(oltp_path="sales.db") as engine:
# Setup
await engine.execute(
"CREATE TABLE sales "
"(id INTEGER PRIMARY KEY, product TEXT, amount REAL, region TEXT)"
)
await engine.register_table(
yoda.TableSchema(
"sales",
[
("id", "int64"),
("product", "utf8"),
("amount", "float64"),
("region", "utf8"),
],
["id"],
)
)
# Insert rows in one transaction
await engine.execute_batch([
"INSERT INTO sales VALUES (1, 'Widget', 29.99, 'EMEA')",
"INSERT INTO sales VALUES (2, 'Gadget', 49.99, 'APAC')",
"INSERT INTO sales VALUES (3, 'Widget', 29.99, 'APAC')",
"INSERT INTO sales VALUES (4, 'Gadget', 49.99, 'EMEA')",
])
# Sync to OLAP
result = await engine.sync_now()
print(result)
# Analytical query -> list[pyarrow.RecordBatch]
batches = await engine.query(
"SELECT region, SUM(amount) AS total "
"FROM sales GROUP BY region ORDER BY total DESC"
)
# Convert to pandas
df = pa.Table.from_batches(batches).to_pandas()
print(df)
# region total
# EMEA 79.98
# APAC 79.98
anyio.run(analytics_report)Error handling
All engine methods raise RuntimeError for Rust-side errors. HtapConfig and TableSchema constructors raise ValueError for bad input (unknown type strings, invalid parameter values).
try:
await engine.execute("INSERT INTO users VALUES (1, 'Alice', 30)")
except RuntimeError as e:
print(f"Engine error: {e}")
try:
schema = yoda.TableSchema("t", [("id", "nonexistent_type")], ["id"])
except ValueError as e:
print(f"Config error: {e}") # ValueError: unknown type: nonexistent_type