# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""BatchOperationBuilder - Builder for chaining operations across multiple keys."""
from __future__ import annotations
from enum import Enum
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union
from aerospike_async import (
Client,
ExpOperation,
ExpReadFlags,
ExpWriteFlags,
FilterExpression,
Key,
Operation,
Txn,
)
from aerospike_sdk.aio.operations.query import _build_exp_write_flags
from aerospike_sdk.ael.parser import parse_ael
from aerospike_sdk.exceptions import _convert_pac_exception
from aerospike_sdk.policy.behavior_settings import OpKind, OpShape
from aerospike_sdk.policy.policy_mapper import to_batch_policy
from aerospike_sdk.record_stream import RecordStream
if TYPE_CHECKING: # Not unused — avoids circular import; used in type annotations only.
from aerospike_sdk.policy.behavior import Behavior
[docs]
class BatchOpType(Enum):
"""Type of batch operation."""
INSERT = "insert"
UPDATE = "update"
UPSERT = "upsert"
REPLACE = "replace"
REPLACE_IF_EXISTS = "replace_if_exists"
DELETE = "delete"
[docs]
class BatchBinBuilder:
"""
Builder for chaining bin operations within a batch key operation.
Example:
batch.insert(key).bin("name").set_to("Alice").bin("age").set_to(25)
"""
[docs]
def __init__(self, key_op: BatchKeyOperationBuilder, bin_name: str) -> None:
self._key_op = key_op
self._bin_name = bin_name
[docs]
def set_to(self, value: Any) -> BatchKeyOperationBuilder:
"""
Set a bin value.
Args:
value: The value to set.
Returns:
The parent BatchKeyOperationBuilder for chaining.
"""
self._key_op._bins[self._bin_name] = value
self._key_op._operations.append(Operation.put(self._bin_name, value))
return self._key_op
[docs]
def add(self, value: int) -> BatchKeyOperationBuilder:
"""Add *value* to the bin (numeric increment)."""
self._key_op._operations.append(Operation.add(self._bin_name, value))
return self._key_op
[docs]
def increment_by(self, value: int) -> BatchKeyOperationBuilder:
"""Alias for :meth:`add`."""
return self.add(value)
[docs]
def append(self, value: str) -> BatchKeyOperationBuilder:
"""
Append a string to a bin value.
Args:
value: The string to append.
Returns:
The parent BatchKeyOperationBuilder for chaining.
"""
self._key_op._operations.append(Operation.append(self._bin_name, value))
return self._key_op
[docs]
def prepend(self, value: str) -> BatchKeyOperationBuilder:
"""
Prepend a string to a bin value.
Args:
value: The string to prepend.
Returns:
The parent BatchKeyOperationBuilder for chaining.
"""
self._key_op._operations.append(Operation.prepend(self._bin_name, value))
return self._key_op
[docs]
def select_from(
self,
expression: Union[str, FilterExpression],
*,
ignore_eval_failure: bool = False,
) -> BatchKeyOperationBuilder:
"""
Read the result of an expression into this bin.
Args:
expression: AEL string or pre-built FilterExpression.
ignore_eval_failure: If True, suppress evaluation errors.
Returns:
The parent BatchKeyOperationBuilder for chaining.
"""
flags = ExpReadFlags.EVAL_NO_FAIL if ignore_eval_failure else ExpReadFlags.DEFAULT
expr = parse_ael(expression) if isinstance(expression, str) else expression
self._key_op._operations.append(ExpOperation.read(self._bin_name, expr, flags))
return self._key_op
[docs]
def insert_from(
self,
expression: Union[str, FilterExpression],
*,
ignore_op_failure: bool = False,
ignore_eval_failure: bool = False,
delete_if_null: bool = False,
) -> BatchKeyOperationBuilder:
"""
Write expression result to this bin only if it does not already exist.
Args:
expression: AEL string or pre-built FilterExpression.
ignore_op_failure: If True, suppress BIN_EXISTS_ERROR.
ignore_eval_failure: If True, suppress evaluation errors.
delete_if_null: If True, delete bin when expression evaluates to nil.
Returns:
The parent BatchKeyOperationBuilder for chaining.
"""
flags = _build_exp_write_flags(
ExpWriteFlags.CREATE_ONLY, ignore_op_failure, ignore_eval_failure, delete_if_null,
)
expr = parse_ael(expression) if isinstance(expression, str) else expression
self._key_op._operations.append(ExpOperation.write(self._bin_name, expr, flags))
return self._key_op
[docs]
def update_from(
self,
expression: Union[str, FilterExpression],
*,
ignore_op_failure: bool = False,
ignore_eval_failure: bool = False,
delete_if_null: bool = False,
) -> BatchKeyOperationBuilder:
"""
Write expression result to this bin only if it already exists.
Args:
expression: AEL string or pre-built FilterExpression.
ignore_op_failure: If True, suppress BIN_NOT_FOUND.
ignore_eval_failure: If True, suppress evaluation errors.
delete_if_null: If True, delete bin when expression evaluates to nil.
Returns:
The parent BatchKeyOperationBuilder for chaining.
"""
flags = _build_exp_write_flags(
ExpWriteFlags.UPDATE_ONLY, ignore_op_failure, ignore_eval_failure, delete_if_null,
)
expr = parse_ael(expression) if isinstance(expression, str) else expression
self._key_op._operations.append(ExpOperation.write(self._bin_name, expr, flags))
return self._key_op
[docs]
def upsert_from(
self,
expression: Union[str, FilterExpression],
*,
ignore_op_failure: bool = False,
ignore_eval_failure: bool = False,
delete_if_null: bool = False,
) -> BatchKeyOperationBuilder:
"""
Write expression result to this bin (create or update).
Args:
expression: AEL string or pre-built FilterExpression.
ignore_op_failure: If True, suppress policy errors.
ignore_eval_failure: If True, suppress evaluation errors.
delete_if_null: If True, delete bin when expression evaluates to nil.
Returns:
The parent BatchKeyOperationBuilder for chaining.
"""
flags = _build_exp_write_flags(
ExpWriteFlags.DEFAULT, ignore_op_failure, ignore_eval_failure, delete_if_null,
)
expr = parse_ael(expression) if isinstance(expression, str) else expression
self._key_op._operations.append(ExpOperation.write(self._bin_name, expr, flags))
return self._key_op
[docs]
class BatchKeyOperationBuilder:
"""
Builder for a single key's operation within a batch.
This class allows chaining bin operations and then continuing
to add more keys to the batch.
Example:
batch.insert(key1).bin("name").set_to("Alice") \\
.update(key2).bin("counter").add(1)
"""
[docs]
def __init__(
self,
batch: BatchOperationBuilder,
key: Key,
op_type: BatchOpType,
) -> None:
self._batch = batch
self._key = key
self._op_type = op_type
self._bins: Dict[str, Any] = {}
self._operations: List[Union[Operation, ExpOperation]] = []
[docs]
def bin(self, bin_name: str) -> BatchBinBuilder:
"""
Start a bin operation chain.
Args:
bin_name: The name of the bin.
Returns:
A BatchBinBuilder for chaining bin operations.
Example:
batch.insert(key).bin("name").set_to("Alice").bin("age").set_to(25)
"""
return BatchBinBuilder(self, bin_name)
[docs]
def put(self, bins: Dict[str, Any]) -> BatchKeyOperationBuilder:
"""
Set multiple bins at once.
Args:
bins: Dictionary of bin name to value mappings.
Returns:
self for method chaining.
Example:
batch.insert(key).put({"name": "Alice", "age": 25})
"""
self._bins.update(bins)
for bin_name, value in bins.items():
self._operations.append(Operation.put(bin_name, value))
return self
# Methods to continue chaining to more keys (delegate to batch)
[docs]
def insert(self, key: Key) -> BatchKeyOperationBuilder:
"""Add an insert operation for another key."""
return self._batch.insert(key)
[docs]
def update(self, key: Key) -> BatchKeyOperationBuilder:
"""Add an update operation for another key."""
return self._batch.update(key)
[docs]
def upsert(self, key: Key) -> BatchKeyOperationBuilder:
"""Add an upsert operation for another key."""
return self._batch.upsert(key)
[docs]
def replace(self, key: Key) -> BatchKeyOperationBuilder:
"""Add a replace operation for another key."""
return self._batch.replace(key)
[docs]
def replace_if_exists(self, key: Key) -> BatchKeyOperationBuilder:
"""Add a replace-if-exists operation for another key."""
return self._batch.replace_if_exists(key)
[docs]
def delete(self, key: Key) -> BatchKeyOperationBuilder:
"""Add a delete operation for another key."""
return self._batch.delete(key)
[docs]
async def execute(self) -> RecordStream:
"""Execute all batch operations."""
return await self._batch.execute()
[docs]
class BatchOperationBuilder:
"""
Builder for chaining operations across multiple keys.
This class enables method chaining of operations on different keys,
which are then executed as a single batch operation.
Example::
results = await session.batch() \\
.insert(key1).bin("name").set_to("Alice").bin("age").set_to(25) \\
.update(key2).bin("counter").add(1) \\
.delete(key3) \\
.execute()
The operations are collected and executed together using the async
client's batch_operate method for optimal performance.
"""
[docs]
def __init__(
self,
client: Client,
behavior: Optional[Behavior] = None,
txn: Optional[Txn] = None,
) -> None:
"""
Initialize a BatchOperationBuilder.
Args:
client: The underlying async client.
behavior: Optional Behavior for deriving policies.
txn: Optional active :class:`~aerospike_async.Txn` captured from
a transactional session; stamped on the outer batch policy
at execute. ``None`` means no transaction participation.
"""
self._client = client
self._behavior = behavior
self._key_operations: List[BatchKeyOperationBuilder] = []
self._txn: Optional[Txn] = txn
def _apply_txn(self, policy: Any) -> Any:
"""Stamp this builder's captured txn on the outer batch policy."""
if self._txn is not None and policy is not None:
policy.txn = self._txn
return policy
[docs]
def with_txn(self, txn: Optional[Txn]) -> "BatchOperationBuilder":
"""Opt this batch into (or out of) a specific transaction.
See :meth:`aerospike_sdk.aio.operations.query.QueryBuilder.with_txn`
for semantics.
Args:
txn: The :class:`~aerospike_async.Txn` to participate in, or
``None`` to opt out.
Returns:
This builder for chaining.
"""
self._txn = txn
return self
[docs]
def insert(self, key: Key) -> BatchKeyOperationBuilder:
"""
Add an insert (create only) operation for a key.
Args:
key: The key for the record.
Returns:
A BatchKeyOperationBuilder for chaining bin operations.
Example:
batch.insert(key).bin("name").set_to("Alice")
"""
op = BatchKeyOperationBuilder(self, key, BatchOpType.INSERT)
self._key_operations.append(op)
return op
[docs]
def update(self, key: Key) -> BatchKeyOperationBuilder:
"""
Add an update (update only) operation for a key.
Args:
key: The key for the record.
Returns:
A BatchKeyOperationBuilder for chaining bin operations.
Example:
batch.update(key).bin("counter").add(1)
"""
op = BatchKeyOperationBuilder(self, key, BatchOpType.UPDATE)
self._key_operations.append(op)
return op
[docs]
def upsert(self, key: Key) -> BatchKeyOperationBuilder:
"""
Add an upsert (create or update) operation for a key.
Args:
key: The key for the record.
Returns:
A BatchKeyOperationBuilder for chaining bin operations.
Example:
batch.upsert(key).bin("name").set_to("Bob")
"""
op = BatchKeyOperationBuilder(self, key, BatchOpType.UPSERT)
self._key_operations.append(op)
return op
[docs]
def replace(self, key: Key) -> BatchKeyOperationBuilder:
"""
Add a replace (create or replace) operation for a key.
Args:
key: The key for the record.
Returns:
A BatchKeyOperationBuilder for chaining bin operations.
Example:
batch.replace(key).put({"name": "Charlie", "age": 35})
"""
op = BatchKeyOperationBuilder(self, key, BatchOpType.REPLACE)
self._key_operations.append(op)
return op
[docs]
def replace_if_exists(self, key: Key) -> BatchKeyOperationBuilder:
"""
Add a replace-if-exists operation for a key.
This operation will fail if the record does not exist.
Args:
key: The key for the record.
Returns:
A BatchKeyOperationBuilder for chaining bin operations.
Example:
batch.replace_if_exists(key).put({"name": "Updated", "status": "active"})
"""
op = BatchKeyOperationBuilder(self, key, BatchOpType.REPLACE_IF_EXISTS)
self._key_operations.append(op)
return op
[docs]
def delete(self, key: Key) -> BatchKeyOperationBuilder:
"""
Add a delete operation for a key.
Args:
key: The key for the record.
Returns:
A BatchKeyOperationBuilder for continuing the chain.
Example:
batch.delete(key1).delete(key2).execute()
"""
op = BatchKeyOperationBuilder(self, key, BatchOpType.DELETE)
self._key_operations.append(op)
return op
[docs]
async def execute(self) -> RecordStream:
"""Execute all batch operations.
Example::
stream = await (
session.batch()
.insert(key1)
.bin("name").set_to("Ada")
.upsert(key2)
.bin("n").set_to(1)
.execute()
)
rows = await stream.collect()
Returns:
A :class:`RecordStream` of per-key :class:`RecordResult` items.
Raises:
ValueError: If no operations have been added.
"""
if not self._key_operations:
raise ValueError("No operations to execute. Add operations with insert(), update(), etc.")
# Separate delete operations from others (they use batch_delete)
delete_keys: List[Key] = []
operate_keys: List[Key] = []
operate_ops: List[List[Union[Operation, ExpOperation]]] = []
for key_op in self._key_operations:
if key_op._op_type == BatchOpType.DELETE:
delete_keys.append(key_op._key)
else:
operate_keys.append(key_op._key)
# Build operations list for this key
ops = key_op._operations.copy()
# If no operations but we have bins, convert to put operations
if not ops and key_op._bins:
for bin_name, value in key_op._bins.items():
ops.append(Operation.put(bin_name, value))
# If still no operations, add a touch to make it valid
if not ops:
ops.append(Operation.touch())
operate_ops.append(ops)
raw_results: list = []
batch_policy = None
if self._behavior is not None:
batch_policy = to_batch_policy(
self._behavior.get_settings(OpKind.WRITE_NON_RETRYABLE, OpShape.BATCH))
# Under MRT the PAC rejects a null BatchPolicy, so materialize one
# just to carry the txn reference when the behavior path didn't.
if self._txn is not None and batch_policy is None:
from aerospike_async import BatchPolicy
batch_policy = BatchPolicy()
self._apply_txn(batch_policy)
try:
if delete_keys:
delete_results = await self._client.batch_delete(
batch_policy,
None,
delete_keys,
)
raw_results.extend(delete_results)
if operate_keys:
operate_results = await self._client.batch_operate(
batch_policy,
None,
operate_keys,
operate_ops,
)
raw_results.extend(operate_results)
except Exception as e:
raise _convert_pac_exception(e) from e
return RecordStream.from_batch_records(raw_results)