# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Synchronous SDK session.
IO methods call PAC's ``_blocking`` entries; builder factories return
synchronous wrappers (:class:`SyncQueryBuilder`,
:class:`SyncBatchOperationBuilder`, etc.).
"""
from __future__ import annotations
import time
import typing
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union, overload
from aerospike_async import Key, Record, Txn
from aerospike_sdk.dataset import DataSet
from aerospike_sdk.session_shared import NamespaceScStatus
from aerospike_sdk.policy.behavior import Behavior, OpKind, OpShape
from aerospike_sdk.policy.behavior_settings import Mode
from aerospike_sdk.policy.policy_mapper import to_read_policy, to_write_policy
from aerospike_sdk.sync.background import SyncBackgroundTaskSession
from aerospike_sdk.sync.info import SyncInfoCommands
from aerospike_sdk.sync.operations.batch import SyncBatchOperationBuilder
from aerospike_sdk.sync.operations.index import SyncIndexBuilder
from aerospike_sdk.sync.operations.query import (
SyncQueryBuilder, SyncWriteSegmentBuilder,
)
from aerospike_sdk.sync.operations.udf import SyncUdfFunctionBuilder
if TYPE_CHECKING:
from aerospike_sdk.sync.client import SyncClient
from aerospike_sdk.sync.transactional_session import SyncTransactionalSession
[docs]
class SyncSession:
"""Run session-scoped reads and writes without ``async``/``await``.
Construct via :meth:`SyncClient.create_session
<aerospike_sdk.sync.client.SyncClient.create_session>`, not directly.
See Also:
:class:`~aerospike_sdk.aio.session.Session`: Async equivalent.
"""
[docs]
def __init__(
self, client: SyncClient, behavior: Behavior,
) -> None:
"""Attach a client and behavior; prefer :meth:`SyncClient.create_session`."""
self._client = client
self._behavior = behavior
# Pre-compute base policies once per session so the fast-path
# get/put + builder bypasses skip the policy_mapper for the common
# no-override case. Cache both AP and SC variants so bypass paths
# can pick the right policy per resolved namespace mode without
# rebuilding. `_cached_*_policy` stays as the AP alias.
self._cached_read_policy = to_read_policy(
behavior.get_settings(OpKind.READ, OpShape.POINT, Mode.AP))
self._cached_write_policy = to_write_policy(
behavior.get_settings(OpKind.WRITE_NON_RETRYABLE, OpShape.POINT, Mode.AP))
self._cached_read_policy_sc = to_read_policy(
behavior.get_settings(OpKind.READ, OpShape.POINT, Mode.SC))
self._cached_write_policy_sc = to_write_policy(
behavior.get_settings(OpKind.WRITE_NON_RETRYABLE, OpShape.POINT, Mode.SC))
# Cache the PAC client for fast-path methods.
self._pac_client = client.underlying_client
# Non-transactional sessions always return None;
# SyncTransactionalSession overrides this to yield its active Txn.
self._txn: Optional[Txn] = None
# -- State accessors ------------------------------------------------------
@property
def behavior(self) -> Behavior:
"""The behavior configuration for this session."""
return self._behavior
@property
def client(self) -> SyncClient:
"""The owning :class:`SyncClient`."""
return self._client
def _resolve_namespace_mode_blocking(self, namespace: str) -> Mode:
"""Resolve AP vs SC for ``namespace`` synchronously (delegates to client)."""
return self._client._resolve_namespace_mode_blocking(namespace)
def _bind_txn(self, builder):
"""Stamp the session's current txn onto a builder if one is active."""
if self._txn is not None:
builder.with_txn(self._txn)
return builder
[docs]
def get_current_transaction(self) -> Optional[Txn]:
"""Return the active transaction for this session, or ``None``."""
return self._txn
# -- Direct single-key fast paths -----------------------------------------
[docs]
def get(
self, key: Key, *, bins: Optional[List[str]] = None,
) -> Optional[Record]:
"""Direct single-key read — no builder, no stream — synchronous.
Passes the AP + SC cached policies; PAC picks the right one based
on the key's namespace mode (from the in-memory partition map).
"""
if self._txn is None:
return self._pac_client.get_blocking(
key, bins,
policy=self._cached_read_policy,
policy_sc=self._cached_read_policy_sc,
)
# Under MRT the cached policies are skipped (txn not stamped);
# rebuild a per-call policy from behavior.
policy = to_read_policy(
self._behavior.get_settings(OpKind.READ, OpShape.POINT))
policy.txn = self._txn
return self._pac_client.get_blocking(key, bins, policy=policy)
[docs]
def put(self, key: Key, bins: Dict[str, Any]) -> None:
"""Direct single-key upsert — no builder, no stream — synchronous.
Passes the AP + SC cached policies; PAC picks the right one based
on the key's namespace mode.
"""
if self._txn is None:
self._pac_client.put_blocking(
key, bins,
policy=self._cached_write_policy,
policy_sc=self._cached_write_policy_sc,
)
return
policy = to_write_policy(
self._behavior.get_settings(OpKind.WRITE_NON_RETRYABLE, OpShape.POINT))
policy.txn = self._txn
self._pac_client.put_blocking(key, bins, policy=policy)
[docs]
def truncate(self, dataset: DataSet, before_nanos: Optional[int] = None) -> None:
"""Truncate a set, synchronously (PAC ``truncate_blocking``)."""
self._pac_client.truncate_blocking(
dataset.namespace, dataset.set_name, before_nanos,
)
# -- Info / namespace SC --------------------------------------------------
[docs]
def namespace_sc_status(self, namespace: str) -> NamespaceScStatus:
"""Describe whether a namespace is SC; includes a reason when it is not."""
from aerospike_sdk.aio.session import _parse_namespace_info_body
try:
result = self._pac_client.info_blocking(f"namespace/{namespace}")
except Exception as e:
raise ValueError(f"Failed to check namespace '{namespace}': {e}") from e
missing = False
sc_val: Optional[bool] = None
for node_result in result.values():
if not node_result:
continue
exists, sc_opt = _parse_namespace_info_body(node_result)
if not exists:
missing = True
break
if sc_opt is not None:
sc_val = sc_opt
if missing:
return NamespaceScStatus(
False,
f"Namespace {namespace!r} is not defined on this cluster "
"(info reports type=unknown). Create it or set "
"AEROSPIKE_SC_NAMESPACE to an existing SC namespace.",
)
if sc_val is True:
return NamespaceScStatus(True, "")
if sc_val is False:
return NamespaceScStatus(
False,
f"Namespace {namespace!r} exists but strong-consistency is false "
"(AP mode). Point AEROSPIKE_SC_NAMESPACE at a namespace with "
"strong-consistency enabled.",
)
return NamespaceScStatus(
False,
f"Namespace {namespace!r} info did not report strong-consistency; "
"treating as non-SC.",
)
[docs]
def is_namespace_sc(self, namespace: str) -> bool:
"""``True`` if ``namespace`` is in strong-consistency mode."""
return self.namespace_sc_status(namespace).is_sc
@overload
def info(self) -> SyncInfoCommands: ...
@overload
def info(self, command: str) -> Dict[str, str]: ...
[docs]
def info(
self, command: Optional[str] = None,
) -> Union[SyncInfoCommands, Dict[str, str]]:
"""Sync info: return :class:`SyncInfoCommands` or raw blocking result."""
if command is not None:
return self._pac_client.info_blocking(command)
return SyncInfoCommands(self._pac_client)
# -- Builder factories ----------------------------------------------------
[docs]
def query(
self,
arg1: Optional[Union[DataSet, Key, List[Key], str]] = None,
arg2: Optional[Union[str, Key]] = None,
*keys: Key,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
dataset: Optional[DataSet] = None,
key: Optional[Key] = None,
keys_list: Optional[List[Key]] = None,
behavior: Optional[Behavior] = None,
) -> SyncQueryBuilder:
"""Start a synchronous read or secondary-index query.
Same shapes as :meth:`Session.query
<aerospike_sdk.aio.session.Session.query>`. Always returns
:class:`SyncQueryBuilder` whose ``execute()`` runs synchronously.
"""
b = self._behavior if behavior is None else behavior
# Normalize positional/kw args.
if arg1 is not None:
if isinstance(arg1, DataSet):
dataset = arg1
elif isinstance(arg1, Key):
all_keys = [arg1]
if isinstance(arg2, Key):
all_keys.append(arg2)
all_keys.extend(keys)
elif keys:
all_keys.extend(keys)
if len(all_keys) == 1:
key = arg1
else:
keys_list = all_keys
elif isinstance(arg1, list):
if not arg1:
raise ValueError("keys list cannot be empty")
if not isinstance(arg1[0], Key):
raise TypeError(
f"Expected List[Key], got first element {type(arg1[0])}",
)
keys_list = arg1
elif isinstance(arg1, str) and arg2 is not None and isinstance(arg2, str):
namespace = arg1
set_name = arg2
else:
raise TypeError(f"Unsupported arg1 type: {type(arg1)}")
sync_builder = self._build_sync_query_builder(
dataset=dataset, key=key, keys=keys_list,
namespace=namespace, set_name=set_name, behavior=b,
)
self._bind_txn(sync_builder)
return sync_builder
def _build_sync_query_builder(
self,
*,
dataset: Optional[DataSet],
key: Optional[Key],
keys: Optional[List[Key]],
namespace: Optional[str],
set_name: Optional[str],
behavior: Behavior,
) -> SyncQueryBuilder:
"""Construct a :class:`SyncQueryBuilder` with full session context.
Returns the builder pre-populated with behavior, indexes monitor,
cached policies, txn, and namespace-mode resolver.
"""
if key is not None:
builder = SyncQueryBuilder(
client=self._pac_client,
namespace=key.namespace,
set_name=key.set_name,
behavior=behavior,
indexes_monitor=self._client._indexes_monitor,
cached_read_policy=self._cached_read_policy,
cached_write_policy=self._cached_write_policy,
cached_read_policy_sc=self._cached_read_policy_sc,
cached_write_policy_sc=self._cached_write_policy_sc,
txn=self._txn,
namespace_mode_resolver=None,
namespace_mode_resolver_blocking=self._resolve_namespace_mode_blocking,
)
builder._single_key = key
return builder
if keys is not None:
ns = keys[0].namespace
sn = keys[0].set_name
builder = SyncQueryBuilder(
client=self._pac_client,
namespace=ns,
set_name=sn,
behavior=behavior,
indexes_monitor=self._client._indexes_monitor,
cached_read_policy=self._cached_read_policy,
cached_write_policy=self._cached_write_policy,
cached_read_policy_sc=self._cached_read_policy_sc,
cached_write_policy_sc=self._cached_write_policy_sc,
txn=self._txn,
namespace_mode_resolver=None,
namespace_mode_resolver_blocking=self._resolve_namespace_mode_blocking,
)
builder._keys = keys
return builder
if dataset is not None:
namespace = dataset.namespace
set_name = dataset.set_name
if namespace is None or set_name is None:
raise ValueError(
"Invalid arguments. Use one of: query(dataset=...), query(key=...), "
"query(keys=[...]), or query(namespace=..., set_name=...).",
)
return SyncQueryBuilder(
client=self._pac_client,
namespace=namespace,
set_name=set_name,
behavior=behavior,
indexes_monitor=self._client._indexes_monitor,
cached_read_policy=self._cached_read_policy,
cached_write_policy=self._cached_write_policy,
cached_read_policy_sc=self._cached_read_policy_sc,
cached_write_policy_sc=self._cached_write_policy_sc,
txn=self._txn,
namespace_mode_resolver=None,
namespace_mode_resolver_blocking=self._resolve_namespace_mode_blocking,
)
[docs]
def batch(self) -> SyncBatchOperationBuilder:
"""Start a multi-key batch of mixed write operations (synchronous)."""
from aerospike_sdk.aio.operations.batch import BatchOperationBuilder as _Batch
inner = _Batch(
client=self._pac_client,
behavior=self._behavior,
txn=self._txn,
namespace_mode_resolver_blocking=self._resolve_namespace_mode_blocking,
)
return SyncBatchOperationBuilder(inner)
[docs]
def background_task(self) -> SyncBackgroundTaskSession:
"""Start a background dataset task chain (synchronous)."""
from aerospike_sdk.aio.background import BackgroundTaskSession as _BTS
# BackgroundTaskSession needs a session-like parent for behavior etc.
# The aio variant accepts our sync session via duck typing; if not,
# we'd need a thin proxy. The aio constructor only reads state, no IO.
inner = _BTS(self) # type: ignore[arg-type]
return SyncBackgroundTaskSession(inner)
[docs]
def execute_udf(self, *keys: Key) -> SyncUdfFunctionBuilder:
"""Begin a foreground UDF invocation (synchronous)."""
from aerospike_sdk.aio.operations.udf import UdfFunctionBuilder as _UFB
if not keys:
raise ValueError("execute_udf requires at least one key")
builder = self._build_sync_query_builder(
dataset=None, key=keys[0] if len(keys) == 1 else None,
keys=list(keys) if len(keys) > 1 else None,
namespace=None, set_name=None, behavior=self._behavior,
)
self._bind_txn(builder)
builder._op_type = "execute_udf"
inner = _UFB(builder)
return SyncUdfFunctionBuilder(inner, self._client)
[docs]
def index(
self,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
*,
dataset: Optional[DataSet] = None,
behavior: Optional[Behavior] = None,
) -> SyncIndexBuilder:
"""Synchronous secondary-index builder."""
_ = behavior
if dataset is not None:
namespace = dataset.namespace
set_name = dataset.set_name
if not namespace or not set_name:
raise ValueError("namespace and set_name are required (or provide dataset)")
return SyncIndexBuilder(
async_client=self._client,
namespace=namespace,
set_name=set_name,
)
[docs]
def transaction_session(self) -> SyncTransactionalSession:
"""Alias for :meth:`begin_transaction`."""
return self.begin_transaction()
[docs]
def begin_transaction(self) -> SyncTransactionalSession:
"""Start a multi-record transaction (synchronous)."""
from aerospike_sdk.sync.transactional_session import SyncTransactionalSession
return SyncTransactionalSession(client=self._client, behavior=self._behavior)
[docs]
def do_in_transaction(
self,
operation: "typing.Callable[[SyncTransactionalSession], typing.Any]",
*,
max_attempts: int = 5,
sleep_between_retries: float = 0.0,
) -> Any:
"""Run a callable inside a retrying multi-record transaction (synchronous)."""
if max_attempts < 1:
raise ValueError("max_attempts must be >= 1")
from aerospike_async import ResultCode
from aerospike_sdk.exceptions import AerospikeError
retryable_codes = {
ResultCode.MRT_BLOCKED,
ResultCode.MRT_VERSION_MISMATCH,
}
txn_failed = getattr(ResultCode, "TXN_FAILED", None)
if txn_failed is not None:
retryable_codes.add(txn_failed)
last_exc: Optional[BaseException] = None
for attempt in range(max_attempts):
try:
with self.begin_transaction() as tx_session:
return operation(tx_session)
except AerospikeError as exc:
last_exc = exc
if exc.result_code not in retryable_codes:
raise
if attempt + 1 >= max_attempts:
raise
if sleep_between_retries > 0:
time.sleep(sleep_between_retries)
assert last_exc is not None
raise last_exc
# -- Write-verb factories -------------------------------------------------
def _is_single_key(
self, arg1, arg2, keys, key, dataset, namespace, key_value,
) -> bool:
return (
isinstance(arg1, Key) and arg2 is None and not keys
and key is None and dataset is None
and namespace is None and key_value is None
)
def _fast_write_segment(self, op_type: str, key: Key) -> SyncWriteSegmentBuilder:
"""Single-key fast-path write segment (sync)."""
from aerospike_sdk.sync.operations.query import SyncSingleKeyWriteSegment
return SyncSingleKeyWriteSegment(
client=self._pac_client,
key=key,
op_type=op_type,
behavior=self._behavior,
write_policy=self._cached_write_policy,
read_policy=self._cached_read_policy,
write_policy_sc=self._cached_write_policy_sc,
read_policy_sc=self._cached_read_policy_sc,
txn=self._txn,
namespace_mode_resolver=None,
namespace_mode_resolver_blocking=self._resolve_namespace_mode_blocking,
)
def _build_write_segment(
self,
op_type: str,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*more_keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> SyncWriteSegmentBuilder:
"""Build a multi-key / dataset write segment via aio QueryBuilder."""
# Reduce overload args to either a single key, a list of keys, or a dataset.
single_key: Optional[Key] = None
many_keys: Optional[List[Key]] = None
if key is not None:
single_key = key
elif isinstance(arg1, Key) and not more_keys and arg2 is None:
single_key = arg1
elif isinstance(arg1, list):
many_keys = list(arg1)
elif isinstance(arg1, Key):
many_keys = [arg1]
if isinstance(arg2, Key):
many_keys.append(arg2)
many_keys.extend(more_keys)
elif dataset is not None:
pass
elif namespace is not None and set_name is not None:
if key_value is not None:
ds = DataSet.of(namespace, set_name)
single_key = ds.id(key_value)
# else: keyless dataset op (rare for write segments)
elif key_value is not None and dataset is None:
raise ValueError("key_value requires dataset or namespace+set_name")
qb = self._build_sync_query_builder(
dataset=dataset, key=single_key, keys=many_keys,
namespace=namespace, set_name=set_name,
behavior=self._behavior,
)
self._bind_txn(qb)
qb._op_type = op_type
return SyncWriteSegmentBuilder(qb)
[docs]
def upsert(
self,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> SyncWriteSegmentBuilder:
"""Create an upsert write segment (synchronous)."""
if self._is_single_key(arg1, arg2, keys, key, dataset, namespace, key_value):
return self._fast_write_segment("upsert", arg1) # type: ignore[arg-type]
return self._build_write_segment(
"upsert", arg1, arg2, *keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)
[docs]
def insert(
self,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> SyncWriteSegmentBuilder:
"""Create an insert write segment (synchronous)."""
if self._is_single_key(arg1, arg2, keys, key, dataset, namespace, key_value):
return self._fast_write_segment("insert", arg1) # type: ignore[arg-type]
return self._build_write_segment(
"insert", arg1, arg2, *keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)
[docs]
def update(
self,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> SyncWriteSegmentBuilder:
"""Create an update write segment (synchronous)."""
if self._is_single_key(arg1, arg2, keys, key, dataset, namespace, key_value):
return self._fast_write_segment("update", arg1) # type: ignore[arg-type]
return self._build_write_segment(
"update", arg1, arg2, *keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)
[docs]
def replace(
self,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> SyncWriteSegmentBuilder:
"""Create a replace write segment (synchronous)."""
if self._is_single_key(arg1, arg2, keys, key, dataset, namespace, key_value):
return self._fast_write_segment("replace", arg1) # type: ignore[arg-type]
return self._build_write_segment(
"replace", arg1, arg2, *keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)
[docs]
def replace_if_exists(
self,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> SyncWriteSegmentBuilder:
"""Create a replace-if-exists write segment (synchronous)."""
if self._is_single_key(arg1, arg2, keys, key, dataset, namespace, key_value):
return self._fast_write_segment("replace_if_exists", arg1) # type: ignore[arg-type]
return self._build_write_segment(
"replace_if_exists", arg1, arg2, *keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)
[docs]
def delete(
self,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> SyncWriteSegmentBuilder:
"""Create a delete write segment (synchronous)."""
if self._is_single_key(arg1, arg2, keys, key, dataset, namespace, key_value):
return self._fast_write_segment("delete", arg1) # type: ignore[arg-type]
return self._build_write_segment(
"delete", arg1, arg2, *keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)
[docs]
def touch(
self,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> SyncWriteSegmentBuilder:
"""Create a touch write segment (synchronous)."""
if self._is_single_key(arg1, arg2, keys, key, dataset, namespace, key_value):
return self._fast_write_segment("touch", arg1) # type: ignore[arg-type]
return self._build_write_segment(
"touch", arg1, arg2, *keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)
[docs]
def exists(
self,
arg1: Optional[Union[Key, List[Key]]] = None,
arg2: Optional[Key] = None,
*keys: Key,
key: Optional[Key] = None,
dataset: Optional[DataSet] = None,
namespace: Optional[str] = None,
set_name: Optional[str] = None,
key_value: Optional[Union[str, int, bytes]] = None,
) -> SyncWriteSegmentBuilder:
"""Create an exists-check write segment (synchronous)."""
if self._is_single_key(arg1, arg2, keys, key, dataset, namespace, key_value):
return self._fast_write_segment("exists", arg1) # type: ignore[arg-type]
return self._build_write_segment(
"exists", arg1, arg2, *keys,
key=key, dataset=dataset, namespace=namespace,
set_name=set_name, key_value=key_value,
)