AsyncPool

class aerospike_sdk.aio.pool.AsyncPool[source]

Bases: object

Pool of event loops + paired clients for parallel async work.

Each loop runs on a dedicated OS thread with its own Client (and therefore its own PAC CompletionBridge). Submitted coroutines are dispatched round-robin (or by explicit index) across loops.

Free-threading required for throughput gains. On a GIL-built interpreter (stock CPython ≤ 3.12) an AsyncPool is correct — N loops still serialize on the GIL for Python work, so TPS does not scale with loop_count. The throughput benefit only materializes under a free-threaded build (3.13t / 3.14t).

Shared IndexesMonitor. Index metadata is cluster-scoped, so the pool runs one shared IndexesMonitor (anchored to loop 0, issuing info commands through clients[0]) instead of one per client. The factory’s per-client monitor is replaced before connect(), so cluster-side sindex-list load is independent of loop_count. Tune via the index_refresh_interval kwarg on AsyncPool itself.

Per-Client Tokio runtime. When loop_count >= 4, AsyncPool automatically configures each Client to use its own dedicated PAC Tokio runtime instead of the shared global one. This eliminates the cross-loop scheduler contention that previously caused throughput to collapse beyond 4 loops. Controlled via the per_client_runtime kwarg; see its docstring for the threshold rationale and override.

Tuning notes (8-core remote-cluster measurement, FT 3.14t):

  • Tasks-per-loop floor. Below ~16–32 concurrent asyncio tasks per loop, per-call dispatch overhead (run_coroutine_threadsafe + asyncio.wrap_future) dominates the savings from parallelism — a 4-loop pool with 8 tasks/loop measured slower than a 1-loop client with 32 tasks total. Keep tasks-per-loop in the same regime that saturates a single client.

  • Throughput scales monotonically with loops under per-Client runtime (4×64 = 167K, 8×64 = 178K, 12×64 = 180K TPS measured). TPS ceiling on 8-core hardware is ~180K, capped by Python interpreter self-time across loops.

  • Tail latency degrades with loops. 4×64 has p99 = 4.3 ms; 12×64 has p99 = 15.5 ms. Latency-sensitive workloads should pick loop_count based on the p99 budget; throughput-only workloads can push higher.

  • Sweet spot is hardware-dependent. With colocated client+server the sweet spot shifts down because they share CPU; with more cores the ceiling shifts up. Always validate against your target deployment.

Example:

pool = AsyncPool(
    client_factory=lambda: Client("127.0.0.1:3000"),
    loop_count=4,
)
async with pool:
    result = await pool.run(
        lambda client: client.create_session().get(key)
    )

See also

Client: Single-loop async API.

__init__(client_factory, loop_count=None, *, index_refresh_interval=5.0, per_client_runtime=None)[source]

Configure the pool. Call start() or use async with.

Parameters:
  • client_factory (Callable[[], Client]) –

    Zero-argument callable returning an unconnected Client. Called loop_count times — once per pool thread. Each client connects on its own loop, binding its PAC CompletionBridge to that loop.

    Shared-policy invariant: when per_client_runtime is enabled (auto at loop_count >= 4 on free-threaded Python), the factory MUST return Clients sharing a single ClientPolicy PyO3 object — typically via a closure that captures one policy: policy = ClientPolicy(); factory = lambda: Client(seeds, policy=policy). AsyncPool applies a one-shot mutation to that shared policy before any loop thread starts; constructing a fresh ClientPolicy per call would land the mutation on client 0 only and silently disable the per-Client runtime for the rest. Violations raise RuntimeError from start().

    Connection-pool sizing: with N clients, total connections per server node = N × max_conns_per_node. To keep the aggregate budget constant, set ClientPolicy.max_conns_per_node in the factory to default / loop_count.

  • loop_count (Optional[int]) – Number of event loops / OS threads. Defaults to os.cpu_count() (or 4 if indeterminate).

  • index_refresh_interval (float) – Seconds between secondary-index cache refreshes for the pool’s single shared IndexesMonitor (default 5.0). Index metadata is cluster-scoped, so one monitor serves all pool clients — the per-Client monitor each client_factory() would create is replaced before connect, eliminating N×polling load.

  • per_client_runtime (Optional[bool]) –

    Whether each pool Client should run on its own dedicated PAC Tokio runtime (per-loop runtime isolation, eliminates cross-loop scheduler contention).

    • None (default): auto-enable when loop_count >= 4. Below 4 loops the shared global runtime wins on the per-loop worker budget; at 4+ loops per-Client runtimes scale monotonically (measured: AsyncPool 8×64 lifts from ~59K TPS collapsed to ~184K with per-Client runtimes).

    • True: always enable. Worker count auto-sized to max(2, os.cpu_count() // loop_count).

    • False: never enable; use the shared global runtime regardless of loop_count.

Example:

from aerospike_async import ClientPolicy
from aerospike_sdk import Client

N = 4
def make_client() -> Client:
    policy = ClientPolicy()
    policy.max_conns_per_node = 300 // N
    return Client("127.0.0.1:3000", policy=policy)

pool = AsyncPool(client_factory=make_client, loop_count=N)
async start()[source]

Spin up pool threads, event loops, and connect all clients.

Each thread starts an asyncio event loop, then the pool creates and connects one Client per loop (via run_coroutine_threadsafe). Because Client.connect() calls await new_client(…) on the pool loop, the PAC CompletionBridge is naturally bound to the correct loop.

Raises:

RuntimeError – If already started or closed.

Return type:

None

async aclose()[source]

Ordered shutdown.

Protocol:

  1. Fence — reject new run/map calls.

  2. Close each client — stops new PAC operations, flushes connection pools. Runs on each client’s own loop so Client.close() awaits properly.

  3. Stop event loopsloop.stop() is scheduled via call_soon_threadsafe, so any pending drain callbacks (from completions delivered between close and stop) run first.

  4. Join threads.

Completions that arrive after the loop stops hit the CompletionBridge.closed latch and resolve their Python futures with RuntimeError("event loop is closed") — callers fail fast instead of hanging.

Return type:

None

async run(fn, pick=None)[source]

Dispatch fn(client_i) to one of the pool’s loops.

Parameters:
Return type:

TypeVar(T)

Returns:

The awaited result of fn.

Raises:

RuntimeError – If the pool is not started, is closed, or if called from within one of the pool’s own loops (which would deadlock).

Example:

result = await pool.run(
    lambda c: c.create_session().get(key)
)
async map(fn, inputs)[source]

Dispatch fn across inputs, sharded round-robin across loops.

Parameters:
Return type:

List[TypeVar(T)]

Returns:

Results in the same order as inputs.

Raises:

RuntimeError – If the pool is not usable or called from a pool loop.

Example:

async def do_get(client: Client, key: Key) -> RecordResult:
    return await client.create_session().get(key)

results = await pool.map(do_get, keys)
property loop_count: int

Number of event loops / OS threads in the pool.

property is_started: bool

True after start() succeeds.

property is_closed: bool

True after aclose() is called.