Client

class aerospike_sdk.aio.client.Client[source]

Bases: object

Async entry point for the SDK API over the Aerospike Python Async Client.

Use async with Client(...) as client (or await connect()) to open a connection, then create_session() for reads and writes with a chosen Behavior.

Example:

async with Client("127.0.0.1:3000") as client:
    session = client.create_session()
    stream = await client.query(
        namespace="test",
        set_name="users",
    ).execute()
    async for row in stream:
        if row.record is not None:
            print(row.record.bins)

See also

create_session(): Primary API for application code.

__init__(seeds, policy=None, index_refresh_interval=5.0, *, max_error_rate=None, error_rate_window=None, indexes_monitor=None)[source]

Store cluster seeds and policy; connection starts in connect() or async with.

Parameters:
  • seeds (str) – Seed address string understood by the async client (for example "127.0.0.1:3000" or a comma-separated host list if supported).

  • policy (Optional[ClientPolicy]) – Optional ClientPolicy; defaults to a new client policy when omitted.

  • index_refresh_interval (float) – Seconds between secondary index cache refreshes (default 5.0). The monitor is a daemon thread that starts lazily on the first AEL where() query and periodically refreshes cached index metadata so subsequent queries can transparently generate secondary index filters. Clients that never use where() never start the monitor thread.

  • max_error_rate (Optional[int]) – Per-node circuit-breaker threshold. When a node’s error count crosses this value within error_rate_window tend iterations, subsequent commands routed to that node fail fast with MaxErrorRate until the window resets. 0 disables the breaker. Defaults to the underlying ClientPolicy default (100).

  • error_rate_window (Optional[int]) – Number of cluster tend iterations after which each node’s error counter is reset. Defaults to the underlying ClientPolicy default (1).

  • indexes_monitor (Optional[IndexesMonitor]) – Optional pre-constructed IndexesMonitor to share across Clients (for example, all clients in an AsyncPool). When provided, this Client uses it for AEL filter generation but does not start or stop it — the caller that constructed the monitor owns its lifecycle. When None (default), the Client owns and manages a private monitor.

Note

No network I/O occurs here. The client connects when you await connect() or enter async with.

async connect()[source]

Open a connection to the cluster using the configured seeds and policy.

Idempotent: if already connected, returns immediately.

Raises:

ConnectionError – If the async client cannot reach the cluster (from PAC).

See also

close(): Release the connection.

Example:

client = Client(ClusterDefinition("localhost", 3000))
await client.connect()
Return type:

None

async close()[source]

Close the underlying async client and clear connection state.

Safe to call when already closed.

See also

connect().

Return type:

None

connect_blocking()[source]

Synchronously open a connection without requiring an asyncio loop.

Uses aerospike_async.new_client_blocking() to construct the underlying PAC client and sets _connected = True. The IndexesMonitor daemon thread is not started here; it lazy-starts on the first AEL where() query that needs cached secondary-index metadata.

Idempotent: returns early if already connected.

Raises:

ConnectionError – When the PAC blocking connect cannot reach the cluster.

Example:

client = Client("localhost:3000")
client.connect_blocking()
try:
    ...
finally:
    client.close_blocking()
Return type:

None

close_blocking()[source]

Synchronously close the underlying client. Pair with connect_blocking().

Safe to call when already closed.

Return type:

None

property is_connected: bool

Check if the client is connected.

Returns:

True when connect() has succeeded and close() has not been called.

property underlying_client: Client

The underlying aerospike_async (PAC) Client for direct API access.

Use this when you need PAC calls that are not wrapped by the SDK API, e.g. info(), nodes(), get_node(). The returned client is the same instance used internally by the SDK API.

Example:

async with Client("localhost:3000") as client:
    pac = client.underlying_client
    response = await pac.info("sindex-list")
    nodes = await pac.nodes()
    node = await pac.get_node(nodes[0].name)
    response = await node.info("build")
Returns:

The aerospike_async Client instance.

Raises:

RuntimeError – If the client is not connected.

query(arg1=None, set_name=None, namespace=None, *, dataset=None, key=None, keys=None, behavior=None, namespace_mode_resolver=None, namespace_mode_resolver_blocking=None)[source]
Overloads:
  • self, dataset (DataSet), behavior (Optional[Behavior]), namespace_mode_resolver (Optional[Callable[[str], Awaitable[Mode]]]), namespace_mode_resolver_blocking (Optional[Callable[[str], Mode]]) → QueryBuilder

  • self, key (Key), behavior (Optional[Behavior]), namespace_mode_resolver (Optional[Callable[[str], Awaitable[Mode]]]), namespace_mode_resolver_blocking (Optional[Callable[[str], Mode]]) → QueryBuilder

  • self, keys (List[Key]), behavior (Optional[Behavior]), namespace_mode_resolver (Optional[Callable[[str], Awaitable[Mode]]]), namespace_mode_resolver_blocking (Optional[Callable[[str], Mode]]) → QueryBuilder

  • self, keys (Key), behavior (Optional[Behavior]), namespace_mode_resolver (Optional[Callable[[str], Awaitable[Mode]]]), namespace_mode_resolver_blocking (Optional[Callable[[str], Mode]]) → QueryBuilder

  • self, namespace (str), set_name (str), behavior (Optional[Behavior]), namespace_mode_resolver (Optional[Callable[[str], Awaitable[Mode]]]), namespace_mode_resolver_blocking (Optional[Callable[[str], Mode]]) → QueryBuilder

Create a query builder.

Supports multiple calling styles:

  1. Using a DataSet (positional or keyword):

    users = DataSet.of("test", "users")
    async for record in client.query(users).execute():
    # or
    async for record in client.query(dataset=users).execute():
        print(record.bins)
    
  2. Using a single Key (positional or keyword):

    users = DataSet.of("test", "users")
    key = users.id("user123")
    recordset = await client.query(key).execute()
    # or
    recordset = await client.query(key=key).execute()
    
  3. Using multiple Keys (positional or keyword):

    users = DataSet.of("test", "users")
    keys = users.ids("user1", "user2", "user3")
    recordset = await client.query(keys).execute()
    # or
    recordset = await client.query(keys=keys).execute()
    
  4. Explicit namespace/set (original style):

    async for record in client.query(
        namespace="test",
        set_name="users"
    ).execute():
        print(record.bins)
    
Parameters:
  • arg1 (Union[DataSet, Key, List[Key], str, None]) – Optional first positional: DataSet, Key, list of keys, or namespace string for the ("namespace", "set") pair form.

  • set_name (Optional[str]) – When arg1 is a namespace string, the set name as the second positional (client.query("test", "users")).

  • namespace (Optional[str]) – Optional third positional; not used for the usual two-string namespace/set pair (that form uses arg1 and set_name).

  • dataset (Optional[DataSet]) – Keyword-only DataSet.

  • key (Optional[Key]) – Keyword-only single key for a point read.

  • keys (Optional[List[Key]]) – Keyword-only list of keys for a batch read.

  • behavior (Optional[Behavior]) – Optional Behavior for timeouts, retries, and replica settings on this builder. If None, the client uses generic defaults (unlike query(), which applies the session’s behavior automatically).

Returns:

A QueryBuilder for chaining filters, bin selection, and execution.

Raises:
  • TypeError – If a positional argument is not a dataset, key, or key list.

  • ValueError – If required namespace/set information is missing or key lists are empty.

See also

query(): Same builder with

session-scoped behavior.

index(namespace=None, set_name=None, *, dataset=None, behavior=None)[source]
Overloads:
  • self, dataset (DataSet), behavior (Optional[Behavior]) → IndexBuilder

  • self, namespace (str), set_name (str), behavior (Optional[Behavior]) → IndexBuilder

Create an index builder.

Supports multiple calling styles:

  1. Using a DataSet:

    users = DataSet.of("test", "users")
    await client.index(dataset=users).on_bin("age").named("age_idx").numeric().create()
    
  2. Explicit namespace/set (original style):

    await client.index(
        namespace="test",
        set_name="users"
    ).on_bin("age").named("age_idx").numeric().create()
    
Parameters:
  • namespace (Optional[str]) – The namespace name (if not using DataSet).

  • set_name (Optional[str]) – The set name (if not using DataSet).

  • dataset (Optional[DataSet]) – Optional DataSet to use for namespace/set.

  • behavior (Optional[Behavior]) – Reserved for symmetry with query(); not applied to index operations yet.

Returns:

An IndexBuilder for chaining index operations.

transaction_session(behavior=None)[source]

Create a multi-record transaction (MRT) session.

Allocates a fresh Txn on entry. Operations chained off the returned session (tx.upsert(...), tx.query(...), tx.batch(), …) auto-participate in the transaction — every builder stamps policy.txn = tx.txn under the hood. On clean exit the transaction is committed; if an exception propagates out of the block it is aborted.

Multi-record transactions require an Aerospike server running in strong-consistency (SC) mode on the target namespace.

Parameters:

behavior (Optional[Behavior]) – Optional Behavior for operations inside the transaction. Defaults to Behavior.DEFAULT when omitted.

Return type:

TransactionalSession

Returns:

A TransactionalSession bound to this client and behavior.

Example:

async with client.transaction_session() as tx:
    await tx.upsert(accounts.id("A")).bin("balance").set_to(100).execute()
    await tx.upsert(accounts.id("B")).bin("balance").set_to(200).execute()
create_session(behavior=None)[source]

Create a session with the specified behavior.

A session represents a logical connection to the cluster with specific behavior settings that control how operations are performed (timeouts, retry policies, consistency levels, etc.).

Parameters:

behavior (Optional[Behavior]) – The behavior configuration for the session. If None, uses Behavior.DEFAULT.

Return type:

Session

Returns:

A new Session bound to this client.

Example:

session = client.create_session()
users = DataSet.of("test", "users")
await session.upsert(users.id(1)).put({"k": 1}).execute()

Example:

from datetime import timedelta
fast = Behavior.DEFAULT.derive_with_changes(
    name="fast",
    total_timeout=timedelta(seconds=5),
)
session = client.create_session(fast)

See also

Behavior: Available presets.

async register_udf(body, server_path, language=UDFLang.LUA, *, policy=None)[source]

Register a UDF package from in-memory bytes on the cluster.

Parameters:
  • body (bytes) – Raw module source (for example UTF-8 encoded Lua).

  • server_path (str) – Path name stored on the server (often ends with .lua).

  • language (UDFLang) – UDFLang; default is Lua.

  • policy (Optional[AdminPolicy]) – Optional AdminPolicy (PAC leading argument); use keyword policy=.

Return type:

RegisterTask

Returns:

A RegisterTask; await wait_till_complete(...) until propagation finishes.

Raises:

See also

register_udf_from_file(): Load source from disk.

Example:

task = await client.register_udf("my_module", udf_source_code)
await task.wait_till_complete()
async register_udf_from_file(client_path, server_path, language=UDFLang.LUA, *, policy=None)[source]

Register a UDF by reading module bytes from a local path.

Parameters:
  • client_path (str) – Filesystem path to the module file on the client machine.

  • server_path (str) – Path name stored on the server.

  • language (UDFLang) – UDFLang; default is Lua.

  • policy (Optional[AdminPolicy]) – Optional admin policy; use keyword policy=.

Return type:

RegisterTask

Returns:

A RegisterTask for completion polling.

Raises:

See also

register_udf(): Register from bytes.

Example:

task = await client.register_udf_from_file("scripts/my_module.lua", "my_module.lua")
await task.wait_till_complete()
async remove_udf(server_path, *, policy=None)[source]

Remove a registered UDF package from the cluster.

Parameters:
  • server_path (str) – Same server path used when registering the module.

  • policy (Optional[AdminPolicy]) – Optional admin policy; use keyword policy=.

Return type:

UdfRemoveTask

Returns:

A UdfRemoveTask; await completion like register.

Raises:

Example:

task = await client.remove_udf("my_module")
await task.wait_till_complete()