Client¶
- class aerospike_sdk.aio.client.Client[source]¶
Bases:
objectAsync entry point for the SDK API over the Aerospike Python Async Client.
Use
async with Client(...) as client(orawait connect()) to open a connection, thencreate_session()for reads and writes with a chosenBehavior.Example:
async with Client("127.0.0.1:3000") as client: session = client.create_session() stream = await client.query( namespace="test", set_name="users", ).execute() async for row in stream: if row.record is not None: print(row.record.bins)
See also
create_session(): Primary API for application code.- __init__(seeds, policy=None, index_refresh_interval=5.0, *, max_error_rate=None, error_rate_window=None, indexes_monitor=None)[source]¶
Store cluster seeds and policy; connection starts in
connect()orasync with.- Parameters:
seeds (
str) – Seed address string understood by the async client (for example"127.0.0.1:3000"or a comma-separated host list if supported).policy (
Optional[ClientPolicy]) – OptionalClientPolicy; defaults to a new client policy when omitted.index_refresh_interval (
float) – Seconds between secondary index cache refreshes (default 5.0). The monitor is a daemon thread that starts lazily on the first AELwhere()query and periodically refreshes cached index metadata so subsequent queries can transparently generate secondary index filters. Clients that never usewhere()never start the monitor thread.max_error_rate (
Optional[int]) – Per-node circuit-breaker threshold. When a node’s error count crosses this value withinerror_rate_windowtend iterations, subsequent commands routed to that node fail fast withMaxErrorRateuntil the window resets.0disables the breaker. Defaults to the underlyingClientPolicydefault (100).error_rate_window (
Optional[int]) – Number of cluster tend iterations after which each node’s error counter is reset. Defaults to the underlyingClientPolicydefault (1).indexes_monitor (
Optional[IndexesMonitor]) – Optional pre-constructedIndexesMonitorto share across Clients (for example, all clients in anAsyncPool). When provided, this Client uses it for AEL filter generation but does not start or stop it — the caller that constructed the monitor owns its lifecycle. WhenNone(default), the Client owns and manages a private monitor.
Note
No network I/O occurs here. The client connects when you
await connect()or enterasync with.
- async connect()[source]¶
Open a connection to the cluster using the configured seeds and policy.
Idempotent: if already connected, returns immediately.
- Raises:
ConnectionError – If the async client cannot reach the cluster (from PAC).
See also
close(): Release the connection.Example:
client = Client(ClusterDefinition("localhost", 3000)) await client.connect()
- Return type:
- async close()[source]¶
Close the underlying async client and clear connection state.
Safe to call when already closed.
See also
- Return type:
- connect_blocking()[source]¶
Synchronously open a connection without requiring an asyncio loop.
Uses
aerospike_async.new_client_blocking()to construct the underlying PAC client and sets_connected = True. TheIndexesMonitordaemon thread is not started here; it lazy-starts on the first AELwhere()query that needs cached secondary-index metadata.Idempotent: returns early if already connected.
- Raises:
ConnectionError – When the PAC blocking connect cannot reach the cluster.
Example:
client = Client("localhost:3000") client.connect_blocking() try: ... finally: client.close_blocking()
- Return type:
- close_blocking()[source]¶
Synchronously close the underlying client. Pair with
connect_blocking().Safe to call when already closed.
- Return type:
- property underlying_client: Client¶
The underlying aerospike_async (PAC) Client for direct API access.
Use this when you need PAC calls that are not wrapped by the SDK API, e.g. info(), nodes(), get_node(). The returned client is the same instance used internally by the SDK API.
Example:
async with Client("localhost:3000") as client: pac = client.underlying_client response = await pac.info("sindex-list") nodes = await pac.nodes() node = await pac.get_node(nodes[0].name) response = await node.info("build")
- Returns:
The aerospike_async Client instance.
- Raises:
RuntimeError – If the client is not connected.
- query(arg1=None, set_name=None, namespace=None, *, dataset=None, key=None, keys=None, behavior=None, namespace_mode_resolver=None, namespace_mode_resolver_blocking=None)[source]¶
- Overloads:
self, dataset (DataSet), behavior (Optional[Behavior]), namespace_mode_resolver (Optional[Callable[[str], Awaitable[Mode]]]), namespace_mode_resolver_blocking (Optional[Callable[[str], Mode]]) → QueryBuilder
self, key (Key), behavior (Optional[Behavior]), namespace_mode_resolver (Optional[Callable[[str], Awaitable[Mode]]]), namespace_mode_resolver_blocking (Optional[Callable[[str], Mode]]) → QueryBuilder
self, keys (List[Key]), behavior (Optional[Behavior]), namespace_mode_resolver (Optional[Callable[[str], Awaitable[Mode]]]), namespace_mode_resolver_blocking (Optional[Callable[[str], Mode]]) → QueryBuilder
self, keys (Key), behavior (Optional[Behavior]), namespace_mode_resolver (Optional[Callable[[str], Awaitable[Mode]]]), namespace_mode_resolver_blocking (Optional[Callable[[str], Mode]]) → QueryBuilder
self, namespace (str), set_name (str), behavior (Optional[Behavior]), namespace_mode_resolver (Optional[Callable[[str], Awaitable[Mode]]]), namespace_mode_resolver_blocking (Optional[Callable[[str], Mode]]) → QueryBuilder
Create a query builder.
Supports multiple calling styles:
Using a DataSet (positional or keyword):
users = DataSet.of("test", "users") async for record in client.query(users).execute(): # or async for record in client.query(dataset=users).execute(): print(record.bins)
Using a single Key (positional or keyword):
users = DataSet.of("test", "users") key = users.id("user123") recordset = await client.query(key).execute() # or recordset = await client.query(key=key).execute()
Using multiple Keys (positional or keyword):
users = DataSet.of("test", "users") keys = users.ids("user1", "user2", "user3") recordset = await client.query(keys).execute() # or recordset = await client.query(keys=keys).execute()
Explicit namespace/set (original style):
async for record in client.query( namespace="test", set_name="users" ).execute(): print(record.bins)
- Parameters:
arg1 (
Union[DataSet,Key,List[Key],str,None]) – Optional first positional:DataSet,Key, list of keys, or namespace string for the("namespace", "set")pair form.set_name (
Optional[str]) – Whenarg1is a namespace string, the set name as the second positional (client.query("test", "users")).namespace (
Optional[str]) – Optional third positional; not used for the usual two-string namespace/set pair (that form usesarg1andset_name).key (
Optional[Key]) – Keyword-only single key for a point read.keys (
Optional[List[Key]]) – Keyword-only list of keys for a batch read.behavior (
Optional[Behavior]) – OptionalBehaviorfor timeouts, retries, and replica settings on this builder. IfNone, the client uses generic defaults (unlikequery(), which applies the session’s behavior automatically).
- Returns:
A
QueryBuilderfor chaining filters, bin selection, and execution.- Raises:
TypeError – If a positional argument is not a dataset, key, or key list.
ValueError – If required namespace/set information is missing or key lists are empty.
See also
query(): Same builder withsession-scoped behavior.
- index(namespace=None, set_name=None, *, dataset=None, behavior=None)[source]¶
- Overloads:
self, dataset (DataSet), behavior (Optional[Behavior]) → IndexBuilder
self, namespace (str), set_name (str), behavior (Optional[Behavior]) → IndexBuilder
Create an index builder.
Supports multiple calling styles:
Using a DataSet:
users = DataSet.of("test", "users") await client.index(dataset=users).on_bin("age").named("age_idx").numeric().create()
Explicit namespace/set (original style):
await client.index( namespace="test", set_name="users" ).on_bin("age").named("age_idx").numeric().create()
- Parameters:
namespace (
Optional[str]) – The namespace name (if not using DataSet).set_name (
Optional[str]) – The set name (if not using DataSet).dataset (
Optional[DataSet]) – Optional DataSet to use for namespace/set.behavior (
Optional[Behavior]) – Reserved for symmetry withquery(); not applied to index operations yet.
- Returns:
An IndexBuilder for chaining index operations.
- transaction_session(behavior=None)[source]¶
Create a multi-record transaction (MRT) session.
Allocates a fresh
Txnon entry. Operations chained off the returned session (tx.upsert(...),tx.query(...),tx.batch(), …) auto-participate in the transaction — every builder stampspolicy.txn = tx.txnunder the hood. On clean exit the transaction is committed; if an exception propagates out of the block it is aborted.Multi-record transactions require an Aerospike server running in strong-consistency (SC) mode on the target namespace.
- Parameters:
behavior (
Optional[Behavior]) – OptionalBehaviorfor operations inside the transaction. Defaults toBehavior.DEFAULTwhen omitted.- Return type:
- Returns:
A
TransactionalSessionbound to this client and behavior.
Example:
async with client.transaction_session() as tx: await tx.upsert(accounts.id("A")).bin("balance").set_to(100).execute() await tx.upsert(accounts.id("B")).bin("balance").set_to(200).execute()
- create_session(behavior=None)[source]¶
Create a session with the specified behavior.
A session represents a logical connection to the cluster with specific behavior settings that control how operations are performed (timeouts, retry policies, consistency levels, etc.).
- Parameters:
behavior (
Optional[Behavior]) – The behavior configuration for the session. If None, uses Behavior.DEFAULT.- Return type:
- Returns:
A new
Sessionbound to this client.
Example:
session = client.create_session() users = DataSet.of("test", "users") await session.upsert(users.id(1)).put({"k": 1}).execute()
Example:
from datetime import timedelta fast = Behavior.DEFAULT.derive_with_changes( name="fast", total_timeout=timedelta(seconds=5), ) session = client.create_session(fast)
See also
Behavior: Available presets.
- async register_udf(body, server_path, language=UDFLang.LUA, *, policy=None)[source]¶
Register a UDF package from in-memory bytes on the cluster.
- Parameters:
- Return type:
RegisterTask- Returns:
A
RegisterTask; awaitwait_till_complete(...)until propagation finishes.- Raises:
RuntimeError – If not connected.
AerospikeError – On cluster or admin errors (via PAC).
See also
register_udf_from_file(): Load source from disk.Example:
task = await client.register_udf("my_module", udf_source_code) await task.wait_till_complete()
- async register_udf_from_file(client_path, server_path, language=UDFLang.LUA, *, policy=None)[source]¶
Register a UDF by reading module bytes from a local path.
- Parameters:
- Return type:
RegisterTask- Returns:
A
RegisterTaskfor completion polling.- Raises:
RuntimeError – If not connected.
OSError – If
client_pathcannot be read.AerospikeError – On cluster or admin errors (via PAC).
See also
register_udf(): Register from bytes.Example:
task = await client.register_udf_from_file("scripts/my_module.lua", "my_module.lua") await task.wait_till_complete()
- async remove_udf(server_path, *, policy=None)[source]¶
Remove a registered UDF package from the cluster.
- Parameters:
- Return type:
UdfRemoveTask- Returns:
A
UdfRemoveTask; await completion like register.- Raises:
RuntimeError – If not connected.
AerospikeError – On cluster or admin errors (via PAC).
Example:
task = await client.remove_udf("my_module") await task.wait_till_complete()