AsyncPool¶
- class aerospike_sdk.aio.pool.AsyncPool[source]¶
Bases:
objectPool of event loops + paired clients for parallel async work.
Each loop runs on a dedicated OS thread with its own
Client(and therefore its own PACCompletionBridge). Submitted coroutines are dispatched round-robin (or by explicit index) across loops.Free-threading required for throughput gains. On a GIL-built interpreter (stock CPython ≤ 3.12) an
AsyncPoolis correct — N loops still serialize on the GIL for Python work, so TPS does not scale withloop_count. The throughput benefit only materializes under a free-threaded build (3.13t / 3.14t).Shared IndexesMonitor. Index metadata is cluster-scoped, so the pool runs one shared
IndexesMonitor(anchored to loop 0, issuing info commands throughclients[0]) instead of one per client. The factory’s per-client monitor is replaced beforeconnect(), so cluster-sidesindex-listload is independent ofloop_count. Tune via theindex_refresh_intervalkwarg onAsyncPoolitself.Per-Client Tokio runtime. When
loop_count >= 4, AsyncPool automatically configures each Client to use its own dedicated PAC Tokio runtime instead of the shared global one. This eliminates the cross-loop scheduler contention that previously caused throughput to collapse beyond 4 loops. Controlled via theper_client_runtimekwarg; see its docstring for the threshold rationale and override.Tuning notes (8-core remote-cluster measurement, FT 3.14t):
Tasks-per-loop floor. Below ~16–32 concurrent asyncio tasks per loop, per-call dispatch overhead (
run_coroutine_threadsafe+asyncio.wrap_future) dominates the savings from parallelism — a 4-loop pool with 8 tasks/loop measured slower than a 1-loop client with 32 tasks total. Keep tasks-per-loop in the same regime that saturates a single client.Throughput scales monotonically with loops under per-Client runtime (4×64 = 167K, 8×64 = 178K, 12×64 = 180K TPS measured). TPS ceiling on 8-core hardware is ~180K, capped by Python interpreter self-time across loops.
Tail latency degrades with loops. 4×64 has p99 = 4.3 ms; 12×64 has p99 = 15.5 ms. Latency-sensitive workloads should pick
loop_countbased on the p99 budget; throughput-only workloads can push higher.Sweet spot is hardware-dependent. With colocated client+server the sweet spot shifts down because they share CPU; with more cores the ceiling shifts up. Always validate against your target deployment.
Example:
pool = AsyncPool( client_factory=lambda: Client("127.0.0.1:3000"), loop_count=4, ) async with pool: result = await pool.run( lambda client: client.create_session().get(key) )
See also
Client: Single-loop async API.- __init__(client_factory, loop_count=None, *, index_refresh_interval=5.0, per_client_runtime=None)[source]¶
Configure the pool. Call
start()or useasync with.- Parameters:
client_factory (
Callable[[],Client]) –Zero-argument callable returning an unconnected
Client. Calledloop_counttimes — once per pool thread. Each client connects on its own loop, binding its PACCompletionBridgeto that loop.Shared-policy invariant: when
per_client_runtimeis enabled (auto atloop_count >= 4on free-threaded Python), the factory MUST return Clients sharing a singleClientPolicyPyO3 object — typically via a closure that captures one policy:policy = ClientPolicy(); factory = lambda: Client(seeds, policy=policy). AsyncPool applies a one-shot mutation to that shared policy before any loop thread starts; constructing a freshClientPolicyper call would land the mutation on client 0 only and silently disable the per-Client runtime for the rest. Violations raiseRuntimeErrorfromstart().Connection-pool sizing: with N clients, total connections per server node = N ×
max_conns_per_node. To keep the aggregate budget constant, setClientPolicy.max_conns_per_nodein the factory todefault / loop_count.loop_count (
Optional[int]) – Number of event loops / OS threads. Defaults toos.cpu_count()(or4if indeterminate).index_refresh_interval (
float) – Seconds between secondary-index cache refreshes for the pool’s single sharedIndexesMonitor(default 5.0). Index metadata is cluster-scoped, so one monitor serves all pool clients — the per-Client monitor eachclient_factory()would create is replaced before connect, eliminating N×polling load.per_client_runtime (
Optional[bool]) –Whether each pool Client should run on its own dedicated PAC Tokio runtime (per-loop runtime isolation, eliminates cross-loop scheduler contention).
None(default): auto-enable whenloop_count >= 4. Below 4 loops the shared global runtime wins on the per-loop worker budget; at 4+ loops per-Client runtimes scale monotonically (measured: AsyncPool 8×64 lifts from ~59K TPS collapsed to ~184K with per-Client runtimes).True: always enable. Worker count auto-sized tomax(2, os.cpu_count() // loop_count).False: never enable; use the shared global runtime regardless ofloop_count.
Example:
from aerospike_async import ClientPolicy from aerospike_sdk import Client N = 4 def make_client() -> Client: policy = ClientPolicy() policy.max_conns_per_node = 300 // N return Client("127.0.0.1:3000", policy=policy) pool = AsyncPool(client_factory=make_client, loop_count=N)
- async start()[source]¶
Spin up pool threads, event loops, and connect all clients.
Each thread starts an
asyncioevent loop, then the pool creates and connects oneClientper loop (viarun_coroutine_threadsafe). BecauseClient.connect()callsawait new_client(…)on the pool loop, the PACCompletionBridgeis naturally bound to the correct loop.- Raises:
RuntimeError – If already started or closed.
- Return type:
- async aclose()[source]¶
Ordered shutdown.
Protocol:
Fence — reject new
run/mapcalls.Close each client — stops new PAC operations, flushes connection pools. Runs on each client’s own loop so
Client.close()awaits properly.Stop event loops —
loop.stop()is scheduled viacall_soon_threadsafe, so any pending drain callbacks (from completions delivered between close and stop) run first.Join threads.
Completions that arrive after the loop stops hit the
CompletionBridge.closedlatch and resolve their Python futures withRuntimeError("event loop is closed")— callers fail fast instead of hanging.- Return type:
- async run(fn, pick=None)[source]¶
Dispatch
fn(client_i)to one of the pool’s loops.- Parameters:
- Return type:
TypeVar(T)- Returns:
The awaited result of
fn.- Raises:
RuntimeError – If the pool is not started, is closed, or if called from within one of the pool’s own loops (which would deadlock).
Example:
result = await pool.run( lambda c: c.create_session().get(key) )
- async map(fn, inputs)[source]¶
Dispatch
fnacross inputs, sharded round-robin across loops.- Parameters:
- Return type:
- Returns:
Results in the same order as inputs.
- Raises:
RuntimeError – If the pool is not usable or called from a pool loop.
Example:
async def do_get(client: Client, key: Key) -> RecordResult: return await client.create_session().get(key) results = await pool.map(do_get, keys)