Session¶
- class aerospike_sdk.aio.session.Session[source]¶
Bases:
objectPerform reads and writes against Aerospike with a fixed
Behavior.A session binds a connected
Clientto policy defaults (timeouts, retries, replica preferences) for every operation started from it. Create sessions withClient.create_session(); do not constructSessiondirectly.Example
- async with Client(“localhost:3000”) as client:
session = client.create_session(Behavior.DEFAULT) users = DataSet.of(“test”, “users”) stream = await session.query(users.id(1)).execute() first = await stream.first_or_raise() await session.upsert(users.id(2)).put({“name”: “Tim”}).execute()
See also
Client.create_session(): How to obtain a session.query(): Point reads, batch reads, and secondary-index queries.upsert(): Create-or-update writes.- __init__(client, behavior)[source]¶
Attach a client and behavior; prefer
Client.create_session().- Parameters:
Note
Application code should not call
Session(...)directly.See also
Client.create_session().
- get_current_transaction()[source]¶
Return the active transaction for this session, or
None.Regular
Sessioninstances always returnNone; onlyTransactionalSessioninside itsasync withblock returns a liveTxn. Builders created from this session call this hook at construction and thread the result through every policy they hand to the PAC.- Return type:
Optional[Txn]- Returns:
The active
Txn, orNoneoutside a transaction.
Example
>>> session = client.create_session() >>> session.get_current_transaction() is None True >>> async with session.begin_transaction() as tx: ... assert tx.get_current_transaction() is tx.txn
See also
begin_transaction(): Enter a multi-record transaction.TransactionalSession
- async get(key, bins=None)[source]¶
Direct single-key point read — returns
Recordor raises.Bypasses the builder chain (
session.query(key).execute()) and theRecordStreamwrapper: oneawaitreaches the underlying client and the resultingRecordis returned unwrapped. Use when you have a single key and want minimum per-op overhead; usequery()when you need filters, projections, or streaming.- Parameters:
- Return type:
Record- Returns:
The
Recordforkey.- Raises:
AerospikeError – Server or client errors (including
KEY_NOT_FOUND_ERROR) are raised from the underlying client without being wrapped in aRecordResult.
Example
>>> users = DataSet.of("test", "users") >>> rec = await session.get(users.id(1)) >>> name = rec.bins["name"]
- async put(key, bins)[source]¶
Direct single-key upsert — returns
Noneor raises.Bypasses the builder chain (
session.upsert(key).put(...).execute()) and theRecordStreamwrapper: oneawaitreaches the underlying client. Use when you have a single key and want minimum per-op overhead; useupsert()when you need atomic multi-op semantics, TTL overrides, generation checks, durable delete, or filter expressions.- Parameters:
- Return type:
- Returns:
Noneon success.- Raises:
AerospikeError – Server or client errors are raised from the underlying client.
Example
>>> users = DataSet.of("test", "users") >>> await session.put(users.id(1), {"name": "Tim", "age": 30})
- property behavior: Behavior¶
Policy bundle applied to operations created from this session.
- Returns:
The
Behaviorpassed toClient.create_session().
- property client: Client¶
SDK client that owns the connection used by this session.
- Returns:
The parent
Client.
- batch()[source]¶
Start a multi-key batch of mixed write operations executed in one server round trip.
Chain
insert,update,upsert,replace,delete, and related bin builders, thenawait ...execute()to obtain per-key outcomes.- Return type:
- Returns:
A
BatchOperationBuilderfor chaining operations.- Raises:
RuntimeError – If the client is not connected.
Example:
results = await ( session.batch() .insert(key1).put({"name": "Alice", "age": 25}) .update(key2).bin("counter").add(1) .upsert(key3).put({"status": "active"}) .delete(key4) .execute() ) for row in results: print(row.key, row.result_code)
See also
upsert(): Single-record writes without batching.
- background_task()[source]¶
Configure a server-side background job (query + scan scope) on a dataset.
Call
update,delete,touch, orexecute_udfon the returned object, add optional filters (for examplewhereon supported builders), thenawait ...execute()to start work and receive an async task handle.- Return type:
- Returns:
A
BackgroundTaskSessionfor chaining the operation type and execution.- Raises:
RuntimeError – If the client is not connected.
Example:
task = await ( session.background_task() .delete(DataSet.of("test", "scratch")) .where("$.flag == 1") .execute() ) await task.wait_till_complete(sleep_time=0.2, max_attempts=50)
See also
execute_udf(): Foreground UDF on explicit keys.
- execute_udf(*keys)[source]¶
Run a registered server-side UDF on one or more keys (foreground).
Chain
function(package, name)(package is the registered module name without.lua), optionalpassing(*args)for Lua parameters, optionalwherefor a filter expression, thenawait ...execute()to obtain aRecordStream. Multiple keys use a batch UDF; results preserve per-key order where applicable.- Parameters:
*keys (
Key) – One or moreKeytargets in the same namespace and set.- Return type:
- Returns:
UdfFunctionBuilder— callfunctionnext.- Raises:
ValueError – If no keys are given.
RuntimeError – If the client is not connected.
Example:
users = DataSet.of("test", "users") stream = await ( session.execute_udf(users.id("a")) .function("my_module", "my_fn") .passing("binName", 42) .execute() ) value = await stream.first_udf_result()
See also
query(): Read bins without UDF.background_task(): Dataset-scoped background UDF.
- query(arg1=None, arg2=None, *keys, namespace=None, set_name=None, dataset=None, key=None, keys_list=None, behavior=None)[source]¶
- Overloads:
self, dataset (DataSet), behavior (Optional[Behavior]) → QueryBuilder
self, key (Key), behavior (Optional[Behavior]) → QueryBuilder
self, keys (List[Key]), behavior (Optional[Behavior]) → QueryBuilder
self, keys (Key), behavior (Optional[Behavior]) → QueryBuilder
self, namespace (str), set_name (str), behavior (Optional[Behavior]) → QueryBuilder
Start a read or secondary-index query for keys or a whole set.
This session’s
behavioris applied to the underlyingQueryBuilder. Supported shapes include aDataSet(set-wide query), a singleKey, multiple keys (list or varargs), or explicitnamespace/set_namefor index scans.- Parameters:
arg1 (
Union[DataSet,Key,List[Key],str,None]) – Positional dataset, key, list of keys, or namespace string (when paired witharg2as set name).arg2 (
Union[str,Key,None]) – Whenarg1is a namespace, the set name; otherwise may be a second key when passing multiple keys positionally.*keys (
Key) – Additional keys when the first positional argument is a key.namespace (
Optional[str]) – Keyword namespace (withset_name) when not using a dataset.set_name (
Optional[str]) – Keyword set name (withnamespace).key (
Optional[Key]) – Keyword single key.keys_list (
Optional[List[Key]]) – Keyword list of keys when not usingarg1or varargs; forwarded to the client askeys.behavior (
Optional[Behavior]) – Optional override for this query; defaults to the session’sbehavior.
- Returns:
A
QueryBuilderto chainwhere,bins,execute, etc.- Raises:
TypeError – If positional types do not match the supported overloads.
ValueError – If a key list is empty or arguments are inconsistent.
Example
users = DataSet.of(“test”, “users”) rs = await session.query(users.id(1)).bins([“name”]).execute() row = await rs.first_or_raise()
Example
users = DataSet.of(“test”, “users”) rs = await session.query(users.ids(1, 2, 3)).bins([“name”]).execute() rows = await rs.collect()
See also
Client.query(): Same shapes without session behavior.upsert(): Writes for the same keys.
- index(namespace=None, set_name=None, *, dataset=None, behavior=None)[source]¶
- Overloads:
self, dataset (DataSet), behavior (Optional[Behavior]) → IndexBuilder
self, namespace (str), set_name (str), behavior (Optional[Behavior]) → IndexBuilder
Create a secondary index builder for a namespace and set.
- Parameters:
- Returns:
IndexBuilderforchaining index definition and creation.
- Raises:
ValueError – If
datasetis not given andnamespaceorset_nameis missing.
Example:
users = DataSet.of("test", "users") await session.index(dataset=users).on_bin("age").named("age_idx").numeric().create()
See also
Client.index()
- transaction_session()[source]¶
Create a transactional session using this session’s behavior.
Alias for
begin_transaction().- Return type:
- Returns:
TransactionalSessionbound to this session’s client and behavior.
See also
begin_transaction(): Preferred entry point.aerospike_sdk.aio.client.Client.transaction_session()
- begin_transaction()[source]¶
Start a multi-record transaction (MRT) using this session’s behavior.
Returns an async context manager that allocates a fresh
Txn. Every operation run on the returned session auto-participates in the transaction — builders stamppolicy.txn = tx.txnunder the hood, so user code never touches a policy object. On clean exit the transaction is committed; if an exception propagates out of theasync withblock the transaction is aborted.Example
>>> async with session.begin_transaction() as tx: ... await tx.upsert(accounts.id("A")).bin("balance").set_to(100).execute() ... await tx.upsert(accounts.id("B")).bin("balance").set_to(200).execute()
- Return type:
- Returns:
TransactionalSessionbound to this session’s client and behavior.
See also
transaction_session(): Alias for this method.do_in_transaction(): Run a callable inside a retrying MRT.
- info(command=None)[source]¶
- Overloads:
self → InfoCommands
self, command (str) → Awaitable[Dict[str, str]]
Execute info commands or get the InfoCommands helper.
With no argument, returns an InfoCommands instance for high-level helpers (namespaces(), namespace_details(), etc.) and for info_on_all_nodes().
With a command string, runs the raw info command and returns its result (awaitable).
- Parameters:
command (
Optional[str]) – Optional. If given, the raw info command to run (e.g. “sindex-list”, “build”).- Returns:
InfoCommands instance. If command is given: awaitable dict (node -> response).
- Return type:
If command is None
Example:
# Raw command (no double .info) response = await session.info("sindex-list") # High-level helpers info = session.info() namespaces = await info.namespaces() by_node = await info.info_on_all_nodes("build")
- async is_namespace_sc(namespace)[source]¶
Check if a namespace is in strong consistency (SC) mode.
Strong consistency mode provides linearizable reads and writes at the cost of availability during network partitions.
- Parameters:
namespace (
str) – The namespace name to check.- Return type:
- Returns:
True if the namespace is in strong consistency mode, False otherwise.
- Raises:
ValueError – If the namespace is unknown or the info command fails.
Example:
if await session.is_namespace_sc("test"): print("Namespace 'test' is in strong consistency mode") else: print("Namespace 'test' is in AP (availability) mode")
- async do_in_transaction(operation, *, max_attempts=5, sleep_between_retries=0.0)[source]¶
Run an async callable inside a retrying multi-record transaction.
Creates a
TransactionalSession, invokesoperation(tx)insideasync with, and retries the whole block when the server signals a transient conflict (MRT_BLOCKED,MRT_VERSION_MISMATCH, orTXN_FAILED). On any non-transient failure the transaction is aborted and the exception re-raised.- Parameters:
operation (
Callable[[TransactionalSession],Awaitable[Any]]) – Async callable accepting aTransactionalSessionand performing zero or more operations on it. Its return value is returned fromdo_in_transaction().max_attempts (
int) – Maximum total attempts (initial + retries). Must be>= 1. Defaults to5.sleep_between_retries (
float) – Optional seconds toawait asyncio.sleepbetween retries.0(the default) retries immediately.
- Return type:
- Returns:
Whatever
operationreturns on the successful attempt.- Raises:
ValueError – If
max_attempts < 1.AerospikeError – The last-seen transient error after
max_attemptsexhausted retries, or any non-transient error raised byoperation.
Example
>>> async def transfer(tx): ... await tx.upsert(accounts.id("A")).bin("bal").add(-10).execute() ... await tx.upsert(accounts.id("B")).bin("bal").add(10).execute() ... return "ok" >>> result = await session.do_in_transaction(transfer)
See also
begin_transaction(): Manual MRT lifecycle.TransactionalSession
- upsert(arg1=None, arg2=None, *keys, key=None, dataset=None, namespace=None, set_name=None, key_value=None)[source]¶
Start a create-or-replace write for one or more keys.
If the record exists, bins are merged according to the chained operations; if it does not exist, it is created. Use
insert()when the record must not already exist.- Parameters:
arg1 (
Union[Key,List[Key],None]) – A singleKey, a list of keys, or omit and passkey/dataset+key_value/namespace+set_name+key_value.arg2 (
Optional[Key]) – Optional second key when passing multiple keys positionally.*keys (
Key) – Additional keys when the first positional is a key.key (
Optional[Key]) – Single key (keyword form).dataset (
Optional[DataSet]) – Dataset used withkey_valueto build a key.namespace (
Optional[str]) – Namespace used withset_nameandkey_value.set_name (
Optional[str]) – Set name used withnamespaceandkey_value.key_value (
Union[str,int,bytes,None]) – User key value withdatasetornamespace/set_name.
- Return type:
- Returns:
A
WriteSegmentBuilderforput,bin,where,execute, etc.- Raises:
ValueError – If no keys are resolved or lists are empty.
TypeError – If positional arguments are not keys or lists of keys.
Example
users = DataSet.of(“test”, “users”) await session.upsert(users.id(1)).put({“name”: “Tim”, “age”: 30}).execute()
- insert(arg1=None, arg2=None, *keys, key=None, dataset=None, namespace=None, set_name=None, key_value=None)[source]¶
Start a create-only write; fails on execute if the record already exists.
Key resolution matches
upsert().- Return type:
- Returns:
- Raises:
ValueError – If no keys are resolved.
TypeError – If positional arguments are invalid.
Example
users = DataSet.of(“test”, “users”) await session.insert(users.id(99)).put({“name”: “new”}).execute()
See also
upsert(): Create or update.
- update(arg1=None, arg2=None, *keys, key=None, dataset=None, namespace=None, set_name=None, key_value=None)[source]¶
Start an update-only write; fails on execute if the record is missing.
Key resolution matches
upsert().- Return type:
- Returns:
- Raises:
ValueError – If no keys are resolved.
TypeError – If positional arguments are invalid.
See also
upsert(): Create if missing.replace_if_exists(): Replace semantics when the record exists.
- replace(arg1=None, arg2=None, *keys, key=None, dataset=None, namespace=None, set_name=None, key_value=None)[source]¶
Start a full-record replace write (bins replaced per builder rules).
Key resolution matches
upsert(). Preferreplace_if_exists()when the record must already exist.- Return type:
- Returns:
- Raises:
ValueError – If no keys are resolved.
TypeError – If positional arguments are invalid.
See also
replace_if_exists(): Replace only when the record exists.
- replace_if_exists(arg1=None, arg2=None, *keys, key=None, dataset=None, namespace=None, set_name=None, key_value=None)[source]¶
Start a replace write that requires an existing record.
Key resolution matches
upsert(). On execute, missing keys surface as errors according to error strategy (default may raise).- Return type:
- Returns:
- Raises:
ValueError – If no keys are resolved.
TypeError – If positional arguments are invalid.
See also
replace(): Unconditional replace semantics.
- delete(arg1=None, arg2=None, *keys, key=None, dataset=None, namespace=None, set_name=None, key_value=None)[source]¶
Start a delete for one or more keys.
Key resolution matches
upsert(). Chain filters or durable-delete options on the builder, thenawait ...execute().- Return type:
- Returns:
- Raises:
ValueError – If no keys are resolved.
TypeError – If positional arguments are invalid.
Example
users = DataSet.of(“test”, “users”) await session.delete(users.id(1)).execute() await session.delete(users.ids(10, 11)).execute()
See also
background_task(): Delete many records via a server job.
- touch(arg1=None, arg2=None, *keys, key=None, dataset=None, namespace=None, set_name=None, key_value=None)[source]¶
Start a touch to refresh TTL without changing bins.
Key resolution matches
upsert(). Use the builder to set TTL or related policy, thenawait ...execute().- Return type:
- Returns:
- Raises:
ValueError – If no keys are resolved.
TypeError – If positional arguments are invalid.
See also
upsert(): Writes that can also set expiration via the builder.
- exists(arg1=None, arg2=None, *keys, key=None, dataset=None, namespace=None, set_name=None, key_value=None)[source]¶
Start an existence check for one or more keys.
Key resolution matches
upsert(). Afterexecute, useas_bool()on eachRecordResultor inspectresult_code.- Return type:
- Returns:
- Raises:
ValueError – If no keys are resolved.
TypeError – If positional arguments are invalid.
Example
users = DataSet.of(“test”, “users”) rs = await session.exists(users.id(1)).execute() exists = (await rs.first()).as_bool()
See also
query(): Read record data when the key is known to exist.
- async truncate(dataset, before_nanos=None)[source]¶
Truncate (delete all records) from a set; this cannot be undone.
- Parameters:
- Return type:
- Returns:
None
- Raises:
RuntimeError – If the client is not connected.
Example:
users = DataSet.of("test", "users") await session.truncate(users) cutoff_time = time.time_ns() - (24 * 60 * 60 * 10**9) # 24 hours ago await session.truncate(users, before_nanos=cutoff_time)