Source code for aerospike_sdk.sync.operations.batch

# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

"""Synchronous batch operation builders delegating to ``aio.operations.batch``."""

from __future__ import annotations

from typing import Any, Dict, Optional, Sequence, Union

from aerospike_async import (
    BatchDeletePolicy,
    FilterExpression,
    Key,
)

from aerospike_sdk.aio.operations.batch import (
    BatchBinBuilder as AsyncBatchBinBuilder,
    BatchKeyOperationBuilder as AsyncBatchKeyOperationBuilder,
    BatchOperationBuilder as AsyncBatchOperationBuilder,
)
from aerospike_sdk.operations_shared import BatchOpType, _build_pac_batch_ops
from aerospike_sdk.error_strategy import ErrorHandler, _filter_records_with_handler
from aerospike_sdk.exceptions import _convert_pac_exception
from aerospike_sdk.hll_config import HllConfig
from aerospike_sdk.policy.behavior_settings import OpKind, OpShape
from aerospike_sdk.policy.policy_mapper import resolve_durable_delete, to_batch_policy
from aerospike_sdk.record_result import batch_records_to_results
from aerospike_sdk.sync.record_stream import SyncRecordStream


# The sync wrappers in this module hold an :class:`AsyncBatchOperationBuilder`
# as ``self._inner`` purely as a state bag — fluent chaining
# (``.insert(k).bin(...).set_to(...)``) mutates that inner builder's
# ``_key_operations`` list. At ``execute()`` / ``execute_stream()`` time the
# sync wrappers call PAC's ``*_blocking`` entries directly; no asyncio loop is
# ever entered. Pure-Python op-construction helpers
# (:func:`_build_pac_batch_ops`, :func:`_write_policy_for_op_type`) live in
# :mod:`aerospike_sdk.operations_shared` and are imported by both surfaces — they
# aren't "async" code, just shared.


def _dispatch_batch_stream_blocking(inner: AsyncBatchOperationBuilder) -> Any:
    """Build a mixed PAC ops list from `inner`'s accumulated state and call
    ``batch_stream_blocking``. Returns the raw PAC ``BatchRecordStream``.

    Single source of truth for the streaming-sync dispatch shape — both the
    multi-key and single-key sync wrappers call this so the spec logic
    (per-op policies, ops-list construction, error conversion) lives in one
    place on the sync side.
    """
    if not inner._key_operations:
        raise ValueError(
            "No operations to execute. Add operations with insert(), update(), etc.")

    all_keys = [key_op._key for key_op in inner._key_operations]
    batch_mode = inner._resolved_mode_for_keys_blocking(all_keys)

    batch_policy = None
    if inner._behavior is not None:
        batch_policy = to_batch_policy(
            inner._behavior.get_settings(
                OpKind.WRITE_NON_RETRYABLE, OpShape.BATCH, batch_mode))
    if inner._txn is not None and batch_policy is None:
        from aerospike_async import BatchPolicy
        batch_policy = BatchPolicy()
    inner._apply_txn(batch_policy)

    delete_policy: Optional[BatchDeletePolicy] = None
    has_delete = any(k._op_type == BatchOpType.DELETE for k in inner._key_operations)
    if has_delete and inner._behavior is not None:
        delete_keys = [k._key for k in inner._key_operations
                       if k._op_type == BatchOpType.DELETE]
        bs = inner._behavior.get_settings(
            OpKind.WRITE_NON_RETRYABLE,
            OpShape.BATCH,
            inner._resolved_mode_for_keys_blocking(delete_keys),
        )
        if resolve_durable_delete(bs.durable_delete, None, None):
            delete_policy = BatchDeletePolicy()
            delete_policy.durable_delete = True

    ops = _build_pac_batch_ops(inner._key_operations, delete_policy)

    try:
        return inner._client.batch_stream_blocking(
            ops, batch_policy=batch_policy,
        )
    except Exception as e:
        raise _convert_pac_exception(e) from e


[docs] class SyncBatchBinBuilder: """Sync wrapper for :class:`~aerospike_sdk.aio.operations.batch.BatchBinBuilder`. See Also: :class:`~aerospike_sdk.aio.operations.batch.BatchBinBuilder` """ __slots__ = ("_inner",)
[docs] def __init__(self, inner: AsyncBatchBinBuilder) -> None: self._inner = inner
[docs] def set_to(self, value: Any) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.set_to(value))
[docs] def set_to_geo_json(self, geo_json: str) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.set_to_geo_json(geo_json), )
[docs] def add(self, value: int) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.add(value))
[docs] def increment_by(self, value: int) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.increment_by(value))
[docs] def append(self, value: str) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.append(value))
[docs] def prepend(self, value: str) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.prepend(value))
[docs] def select_from( self, expression: Union[str, FilterExpression], *, ignore_eval_failure: bool = False, ) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.select_from(expression, ignore_eval_failure=ignore_eval_failure), )
[docs] def insert_from( self, expression: Union[str, FilterExpression], *, ignore_op_failure: bool = False, ignore_eval_failure: bool = False, delete_if_null: bool = False, ) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.insert_from( expression, ignore_op_failure=ignore_op_failure, ignore_eval_failure=ignore_eval_failure, delete_if_null=delete_if_null, ), )
[docs] def update_from( self, expression: Union[str, FilterExpression], *, ignore_op_failure: bool = False, ignore_eval_failure: bool = False, delete_if_null: bool = False, ) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.update_from( expression, ignore_op_failure=ignore_op_failure, ignore_eval_failure=ignore_eval_failure, delete_if_null=delete_if_null, ), )
[docs] def upsert_from( self, expression: Union[str, FilterExpression], *, ignore_op_failure: bool = False, ignore_eval_failure: bool = False, delete_if_null: bool = False, ) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.upsert_from( expression, ignore_op_failure=ignore_op_failure, ignore_eval_failure=ignore_eval_failure, delete_if_null=delete_if_null, ), )
# -- HyperLogLog ----------------------------------------------------------
[docs] def hll_init( self, config: HllConfig, *, create_only: bool = False, update_only: bool = False, no_fail: bool = False, allow_fold: bool = False, ) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.hll_init( config, create_only=create_only, update_only=update_only, no_fail=no_fail, allow_fold=allow_fold, ), )
[docs] def hll_add( self, values: Sequence[Any], *, config: Optional[HllConfig] = None, create_only: bool = False, update_only: bool = False, no_fail: bool = False, allow_fold: bool = False, ) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.hll_add( values, config=config, create_only=create_only, update_only=update_only, no_fail=no_fail, allow_fold=allow_fold, ), )
[docs] def hll_set_union( self, hll_list: Sequence[Any], *, create_only: bool = False, update_only: bool = False, no_fail: bool = False, allow_fold: bool = False, ) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.hll_set_union( hll_list, create_only=create_only, update_only=update_only, no_fail=no_fail, allow_fold=allow_fold, ), )
[docs] def hll_fold(self, index_bit_count: int) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.hll_fold(index_bit_count), )
[docs] def hll_refresh_count(self) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.hll_refresh_count(), )
[docs] def hll_get_count(self) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.hll_get_count(), )
[docs] def hll_describe(self) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.hll_describe(), )
[docs] def hll_get_union(self, hll_list: Sequence[Any]) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.hll_get_union(hll_list), )
[docs] def hll_get_union_count(self, hll_list: Sequence[Any]) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.hll_get_union_count(hll_list), )
[docs] def hll_get_intersect_count(self, hll_list: Sequence[Any]) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.hll_get_intersect_count(hll_list), )
[docs] def hll_get_similarity(self, hll_list: Sequence[Any]) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.hll_get_similarity(hll_list), )
[docs] class SyncBatchKeyOperationBuilder: """Sync wrapper for :class:`~aerospike_sdk.aio.operations.batch.BatchKeyOperationBuilder`. See Also: :class:`~aerospike_sdk.aio.operations.batch.BatchKeyOperationBuilder` """ __slots__ = ("_inner",)
[docs] def __init__(self, inner: AsyncBatchKeyOperationBuilder) -> None: self._inner = inner
[docs] def bin(self, bin_name: str) -> SyncBatchBinBuilder: return SyncBatchBinBuilder(self._inner.bin(bin_name))
[docs] def put(self, bins: Dict[str, Any]) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.put(bins))
[docs] def insert(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.insert(key))
[docs] def update(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.update(key))
[docs] def upsert(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.upsert(key))
[docs] def replace(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.replace(key))
[docs] def replace_if_exists(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.replace_if_exists(key))
[docs] def delete(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.delete(key))
[docs] def execute(self, on_error: Optional[ErrorHandler] = None) -> SyncRecordStream: """Buffered sync batch execute — writes-complete-on-return. Mirrors :meth:`aerospike_sdk.aio.operations.batch.BatchOperationBuilder.execute`. For lazy per-record streaming, see :meth:`execute_stream`. Args: on_error: Optional ``(key, index, exception) -> None`` callback. Failed per-key results are dispatched to the handler and excluded from the returned stream. """ raw = self._inner.execute_blocking() if on_error is not None: return SyncRecordStream.from_list( _filter_records_with_handler( batch_records_to_results(raw), on_error, ), ) return SyncRecordStream.from_batch_records(raw)
[docs] def execute_stream( self, on_error: Optional[ErrorHandler] = None, ) -> SyncRecordStream: """Streaming sync batch execute — yields records in completion order. See :meth:`SyncBatchOperationBuilder.execute_stream` for full documentation, including the trade-offs vs :meth:`execute`. Args: on_error: Optional ``(key, index, exception) -> None`` callback. Per-key failures are dispatched to the handler and excluded from the returned stream; cluster-level errors still raise. """ # Key-level builder holds no state of its own; the multi-key parent # at `self._inner._batch` is the source of truth for accumulated ops. pac_stream = _dispatch_batch_stream_blocking(self._inner._batch) return SyncRecordStream.from_pac_batch_stream(pac_stream, on_error=on_error)
[docs] class SyncBatchOperationBuilder: """Sync wrapper for :class:`~aerospike_sdk.aio.operations.batch.BatchOperationBuilder`. See Also: :class:`~aerospike_sdk.aio.operations.batch.BatchOperationBuilder` :meth:`~aerospike_sdk.sync.session.SyncSession.batch` """ __slots__ = ("_inner",)
[docs] def __init__(self, inner: AsyncBatchOperationBuilder) -> None: self._inner = inner
[docs] def insert(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.insert(key))
[docs] def update(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.update(key))
[docs] def upsert(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.upsert(key))
[docs] def replace(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.replace(key))
[docs] def replace_if_exists(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.replace_if_exists(key))
[docs] def delete(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.delete(key))
[docs] def execute(self, on_error: Optional[ErrorHandler] = None) -> SyncRecordStream: """Buffered sync batch execute — writes-complete-on-return. Awaits all per-key results before returning a :class:`SyncRecordStream` backed by a fully-materialized list. Safe for "fire-and-forget" use; subsequent reads observe the new state without races. For true per-record streaming, see :meth:`execute_stream`. Args: on_error: Optional ``(key, index, exception) -> None`` callback. When set, failed per-key results are dispatched to the callback and excluded from the returned stream — the stream contains only successes. Cluster-level errors still raise. Returns: A :class:`SyncRecordStream` of per-key :class:`RecordResult` items (positional via :attr:`RecordResult.index`). """ raw = self._inner.execute_blocking() if on_error is not None: return SyncRecordStream.from_list( _filter_records_with_handler( batch_records_to_results(raw), on_error, ), ) return SyncRecordStream.from_batch_records(raw)
[docs] def execute_stream( self, on_error: Optional[ErrorHandler] = None, ) -> SyncRecordStream: """Lazy sync batch execute — yields records in completion order. Dispatches all ops in a single mixed ``batch_stream_blocking`` call and returns a :class:`SyncRecordStream` whose ``__next__`` pulls ``(idx, BatchRecord)`` tuples from the PAC stream one at a time. **Caveats** — differ from :meth:`execute`: - **Yields completion order, not input order.** Each :class:`RecordResult` carries its originating op's input position in :attr:`RecordResult.index`; sort after collecting if you need positional results. - **Per-key errors inline** on :class:`RecordResult` (when ``on_error`` is unset); cluster-level errors raise from ``__next__``. - **No writes-complete-on-return guarantee.** Per-node tasks dispatch in the background; if the caller discards the stream without draining, server-side writes may still be in-flight. Tests / callers that follow "execute then immediately read" should use :meth:`execute` instead. Args: on_error: Optional ``(key, index, exception) -> None`` callback. When set, per-key failures are dispatched to the handler as records arrive and excluded from the returned stream; cluster-level errors still raise from ``__next__``. Returns: A lazy :class:`SyncRecordStream`. Iterate to drive PAC's per-record yield. Raises: ValueError: If no operations have been added. """ pac_stream = _dispatch_batch_stream_blocking(self._inner) return SyncRecordStream.from_pac_batch_stream(pac_stream, on_error=on_error)