# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Synchronous background dataset task builders (delegate to ``aio.background``)."""
from __future__ import annotations
from typing import Any, Union, overload
from aerospike_async import ExecuteTask, FilterExpression
from aerospike_sdk.aio.background import (
BackgroundOperationBuilder as AsyncBackgroundOperationBuilder,
BackgroundTaskSession as AsyncBackgroundTaskSession,
BackgroundUdfBuilder as AsyncBackgroundUdfBuilder,
BackgroundUdfFunctionBuilder as AsyncBackgroundUdfFunctionBuilder,
BackgroundWriteBinBuilder as AsyncBackgroundWriteBinBuilder,
)
from aerospike_sdk.dataset import DataSet
from aerospike_sdk.sync.client import _EventLoopManager
[docs]
class SyncBackgroundWriteBinBuilder:
"""Per-bin scalar write inside a background operation (sync).
See Also:
:class:`~aerospike_sdk.aio.background.BackgroundWriteBinBuilder`
"""
__slots__ = ("_inner", "_loop_manager")
[docs]
def __init__(
self,
inner: AsyncBackgroundWriteBinBuilder,
loop_manager: _EventLoopManager,
) -> None:
self._inner = inner
self._loop_manager = loop_manager
[docs]
def set_to(self, value: Any) -> SyncBackgroundOperationBuilder:
"""Set the bin to *value* (sync wrapper)."""
self._inner.set_to(value)
return SyncBackgroundOperationBuilder(
self._inner._parent, self._loop_manager)
[docs]
def add(self, value: Any) -> SyncBackgroundOperationBuilder:
"""Numeric increment (sync wrapper)."""
self._inner.add(value)
return SyncBackgroundOperationBuilder(
self._inner._parent, self._loop_manager)
[docs]
class SyncBackgroundOperationBuilder:
"""Configure a background update/delete/touch job (sync).
See Also:
:class:`~aerospike_sdk.aio.background.BackgroundOperationBuilder`
"""
__slots__ = ("_inner", "_loop_manager")
[docs]
def __init__(
self,
inner: AsyncBackgroundOperationBuilder,
loop_manager: _EventLoopManager,
) -> None:
self._inner = inner
self._loop_manager = loop_manager
@overload
def where(self, expression: str) -> SyncBackgroundOperationBuilder: ...
@overload
def where(self, expression: FilterExpression) -> SyncBackgroundOperationBuilder: ...
[docs]
def where(
self,
expression: Union[str, FilterExpression],
) -> SyncBackgroundOperationBuilder:
"""Restrict the scan with an AEL or filter predicate."""
self._inner.where(expression)
return self
[docs]
def bin(self, name: str) -> SyncBackgroundWriteBinBuilder:
return SyncBackgroundWriteBinBuilder(
self._inner.bin(name), self._loop_manager)
[docs]
def expire_record_after_seconds(self, seconds: int) -> SyncBackgroundOperationBuilder:
"""Set record TTL for the background job."""
self._inner.expire_record_after_seconds(seconds)
return self
[docs]
def records_per_second(self, rps: int) -> SyncBackgroundOperationBuilder:
"""Store a throttle hint."""
self._inner.records_per_second(rps)
return self
[docs]
def fail_on_filtered_out(self) -> SyncBackgroundOperationBuilder:
self._inner.fail_on_filtered_out()
return self
[docs]
def respond_all_keys(self) -> SyncBackgroundOperationBuilder:
"""Unsupported for background tasks."""
self._inner.respond_all_keys()
return self
[docs]
def execute(self) -> ExecuteTask:
"""Submit the job and return a task handle (blocks until accepted).
See Also:
:meth:`~aerospike_sdk.aio.background.BackgroundOperationBuilder.execute`
"""
inner = self._inner
async def _run() -> ExecuteTask:
return await inner.execute()
return self._loop_manager.run_async(_run())
[docs]
class SyncBackgroundUdfFunctionBuilder:
"""Select UDF package/function for a background dataset run (sync).
See Also:
:class:`~aerospike_sdk.aio.background.BackgroundUdfFunctionBuilder`
"""
__slots__ = ("_inner", "_loop_manager")
[docs]
def __init__(
self,
inner: AsyncBackgroundUdfFunctionBuilder,
loop_manager: _EventLoopManager,
) -> None:
self._inner = inner
self._loop_manager = loop_manager
[docs]
def function(
self,
package_name: str,
function_name: str,
) -> SyncBackgroundUdfBuilder:
"""Select the UDF package and Lua function."""
b = self._inner.function(package_name, function_name)
return SyncBackgroundUdfBuilder(b, self._loop_manager)
[docs]
class SyncBackgroundUdfBuilder:
"""Arguments, filters, and throttle for background UDF execution (sync).
See Also:
:class:`~aerospike_sdk.aio.background.BackgroundUdfBuilder`
"""
__slots__ = ("_inner", "_loop_manager")
[docs]
def __init__(
self,
inner: AsyncBackgroundUdfBuilder,
loop_manager: _EventLoopManager,
) -> None:
self._inner = inner
self._loop_manager = loop_manager
[docs]
def passing(self, *args: Any) -> SyncBackgroundUdfBuilder:
self._inner.passing(*args)
return self
@overload
def where(self, expression: str) -> SyncBackgroundUdfBuilder: ...
@overload
def where(self, expression: FilterExpression) -> SyncBackgroundUdfBuilder: ...
[docs]
def where(
self,
expression: Union[str, FilterExpression],
) -> SyncBackgroundUdfBuilder:
"""Optional predicate limiting which records invoke the UDF."""
self._inner.where(expression)
return self
[docs]
def records_per_second(self, rps: int) -> SyncBackgroundUdfBuilder:
self._inner.records_per_second(rps)
return self
[docs]
def fail_on_filtered_out(self) -> SyncBackgroundUdfBuilder:
"""Unsupported for background tasks."""
self._inner.fail_on_filtered_out()
return self
[docs]
def respond_all_keys(self) -> SyncBackgroundUdfBuilder:
"""Unsupported for background tasks."""
self._inner.respond_all_keys()
return self
[docs]
def execute(self) -> ExecuteTask:
"""Submit the background UDF job (blocks until accepted).
See Also:
:meth:`~aerospike_sdk.aio.background.BackgroundUdfBuilder.execute`
"""
inner = self._inner
async def _run() -> ExecuteTask:
return await inner.execute()
return self._loop_manager.run_async(_run())
[docs]
class SyncBackgroundTaskSession:
"""Sync entry for server-side dataset background operations.
Obtained from :meth:`~aerospike_sdk.sync.session.SyncSession.background_task`.
Each method returns a sync builder that mirrors
:class:`~aerospike_sdk.aio.background.BackgroundTaskSession`.
See Also:
:class:`~aerospike_sdk.aio.background.BackgroundTaskSession`
Examples:
session.background_task().update(dataset).bin("x").set_to(1).execute()
session.background_task().execute_udf(dataset).function("pkg", "fn").execute()
"""
__slots__ = ("_inner", "_loop_manager")
[docs]
def __init__(
self,
inner: AsyncBackgroundTaskSession,
loop_manager: _EventLoopManager,
) -> None:
self._inner = inner
self._loop_manager = loop_manager
[docs]
def update(self, dataset: DataSet) -> SyncBackgroundOperationBuilder:
b = self._inner.update(dataset)
return SyncBackgroundOperationBuilder(b, self._loop_manager)
[docs]
def delete(self, dataset: DataSet) -> SyncBackgroundOperationBuilder:
"""Start a background delete over *dataset*."""
b = self._inner.delete(dataset)
return SyncBackgroundOperationBuilder(b, self._loop_manager)
[docs]
def touch(self, dataset: DataSet) -> SyncBackgroundOperationBuilder:
"""Start a background touch (TTL refresh) over *dataset*."""
b = self._inner.touch(dataset)
return SyncBackgroundOperationBuilder(b, self._loop_manager)
[docs]
def execute_udf(self, dataset: DataSet) -> SyncBackgroundUdfFunctionBuilder:
"""Start a background UDF over *dataset*."""
b = self._inner.execute_udf(dataset)
return SyncBackgroundUdfFunctionBuilder(b, self._loop_manager)