Source code for aerospike_sdk.aio.client

# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

"""Client - Main entry point for the Aerospike SDK API."""

from __future__ import annotations

import types
import typing
from typing import List, Optional, Union, overload

from aerospike_async import (
    AdminPolicy,
    Client as AsyncClient,
    ClientPolicy,
    Key,
    RegisterTask,
    UDFLang,
    UdfRemoveTask,
    new_client,
)

from aerospike_sdk.dataset import DataSet
from aerospike_sdk.aio.operations.index import IndexBuilder
from aerospike_sdk.aio.operations.query import QueryBuilder
from aerospike_sdk.index_monitor import IndexesMonitor
from aerospike_sdk.policy.behavior import Behavior

if typing.TYPE_CHECKING:
    from aerospike_sdk.aio.session import Session
    from aerospike_sdk.aio.transactional_session import TransactionalSession


[docs] class Client: """Async entry point for the SDK API over the Aerospike Python Async Client. Use ``async with Client(...) as client`` (or ``await connect()``) to open a connection, then :meth:`create_session` for reads and writes with a chosen :class:`~aerospike_sdk.policy.behavior.Behavior`. Example:: async with Client("127.0.0.1:3000") as client: session = client.create_session() stream = await client.query( namespace="test", set_name="users", ).execute() async for row in stream: if row.record is not None: print(row.record.bins) See Also: :meth:`create_session`: Primary API for application code. """
[docs] def __init__( self, seeds: str, policy: Optional[ClientPolicy] = None, index_refresh_interval: float = 5.0, ) -> None: """Store cluster seeds and policy; connection starts in :meth:`connect` or ``async with``. Args: seeds: Seed address string understood by the async client (for example ``"127.0.0.1:3000"`` or a comma-separated host list if supported). policy: Optional :class:`~aerospike_async.ClientPolicy`; defaults to a new client policy when omitted. index_refresh_interval: Seconds between secondary index cache refreshes (default 5.0). The monitor periodically fetches index metadata from the cluster so that AEL-based ``where()`` calls can transparently generate secondary index filters. Note: No network I/O occurs here. The client connects when you ``await connect()`` or enter ``async with``. """ self._seeds = seeds if policy is None: policy = ClientPolicy() self._policy = policy self._client: Optional[AsyncClient] = None self._connected = False self._indexes_monitor = IndexesMonitor(refresh_interval=index_refresh_interval)
[docs] async def connect(self) -> None: """Open a connection to the cluster using the configured seeds and policy. Idempotent: if already connected, returns immediately. Raises: ConnectionError: If the async client cannot reach the cluster (from PAC). See Also: :meth:`close`: Release the connection. Example:: client = Client(ClusterDefinition("localhost", 3000)) await client.connect() """ if self._connected and self._client is not None: return self._client = await new_client(self._policy, self._seeds) self._connected = True await self._indexes_monitor.start(self._client)
[docs] async def close(self) -> None: """Close the underlying async client and clear connection state. Safe to call when already closed. See Also: :meth:`connect`. """ await self._indexes_monitor.stop() if self._client is not None: await self._client.close() self._client = None self._connected = False
async def __aenter__(self) -> Client: """Async context manager entry.""" await self.connect() return self async def __aexit__( self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[types.TracebackType], ) -> None: """Async context manager exit.""" await self.close() @property def is_connected(self) -> bool: """Check if the client is connected. Returns: ``True`` when :meth:`connect` has succeeded and :meth:`close` has not been called. """ return self._connected @property def _async_client(self) -> AsyncClient: """ Get the underlying async client. Raises: RuntimeError: If the client is not connected. """ if not self._connected or self._client is None: raise RuntimeError("Client is not connected. Call connect() first or use async with.") return self._client @property def underlying_client(self) -> AsyncClient: """ The underlying aerospike_async (PAC) Client for direct API access. Use this when you need PAC calls that are not wrapped by the SDK API, e.g. info(), nodes(), get_node(). The returned client is the same instance used internally by the SDK API. Example:: async with Client("localhost:3000") as client: pac = client.underlying_client response = await pac.info("sindex-list") nodes = await pac.nodes() node = await pac.get_node(nodes[0].name) response = await node.info("build") Returns: The aerospike_async Client instance. Raises: RuntimeError: If the client is not connected. """ return self._async_client @overload def query( self, *, dataset: DataSet, behavior: Optional[Behavior] = None, ) -> QueryBuilder: """Create a query builder from a DataSet.""" ... @overload def query( self, *, key: Key, behavior: Optional[Behavior] = None, ) -> QueryBuilder: """Create a query builder for a single Key (point read).""" ... @overload def query( self, *, keys: List[Key], behavior: Optional[Behavior] = None, ) -> QueryBuilder: """Create a query builder for multiple Keys (batch read).""" ... @overload def query( self, *keys: Key, behavior: Optional[Behavior] = None, ) -> QueryBuilder: """Create a query builder for multiple Keys (varargs).""" ... @overload def query( self, namespace: str, set_name: str, *, behavior: Optional[Behavior] = None, ) -> QueryBuilder: """Create a query builder with explicit namespace/set.""" ...
[docs] def query( self, arg1: Optional[Union[DataSet, Key, List[Key], str]] = None, set_name: Optional[str] = None, namespace: Optional[str] = None, *, dataset: Optional[DataSet] = None, key: Optional[Key] = None, keys: Optional[List[Key]] = None, behavior: Optional[Behavior] = None, ) -> QueryBuilder: """ Create a query builder. Supports multiple calling styles: 1. Using a DataSet (positional or keyword):: users = DataSet.of("test", "users") async for record in client.query(users).execute(): # or async for record in client.query(dataset=users).execute(): print(record.bins) 2. Using a single Key (positional or keyword):: users = DataSet.of("test", "users") key = users.id("user123") recordset = await client.query(key).execute() # or recordset = await client.query(key=key).execute() 3. Using multiple Keys (positional or keyword):: users = DataSet.of("test", "users") keys = users.ids("user1", "user2", "user3") recordset = await client.query(keys).execute() # or recordset = await client.query(keys=keys).execute() 4. Explicit namespace/set (original style):: async for record in client.query( namespace="test", set_name="users" ).execute(): print(record.bins) Args: arg1: Optional first positional: :class:`~aerospike_sdk.dataset.DataSet`, :class:`~aerospike_async.Key`, list of keys, or namespace string for the ``("namespace", "set")`` pair form. set_name: When ``arg1`` is a namespace string, the set name as the second positional (``client.query("test", "users")``). namespace: Optional third positional; not used for the usual two-string namespace/set pair (that form uses ``arg1`` and ``set_name``). dataset: Keyword-only :class:`~aerospike_sdk.dataset.DataSet`. key: Keyword-only single key for a point read. keys: Keyword-only list of keys for a batch read. behavior: Optional :class:`~aerospike_sdk.policy.behavior.Behavior` for timeouts, retries, and replica settings on this builder. If ``None``, the client uses generic defaults (unlike :meth:`~aerospike_sdk.aio.session.Session.query`, which applies the session's behavior automatically). Returns: A :class:`~aerospike_sdk.aio.operations.query.QueryBuilder` for chaining filters, bin selection, and execution. Raises: TypeError: If a positional argument is not a dataset, key, or key list. ValueError: If required namespace/set information is missing or key lists are empty. See Also: :meth:`~aerospike_sdk.aio.session.Session.query`: Same builder with session-scoped behavior. """ # Handle positional arguments # Check if arg1 and arg2 are both strings (namespace, set_name pattern) if isinstance(arg1, str) and set_name is not None: # This is the namespace, set_name pattern - use them directly namespace = arg1 # set_name is already set from the parameter elif arg1 is not None: # Handle single positional argument (DataSet, Key, or List[Key]) if isinstance(arg1, DataSet): dataset = arg1 elif isinstance(arg1, Key): key = arg1 elif isinstance(arg1, list): keys = arg1 else: raise TypeError(f"Expected DataSet, Key, or List[Key], got {type(arg1)}") # Handle single Key if key is not None: namespace = key.namespace set_name = key.set_name # For single key queries, we'll need to handle this in QueryBuilder # For now, create a query builder and store the key builder = QueryBuilder( client=self._async_client, namespace=namespace, set_name=set_name, behavior=behavior, indexes_monitor=self._indexes_monitor, ) builder._single_key = key return builder # Handle multiple Keys if keys is not None: if not keys: raise ValueError("keys list cannot be empty") namespace = keys[0].namespace set_name = keys[0].set_name builder = QueryBuilder( client=self._async_client, namespace=namespace, set_name=set_name, behavior=behavior, indexes_monitor=self._indexes_monitor, ) builder._keys = keys return builder # Handle DataSet if dataset is not None: namespace = dataset.namespace set_name = dataset.set_name # Handle explicit namespace/set (original style) elif namespace is not None and set_name is not None: pass else: raise ValueError( "Invalid arguments. Use either:\n" " - query(dataset=DataSet(...))\n" " - query(key=Key(...))\n" " - query(keys=[Key(...), ...])\n" " - query(namespace=..., set_name=...)" ) return QueryBuilder( client=self._async_client, namespace=namespace, set_name=set_name, behavior=behavior, indexes_monitor=self._indexes_monitor, )
@overload def index( self, *, dataset: DataSet, behavior: Optional[Behavior] = None, ) -> IndexBuilder: """Create an index builder from a DataSet.""" ... @overload def index( self, namespace: str, set_name: str, *, behavior: Optional[Behavior] = None, ) -> IndexBuilder: """Create an index builder with explicit namespace/set.""" ...
[docs] def index( self, namespace: Optional[str] = None, set_name: Optional[str] = None, *, dataset: Optional[DataSet] = None, behavior: Optional[Behavior] = None, ) -> IndexBuilder: """ Create an index builder. Supports multiple calling styles: 1. Using a DataSet:: users = DataSet.of("test", "users") await client.index(dataset=users).on_bin("age").named("age_idx").numeric().create() 2. Explicit namespace/set (original style):: await client.index( namespace="test", set_name="users" ).on_bin("age").named("age_idx").numeric().create() Args: namespace: The namespace name (if not using DataSet). set_name: The set name (if not using DataSet). dataset: Optional DataSet to use for namespace/set. behavior: Reserved for symmetry with :meth:`query`; not applied to index operations yet. Returns: An IndexBuilder for chaining index operations. """ _ = behavior # Handle DataSet if dataset is not None: namespace = dataset.namespace set_name = dataset.set_name # Handle explicit namespace/set (original style) elif namespace is not None and set_name is not None: pass else: raise ValueError( "Invalid arguments. Use either:\n" " - index(dataset=DataSet(...))\n" " - index(namespace=..., set_name=...)" ) return IndexBuilder( client=self._async_client, namespace=namespace, set_name=set_name, )
[docs] def transaction_session( self, behavior: Optional[Behavior] = None, ) -> "TransactionalSession": """Create a multi-record transaction (MRT) session. Allocates a fresh :class:`~aerospike_async.Txn` on entry. Operations chained off the returned session (``tx.upsert(...)``, ``tx.query(...)``, ``tx.batch()``, ...) auto-participate in the transaction — every builder stamps ``policy.txn = tx.txn`` under the hood. On clean exit the transaction is committed; if an exception propagates out of the block it is aborted. Multi-record transactions require an Aerospike server running in strong-consistency (SC) mode on the target namespace. Args: behavior: Optional :class:`~aerospike_sdk.policy.behavior.Behavior` for operations inside the transaction. Defaults to :attr:`Behavior.DEFAULT` when omitted. Returns: A :class:`~aerospike_sdk.aio.transactional_session.TransactionalSession` bound to this client and behavior. Example:: async with client.transaction_session() as tx: await tx.upsert(accounts.id("A")).bin("balance").set_to(100).execute() await tx.upsert(accounts.id("B")).bin("balance").set_to(200).execute() """ # Late import breaks the client -> transactional_session -> session -> # client cycle (TransactionalSession subclasses Session, and Session # imports Client at module level). from aerospike_sdk.aio.transactional_session import TransactionalSession return TransactionalSession(client=self, behavior=behavior)
[docs] def create_session(self, behavior: Optional[Behavior] = None) -> Session: """ Create a session with the specified behavior. A session represents a logical connection to the cluster with specific behavior settings that control how operations are performed (timeouts, retry policies, consistency levels, etc.). Args: behavior: The behavior configuration for the session. If None, uses Behavior.DEFAULT. Returns: A new :class:`~aerospike_sdk.aio.session.Session` bound to this client. Example:: session = client.create_session() users = DataSet.of("test", "users") await session.upsert(users.id(1)).put({"k": 1}).execute() Example:: from datetime import timedelta fast = Behavior.DEFAULT.derive_with_changes( name="fast", total_timeout=timedelta(seconds=5), ) session = client.create_session(fast) See Also: :class:`~aerospike_sdk.policy.behavior.Behavior`: Available presets. """ from aerospike_sdk.aio.session import Session if behavior is None: behavior = Behavior.DEFAULT return Session(client=self, behavior=behavior)
[docs] async def register_udf( self, body: bytes, server_path: str, language: UDFLang = UDFLang.LUA, *, policy: Optional[AdminPolicy] = None, ) -> RegisterTask: """Register a UDF package from in-memory bytes on the cluster. Args: body: Raw module source (for example UTF-8 encoded Lua). server_path: Path name stored on the server (often ends with ``.lua``). language: :class:`~aerospike_async.UDFLang`; default is Lua. policy: Optional :class:`~aerospike_async.AdminPolicy` (PAC leading argument); use keyword ``policy=``. Returns: A :class:`~aerospike_async.RegisterTask`; await ``wait_till_complete(...)`` until propagation finishes. Raises: RuntimeError: If not connected. AerospikeError: On cluster or admin errors (via PAC). See Also: :meth:`register_udf_from_file`: Load source from disk. Example:: task = await client.register_udf("my_module", udf_source_code) await task.wait_till_complete() """ return await self._async_client.register_udf( policy, body, server_path, language)
[docs] async def register_udf_from_file( self, client_path: str, server_path: str, language: UDFLang = UDFLang.LUA, *, policy: Optional[AdminPolicy] = None, ) -> RegisterTask: """Register a UDF by reading module bytes from a local path. Args: client_path: Filesystem path to the module file on the client machine. server_path: Path name stored on the server. language: :class:`~aerospike_async.UDFLang`; default is Lua. policy: Optional admin policy; use keyword ``policy=``. Returns: A :class:`~aerospike_async.RegisterTask` for completion polling. Raises: RuntimeError: If not connected. OSError: If ``client_path`` cannot be read. AerospikeError: On cluster or admin errors (via PAC). See Also: :meth:`register_udf`: Register from bytes. Example:: task = await client.register_udf_from_file("scripts/my_module.lua", "my_module.lua") await task.wait_till_complete() """ return await self._async_client.register_udf_from_file( policy, client_path, server_path, language)
[docs] async def remove_udf( self, server_path: str, *, policy: Optional[AdminPolicy] = None, ) -> UdfRemoveTask: """Remove a registered UDF package from the cluster. Args: server_path: Same server path used when registering the module. policy: Optional admin policy; use keyword ``policy=``. Returns: A :class:`~aerospike_async.UdfRemoveTask`; await completion like register. Raises: RuntimeError: If not connected. AerospikeError: On cluster or admin errors (via PAC). Example:: task = await client.remove_udf("my_module") await task.wait_till_complete() """ return await self._async_client.remove_udf(policy, server_path)