# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Synchronous batch operation builders delegating to ``aio.operations.batch``."""
from __future__ import annotations
from typing import Any, Dict, Union
from aerospike_async import FilterExpression, Key
from aerospike_sdk.aio.operations.batch import (
BatchBinBuilder as AsyncBatchBinBuilder,
BatchKeyOperationBuilder as AsyncBatchKeyOperationBuilder,
BatchOperationBuilder as AsyncBatchOperationBuilder,
)
from aerospike_sdk.sync.client import _EventLoopManager
from aerospike_sdk.sync.record_stream import SyncRecordStream
[docs]
class SyncBatchBinBuilder:
"""Sync wrapper for :class:`~aerospike_sdk.aio.operations.batch.BatchBinBuilder`.
See Also:
:class:`~aerospike_sdk.aio.operations.batch.BatchBinBuilder`
"""
__slots__ = ("_inner", "_loop_manager")
[docs]
def __init__(
self,
inner: AsyncBatchBinBuilder,
loop_manager: _EventLoopManager,
) -> None:
self._inner = inner
self._loop_manager = loop_manager
[docs]
def set_to(self, value: Any) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.set_to(value), self._loop_manager)
[docs]
def add(self, value: int) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.add(value), self._loop_manager)
[docs]
def increment_by(self, value: int) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.increment_by(value), self._loop_manager)
[docs]
def append(self, value: str) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.append(value), self._loop_manager)
[docs]
def prepend(self, value: str) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.prepend(value), self._loop_manager)
[docs]
def select_from(
self,
expression: Union[str, FilterExpression],
*,
ignore_eval_failure: bool = False,
) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(
self._inner.select_from(expression, ignore_eval_failure=ignore_eval_failure),
self._loop_manager,
)
[docs]
def insert_from(
self,
expression: Union[str, FilterExpression],
*,
ignore_op_failure: bool = False,
ignore_eval_failure: bool = False,
delete_if_null: bool = False,
) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(
self._inner.insert_from(
expression,
ignore_op_failure=ignore_op_failure,
ignore_eval_failure=ignore_eval_failure,
delete_if_null=delete_if_null,
),
self._loop_manager,
)
[docs]
def update_from(
self,
expression: Union[str, FilterExpression],
*,
ignore_op_failure: bool = False,
ignore_eval_failure: bool = False,
delete_if_null: bool = False,
) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(
self._inner.update_from(
expression,
ignore_op_failure=ignore_op_failure,
ignore_eval_failure=ignore_eval_failure,
delete_if_null=delete_if_null,
),
self._loop_manager,
)
[docs]
def upsert_from(
self,
expression: Union[str, FilterExpression],
*,
ignore_op_failure: bool = False,
ignore_eval_failure: bool = False,
delete_if_null: bool = False,
) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(
self._inner.upsert_from(
expression,
ignore_op_failure=ignore_op_failure,
ignore_eval_failure=ignore_eval_failure,
delete_if_null=delete_if_null,
),
self._loop_manager,
)
[docs]
class SyncBatchKeyOperationBuilder:
"""Sync wrapper for :class:`~aerospike_sdk.aio.operations.batch.BatchKeyOperationBuilder`.
See Also:
:class:`~aerospike_sdk.aio.operations.batch.BatchKeyOperationBuilder`
"""
__slots__ = ("_inner", "_loop_manager")
[docs]
def __init__(
self,
inner: AsyncBatchKeyOperationBuilder,
loop_manager: _EventLoopManager,
) -> None:
self._inner = inner
self._loop_manager = loop_manager
[docs]
def bin(self, bin_name: str) -> SyncBatchBinBuilder:
return SyncBatchBinBuilder(self._inner.bin(bin_name), self._loop_manager)
[docs]
def put(self, bins: Dict[str, Any]) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.put(bins), self._loop_manager)
[docs]
def insert(self, key: Key) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.insert(key), self._loop_manager)
[docs]
def update(self, key: Key) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.update(key), self._loop_manager)
[docs]
def upsert(self, key: Key) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.upsert(key), self._loop_manager)
[docs]
def replace(self, key: Key) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.replace(key), self._loop_manager)
[docs]
def replace_if_exists(self, key: Key) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.replace_if_exists(key), self._loop_manager)
[docs]
def delete(self, key: Key) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.delete(key), self._loop_manager)
[docs]
def execute(self) -> SyncRecordStream:
stream = self._loop_manager.run_async(self._inner.execute())
return SyncRecordStream(stream, self._loop_manager)
[docs]
class SyncBatchOperationBuilder:
"""Sync wrapper for :class:`~aerospike_sdk.aio.operations.batch.BatchOperationBuilder`.
See Also:
:class:`~aerospike_sdk.aio.operations.batch.BatchOperationBuilder`
:meth:`~aerospike_sdk.sync.session.SyncSession.batch`
"""
__slots__ = ("_inner", "_loop_manager")
[docs]
def __init__(
self,
inner: AsyncBatchOperationBuilder,
loop_manager: _EventLoopManager,
) -> None:
self._inner = inner
self._loop_manager = loop_manager
[docs]
def insert(self, key: Key) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.insert(key), self._loop_manager)
[docs]
def update(self, key: Key) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.update(key), self._loop_manager)
[docs]
def upsert(self, key: Key) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.upsert(key), self._loop_manager)
[docs]
def replace(self, key: Key) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.replace(key), self._loop_manager)
[docs]
def replace_if_exists(self, key: Key) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.replace_if_exists(key), self._loop_manager)
[docs]
def delete(self, key: Key) -> SyncBatchKeyOperationBuilder:
return SyncBatchKeyOperationBuilder(self._inner.delete(key), self._loop_manager)
[docs]
def execute(self) -> SyncRecordStream:
stream = self._loop_manager.run_async(self._inner.execute())
return SyncRecordStream(stream, self._loop_manager)