# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Synchronous foreground UDF builders delegating to ``aio.operations.udf``."""
from __future__ import annotations
from typing import Any, List, Union, overload
from aerospike_async import FilterExpression, Key
from aerospike_sdk.aio.client import Client
from aerospike_sdk.aio.operations.udf import (
UdfBuilder as AsyncUdfBuilder,
UdfFunctionBuilder as AsyncUdfFunctionBuilder,
)
from aerospike_sdk.error_strategy import OnError
from aerospike_sdk.sync.client import _EventLoopManager
from aerospike_sdk.sync.operations.query import SyncQueryBuilder, SyncWriteSegmentBuilder
from aerospike_sdk.sync.record_stream import SyncRecordStream
[docs]
class SyncUdfFunctionBuilder:
"""First step after ``execute_udf``: select package and function name.
See Also:
:class:`~aerospike_sdk.aio.operations.udf.UdfFunctionBuilder`
Examples:
session.execute_udf(key).function("pkg", "fn")
"""
__slots__ = ("_inner", "_loop_manager", "_sdk_client")
[docs]
def __init__(
self,
inner: AsyncUdfFunctionBuilder,
loop_manager: _EventLoopManager,
sdk_client: Client,
) -> None:
self._inner = inner
self._loop_manager = loop_manager
self._sdk_client = sdk_client
[docs]
def function(self, package: str, function_name: str) -> SyncUdfBuilder:
"""Select the UDF package and Lua function."""
b = self._inner.function(package, function_name)
return SyncUdfBuilder(b, self._loop_manager, self._sdk_client)
[docs]
class SyncUdfBuilder:
"""Chain UDF arguments, optional filter, and execution (sync).
See Also:
:class:`~aerospike_sdk.aio.operations.udf.UdfBuilder`
Examples:
session.execute_udf(key).function("pkg", "fn").passing(1, 2).execute()
session.execute_udf(key).function("pkg", "fn").query(key).where("true").execute()
"""
__slots__ = ("_inner", "_loop_manager", "_sdk_client")
[docs]
def __init__(
self,
inner: AsyncUdfBuilder,
loop_manager: _EventLoopManager,
sdk_client: Client,
) -> None:
self._inner = inner
self._loop_manager = loop_manager
self._sdk_client = sdk_client
[docs]
def passing(self, *args: Any) -> SyncUdfBuilder:
"""Forward arguments to the server UDF (chainable)."""
self._inner.passing(*args)
return self
@overload
def where(self, expression: str) -> SyncUdfBuilder: ...
@overload
def where(self, expression: FilterExpression) -> SyncUdfBuilder: ...
[docs]
def where(
self,
expression: Union[str, FilterExpression],
) -> SyncUdfBuilder:
"""Restrict rows with an AEL string or :class:`~aerospike_async.FilterExpression`."""
self._inner.where(expression)
return self
[docs]
def respond_all_keys(self) -> SyncUdfBuilder:
"""Include results for missing keys in the stream.
Returns:
self for method chaining.
"""
self._inner.respond_all_keys()
return self
[docs]
def execute_udf(self, *keys: Key) -> SyncUdfFunctionBuilder:
"""Finalize this UDF spec and start another on *keys*."""
fb = self._inner.execute_udf(*keys)
return SyncUdfFunctionBuilder(fb, self._loop_manager, self._sdk_client)
[docs]
def query(
self,
arg1: Union[Key, List[Key]],
*more_keys: Key,
) -> SyncQueryBuilder:
qb = self._inner.query(arg1, *more_keys)
return SyncQueryBuilder(
async_client=self._sdk_client,
namespace=qb._namespace,
set_name=qb._set_name,
loop_manager=self._loop_manager,
query_builder=qb,
)
[docs]
def upsert(
self, arg1: Union[Key, List[Key]], *more_keys: Key,
) -> SyncWriteSegmentBuilder:
"""Finalize the UDF spec and start an upsert write segment."""
wsb = self._inner.upsert(arg1, *more_keys)
return SyncWriteSegmentBuilder(wsb, self._loop_manager)
[docs]
def insert(
self, arg1: Union[Key, List[Key]], *more_keys: Key,
) -> SyncWriteSegmentBuilder:
"""Finalize the UDF spec and start an insert-only write segment."""
wsb = self._inner.insert(arg1, *more_keys)
return SyncWriteSegmentBuilder(wsb, self._loop_manager)
[docs]
def update(
self, arg1: Union[Key, List[Key]], *more_keys: Key,
) -> SyncWriteSegmentBuilder:
wsb = self._inner.update(arg1, *more_keys)
return SyncWriteSegmentBuilder(wsb, self._loop_manager)
[docs]
def replace(
self, arg1: Union[Key, List[Key]], *more_keys: Key,
) -> SyncWriteSegmentBuilder:
"""Finalize the UDF spec and start a replace write segment."""
wsb = self._inner.replace(arg1, *more_keys)
return SyncWriteSegmentBuilder(wsb, self._loop_manager)
[docs]
def replace_if_exists(
self, arg1: Union[Key, List[Key]], *more_keys: Key,
) -> SyncWriteSegmentBuilder:
wsb = self._inner.replace_if_exists(arg1, *more_keys)
return SyncWriteSegmentBuilder(wsb, self._loop_manager)
[docs]
def delete(
self, arg1: Union[Key, List[Key]], *more_keys: Key,
) -> SyncWriteSegmentBuilder:
"""Finalize the UDF spec and start a delete segment."""
wsb = self._inner.delete(arg1, *more_keys)
return SyncWriteSegmentBuilder(wsb, self._loop_manager)
[docs]
def touch(
self, arg1: Union[Key, List[Key]], *more_keys: Key,
) -> SyncWriteSegmentBuilder:
"""Finalize the UDF spec and start a touch segment."""
wsb = self._inner.touch(arg1, *more_keys)
return SyncWriteSegmentBuilder(wsb, self._loop_manager)
[docs]
def exists(
self, arg1: Union[Key, List[Key]], *more_keys: Key,
) -> SyncWriteSegmentBuilder:
"""Finalize the UDF spec and start an exists-check segment."""
wsb = self._inner.exists(arg1, *more_keys)
return SyncWriteSegmentBuilder(wsb, self._loop_manager)
[docs]
def execute(self, on_error: OnError | None = None) -> SyncRecordStream:
"""Run the UDF and return a :class:`~aerospike_sdk.sync.record_stream.SyncRecordStream`.
Args:
on_error: Same semantics as query/write
:meth:`~aerospike_sdk.sync.operations.query.SyncQueryBuilder.execute`.
See Also:
:meth:`~aerospike_sdk.aio.operations.udf.UdfBuilder.execute`
"""
inner = self._inner
async def _run():
return await inner.execute(on_error)
stream = self._loop_manager.run_async(_run())
return SyncRecordStream(stream, self._loop_manager)