# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Chainable builders for server-side background operations on datasets."""
from __future__ import annotations
import enum
import logging
from typing import TYPE_CHECKING, Any, List, Optional, Union, overload
log = logging.getLogger("aerospike_sdk.background")
from aerospike_async import (
Client,
ExecuteTask,
FilterExpression,
Operation,
RecordExistsAction,
)
from aerospike_sdk.background_shared import (
dataset_statement,
make_background_write_policy,
reject_unsupported_background_write_ops,
)
from aerospike_sdk.dataset import DataSet
from aerospike_sdk.ael.parser import parse_ael
from aerospike_sdk.exceptions import _convert_pac_exception
if TYPE_CHECKING: # Not unused — avoids circular import; used in type annotations only.
from aerospike_sdk.aio.session import Session
class _OpType(enum.Enum):
UPDATE = enum.auto()
DELETE = enum.auto()
TOUCH = enum.auto()
_BG_UNSUPPORTED = (
"fail_on_filtered_out and respond_all_keys apply to foreground reads; "
"they are not supported for background tasks."
)
[docs]
class BackgroundTaskSession:
"""Choose a dataset-wide background job (update, delete, touch, or UDF).
From :meth:`~aerospike_sdk.aio.session.Session.background_task`. Each
method returns a builder to add filters, bin operations or UDF arguments,
then ``await ...execute()`` for a server :class:`~aerospike_async.ExecuteTask`.
Example:
Background update with a filter::
task = await (
session.background_task()
.update(users)
.where("$.active == true")
.bin("score").add(1)
.execute()
)
See Also:
:meth:`~aerospike_sdk.aio.session.Session.execute_udf`: Foreground UDF on keys.
"""
[docs]
def __init__(self, session: Session) -> None:
"""Bind to *session*; prefer :meth:`Session.background_task`."""
self._session = session
[docs]
def update(self, dataset: DataSet) -> BackgroundOperationBuilder:
"""Start a ``query_operate`` update over records in *dataset*.
Args:
dataset: Namespace/set scope for the scan.
Returns:
:class:`BackgroundOperationBuilder` — add ``where``, ``bin``, then
:meth:`BackgroundOperationBuilder.execute`.
Raises:
ValueError: On execute if no bin operations were added.
"""
return BackgroundOperationBuilder(self._session, dataset, _OpType.UPDATE)
[docs]
def delete(self, dataset: DataSet) -> BackgroundOperationBuilder:
"""Start a background delete of all records matching optional filters.
Args:
dataset: Namespace/set to scan.
Returns:
:class:`BackgroundOperationBuilder` (no bin ops required for delete).
"""
return BackgroundOperationBuilder(self._session, dataset, _OpType.DELETE)
[docs]
def touch(self, dataset: DataSet) -> BackgroundOperationBuilder:
"""Start a background touch (TTL refresh) for matching records.
Args:
dataset: Namespace/set to scan.
Returns:
:class:`BackgroundOperationBuilder` — optional ``expire_record_after_seconds``.
"""
return BackgroundOperationBuilder(self._session, dataset, _OpType.TOUCH)
[docs]
def execute_udf(self, dataset: DataSet) -> BackgroundUdfFunctionBuilder:
"""Start a background UDF executed via ``query_execute_udf``.
Args:
dataset: Namespace/set scope.
Returns:
:class:`BackgroundUdfFunctionBuilder` — call :meth:`BackgroundUdfFunctionBuilder.function`
then :meth:`BackgroundUdfBuilder.passing` and :meth:`BackgroundUdfBuilder.execute`.
"""
return BackgroundUdfFunctionBuilder(self._session, dataset)
[docs]
class BackgroundWriteBinBuilder:
"""Per-bin write helper for background updates (``put`` / ``add`` only).
Obtained from :meth:`BackgroundOperationBuilder.bin`. Call :meth:`set_to`
or :meth:`add`, which return the parent builder for further chaining.
Example::
builder.bin("score").add(10)
"""
__slots__ = ("_parent", "_bin")
[docs]
def __init__(self, parent: BackgroundOperationBuilder, bin_name: str) -> None:
"""Capture the bin name; prefer :meth:`BackgroundOperationBuilder.bin`."""
self._parent = parent
self._bin = bin_name
[docs]
def set_to(self, value: Any) -> BackgroundOperationBuilder:
"""Set the bin to *value* (``Operation.put``).
Args:
value: The value to write.
Returns:
The parent :class:`BackgroundOperationBuilder`.
"""
self._parent._operations.append(Operation.put(self._bin, value))
return self._parent
[docs]
def add(self, value: Any) -> BackgroundOperationBuilder:
"""Add a numeric *value* to the bin (``Operation.add``).
Args:
value: Numeric amount to add (may be negative).
Returns:
The parent :class:`BackgroundOperationBuilder`.
"""
self._parent._operations.append(Operation.add(self._bin, value))
return self._parent
[docs]
class BackgroundOperationBuilder:
"""Configure filters, TTL, and operations for ``query_operate``.
Not all query-policy knobs are wired through to PAC for background jobs;
``records_per_second`` is stored for API parity but may not affect the
underlying call.
See Also:
:meth:`BackgroundTaskSession.update`: Typical construction path.
"""
__slots__ = (
"_session",
"_dataset",
"_op_type",
"_operations",
"_filter_expression",
"_ttl_seconds",
"_records_per_second",
)
[docs]
def __init__(
self,
session: Session,
dataset: DataSet,
op_type: _OpType,
) -> None:
self._session = session
self._dataset = dataset
self._op_type = op_type
self._operations: List[Any] = []
self._filter_expression: Optional[FilterExpression] = None
self._ttl_seconds: Optional[int] = None
self._records_per_second: Optional[int] = None
@overload
def where(self, expression: str) -> BackgroundOperationBuilder: ...
@overload
def where(self, expression: FilterExpression) -> BackgroundOperationBuilder: ...
[docs]
def where(
self,
expression: Union[str, FilterExpression],
) -> BackgroundOperationBuilder:
"""Restrict the scan with an AEL or ``FilterExpression`` predicate.
Returns:
This builder for chaining.
Example::
builder.where("$.status == 'inactive'")
"""
if isinstance(expression, str):
self._filter_expression = parse_ael(expression)
else:
self._filter_expression = expression
return self
[docs]
def bin(self, name: str) -> BackgroundWriteBinBuilder:
"""Start a scalar write on *name* (update jobs only).
Example::
builder.bin("score").add(10)
"""
return BackgroundWriteBinBuilder(self, name)
[docs]
def expire_record_after_seconds(self, seconds: int) -> BackgroundOperationBuilder:
"""Set record TTL in seconds for touches/updates when supported by policy."""
self._ttl_seconds = seconds
return self
[docs]
def records_per_second(self, rps: int) -> BackgroundOperationBuilder:
"""Store a throttle hint (may be unused depending on PAC background API)."""
self._records_per_second = rps
return self
[docs]
def fail_on_filtered_out(self) -> BackgroundOperationBuilder:
"""Unsupported for background tasks (raises ``TypeError``)."""
raise TypeError(_BG_UNSUPPORTED)
[docs]
def respond_all_keys(self) -> BackgroundOperationBuilder:
"""Unsupported for background tasks (raises ``TypeError``)."""
raise TypeError(_BG_UNSUPPORTED)
def _pac_client(self) -> Client:
fc = self._session.client
if fc._client is None:
raise RuntimeError("Client is not connected")
return fc._client
def _final_operations(self) -> List[Any]:
ops = list(self._operations)
if self._op_type is _OpType.DELETE:
if not ops:
ops = [Operation.delete()]
elif self._op_type is _OpType.TOUCH:
if not ops:
ops = [Operation.touch()]
elif self._op_type is _OpType.UPDATE:
if not ops:
raise ValueError(
"Background update requires at least one bin operation; "
"use .bin(name).set_to(...) or .add(...).",
)
return ops
def _record_exists_action(self) -> Optional[RecordExistsAction]:
if self._op_type is _OpType.UPDATE:
return RecordExistsAction.UPDATE_ONLY
if self._op_type is _OpType.TOUCH:
return RecordExistsAction.UPDATE_ONLY
return None
[docs]
async def execute(self) -> ExecuteTask:
"""Start the server job and return an :class:`~aerospike_async.ExecuteTask`.
Raises:
ValueError: For update without bin operations.
RuntimeError: If the SDK client is not connected.
AerospikeError: On PAC errors (converted).
Example::
task = await (
session.background_task()
.update(users)
.bin("visits").add(1)
.execute()
)
await task.wait_till_complete()
"""
ops = self._final_operations()
reject_unsupported_background_write_ops(ops)
log.debug(
"background %s: %s.%s ops=%d",
self._op_type.name if self._op_type else "WRITE",
self._dataset.namespace, self._dataset.set_name, len(ops),
)
wp = make_background_write_policy(
self._session.behavior,
self._filter_expression,
self._ttl_seconds,
self._record_exists_action(),
)
statement = dataset_statement(
self._dataset.namespace,
self._dataset.set_name,
)
client = self._pac_client()
try:
return await client.query_operate(wp, statement, ops)
except Exception as e:
raise _convert_pac_exception(e) from e
[docs]
class BackgroundUdfFunctionBuilder:
"""Pick module and function for a dataset background UDF."""
__slots__ = ("_session", "_dataset")
[docs]
def __init__(self, session: Session, dataset: DataSet) -> None:
self._session = session
self._dataset = dataset
[docs]
def function(
self,
package_name: str,
function_name: str,
) -> BackgroundUdfBuilder:
"""Select the registered package and Lua entrypoint.
Args:
package_name: Server module name (no ``.lua`` suffix).
function_name: Lua function to invoke.
Returns:
:class:`BackgroundUdfBuilder` for arguments and execution.
Raises:
ValueError: If either string is empty.
"""
if not package_name:
raise ValueError("package_name must be a non-empty string")
if not function_name:
raise ValueError("function_name must be a non-empty string")
return BackgroundUdfBuilder(
self._session,
self._dataset,
package_name,
function_name,
)
[docs]
class BackgroundUdfBuilder:
"""Arguments, optional filter, and execution for ``query_execute_udf``."""
__slots__ = (
"_session",
"_dataset",
"_package_name",
"_function_name",
"_args",
"_filter_expression",
"_records_per_second",
)
[docs]
def __init__(
self,
session: Session,
dataset: DataSet,
package_name: str,
function_name: str,
) -> None:
self._session = session
self._dataset = dataset
self._package_name = package_name
self._function_name = function_name
self._args: Optional[List[Any]] = None
self._filter_expression: Optional[FilterExpression] = None
self._records_per_second: Optional[int] = None
[docs]
def passing(self, *args: Any) -> BackgroundUdfBuilder:
"""Set Lua arguments after the implicit record parameter.
Returns:
This builder for chaining.
Example::
builder.passing("arg1", 42)
"""
self._args = list(args)
return self
@overload
def where(self, expression: str) -> BackgroundUdfBuilder: ...
@overload
def where(self, expression: FilterExpression) -> BackgroundUdfBuilder: ...
[docs]
def where(
self,
expression: Union[str, FilterExpression],
) -> BackgroundUdfBuilder:
"""Optional predicate limiting which records invoke the UDF."""
if isinstance(expression, str):
self._filter_expression = parse_ael(expression)
else:
self._filter_expression = expression
return self
[docs]
def records_per_second(self, rps: int) -> BackgroundUdfBuilder:
"""Throttle hint stored for API parity (may not affect PAC)."""
self._records_per_second = rps
return self
[docs]
def fail_on_filtered_out(self) -> BackgroundUdfBuilder:
"""Unsupported (raises ``TypeError``)."""
raise TypeError(_BG_UNSUPPORTED)
[docs]
def respond_all_keys(self) -> BackgroundUdfBuilder:
"""Unsupported (raises ``TypeError``)."""
raise TypeError(_BG_UNSUPPORTED)
def _pac_client(self) -> Client:
fc = self._session.client
if fc._client is None:
raise RuntimeError("Client is not connected")
return fc._client
[docs]
async def execute(self) -> ExecuteTask:
"""Start the background UDF job.
Raises:
RuntimeError: If the client is not connected.
AerospikeError: On PAC errors (converted).
Example::
task = await (
session.background_task()
.execute_udf(users)
.function("mypkg", "expire_old")
.passing(30)
.execute()
)
await task.wait_till_complete()
"""
log.debug(
"background UDF: %s.%s %s.%s",
self._dataset.namespace, self._dataset.set_name,
self._package_name, self._function_name,
)
wp = make_background_write_policy(
self._session.behavior,
self._filter_expression,
None,
None,
)
statement = dataset_statement(
self._dataset.namespace,
self._dataset.set_name,
)
client = self._pac_client()
py_args: Optional[List[Any]] = (
list(self._args) if self._args is not None else None
)
try:
return await client.query_execute_udf(
wp,
statement,
self._package_name,
self._function_name,
py_args,
)
except Exception as e:
raise _convert_pac_exception(e) from e