# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Synchronous :class:`~aerospike_sdk.aio.session.Session` wrapper."""
from __future__ import annotations
import typing
from typing import Dict, List, Optional, TYPE_CHECKING, overload, Union
from aerospike_async import Key
from aerospike_sdk.aio.client import Client
from aerospike_sdk.aio.session import Session as AsyncSession
from aerospike_sdk.dataset import DataSet
from aerospike_sdk.policy.behavior import Behavior
from aerospike_sdk.sync.background import SyncBackgroundTaskSession
from aerospike_sdk.sync.client import _EventLoopManager
from aerospike_sdk.sync.info import SyncInfoCommands
from aerospike_sdk.sync.operations.batch import SyncBatchOperationBuilder
from aerospike_sdk.sync.operations.index import SyncIndexBuilder
from aerospike_sdk.sync.operations.query import SyncQueryBuilder, SyncWriteSegmentBuilder
from aerospike_sdk.sync.operations.udf import SyncUdfFunctionBuilder
if TYPE_CHECKING:
from aerospike_sdk.sync.transactional_session import SyncTransactionalSession
[docs]
class SyncSession:
"""Run session-scoped reads and writes without ``async``/``await``.
Constructed by :meth:`SyncClient.create_session
<aerospike_sdk.sync.client.SyncClient.create_session>`, not by
calling ``SyncSession(...)`` directly. Each method delegates to
:class:`~aerospike_sdk.aio.session.Session` on a shared per-thread loop;
return types are sync wrappers where the async API would return a coroutine
or async stream.
See Also:
:class:`~aerospike_sdk.aio.session.Session`: Async API and behavior
semantics.
"""
[docs]
def __init__(self, async_session: AsyncSession, loop_manager: _EventLoopManager) -> None:
"""Wrap ``async_session``; use :meth:`SyncClient.create_session` instead.
Args:
async_session: Connected async session (same behavior binding).
loop_manager: Loop manager shared with the parent
:class:`~aerospike_sdk.sync.client.SyncClient`.
"""
self._async_session = async_session
self._loop_manager = loop_manager
@property
def behavior(self) -> Behavior:
"""Get the behavior configuration for this session."""
return self._async_session.behavior
@property
def client(self) -> Client:
"""Get the underlying Client."""
return self._async_session.client
def _build_write_segment(
self,
op_type: str,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*more_keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> "SyncWriteSegmentBuilder":
"""Delegate to async session's write segment builder and wrap in sync."""
wsb = self._async_session._build_write_segment(
op_type, arg1, arg2, *more_keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)
return SyncWriteSegmentBuilder(wsb, self._loop_manager)
def _fast_write_segment(self, op_type: str, key: Key) -> "SyncWriteSegmentBuilder":
"""Single-key write shortcut: delegate to async fast path and wrap."""
wsb = self._async_session._fast_write_segment(op_type, key)
return SyncWriteSegmentBuilder(wsb, self._loop_manager)
[docs]
def query(
self,
arg1: Optional[Union[DataSet, Key, List[Key], str]] = None,
arg2: Optional[Union[str, Key]] = None,
*keys: Key,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
dataset: Optional[DataSet] = None,
key: Optional[Key] = None,
keys_list: Optional[List[Key]] = None,
behavior: Optional[Behavior] = None,
) -> "SyncQueryBuilder":
"""Start a read or secondary-index query (synchronous session).
Same shapes as :meth:`aerospike_sdk.aio.session.Session.query`, with
this session's behavior applied on the underlying async builder.
Args:
arg1: Positional dataset, key, list of keys, or namespace string
(when paired with ``arg2`` as set name).
arg2: When ``arg1`` is a namespace, the set name; otherwise may be
a second key when passing multiple keys positionally.
*keys: Additional keys when the first positional argument is a key.
namespace: Keyword namespace (with ``set_name``) when not using a
dataset.
set_name: Keyword set name (with ``namespace``).
dataset: Keyword :class:`~aerospike_sdk.dataset.DataSet`.
key: Keyword single key.
keys_list: Keyword list of keys for batch read.
behavior: Optional per-query behavior override (same as async session).
Returns:
A :class:`~aerospike_sdk.sync.operations.query.SyncQueryBuilder`.
"""
# Delegate to async session.query() - pass positional args as positional, keyword args as keyword
if arg1 is not None or arg2 is not None or keys:
# Has positional arguments - pass them positionally
async_builder = self._async_session.query( # type: ignore[call-overload]
arg1, arg2, *keys, behavior=behavior,
)
else:
# Only keyword arguments
async_builder = self._async_session.query( # type: ignore[call-overload]
namespace=namespace,
set_name=set_name,
dataset=dataset,
key=key,
keys_list=keys_list,
behavior=behavior,
)
return SyncQueryBuilder(
async_client=self._async_session._client,
namespace=async_builder._namespace,
set_name=async_builder._set_name,
loop_manager=self._loop_manager,
query_builder=async_builder,
)
[docs]
def batch(self) -> "SyncBatchOperationBuilder":
"""Start a multi-key batch of mixed write operations (synchronous).
Chain ``insert``, ``update``, ``upsert``, ``replace``, ``delete``, and bin
builders, then :meth:`~aerospike_sdk.sync.operations.batch.SyncBatchOperationBuilder.execute`
for a :class:`~aerospike_sdk.sync.record_stream.SyncRecordStream`.
Returns:
A :class:`~aerospike_sdk.sync.operations.batch.SyncBatchOperationBuilder`.
Raises:
RuntimeError: If the client is not connected (from the async session).
Example::
stream = (
session.batch()
.insert(key1).put({"name": "Alice", "age": 25})
.update(key2).bin("counter").add(1)
.execute()
)
for row in stream:
print(row.key, row.result_code)
See Also:
:meth:`~aerospike_sdk.aio.session.Session.batch`
"""
inner = self._async_session.batch()
return SyncBatchOperationBuilder(inner, self._loop_manager)
[docs]
def transaction_session(self) -> "SyncTransactionalSession":
"""Alias for :meth:`begin_transaction`.
Returns:
:class:`~aerospike_sdk.sync.transactional_session.SyncTransactionalSession`
bound to this session's client and behavior.
See Also:
:meth:`begin_transaction`: Preferred entry point.
:meth:`~aerospike_sdk.aio.session.Session.transaction_session`: Async equivalent.
"""
return self.begin_transaction()
[docs]
def begin_transaction(self) -> "SyncTransactionalSession":
"""Start a multi-record transaction (MRT) using this session's behavior.
Returns a context manager that allocates a fresh
:class:`~aerospike_async.Txn`. Every operation run on the returned
session auto-participates in the transaction — builders stamp
``policy.txn = tx.txn`` under the hood, so user code never touches a
policy object. On clean exit the transaction is committed; if an
exception propagates out of the ``with`` block the transaction is
aborted.
Example:
>>> with session.begin_transaction() as tx:
... tx.upsert(accounts.id("A")).bin("balance").set_to(100).execute()
... tx.upsert(accounts.id("B")).bin("balance").set_to(200).execute()
Returns:
:class:`~aerospike_sdk.sync.transactional_session.SyncTransactionalSession`
bound to this session's client and behavior.
See Also:
:meth:`transaction_session`: Alias for this method.
:meth:`do_in_transaction`: Run a callable inside a retrying MRT.
:meth:`~aerospike_sdk.aio.session.Session.begin_transaction`: Async equivalent.
"""
from aerospike_sdk.sync.transactional_session import SyncTransactionalSession
async_txn_session = self._async_session.begin_transaction()
return SyncTransactionalSession(async_txn_session, self._loop_manager)
[docs]
def do_in_transaction(
self,
operation: "typing.Callable[[SyncTransactionalSession], typing.Any]",
*,
max_attempts: int = 5,
sleep_between_retries: float = 0.0,
) -> "typing.Any":
"""Run a callable inside a retrying multi-record transaction.
Creates a :class:`SyncTransactionalSession`, invokes ``operation(tx)``
inside ``with``, and retries the whole block when the server signals
a transient conflict (``MRT_BLOCKED``, ``MRT_VERSION_MISMATCH``, or
``TXN_FAILED``). On any non-transient failure the transaction is
aborted and the exception re-raised.
Args:
operation: Synchronous callable accepting a
:class:`SyncTransactionalSession` and performing zero or more
operations on it. Its return value is returned from
:meth:`do_in_transaction`.
max_attempts: Maximum total attempts (initial + retries). Must
be ``>= 1``. Defaults to ``5``.
sleep_between_retries: Optional seconds to ``time.sleep`` between
retries. ``0`` (the default) retries immediately.
Returns:
Whatever ``operation`` returns on the successful attempt.
Raises:
ValueError: If ``max_attempts < 1``.
AerospikeError: The last-seen transient error after
``max_attempts`` exhausted retries, or any non-transient
error raised by ``operation``.
Example:
>>> def transfer(tx):
... tx.upsert(accounts.id("A")).bin("bal").add(-10).execute()
... tx.upsert(accounts.id("B")).bin("bal").add(10).execute()
... return "ok"
>>> result = session.do_in_transaction(transfer)
See Also:
:meth:`begin_transaction`: Manual MRT lifecycle.
:class:`SyncTransactionalSession`
:meth:`~aerospike_sdk.aio.session.Session.do_in_transaction`: Async equivalent.
"""
if max_attempts < 1:
raise ValueError("max_attempts must be >= 1")
import time
from aerospike_async import ResultCode
from aerospike_sdk.exceptions import AerospikeError
retryable_codes = {
ResultCode.MRT_BLOCKED,
ResultCode.MRT_VERSION_MISMATCH,
}
txn_failed = getattr(ResultCode, "TXN_FAILED", None)
if txn_failed is not None:
retryable_codes.add(txn_failed)
last_exc: Optional[BaseException] = None
for attempt in range(max_attempts):
try:
with self.begin_transaction() as tx_session:
return operation(tx_session)
except AerospikeError as exc:
last_exc = exc
if exc.result_code not in retryable_codes:
raise
if attempt + 1 >= max_attempts:
raise
if sleep_between_retries > 0:
time.sleep(sleep_between_retries)
assert last_exc is not None
raise last_exc
[docs]
def background_task(self) -> "SyncBackgroundTaskSession":
"""Start a background dataset task chain (synchronous).
Returns:
:class:`~aerospike_sdk.sync.background.SyncBackgroundTaskSession`.
See Also:
:meth:`~aerospike_sdk.aio.session.Session.background_task`.
"""
inner = self._async_session.background_task()
return SyncBackgroundTaskSession(inner, self._loop_manager)
[docs]
def execute_udf(self, *keys: Key) -> "SyncUdfFunctionBuilder":
"""Begin a foreground UDF invocation on the given keys (synchronous).
Returns:
:class:`~aerospike_sdk.sync.operations.udf.SyncUdfFunctionBuilder`.
See Also:
:meth:`~aerospike_sdk.aio.session.Session.execute_udf`.
"""
inner = self._async_session.execute_udf(*keys)
return SyncUdfFunctionBuilder(
inner, self._loop_manager, self._async_session.client)
[docs]
def index(
self,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
*,
dataset: Optional[DataSet] = None,
behavior: Optional[Behavior] = None,
) -> "SyncIndexBuilder":
"""Create a secondary-index builder for this namespace/set (synchronous).
Raises:
ValueError: If ``namespace`` and ``set_name`` are missing and no
``dataset`` is provided.
Returns:
:class:`~aerospike_sdk.sync.operations.index.SyncIndexBuilder`.
See Also:
:meth:`~aerospike_sdk.aio.session.Session.index`.
"""
_ = behavior
# Resolve namespace and set_name from dataset if provided
if dataset:
namespace = dataset.namespace
set_name = dataset.set_name
if not namespace or not set_name:
raise ValueError("namespace and set_name are required (or provide dataset)")
return SyncIndexBuilder(
async_client=self._async_session._client,
namespace=namespace,
set_name=set_name,
loop_manager=self._loop_manager,
)
[docs]
def truncate(self, dataset: DataSet, before_nanos: Optional[int] = None) -> None:
"""Truncate (delete all records) from a set (synchronous)."""
async def _truncate():
await self._async_session.truncate(dataset, before_nanos)
self._loop_manager.run_async(_truncate())
[docs]
def is_namespace_sc(self, namespace: str) -> bool:
"""Check if a namespace is in strong-consistency (SC) mode.
Args:
namespace: The namespace name to check.
Returns:
``True`` if the namespace is configured for strong consistency,
``False`` otherwise.
Raises:
RuntimeError: If the underlying client is not connected.
ValueError: If the namespace is unknown or the info command fails.
Example::
if session.is_namespace_sc("test_sc"):
print("Namespace is SC — MRTs are supported here.")
See Also:
:meth:`~aerospike_sdk.aio.session.Session.is_namespace_sc`:
Async equivalent.
"""
return self._loop_manager.run_async(
self._async_session.is_namespace_sc(namespace)
)
@overload
def info(self) -> "SyncInfoCommands": ...
@overload
def info(self, command: str) -> Dict[str, str]: ...
[docs]
def info(
self, command: Optional[str] = None
) -> Union["SyncInfoCommands", Dict[str, str]]:
"""
Execute info commands or get the SyncInfoCommands helper (synchronous).
With no argument, returns SyncInfoCommands for high-level helpers and
info_on_all_nodes(). With a command string, runs the raw info command
and returns its result.
Example::
response = session.info("sindex-list")
info = session.info()
by_node = info.info_on_all_nodes("build")
"""
if command is not None:
async def _info():
return await self._async_session.info(command)
return self._loop_manager.run_async(_info())
return SyncInfoCommands(self._async_session.info(), self._loop_manager)
def _is_single_key(
self, arg1, arg2, keys, key, dataset, namespace, key_value,
) -> bool:
return (
isinstance(arg1, Key) and arg2 is None and not keys
and key is None and dataset is None
and namespace is None and key_value is None
)
[docs]
def upsert(
self,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> "SyncWriteSegmentBuilder":
"""Create an upsert write segment (synchronous)."""
if self._is_single_key(arg1, arg2, keys, key, dataset, namespace, key_value):
return self._fast_write_segment("upsert", arg1) # type: ignore[arg-type]
return self._build_write_segment(
"upsert", arg1, arg2, *keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)
[docs]
def insert(
self,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> "SyncWriteSegmentBuilder":
"""Create an insert write segment (synchronous)."""
if self._is_single_key(arg1, arg2, keys, key, dataset, namespace, key_value):
return self._fast_write_segment("insert", arg1) # type: ignore[arg-type]
return self._build_write_segment(
"insert", arg1, arg2, *keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)
[docs]
def update(
self,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> "SyncWriteSegmentBuilder":
"""Create an update write segment (synchronous)."""
if self._is_single_key(arg1, arg2, keys, key, dataset, namespace, key_value):
return self._fast_write_segment("update", arg1) # type: ignore[arg-type]
return self._build_write_segment(
"update", arg1, arg2, *keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)
[docs]
def replace(
self,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> "SyncWriteSegmentBuilder":
"""Create a replace write segment (synchronous)."""
if self._is_single_key(arg1, arg2, keys, key, dataset, namespace, key_value):
return self._fast_write_segment("replace", arg1) # type: ignore[arg-type]
return self._build_write_segment(
"replace", arg1, arg2, *keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)
[docs]
def replace_if_exists(
self,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> "SyncWriteSegmentBuilder":
"""Create a replace-if-exists write segment (synchronous)."""
if self._is_single_key(arg1, arg2, keys, key, dataset, namespace, key_value):
return self._fast_write_segment("replace_if_exists", arg1) # type: ignore[arg-type]
return self._build_write_segment(
"replace_if_exists", arg1, arg2, *keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)
[docs]
def delete(
self,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> "SyncWriteSegmentBuilder":
"""Create a delete write segment (synchronous)."""
if self._is_single_key(arg1, arg2, keys, key, dataset, namespace, key_value):
return self._fast_write_segment("delete", arg1) # type: ignore[arg-type]
return self._build_write_segment(
"delete", arg1, arg2, *keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)
[docs]
def touch(
self,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> "SyncWriteSegmentBuilder":
"""Create a touch write segment (synchronous)."""
if self._is_single_key(arg1, arg2, keys, key, dataset, namespace, key_value):
return self._fast_write_segment("touch", arg1) # type: ignore[arg-type]
return self._build_write_segment(
"touch", arg1, arg2, *keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)
[docs]
def exists(
self,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> "SyncWriteSegmentBuilder":
"""Create an exists-check write segment (synchronous)."""
if self._is_single_key(arg1, arg2, keys, key, dataset, namespace, key_value):
return self._fast_write_segment("exists", arg1) # type: ignore[arg-type]
return self._build_write_segment(
"exists", arg1, arg2, *keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)