Async Patterns
Yoda's Python bindings are built on pyo3-async-runtimes with a multi-threaded Tokio runtime that is created once at module import time. Every HtapEngine method returns a Python awaitable, so any async framework that drives an event loop can consume the API.
anyio backend constraint
The test suite declares anyio_backend = "asyncio" because pyo3-async-runtimes requires the asyncio event loop. anyio.run() defaults to asyncio, so in practice both asyncio.run() and anyio.run() work. The trio backend is not supported.
asyncio
Use asyncio.run() for a top-level entry point:
import asyncio
import yoda
async def main():
async with await yoda.open(oltp_path="app.db") as engine:
await engine.execute(
"CREATE TABLE events (id INTEGER PRIMARY KEY, msg TEXT)"
)
await engine.register_table(
yoda.TableSchema("events", [("id", "int64"), ("msg", "utf8")], ["id"])
)
await engine.execute("INSERT INTO events VALUES (1, 'hello')")
result = await engine.sync_now()
print(result)
batches = await engine.query("SELECT COUNT(*) as n FROM events")
print(batches[0].to_pydict())
asyncio.run(main())Inside an existing event loop (e.g. a Jupyter notebook or a framework that manages the loop for you), just await directly:
engine = await yoda.HtapEngine.create()
batches = await engine.query("SELECT 1 AS x")anyio
anyio.run() uses asyncio by default, which is compatible with the bindings:
import anyio
import yoda
async def main():
async with await yoda.open(oltp_path="app.db") as engine:
await engine.execute(
"CREATE TABLE metrics (id INTEGER PRIMARY KEY, value REAL)"
)
await engine.register_table(
yoda.TableSchema(
"metrics",
[("id", "int64"), ("value", "float64")],
["id"],
)
)
await engine.execute("INSERT INTO metrics VALUES (1, 3.14)")
await engine.sync_now()
batches = await engine.query("SELECT AVG(value) AS avg FROM metrics")
print(batches[0].to_pydict())
anyio.run(main)anyio task groups
Use an anyio.create_task_group() to fan out concurrent work:
import anyio
import yoda
async def write_batch(engine, start: int, n: int):
stmts = [f"INSERT INTO t VALUES ({start + i})" for i in range(n)]
await engine.execute_batch(stmts)
async def main():
async with await yoda.open(oltp_path="fanout.db") as engine:
await engine.execute("CREATE TABLE t (id INTEGER PRIMARY KEY)")
await engine.register_table(
yoda.TableSchema("t", [("id", "int64")], ["id"])
)
# Write three batches concurrently from different coroutines
async with anyio.create_task_group() as tg:
tg.start_soon(write_batch, engine, 0, 100)
tg.start_soon(write_batch, engine, 100, 100)
tg.start_soon(write_batch, engine, 200, 100)
result = await engine.sync_now()
print(result) # rows_inserted ≥ 300
anyio.run(main)Concurrent queries
Multiple query() or query_olap() coroutines can run concurrently. On the OLAP side (DataFusion), each query gets its own execution context. On the OLTP side, the engine maintains a round-robin read pool (default: 4 connections) so concurrent point queries do not block each other.
asyncio.gather
import asyncio
import yoda
async def main():
async with await yoda.open(oltp_path="app.db") as engine:
# ... setup omitted ...
# Fire three OLAP queries at the same time
results = await asyncio.gather(
engine.query("SELECT COUNT(*) FROM orders"),
engine.query("SELECT SUM(amount) FROM orders"),
engine.query("SELECT AVG(amount) FROM orders"),
)
count_batch, sum_batch, avg_batch = results
print(count_batch[0].to_pydict())anyio task group (equivalent)
import anyio
import yoda
import pyarrow as pa
async def main():
async with await yoda.open(oltp_path="app.db") as engine:
# ... setup omitted ...
outputs = {}
async def run_query(key: str, sql: str):
outputs[key] = await engine.query(sql)
async with anyio.create_task_group() as tg:
tg.start_soon(run_query, "count", "SELECT COUNT(*) FROM orders")
tg.start_soon(run_query, "total", "SELECT SUM(amount) FROM orders")
print(outputs["count"][0].to_pydict())
anyio.run(main)Read-pool concurrency
Each concurrent OLTP read (simple SELECT without aggregation) is served by a different connection from the round-robin pool. Increase read_pool_size in HtapConfig if you have more concurrent readers than the default of 4.
Long-running service with a background sync loop
For production services, start the engine once and run a background sync loop that periodically calls sync_now(). Use asyncio.create_task (or anyio's equivalent) to keep the loop running alongside request handling.
import asyncio
import signal
import yoda
async def sync_loop(engine: yoda.HtapEngine, interval: float = 0.5):
"""Sync CDC events to OLAP every `interval` seconds."""
while True:
try:
result = await engine.sync_now()
if result.events_processed:
print(f"[sync] {result}")
except Exception as e:
print(f"[sync] error: {e}")
await asyncio.sleep(interval)
async def main():
engine = await yoda.HtapEngine.create(
yoda.HtapConfig(oltp_path="service.db")
)
# Start background sync
sync_task = asyncio.create_task(sync_loop(engine))
# Graceful shutdown on SIGINT / SIGTERM
loop = asyncio.get_running_loop()
stop = loop.create_future()
loop.add_signal_handler(signal.SIGINT, stop.set_result, None)
loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
await stop # wait for signal
sync_task.cancel()
try:
await sync_task
except asyncio.CancelledError:
pass
engine.shutdown()
print("Engine shut down cleanly.")
asyncio.run(main())Always call shutdown
Call engine.shutdown() — a synchronous method, no await — or exit the async with block before your process ends. Skipping shutdown is safe for in-memory DataFusion, but with file-backed storage ("arrow_ipc" / "parquet") or DuckDB, unflushed buffers may be left incomplete.
Context manager
HtapEngine supports async with when obtained via yoda.open() or HtapEngine.create():
async with await yoda.open(oltp_path="app.db") as engine:
await engine.execute("INSERT INTO t VALUES (1)")
await engine.sync_now()
# engine.shutdown() called automatically hereHtapEngine(config) (the sync constructor) does not participate in async with directly when called this way, but the Python wrapper class still implements __aenter__ / __aexit__, so you can also write:
engine = yoda.HtapEngine(config)
async with engine:
await engine.execute("INSERT INTO t VALUES (1)")If you need more control (e.g. store the engine as an instance variable), use the explicit pattern:
class MyService:
async def start(self):
self.engine = await yoda.HtapEngine.create(yoda.HtapConfig(oltp_path="svc.db"))
async def stop(self):
self.engine.shutdown()Web handler sketch
The following illustrates a minimal async handler that returns query results as JSON. No actual FastAPI installation is required — adapt to whichever async framework you use.
import json
import yoda
import pyarrow as pa
# Module-level engine — created once at startup
_engine: yoda.HtapEngine | None = None
async def startup():
global _engine
_engine = await yoda.HtapEngine.create(
yoda.HtapConfig(oltp_path="app.db")
)
async def shutdown():
if _engine:
_engine.shutdown()
async def handle_analytics(region: str) -> str:
"""Return JSON: total sales per product in a region."""
assert _engine is not None
batches = await _engine.query(
f"SELECT product, SUM(amount) AS total "
f"FROM sales WHERE region = '{region}' GROUP BY product"
)
table = pa.Table.from_batches(batches)
return json.dumps(table.to_pydict())SQL injection
The example above inlines region directly into SQL for brevity. In real code, use parameterised queries via the params argument of execute(), or validate/escape the input.
Common pitfalls
Do not call engine methods from a synchronous context after the event loop is running.HtapEngine methods return awaitables; calling them without await does nothing and the coroutine is immediately garbage-collected.
# Wrong — coroutine is created but never executed
engine.execute("INSERT ...")
# Correct
await engine.execute("INSERT ...")Do not share an HtapEngine across processes. The engine holds open file handles and a Tokio runtime. Fork-safety is not guaranteed. Create a new engine in each child process.
Do not skip engine.shutdown() (or the async with exit) in long-running processes. shutdown() is a synchronous call (no await); the sync loop background thread keeps running until shutdown is invoked. In short scripts that exit normally this is fine; in servers it causes resource leaks.
The sync constructor blocks the event loop. HtapEngine(config) runs SQLite PRAGMA setup, WAL enable, trigger installation, and OLAP engine init synchronously. Prefer await yoda.open(...) or await HtapEngine.create(config) from async code to avoid blocking other tasks.
sync_now() is not called automatically. CDC events accumulate in the OLTP log until you call sync_now(). Either call it explicitly after writes, or run a background loop as shown above.