Source code for aerospike_sdk.sync.client

# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

"""Synchronous SDK client.

Owns a PAC ``aerospike_async.Client`` and a daemon-thread
:class:`~aerospike_sdk.index_monitor.IndexesMonitor`. Every lifecycle and
IO entry calls PAC's ``_blocking`` methods; no asyncio event loop is
constructed. Builder and session factories return synchronous wrappers
(:class:`~aerospike_sdk.sync.operations.query.SyncQueryBuilder`,
:class:`~aerospike_sdk.sync.session.SyncSession`).
"""

from __future__ import annotations

import logging
import types
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, overload

from aerospike_async import (
    AdminPolicy,
    Client as AsyncClient,
    ClientPolicy,
    Key,
    RegisterTask,
    UDFLang,
    UdfRemoveTask,
    new_client_blocking,
)

from aerospike_sdk.dataset import DataSet
from aerospike_sdk.index_monitor import IndexesMonitor
from aerospike_sdk.policy.behavior import Behavior
from aerospike_sdk.policy.behavior_settings import Mode

if TYPE_CHECKING:  # avoid circular imports — type-only annotations
    from aerospike_sdk.aio.operations.query import QueryBuilder
    from aerospike_sdk.sync.operations.index import SyncIndexBuilder
    from aerospike_sdk.sync.operations.query import SyncQueryBuilder
    from aerospike_sdk.sync.session import SyncSession
    from aerospike_sdk.sync.transactional_session import SyncTransactionalSession

log = logging.getLogger(__name__)


[docs] class SyncClient: """Connect to Aerospike and run the SDK API without ``async``/``await``. Example:: with SyncClient("localhost:3000") as client: for row in client.query( namespace="test", set_name="users" ).execute(): if row.record: print(row.record.bins) See Also: :class:`~aerospike_sdk.aio.client.Client`: Async equivalent. :meth:`create_session`: Session-scoped :class:`~aerospike_sdk.policy.behavior.Behavior`. """
[docs] def __init__( self, seeds: str, policy: Optional[ClientPolicy] = None, index_refresh_interval: float = 5.0, *, max_error_rate: Optional[int] = None, error_rate_window: Optional[int] = None, indexes_monitor: Optional[IndexesMonitor] = None, current_thread_runtime: bool = False, ) -> None: """Initialize a SyncClient (no IO). Args: seeds: Aerospike cluster seed addresses (e.g., "localhost:3000"). policy: Optional client policy. When not supplied, defaults to a fresh ``ClientPolicy`` with ``conn_pools_per_node = 8`` (PAC's default of 4 is tuned for async; sync workloads driven from many caller threads see real connection-pool mutex contention at 4). Pass an explicit ``ClientPolicy`` to override either this default or any other client-level setting. index_refresh_interval: Seconds between secondary index cache refreshes (default 5.0). The monitor is a daemon thread that starts lazily on the first AEL ``where()`` query — clients that never use AEL filters never spin up the thread. max_error_rate: Per-node circuit-breaker threshold (see :class:`aerospike_sdk.aio.client.Client`). error_rate_window: Tend iterations until each node's error counter resets. indexes_monitor: Optional pre-constructed :class:`IndexesMonitor` to share across clients (for example an :class:`AsyncPool`). When supplied, this client uses it but does not own its lifecycle. current_thread_runtime: **Experimental — opt-in, subject to removal.** When ``True``, each calling OS thread gets its own PAC ``_LocalClient`` (sync-only, backed by a per-thread ``current_thread`` Tokio runtime). Eliminates the cross-thread worker hop on every op for ~+30-40% TPS lift on 32-thread sync workloads. Open caveat before production use: per-thread cluster tend multiplies info load on the cluster at high thread counts. Recommended pairing when opted in: set ``policy.conn_pools_per_node = 1`` so total connections per node stay modest (N threads × 1 ≈ shared client's 8). """ self._seeds = seeds if policy is None: policy = ClientPolicy() if current_thread_runtime: # Per-thread Client = per-thread pool; one connection per # thread is enough for the per-thread blocking pattern. policy.conn_pools_per_node = 1 else: # SyncClient drives PAC from many caller threads, so the # per-node connection-pool mutex sees real contention that # async (single- or per-Client-loop) workloads do not. # py-spy traces at conn_pools_per_node=4 showed ~65% of # lock-contended samples in put_back/get on a 32-thread # builder cell; 8 cuts the p99 tail roughly in half with no # TPS cost. User-supplied policies are respected as-is. policy.conn_pools_per_node = 8 if max_error_rate is not None: policy.max_error_rate = max_error_rate if error_rate_window is not None: policy.error_rate_window = error_rate_window self._policy = policy self._current_thread_runtime = current_thread_runtime self._client: Optional[AsyncClient] = None self._connected = False if indexes_monitor is not None: self._indexes_monitor = indexes_monitor self._owns_monitor = False else: self._indexes_monitor = IndexesMonitor(refresh_interval=index_refresh_interval) self._owns_monitor = True # Shared by all sessions from this client; avoids repeated # namespace/<ns> info probes when callers use multiple sessions. self._namespace_mode_cache: Dict[str, Mode] = {}
# -- Lifecycle ------------------------------------------------------------
[docs] def connect(self) -> None: """Open a connection to the cluster synchronously. Calls :func:`aerospike_async.new_client_blocking` directly — no asyncio loop is constructed. The :class:`IndexesMonitor` daemon thread is not started here; it lazy-starts on the first AEL ``where()`` query. When ``current_thread_runtime=True``, no PAC Client is constructed here. Instead a thread-local proxy is installed; each calling OS thread lazy-constructs its own :class:`aerospike_async.LocalClient` on first op. Idempotent: returns early if already connected. """ if self._connected and self._client is not None: return if log.isEnabledFor(logging.DEBUG): log.debug("Connecting (blocking) to cluster seeds=%r", self._seeds) if self._current_thread_runtime: from aerospike_sdk.sync._threadlocal_client import _ThreadLocalLocalClient # type: ignore[assignment] — proxy duck-types as PAC Client. self._client = _ThreadLocalLocalClient(self._policy, self._seeds) # type: ignore[assignment] else: self._client = new_client_blocking(self._policy, self._seeds) self._connected = True
[docs] def close(self) -> None: """Close the connection synchronously. Stops the :class:`IndexesMonitor` daemon thread (if owned) and calls PAC's ``close_blocking``. Safe to call when already closed. """ if self._owns_monitor: self._indexes_monitor.stop() if self._client is not None: self._client.close_blocking() self._client = None self._connected = False self._namespace_mode_cache.clear()
def __enter__(self) -> SyncClient: self.connect() return self def __exit__( self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[types.TracebackType], ) -> None: self.close() # -- State accessors ------------------------------------------------------ @property def is_connected(self) -> bool: """``True`` once :meth:`connect` has succeeded and :meth:`close` hasn't run.""" return self._connected @property def underlying_client(self) -> AsyncClient: """The underlying PAC ``aerospike_async.Client``. Use for PAC calls the SDK doesn't wrap (info, nodes, etc.). """ if not self._connected or self._client is None: raise RuntimeError("SyncClient is not connected. Call connect() first or use `with`.") return self._client @property def _async_client(self) -> AsyncClient: """Alias of :attr:`underlying_client` for parity with :class:`~aerospike_sdk.aio.client.Client`.""" return self.underlying_client def _ensure_connected(self) -> SyncClient: """Connect if not already connected; return ``self`` for chaining.""" if not self._connected: self.connect() return self def _pac_client(self) -> AsyncClient: """Return the underlying PAC ``aerospike_async.Client`` (post-connect).""" self._ensure_connected() return self.underlying_client def _resolve_namespace_mode_blocking(self, namespace: str) -> Mode: """Resolve AP vs SC for ``namespace`` synchronously; caches per-client.""" cached = self._namespace_mode_cache.get(namespace) if cached is not None: return cached try: from aerospike_sdk.aio.session import _parse_namespace_info_body result = self.underlying_client.info_blocking(f"namespace/{namespace}") except Exception: mode = Mode.AP self._namespace_mode_cache[namespace] = mode return mode is_sc = False for node_result in result.values(): if not node_result: continue exists, sc_opt = _parse_namespace_info_body(node_result) if exists and sc_opt is True: is_sc = True break mode = Mode.SC if is_sc else Mode.AP self._namespace_mode_cache[namespace] = mode return mode # -- Factories: query / index / session ------------------------------------ @overload def query(self, arg1: DataSet, *, behavior: Optional[Behavior] = None) -> SyncQueryBuilder: ... @overload def query(self, arg1: Key, *, behavior: Optional[Behavior] = None) -> SyncQueryBuilder: ... @overload def query(self, arg1: List[Key], *, behavior: Optional[Behavior] = None) -> SyncQueryBuilder: ... @overload def query( self, arg1: str, set_name: str, *, behavior: Optional[Behavior] = None, ) -> SyncQueryBuilder: ...
[docs] def query( self, arg1: Optional[Union[DataSet, Key, List[Key], str]] = None, set_name: Optional[str] = None, namespace: Optional[str] = None, *, dataset: Optional[DataSet] = None, key: Optional[Key] = None, keys: Optional[List[Key]] = None, behavior: Optional[Behavior] = None, namespace_mode_resolver: Optional[Any] = None, namespace_mode_resolver_blocking: Optional[Any] = None, ) -> SyncQueryBuilder: """Create a synchronous query builder. Same calling shapes as :meth:`Client.query <aerospike_sdk.aio.client.Client.query>`. Returns :class:`SyncQueryBuilder` whose ``.execute()`` runs synchronously. """ from aerospike_sdk.sync.operations.query import SyncQueryBuilder self._ensure_connected() # Normalize args: extract the right (namespace, set_name, key, keys, dataset). if arg1 is not None: if isinstance(arg1, DataSet): dataset = arg1 elif isinstance(arg1, Key): key = arg1 elif isinstance(arg1, list): if not arg1: raise ValueError("keys list cannot be empty") if not isinstance(arg1[0], Key): raise TypeError( f"Expected List[Key], got first element {type(arg1[0])}", ) keys = arg1 elif isinstance(arg1, str): namespace = arg1 # set_name is the positional second arg in this calling style else: raise TypeError(f"Expected DataSet, Key, List[Key], or str; got {type(arg1)}") return self._build_sync_query_builder( namespace=namespace, set_name=set_name, dataset=dataset, key=key, keys=keys, behavior=behavior, namespace_mode_resolver=namespace_mode_resolver, namespace_mode_resolver_blocking=namespace_mode_resolver_blocking, )
def _build_sync_query_builder( self, *, namespace: Optional[str], set_name: Optional[str], dataset: Optional[DataSet], key: Optional[Key], keys: Optional[List[Key]], behavior: Optional[Behavior], namespace_mode_resolver: Optional[Any] = None, namespace_mode_resolver_blocking: Optional[Any] = None, ) -> SyncQueryBuilder: """Construct a :class:`SyncQueryBuilder` with full context.""" from aerospike_sdk.sync.operations.query import SyncQueryBuilder as _SQB nmrb = namespace_mode_resolver_blocking or self._resolve_namespace_mode_blocking if key is not None: builder = _SQB( client=self.underlying_client, namespace=key.namespace, set_name=key.set_name, behavior=behavior, indexes_monitor=self._indexes_monitor, namespace_mode_resolver=namespace_mode_resolver, namespace_mode_resolver_blocking=nmrb, ) builder._single_key = key return builder if keys is not None: ns = keys[0].namespace sn = keys[0].set_name builder = _SQB( client=self.underlying_client, namespace=ns, set_name=sn, behavior=behavior, indexes_monitor=self._indexes_monitor, namespace_mode_resolver=namespace_mode_resolver, namespace_mode_resolver_blocking=nmrb, ) builder._keys = keys return builder if dataset is not None: namespace = dataset.namespace set_name = dataset.set_name elif namespace is None or set_name is None: raise ValueError( "Invalid arguments. Use one of: query(dataset=...), query(key=...), " "query(keys=[...]), or query(namespace=..., set_name=...).", ) return _SQB( client=self.underlying_client, namespace=namespace, set_name=set_name, behavior=behavior, indexes_monitor=self._indexes_monitor, namespace_mode_resolver=namespace_mode_resolver, namespace_mode_resolver_blocking=nmrb, ) @overload def index( self, *, dataset: DataSet, behavior: Optional[Behavior] = None, ) -> SyncIndexBuilder: ... @overload def index( self, namespace: str, set_name: str, *, behavior: Optional[Behavior] = None, ) -> SyncIndexBuilder: ...
[docs] def index( self, namespace: Optional[str] = None, set_name: Optional[str] = None, *, dataset: Optional[DataSet] = None, behavior: Optional[Behavior] = None, ) -> SyncIndexBuilder: """Create a secondary-index builder (synchronous).""" from aerospike_sdk.sync.operations.index import SyncIndexBuilder self._ensure_connected() if dataset is not None: namespace = dataset.namespace set_name = dataset.set_name if not namespace or not set_name: raise ValueError("namespace and set_name are required (or provide dataset)") return SyncIndexBuilder( async_client=self, namespace=namespace, set_name=set_name, )
[docs] def truncate( self, dataset: DataSet, before_nanos: Optional[int] = None, ) -> None: """Truncate a set, synchronously (PAC ``truncate_blocking``).""" self.underlying_client.truncate_blocking( dataset.namespace, dataset.set_name, before_nanos, )
[docs] def register_udf( self, body: bytes, server_path: str, language: UDFLang = UDFLang.LUA, *, policy: Optional[AdminPolicy] = None, ) -> RegisterTask: """Register a UDF module from bytes (synchronous).""" return self.underlying_client.register_udf_blocking( body, server_path, language, policy=policy, )
[docs] def register_udf_from_file( self, client_path: str, server_path: str, language: UDFLang = UDFLang.LUA, *, policy: Optional[AdminPolicy] = None, ) -> RegisterTask: """Register a UDF module from a local file (synchronous).""" return self.underlying_client.register_udf_from_file_blocking( client_path, server_path, language, policy=policy, )
[docs] def remove_udf( self, server_path: str, *, policy: Optional[AdminPolicy] = None, ) -> UdfRemoveTask: """Remove a UDF module from the cluster (synchronous).""" return self.underlying_client.remove_udf_blocking(server_path, policy=policy)
[docs] def create_session(self, behavior: Optional[Behavior] = None) -> SyncSession: """Create a synchronous session with the specified behavior.""" from aerospike_sdk.sync.session import SyncSession self._ensure_connected() return SyncSession(client=self, behavior=behavior or Behavior.DEFAULT)
[docs] def create_transactional_session( self, behavior: Optional[Behavior] = None, ) -> SyncTransactionalSession: """Create a synchronous multi-record transaction session.""" from aerospike_sdk.sync.transactional_session import SyncTransactionalSession self._ensure_connected() return SyncTransactionalSession(client=self, behavior=behavior or Behavior.DEFAULT)