Source code for aerospike_sdk.aio.session

# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

"""Session - Main interface for database operations with Behavior configuration."""

from __future__ import annotations

import typing
from typing import Any, Awaitable, Dict, List, Optional, overload, TYPE_CHECKING, Union

if TYPE_CHECKING:
    from aerospike_sdk.aio.transactional_session import TransactionalSession
    from aerospike_sdk.record_result import RecordResult

from aerospike_async import Key, Record, ResultCode, Txn

from aerospike_sdk.aio.background import BackgroundTaskSession
from aerospike_sdk.aio.client import Client
from aerospike_sdk.aio.info import InfoCommands
from aerospike_sdk.aio.operations.batch import BatchOperationBuilder
from aerospike_sdk.aio.operations.index import IndexBuilder
from aerospike_sdk.aio.operations.query import (
    QueryBuilder,
    WriteSegmentBuilder,
    _SingleKeyWriteSegment,
)
from aerospike_sdk.aio.operations.udf import UdfFunctionBuilder
from aerospike_sdk.dataset import DataSet
from aerospike_sdk.policy.behavior import Behavior, OpKind, OpShape
from aerospike_sdk.policy.policy_mapper import to_read_policy, to_write_policy


[docs] class Session: """Perform reads and writes against Aerospike with a fixed :class:`~aerospike_sdk.policy.behavior.Behavior`. A session binds a connected :class:`Client` to policy defaults (timeouts, retries, replica preferences) for every operation started from it. Create sessions with :meth:`Client.create_session`; do not construct ``Session`` directly. Example: async with Client("localhost:3000") as client: session = client.create_session(Behavior.DEFAULT) users = DataSet.of("test", "users") stream = await session.query(users.id(1)).execute() first = await stream.first_or_raise() await session.upsert(users.id(2)).put({"name": "Tim"}).execute() See Also: :meth:`Client.create_session`: How to obtain a session. :meth:`query`: Point reads, batch reads, and secondary-index queries. :meth:`upsert`: Create-or-update writes. """
[docs] def __init__(self, client: Client, behavior: Behavior) -> None: """Attach a client and behavior; prefer :meth:`Client.create_session`. Args: client: Connected (or not yet connected) :class:`Client`. behavior: Policy bundle for operations from this session. Note: Application code should not call ``Session(...)`` directly. See Also: :meth:`Client.create_session`. """ self._client = client self._behavior = behavior # Pre-compute base policies once per session so QueryBuilders # skip per-op policy_mapper calls for the common no-override path. self._cached_read_policy = to_read_policy( behavior.get_settings(OpKind.READ, OpShape.POINT)) self._cached_write_policy = to_write_policy( behavior.get_settings(OpKind.WRITE_NON_RETRYABLE, OpShape.POINT)) # Cache the raw PAC client for fast-path methods. self._pac_client = client._async_client # Transaction hook. Non-transactional sessions always return None; # TransactionalSession overrides this to yield its active Txn so every # builder spawned from the session auto-participates. self._txn: Optional[Txn] = None
def _bind_txn(self, builder): """Stamp the session's current txn onto a builder if one is active. Fast-path helper used by every builder factory on :class:`Session` so that operations started inside a :class:`~aerospike_sdk.aio.transactional_session.TransactionalSession` auto-participate in the transaction. Returns the builder for fluent use; no-op outside an MRT. """ if self._txn is not None: builder.with_txn(self._txn) return builder
[docs] def get_current_transaction(self) -> Optional[Txn]: """Return the active transaction for this session, or ``None``. Regular :class:`Session` instances always return ``None``; only :class:`~aerospike_sdk.aio.transactional_session.TransactionalSession` inside its ``async with`` block returns a live :class:`~aerospike_async.Txn`. Builders created from this session call this hook at construction and thread the result through every policy they hand to the PAC. Returns: The active :class:`~aerospike_async.Txn`, or ``None`` outside a transaction. Example: >>> session = client.create_session() >>> session.get_current_transaction() is None True >>> async with session.begin_transaction() as tx: ... assert tx.get_current_transaction() is tx.txn See Also: :meth:`begin_transaction`: Enter a multi-record transaction. :class:`~aerospike_sdk.aio.transactional_session.TransactionalSession` """ return self._txn
# -- Fast-path single-key operations ------------------------------------ # These bypass the QueryBuilder/OperationSpec/RecordStream chain for # simple single-key reads and writes, calling the PAC directly.
[docs] async def get( self, key: Key, bins: Optional[List[str]] = None, ) -> Record: """Direct single-key point read — returns ``Record`` or raises. Bypasses the builder chain (``session.query(key).execute()``) and the :class:`~aerospike_sdk.record_stream.RecordStream` wrapper: one ``await`` reaches the underlying client and the resulting :class:`~aerospike_async.Record` is returned unwrapped. Use when you have a single key and want minimum per-op overhead; use :meth:`query` when you need filters, projections, or streaming. Args: key: Target :class:`~aerospike_async.Key`. bins: Optional bin-name projection. ``None`` (default) reads all bins. Returns: The :class:`~aerospike_async.Record` for ``key``. Raises: AerospikeError: Server or client errors (including ``KEY_NOT_FOUND_ERROR``) are raised from the underlying client without being wrapped in a :class:`~aerospike_sdk.record_result.RecordResult`. Example: >>> users = DataSet.of("test", "users") >>> rec = await session.get(users.id(1)) >>> name = rec.bins["name"] See Also: :meth:`query`: Builder-based reads for projections, streams, and secondary-index queries. :meth:`put`: Direct single-key upsert. """ if self._txn is None: return await self._pac_client.get( self._cached_read_policy, key, bins) policy = to_read_policy( self._behavior.get_settings(OpKind.READ, OpShape.POINT)) policy.txn = self._txn return await self._pac_client.get(policy, key, bins)
[docs] async def put( self, key: Key, bins: Dict[str, Any], ) -> None: """Direct single-key upsert — returns ``None`` or raises. Bypasses the builder chain (``session.upsert(key).put(...).execute()``) and the :class:`~aerospike_sdk.record_stream.RecordStream` wrapper: one ``await`` reaches the underlying client. Use when you have a single key and want minimum per-op overhead; use :meth:`upsert` when you need atomic multi-op semantics, TTL overrides, generation checks, durable delete, or filter expressions. Args: key: Target :class:`~aerospike_async.Key`. bins: Mapping of bin name to value to write. An empty mapping is permitted. Returns: ``None`` on success. Raises: AerospikeError: Server or client errors are raised from the underlying client. Example: >>> users = DataSet.of("test", "users") >>> await session.put(users.id(1), {"name": "Tim", "age": 30}) See Also: :meth:`upsert`: Builder-based writes with full feature set. :meth:`get`: Direct single-key point read. """ if self._txn is None: await self._pac_client.put( self._cached_write_policy, key, bins) return policy = to_write_policy( self._behavior.get_settings( OpKind.WRITE_NON_RETRYABLE, OpShape.POINT)) policy.txn = self._txn await self._pac_client.put(policy, key, bins)
@property def behavior(self) -> Behavior: """Policy bundle applied to operations created from this session. Returns: The :class:`~aerospike_sdk.policy.behavior.Behavior` passed to :meth:`Client.create_session`. """ return self._behavior @property def client(self) -> Client: """SDK client that owns the connection used by this session. Returns: The parent :class:`Client`. """ return self._client # Delegate all Client operations to maintain same API
[docs] def batch(self) -> "BatchOperationBuilder": """Start a multi-key batch of mixed write operations executed in one server round trip. Chain ``insert``, ``update``, ``upsert``, ``replace``, ``delete``, and related bin builders, then ``await ...execute()`` to obtain per-key outcomes. Returns: A :class:`~aerospike_sdk.aio.operations.batch.BatchOperationBuilder` for chaining operations. Raises: RuntimeError: If the client is not connected. Example:: results = await ( session.batch() .insert(key1).put({"name": "Alice", "age": 25}) .update(key2).bin("counter").add(1) .upsert(key3).put({"status": "active"}) .delete(key4) .execute() ) for row in results: print(row.key, row.result_code) See Also: :meth:`upsert`: Single-record writes without batching. """ if self._client._client is None: raise RuntimeError("Client is not connected") return BatchOperationBuilder( self._client._client, self._behavior, txn=self._txn, )
[docs] def background_task(self) -> "BackgroundTaskSession": """Configure a server-side background job (query + scan scope) on a dataset. Call ``update``, ``delete``, ``touch``, or ``execute_udf`` on the returned object, add optional filters (for example ``where`` on supported builders), then ``await ...execute()`` to start work and receive an async task handle. Returns: A :class:`~aerospike_sdk.aio.background.BackgroundTaskSession` for chaining the operation type and execution. Raises: RuntimeError: If the client is not connected. Example:: task = await ( session.background_task() .delete(DataSet.of("test", "scratch")) .where("$.flag == 1") .execute() ) await task.wait_till_complete(sleep_time=0.2, max_attempts=50) See Also: :meth:`execute_udf`: Foreground UDF on explicit keys. """ if self._client._client is None: raise RuntimeError("Client is not connected") return BackgroundTaskSession(self)
[docs] def execute_udf(self, *keys: Key) -> "UdfFunctionBuilder": """Run a registered server-side UDF on one or more keys (foreground). Chain ``function(package, name)`` (package is the registered module name without ``.lua``), optional ``passing(*args)`` for Lua parameters, optional ``where`` for a filter expression, then ``await ...execute()`` to obtain a :class:`~aerospike_sdk.record_stream.RecordStream`. Multiple keys use a batch UDF; results preserve per-key order where applicable. Args: *keys: One or more :class:`~aerospike_async.Key` targets in the same namespace and set. Returns: :class:`~aerospike_sdk.aio.operations.udf.UdfFunctionBuilder` — call ``function`` next. Raises: ValueError: If no keys are given. RuntimeError: If the client is not connected. Example:: users = DataSet.of("test", "users") stream = await ( session.execute_udf(users.id("a")) .function("my_module", "my_fn") .passing("binName", 42) .execute() ) value = await stream.first_udf_result() See Also: :meth:`query`: Read bins without UDF. :meth:`background_task`: Dataset-scoped background UDF. """ if not keys: raise ValueError("At least one key is required") if self._client._client is None: raise RuntimeError("Client is not connected") first = keys[0] qb = QueryBuilder( self._client._client, first.namespace, first.set_name, self._behavior, indexes_monitor=self._client._indexes_monitor, cached_read_policy=self._cached_read_policy, cached_write_policy=self._cached_write_policy, txn=self._txn, ) qb._set_current_keys_from_varargs(keys) return UdfFunctionBuilder(qb)
# -- Internal helpers ----------------------------------------------------- @staticmethod def _resolve_keys( arg1: Optional[Union[Key, List[Key]]] = None, arg2: Optional[Key] = None, *more_keys: Key, key: Optional[Key] = None, dataset: Optional[DataSet] = None, namespace: Optional[str] = None, set_name: Optional[str] = None, key_value: Optional[Union[str, int, bytes]] = None, ) -> List[Key]: """Resolve mixed positional/keyword arguments into a flat list of Keys.""" all_keys: List[Key] = [] if arg1 is not None: if isinstance(arg1, Key): all_keys.append(arg1) if isinstance(arg2, Key): all_keys.append(arg2) all_keys.extend(more_keys) elif isinstance(arg1, list): if not arg1: raise ValueError("keys list cannot be empty") all_keys.extend(arg1) else: raise TypeError(f"Expected Key or List[Key], got {type(arg1)}") elif key is not None: all_keys.append(key) elif key_value is not None: if dataset is not None: all_keys.append(dataset.id(key_value)) elif namespace is not None and set_name is not None: all_keys.append(Key(namespace, set_name, key_value)) else: raise ValueError( "Either dataset or (namespace and set_name) must be provided with key_value" ) if not all_keys: raise ValueError("At least one key must be provided") return all_keys def _build_write_segment( self, op_type: str, arg1: Optional[Union[Key, List[Key]]] = None, arg2: Optional[Key] = None, *more_keys: Key, key: Optional[Key] = None, dataset: Optional[DataSet] = None, namespace: Optional[str] = None, set_name: Optional[str] = None, key_value: Optional[Union[str, int, bytes]] = None, ) -> WriteSegmentBuilder: """Resolve keys and create a :class:`WriteSegmentBuilder`.""" all_keys = self._resolve_keys( arg1, arg2, *more_keys, key=key, dataset=dataset, namespace=namespace, set_name=set_name, key_value=key_value, ) first = all_keys[0] qb = QueryBuilder( client=self._client._client, namespace=first.namespace, set_name=first.set_name, behavior=self._behavior, indexes_monitor=self._client._indexes_monitor, cached_read_policy=self._cached_read_policy, cached_write_policy=self._cached_write_policy, txn=self._txn, ) target: Union[Key, List[Key]] = all_keys[0] if len(all_keys) == 1 else all_keys return qb._start_write_verb(op_type, target) def _fast_write_segment(self, op_type: str, key: Key) -> WriteSegmentBuilder: """Single-key write shortcut: bypass QueryBuilder entirely.""" return _SingleKeyWriteSegment( client=self._client._async_client, key=key, op_type=op_type, behavior=self._behavior, write_policy=self._cached_write_policy, read_policy=self._cached_read_policy, txn=self._txn, ) # -- Read entry point ----------------------------------------------------- @typing.overload def query( self, dataset: DataSet, *, behavior: Optional[Behavior] = None, ) -> QueryBuilder: """Create a query builder from a DataSet.""" ... @typing.overload def query( self, key: Key, *, behavior: Optional[Behavior] = None, ) -> QueryBuilder: """Create a query builder for a single Key (point read).""" ... @typing.overload def query( self, keys: List[Key], *, behavior: Optional[Behavior] = None, ) -> QueryBuilder: """Create a query builder for multiple Keys (batch read).""" ... @typing.overload def query( self, *keys: Key, behavior: Optional[Behavior] = None, ) -> QueryBuilder: """Create a query builder for multiple Keys (varargs).""" ... @typing.overload def query( self, namespace: str, set_name: str, *, behavior: Optional[Behavior] = None, ) -> QueryBuilder: """Create a query builder with explicit namespace/set.""" ...
[docs] def query( self, arg1: Optional[Union[DataSet, Key, List[Key], str]] = None, arg2: Optional[Union[str, Key]] = None, *keys: Key, namespace: Optional[str] = None, set_name: Optional[str] = None, dataset: Optional[DataSet] = None, key: Optional[Key] = None, keys_list: Optional[List[Key]] = None, behavior: Optional[Behavior] = None, ) -> QueryBuilder: """Start a read or secondary-index query for keys or a whole set. This session's :attr:`behavior` is applied to the underlying :class:`~aerospike_sdk.aio.operations.query.QueryBuilder`. Supported shapes include a :class:`~aerospike_sdk.dataset.DataSet` (set-wide query), a single :class:`~aerospike_async.Key`, multiple keys (list or varargs), or explicit ``namespace`` / ``set_name`` for index scans. Args: arg1: Positional dataset, key, list of keys, or namespace string (when paired with ``arg2`` as set name). arg2: When ``arg1`` is a namespace, the set name; otherwise may be a second key when passing multiple keys positionally. *keys: Additional keys when the first positional argument is a key. namespace: Keyword namespace (with ``set_name``) when not using a dataset. set_name: Keyword set name (with ``namespace``). dataset: Keyword :class:`~aerospike_sdk.dataset.DataSet`. key: Keyword single key. keys_list: Keyword list of keys when not using ``arg1`` or varargs; forwarded to the client as ``keys``. behavior: Optional override for this query; defaults to the session's :attr:`behavior`. Returns: A :class:`~aerospike_sdk.aio.operations.query.QueryBuilder` to chain ``where``, ``bins``, ``execute``, etc. Raises: TypeError: If positional types do not match the supported overloads. ValueError: If a key list is empty or arguments are inconsistent. Example: users = DataSet.of("test", "users") rs = await session.query(users.id(1)).bins(["name"]).execute() row = await rs.first_or_raise() Example: users = DataSet.of("test", "users") rs = await session.query(users.ids(1, 2, 3)).bins(["name"]).execute() rows = await rs.collect() See Also: :meth:`Client.query`: Same shapes without session behavior. :meth:`upsert`: Writes for the same keys. """ b = self._behavior if behavior is None else behavior # Handle positional arguments (SDK API) if arg1 is not None: if isinstance(arg1, DataSet): return self._bind_txn( self._client.query(dataset=arg1, behavior=b)) elif isinstance(arg1, Key): all_keys = [arg1] if isinstance(arg2, Key): all_keys.append(arg2) all_keys.extend(keys) elif keys: all_keys.extend(keys) else: # Fast path for single-key queries: construct the # QueryBuilder directly with cached policies to skip # Client.query() overhead and per-op policy rebuilds. builder = QueryBuilder( client=self._client._async_client, namespace=arg1.namespace, set_name=arg1.set_name, behavior=b, indexes_monitor=self._client._indexes_monitor, cached_read_policy=self._cached_read_policy, cached_write_policy=self._cached_write_policy, txn=self._txn, ) builder._single_key = arg1 return builder return self._bind_txn( self._client.query(keys=all_keys, behavior=b)) elif isinstance(arg1, list): if len(arg1) == 0: raise ValueError("keys list cannot be empty") if not isinstance(arg1[0], Key): raise TypeError(f"Expected List[Key], but first element is {type(arg1[0])}") return self._bind_txn( self._client.query(keys=arg1, behavior=b)) elif isinstance(arg1, str) and arg2 is not None: return self._bind_txn( self._client.query(namespace=arg1, set_name=arg2, behavior=b)) if keys: keys_list = list(keys) if arg1 is not None and isinstance(arg1, Key): keys_list.insert(0, arg1) if arg2 is not None and isinstance(arg2, Key): keys_list.insert(1 if arg1 is not None and isinstance(arg1, Key) else 0, arg2) return self._bind_txn( self._client.query(keys=keys_list, behavior=b)) return self._bind_txn(self._client.query( # type: ignore[call-overload] namespace=namespace, set_name=set_name, dataset=dataset, key=key, keys=keys_list, behavior=b, ))
@typing.overload def index( self, *, dataset: DataSet, behavior: Optional[Behavior] = None, ) -> IndexBuilder: """Create an index builder from a DataSet.""" ... @typing.overload def index( self, namespace: str, set_name: str, *, behavior: Optional[Behavior] = None, ) -> IndexBuilder: """Create an index builder with explicit namespace/set.""" ...
[docs] def index( self, namespace: Optional[str] = None, set_name: Optional[str] = None, *, dataset: Optional[DataSet] = None, behavior: Optional[Behavior] = None, ) -> IndexBuilder: """ Create a secondary index builder for a namespace and set. Args: namespace: Namespace name when not using ``dataset``. set_name: Set name when not using ``dataset``. dataset: Optional :class:`~aerospike_sdk.dataset.DataSet` that supplies namespace and set. behavior: Reserved for symmetry with :meth:`query`; forwarded to :meth:`Client.index` but not used by index operations yet. Returns: :class:`~aerospike_sdk.aio.operations.index.IndexBuilder` for chaining index definition and creation. Raises: ValueError: If ``dataset`` is not given and ``namespace`` or ``set_name`` is missing. Example:: users = DataSet.of("test", "users") await session.index(dataset=users).on_bin("age").named("age_idx").numeric().create() See Also: :meth:`Client.index` """ if dataset is not None: return self._client.index(dataset=dataset, behavior=behavior) elif namespace is not None and set_name is not None: return self._client.index( namespace, set_name, behavior=behavior, ) else: raise ValueError( "Invalid arguments. Use either:\n" " - index(dataset=DataSet(...))\n" " - index(namespace=..., set_name=...)" )
[docs] def transaction_session(self) -> "TransactionalSession": """Create a transactional session using this session's behavior. Alias for :meth:`begin_transaction`. Returns: :class:`~aerospike_sdk.aio.transactional_session.TransactionalSession` bound to this session's client and behavior. See Also: :meth:`begin_transaction`: Preferred entry point. :meth:`aerospike_sdk.aio.client.Client.transaction_session` """ return self.begin_transaction()
[docs] def begin_transaction(self) -> "TransactionalSession": """Start a multi-record transaction (MRT) using this session's behavior. Returns an async context manager that allocates a fresh :class:`~aerospike_async.Txn`. Every operation run on the returned session auto-participates in the transaction — builders stamp ``policy.txn = tx.txn`` under the hood, so user code never touches a policy object. On clean exit the transaction is committed; if an exception propagates out of the ``async with`` block the transaction is aborted. Example: >>> async with session.begin_transaction() as tx: ... await tx.upsert(accounts.id("A")).bin("balance").set_to(100).execute() ... await tx.upsert(accounts.id("B")).bin("balance").set_to(200).execute() Returns: :class:`~aerospike_sdk.aio.transactional_session.TransactionalSession` bound to this session's client and behavior. See Also: :meth:`transaction_session`: Alias for this method. :meth:`do_in_transaction`: Run a callable inside a retrying MRT. """ return self._client.transaction_session(behavior=self._behavior)
@overload def info(self) -> InfoCommands: ... @overload def info(self, command: str) -> Awaitable[Dict[str, str]]: ...
[docs] def info( self, command: Optional[str] = None ) -> Union[InfoCommands, Awaitable[Dict[str, str]]]: """ Execute info commands or get the InfoCommands helper. With no argument, returns an InfoCommands instance for high-level helpers (namespaces(), namespace_details(), etc.) and for info_on_all_nodes(). With a command string, runs the raw info command and returns its result (awaitable). Args: command: Optional. If given, the raw info command to run (e.g. "sindex-list", "build"). Returns: If command is None: InfoCommands instance. If command is given: awaitable dict (node -> response). Example:: # Raw command (no double .info) response = await session.info("sindex-list") # High-level helpers info = session.info() namespaces = await info.namespaces() by_node = await info.info_on_all_nodes("build") """ if command is not None: return self._client._async_client.info(command) return InfoCommands(self)
[docs] async def is_namespace_sc(self, namespace: str) -> bool: """ Check if a namespace is in strong consistency (SC) mode. Strong consistency mode provides linearizable reads and writes at the cost of availability during network partitions. Args: namespace: The namespace name to check. Returns: True if the namespace is in strong consistency mode, False otherwise. Raises: ValueError: If the namespace is unknown or the info command fails. Example:: if await session.is_namespace_sc("test"): print("Namespace 'test' is in strong consistency mode") else: print("Namespace 'test' is in AP (availability) mode") """ if self._client._client is None: raise RuntimeError("Client is not connected") try: # Query namespace configuration via info command result = await self._client._client.info(f"namespace/{namespace}") # Parse the result - it's a dict with node addresses as keys for node_result in result.values(): # Parse semicolon-separated key=value pairs for pair in node_result.split(";"): if "=" in pair: key, value = pair.split("=", 1) if key == "strong-consistency": return value.lower() == "true" # If we didn't find the strong-consistency key, default to False (AP mode) return False except Exception as e: raise ValueError(f"Failed to check namespace '{namespace}': {e}") from e
[docs] async def do_in_transaction( self, operation: typing.Callable[["TransactionalSession"], typing.Awaitable[typing.Any]], *, max_attempts: int = 5, sleep_between_retries: float = 0.0, ) -> typing.Any: """Run an async callable inside a retrying multi-record transaction. Creates a :class:`TransactionalSession`, invokes ``operation(tx)`` inside ``async with``, and retries the whole block when the server signals a transient conflict (``MRT_BLOCKED``, ``MRT_VERSION_MISMATCH``, or ``TXN_FAILED``). On any non-transient failure the transaction is aborted and the exception re-raised. Args: operation: Async callable accepting a :class:`TransactionalSession` and performing zero or more operations on it. Its return value is returned from :meth:`do_in_transaction`. max_attempts: Maximum total attempts (initial + retries). Must be ``>= 1``. Defaults to ``5``. sleep_between_retries: Optional seconds to ``await asyncio.sleep`` between retries. ``0`` (the default) retries immediately. Returns: Whatever ``operation`` returns on the successful attempt. Raises: ValueError: If ``max_attempts < 1``. AerospikeError: The last-seen transient error after ``max_attempts`` exhausted retries, or any non-transient error raised by ``operation``. Example: >>> async def transfer(tx): ... await tx.upsert(accounts.id("A")).bin("bal").add(-10).execute() ... await tx.upsert(accounts.id("B")).bin("bal").add(10).execute() ... return "ok" >>> result = await session.do_in_transaction(transfer) See Also: :meth:`begin_transaction`: Manual MRT lifecycle. :class:`TransactionalSession` """ if max_attempts < 1: raise ValueError("max_attempts must be >= 1") import asyncio from aerospike_sdk.exceptions import AerospikeError # Transient MRT conflicts that are safe to retry automatically. retryable_codes = { ResultCode.MRT_BLOCKED, ResultCode.MRT_VERSION_MISMATCH, } # TXN_FAILED is a rolled-up code used when the MRT monitor reports # that one or more ops failed — retrying is safe because we abort # and start fresh on each attempt. txn_failed = getattr(ResultCode, "TXN_FAILED", None) if txn_failed is not None: retryable_codes.add(txn_failed) last_exc: Optional[BaseException] = None for attempt in range(max_attempts): try: async with self.begin_transaction() as tx_session: return await operation(tx_session) except AerospikeError as exc: last_exc = exc if exc.result_code not in retryable_codes: raise if attempt + 1 >= max_attempts: raise if sleep_between_retries > 0: await asyncio.sleep(sleep_between_retries) # Unreachable — last iteration always raises — but keep mypy happy. assert last_exc is not None raise last_exc
# -- Write entry points ---------------------------------------------------
[docs] def upsert( self, arg1: Optional[Union[Key, List[Key]]] = None, arg2: Optional[Key] = None, *keys: Key, key: Optional[Key] = None, dataset: Optional[DataSet] = None, namespace: Optional[str] = None, set_name: Optional[str] = None, key_value: Optional[Union[str, int, bytes]] = None, ) -> WriteSegmentBuilder: """Start a create-or-replace write for one or more keys. If the record exists, bins are merged according to the chained operations; if it does not exist, it is created. Use :meth:`insert` when the record must not already exist. Args: arg1: A single :class:`~aerospike_async.Key`, a list of keys, or omit and pass ``key`` / ``dataset`` + ``key_value`` / ``namespace`` + ``set_name`` + ``key_value``. arg2: Optional second key when passing multiple keys positionally. *keys: Additional keys when the first positional is a key. key: Single key (keyword form). dataset: Dataset used with ``key_value`` to build a key. namespace: Namespace used with ``set_name`` and ``key_value``. set_name: Set name used with ``namespace`` and ``key_value``. key_value: User key value with ``dataset`` or ``namespace``/``set_name``. Returns: A :class:`~aerospike_sdk.aio.operations.query.WriteSegmentBuilder` for ``put``, ``bin``, ``where``, ``execute``, etc. Raises: ValueError: If no keys are resolved or lists are empty. TypeError: If positional arguments are not keys or lists of keys. Example: users = DataSet.of("test", "users") await session.upsert(users.id(1)).put({"name": "Tim", "age": 30}).execute() See Also: :meth:`insert`: Fails if the record already exists. :meth:`update`: Fails if the record does not exist. :meth:`replace`: Replace-entire-record semantics when configured. """ if ( isinstance(arg1, Key) and arg2 is None and not keys and key is None and dataset is None and namespace is None and key_value is None ): return self._fast_write_segment("upsert", arg1) return self._build_write_segment( "upsert", arg1, arg2, *keys, key=key, dataset=dataset, namespace=namespace, set_name=set_name, key_value=key_value, )
[docs] def insert( self, arg1: Optional[Union[Key, List[Key]]] = None, arg2: Optional[Key] = None, *keys: Key, key: Optional[Key] = None, dataset: Optional[DataSet] = None, namespace: Optional[str] = None, set_name: Optional[str] = None, key_value: Optional[Union[str, int, bytes]] = None, ) -> WriteSegmentBuilder: """Start a create-only write; fails on execute if the record already exists. Key resolution matches :meth:`upsert`. Returns: A :class:`~aerospike_sdk.aio.operations.query.WriteSegmentBuilder`. Raises: ValueError: If no keys are resolved. TypeError: If positional arguments are invalid. Example: users = DataSet.of("test", "users") await session.insert(users.id(99)).put({"name": "new"}).execute() See Also: :meth:`upsert`: Create or update. """ if ( isinstance(arg1, Key) and arg2 is None and not keys and key is None and dataset is None and namespace is None and key_value is None ): return self._fast_write_segment("insert", arg1) return self._build_write_segment( "insert", arg1, arg2, *keys, key=key, dataset=dataset, namespace=namespace, set_name=set_name, key_value=key_value, )
[docs] def update( self, arg1: Optional[Union[Key, List[Key]]] = None, arg2: Optional[Key] = None, *keys: Key, key: Optional[Key] = None, dataset: Optional[DataSet] = None, namespace: Optional[str] = None, set_name: Optional[str] = None, key_value: Optional[Union[str, int, bytes]] = None, ) -> WriteSegmentBuilder: """Start an update-only write; fails on execute if the record is missing. Key resolution matches :meth:`upsert`. Returns: A :class:`~aerospike_sdk.aio.operations.query.WriteSegmentBuilder`. Raises: ValueError: If no keys are resolved. TypeError: If positional arguments are invalid. See Also: :meth:`upsert`: Create if missing. :meth:`replace_if_exists`: Replace semantics when the record exists. """ if ( isinstance(arg1, Key) and arg2 is None and not keys and key is None and dataset is None and namespace is None and key_value is None ): return self._fast_write_segment("update", arg1) return self._build_write_segment( "update", arg1, arg2, *keys, key=key, dataset=dataset, namespace=namespace, set_name=set_name, key_value=key_value, )
[docs] def replace( self, arg1: Optional[Union[Key, List[Key]]] = None, arg2: Optional[Key] = None, *keys: Key, key: Optional[Key] = None, dataset: Optional[DataSet] = None, namespace: Optional[str] = None, set_name: Optional[str] = None, key_value: Optional[Union[str, int, bytes]] = None, ) -> WriteSegmentBuilder: """Start a full-record replace write (bins replaced per builder rules). Key resolution matches :meth:`upsert`. Prefer :meth:`replace_if_exists` when the record must already exist. Returns: A :class:`~aerospike_sdk.aio.operations.query.WriteSegmentBuilder`. Raises: ValueError: If no keys are resolved. TypeError: If positional arguments are invalid. See Also: :meth:`replace_if_exists`: Replace only when the record exists. """ if ( isinstance(arg1, Key) and arg2 is None and not keys and key is None and dataset is None and namespace is None and key_value is None ): return self._fast_write_segment("replace", arg1) return self._build_write_segment( "replace", arg1, arg2, *keys, key=key, dataset=dataset, namespace=namespace, set_name=set_name, key_value=key_value, )
[docs] def replace_if_exists( self, arg1: Optional[Union[Key, List[Key]]] = None, arg2: Optional[Key] = None, *keys: Key, key: Optional[Key] = None, dataset: Optional[DataSet] = None, namespace: Optional[str] = None, set_name: Optional[str] = None, key_value: Optional[Union[str, int, bytes]] = None, ) -> WriteSegmentBuilder: """Start a replace write that requires an existing record. Key resolution matches :meth:`upsert`. On execute, missing keys surface as errors according to error strategy (default may raise). Returns: A :class:`~aerospike_sdk.aio.operations.query.WriteSegmentBuilder`. Raises: ValueError: If no keys are resolved. TypeError: If positional arguments are invalid. See Also: :meth:`replace`: Unconditional replace semantics. """ if ( isinstance(arg1, Key) and arg2 is None and not keys and key is None and dataset is None and namespace is None and key_value is None ): return self._fast_write_segment("replace_if_exists", arg1) return self._build_write_segment( "replace_if_exists", arg1, arg2, *keys, key=key, dataset=dataset, namespace=namespace, set_name=set_name, key_value=key_value, )
[docs] def delete( self, arg1: Optional[Union[Key, List[Key]]] = None, arg2: Optional[Key] = None, *keys: Key, key: Optional[Key] = None, dataset: Optional[DataSet] = None, namespace: Optional[str] = None, set_name: Optional[str] = None, key_value: Optional[Union[str, int, bytes]] = None, ) -> WriteSegmentBuilder: """Start a delete for one or more keys. Key resolution matches :meth:`upsert`. Chain filters or durable-delete options on the builder, then ``await ...execute()``. Returns: A :class:`~aerospike_sdk.aio.operations.query.WriteSegmentBuilder`. Raises: ValueError: If no keys are resolved. TypeError: If positional arguments are invalid. Example: users = DataSet.of("test", "users") await session.delete(users.id(1)).execute() await session.delete(users.ids(10, 11)).execute() See Also: :meth:`background_task`: Delete many records via a server job. """ if ( isinstance(arg1, Key) and arg2 is None and not keys and key is None and dataset is None and namespace is None and key_value is None ): return self._fast_write_segment("delete", arg1) return self._build_write_segment( "delete", arg1, arg2, *keys, key=key, dataset=dataset, namespace=namespace, set_name=set_name, key_value=key_value, )
[docs] def touch( self, arg1: Optional[Union[Key, List[Key]]] = None, arg2: Optional[Key] = None, *keys: Key, key: Optional[Key] = None, dataset: Optional[DataSet] = None, namespace: Optional[str] = None, set_name: Optional[str] = None, key_value: Optional[Union[str, int, bytes]] = None, ) -> WriteSegmentBuilder: """Start a touch to refresh TTL without changing bins. Key resolution matches :meth:`upsert`. Use the builder to set TTL or related policy, then ``await ...execute()``. Returns: A :class:`~aerospike_sdk.aio.operations.query.WriteSegmentBuilder`. Raises: ValueError: If no keys are resolved. TypeError: If positional arguments are invalid. See Also: :meth:`upsert`: Writes that can also set expiration via the builder. """ if ( isinstance(arg1, Key) and arg2 is None and not keys and key is None and dataset is None and namespace is None and key_value is None ): return self._fast_write_segment("touch", arg1) return self._build_write_segment( "touch", arg1, arg2, *keys, key=key, dataset=dataset, namespace=namespace, set_name=set_name, key_value=key_value, )
[docs] def exists( self, arg1: Optional[Union[Key, List[Key]]] = None, arg2: Optional[Key] = None, *keys: Key, key: Optional[Key] = None, dataset: Optional[DataSet] = None, namespace: Optional[str] = None, set_name: Optional[str] = None, key_value: Optional[Union[str, int, bytes]] = None, ) -> WriteSegmentBuilder: """Start an existence check for one or more keys. Key resolution matches :meth:`upsert`. After ``execute``, use :meth:`~aerospike_sdk.record_result.RecordResult.as_bool` on each :class:`~aerospike_sdk.record_result.RecordResult` or inspect ``result_code``. Returns: A :class:`~aerospike_sdk.aio.operations.query.WriteSegmentBuilder`. Raises: ValueError: If no keys are resolved. TypeError: If positional arguments are invalid. Example: users = DataSet.of("test", "users") rs = await session.exists(users.id(1)).execute() exists = (await rs.first()).as_bool() See Also: :meth:`query`: Read record data when the key is known to exist. """ if ( isinstance(arg1, Key) and arg2 is None and not keys and key is None and dataset is None and namespace is None and key_value is None ): return self._fast_write_segment("exists", arg1) return self._build_write_segment( "exists", arg1, arg2, *keys, key=key, dataset=dataset, namespace=namespace, set_name=set_name, key_value=key_value, )
[docs] async def truncate(self, dataset: DataSet, before_nanos: Optional[int] = None) -> None: """ Truncate (delete all records) from a set; this cannot be undone. Args: dataset: The DataSet to truncate. before_nanos: Optional timestamp in nanoseconds. Only records with last update time (LUT) less than this value are truncated. If None, all records in the set are truncated. Returns: None Raises: RuntimeError: If the client is not connected. Example:: users = DataSet.of("test", "users") await session.truncate(users) cutoff_time = time.time_ns() - (24 * 60 * 60 * 10**9) # 24 hours ago await session.truncate(users, before_nanos=cutoff_time) """ # Access the underlying async client and call its truncate method if self._client._client is None: raise RuntimeError("Client is not connected") await self._client._client.truncate( dataset.namespace, dataset.set_name, before_nanos )
def __repr__(self) -> str: """String representation of the session.""" return f"Session(behavior={self._behavior.name!r})"