Source code for aerospike_sdk.sync.operations.batch

# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

"""Synchronous batch operation builders delegating to ``aio.operations.batch``."""

from __future__ import annotations

from typing import Any, Dict, Union

from aerospike_async import FilterExpression, Key

from aerospike_sdk.aio.operations.batch import (
    BatchBinBuilder as AsyncBatchBinBuilder,
    BatchKeyOperationBuilder as AsyncBatchKeyOperationBuilder,
    BatchOperationBuilder as AsyncBatchOperationBuilder,
)
from aerospike_sdk.sync.client import _EventLoopManager
from aerospike_sdk.sync.record_stream import SyncRecordStream


[docs] class SyncBatchBinBuilder: """Sync wrapper for :class:`~aerospike_sdk.aio.operations.batch.BatchBinBuilder`. See Also: :class:`~aerospike_sdk.aio.operations.batch.BatchBinBuilder` """ __slots__ = ("_inner", "_loop_manager")
[docs] def __init__( self, inner: AsyncBatchBinBuilder, loop_manager: _EventLoopManager, ) -> None: self._inner = inner self._loop_manager = loop_manager
[docs] def set_to(self, value: Any) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.set_to(value), self._loop_manager)
[docs] def add(self, value: int) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.add(value), self._loop_manager)
[docs] def increment_by(self, value: int) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.increment_by(value), self._loop_manager)
[docs] def append(self, value: str) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.append(value), self._loop_manager)
[docs] def prepend(self, value: str) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.prepend(value), self._loop_manager)
[docs] def select_from( self, expression: Union[str, FilterExpression], *, ignore_eval_failure: bool = False, ) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.select_from(expression, ignore_eval_failure=ignore_eval_failure), self._loop_manager, )
[docs] def insert_from( self, expression: Union[str, FilterExpression], *, ignore_op_failure: bool = False, ignore_eval_failure: bool = False, delete_if_null: bool = False, ) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.insert_from( expression, ignore_op_failure=ignore_op_failure, ignore_eval_failure=ignore_eval_failure, delete_if_null=delete_if_null, ), self._loop_manager, )
[docs] def update_from( self, expression: Union[str, FilterExpression], *, ignore_op_failure: bool = False, ignore_eval_failure: bool = False, delete_if_null: bool = False, ) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.update_from( expression, ignore_op_failure=ignore_op_failure, ignore_eval_failure=ignore_eval_failure, delete_if_null=delete_if_null, ), self._loop_manager, )
[docs] def upsert_from( self, expression: Union[str, FilterExpression], *, ignore_op_failure: bool = False, ignore_eval_failure: bool = False, delete_if_null: bool = False, ) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder( self._inner.upsert_from( expression, ignore_op_failure=ignore_op_failure, ignore_eval_failure=ignore_eval_failure, delete_if_null=delete_if_null, ), self._loop_manager, )
[docs] class SyncBatchKeyOperationBuilder: """Sync wrapper for :class:`~aerospike_sdk.aio.operations.batch.BatchKeyOperationBuilder`. See Also: :class:`~aerospike_sdk.aio.operations.batch.BatchKeyOperationBuilder` """ __slots__ = ("_inner", "_loop_manager")
[docs] def __init__( self, inner: AsyncBatchKeyOperationBuilder, loop_manager: _EventLoopManager, ) -> None: self._inner = inner self._loop_manager = loop_manager
[docs] def bin(self, bin_name: str) -> SyncBatchBinBuilder: return SyncBatchBinBuilder(self._inner.bin(bin_name), self._loop_manager)
[docs] def put(self, bins: Dict[str, Any]) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.put(bins), self._loop_manager)
[docs] def insert(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.insert(key), self._loop_manager)
[docs] def update(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.update(key), self._loop_manager)
[docs] def upsert(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.upsert(key), self._loop_manager)
[docs] def replace(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.replace(key), self._loop_manager)
[docs] def replace_if_exists(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.replace_if_exists(key), self._loop_manager)
[docs] def delete(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.delete(key), self._loop_manager)
[docs] def execute(self) -> SyncRecordStream: stream = self._loop_manager.run_async(self._inner.execute()) return SyncRecordStream(stream, self._loop_manager)
[docs] class SyncBatchOperationBuilder: """Sync wrapper for :class:`~aerospike_sdk.aio.operations.batch.BatchOperationBuilder`. See Also: :class:`~aerospike_sdk.aio.operations.batch.BatchOperationBuilder` :meth:`~aerospike_sdk.sync.session.SyncSession.batch` """ __slots__ = ("_inner", "_loop_manager")
[docs] def __init__( self, inner: AsyncBatchOperationBuilder, loop_manager: _EventLoopManager, ) -> None: self._inner = inner self._loop_manager = loop_manager
[docs] def insert(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.insert(key), self._loop_manager)
[docs] def update(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.update(key), self._loop_manager)
[docs] def upsert(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.upsert(key), self._loop_manager)
[docs] def replace(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.replace(key), self._loop_manager)
[docs] def replace_if_exists(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.replace_if_exists(key), self._loop_manager)
[docs] def delete(self, key: Key) -> SyncBatchKeyOperationBuilder: return SyncBatchKeyOperationBuilder(self._inner.delete(key), self._loop_manager)
[docs] def execute(self) -> SyncRecordStream: stream = self._loop_manager.run_async(self._inner.execute()) return SyncRecordStream(stream, self._loop_manager)