Source code for aerospike_sdk.aio.operations.udf

# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Foreground UDF execution builders (single-key, batch, and chained operations)."""

from __future__ import annotations

from typing import Any, List, Union, overload

from aerospike_async import FilterExpression, Key

from aerospike_sdk.aio.operations.query import QueryBuilder, WriteSegmentBuilder
from aerospike_sdk.ael.parser import parse_ael
from aerospike_sdk.error_strategy import OnError
from aerospike_sdk.record_stream import RecordStream


[docs] class UdfFunctionBuilder: """First step of foreground UDF chaining: choose package and Lua function name. Produced by :meth:`~aerospike_sdk.aio.session.Session.execute_udf` or :meth:`UdfBuilder.execute_udf`. Call :meth:`function` before :meth:`UdfBuilder.passing` or :meth:`execute`. Example:: stream = await ( session.execute_udf(key) .function("my_module", "my_func") .execute() ) """ __slots__ = ("_qb",)
[docs] def __init__(self, qb: QueryBuilder) -> None: self._qb = qb
[docs] def function(self, package: str, function_name: str) -> UdfBuilder: """Select the registered module and function to invoke. Args: package: Server-side module name (no ``.lua`` suffix). function_name: Lua function symbol exported by the module. Returns: :class:`UdfBuilder` for arguments and execution. Raises: ValueError: If ``package`` or ``function_name`` is empty. """ if not package: raise ValueError("package must be a non-empty string") if not function_name: raise ValueError("function_name must be a non-empty string") self._qb._udf_package = package self._qb._udf_function = function_name self._qb._udf_args = None self._qb._op_type = "udf" return UdfBuilder(self._qb)
[docs] class UdfBuilder: """Supply UDF arguments, optional filter, then execute or chain another operation. After :meth:`UdfFunctionBuilder.function`, call :meth:`passing` with values passed to Lua (after the implicit record argument). Use :meth:`execute_udf` to append another UDF segment, or :meth:`query` / write verbs to switch operation type. Await :meth:`execute` to run the accumulated chain. Example:: stream = await ( session.execute_udf(key) .function("my_pkg", "my_func") .passing(1, "x") .execute() ) See Also: :meth:`~aerospike_sdk.aio.session.Session.execute_udf`: Entry point. """ __slots__ = ("_qb",)
[docs] def __init__(self, qb: QueryBuilder) -> None: self._qb = qb
[docs] def passing(self, *args: Any) -> UdfBuilder: """Set positional arguments forwarded to the Lua function. The Aerospike server automatically passes the record as the first argument to the UDF; values provided here follow it. Args: *args: Values serialized by the async client (scalars, lists, maps, bytes). Returns: This builder for chaining. Example:: builder.passing("binName", 42) """ self._qb._udf_args = list(args) return self
@overload def where(self, expression: str) -> UdfBuilder: ... @overload def where(self, expression: FilterExpression) -> UdfBuilder: ...
[docs] def where( self, expression: Union[str, FilterExpression], ) -> UdfBuilder: """Apply a filter expression so the UDF runs only when the predicate matches. Args: expression: AEL string or ``FilterExpression``. Returns: This builder for chaining. See Also: :meth:`QueryBuilder.where`: Same AEL for reads. """ if isinstance(expression, str): self._qb._filter_expression = parse_ael(expression) else: self._qb._filter_expression = expression return self
[docs] def respond_all_keys(self) -> UdfBuilder: """For batch UDF, emit a row per requested key (including not-found). Returns: This builder for chaining. See Also: :meth:`QueryBuilder.respond_all_keys`: Same flag for reads. """ self._qb._respond_all_keys = True return self
[docs] def execute_udf(self, *keys: Key) -> UdfFunctionBuilder: """Finalize this UDF operation and start another on *keys*. Args: *keys: One or more keys for the next UDF segment. Returns: A new :class:`UdfFunctionBuilder` to call :meth:`function` again. Raises: ValueError: If no keys are provided. """ if not keys: raise ValueError("At least one key is required") self._qb._finalize_udf_spec() self._qb._set_current_keys_from_varargs(keys) return UdfFunctionBuilder(self._qb)
[docs] def query( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> QueryBuilder: """Close the UDF operation and begin a read :class:`QueryBuilder` segment. Args: arg1: One key or a list of keys. *more_keys: Additional keys when ``arg1`` is a single key. Returns: :class:`QueryBuilder` for chaining. """ self._qb._finalize_udf_spec() self._qb._op_type = None self._qb._set_current_keys(arg1, *more_keys) return self._qb
[docs] def upsert( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Finalize the UDF operation and start an upsert :class:`WriteSegmentBuilder`. Returns: :class:`WriteSegmentBuilder` for chaining. """ self._qb._finalize_udf_spec() return self._qb._start_write_verb("upsert", arg1, *more_keys)
[docs] def insert( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Finalize the UDF operation and start an insert-only write segment. Returns: :class:`WriteSegmentBuilder` for chaining. """ self._qb._finalize_udf_spec() return self._qb._start_write_verb("insert", arg1, *more_keys)
[docs] def update( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Finalize the UDF operation and start an update-only write segment. Returns: :class:`WriteSegmentBuilder` for chaining. """ self._qb._finalize_udf_spec() return self._qb._start_write_verb("update", arg1, *more_keys)
[docs] def replace( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Finalize the UDF operation and start a replace write segment. Returns: :class:`WriteSegmentBuilder` for chaining. """ self._qb._finalize_udf_spec() return self._qb._start_write_verb("replace", arg1, *more_keys)
[docs] def replace_if_exists( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Finalize the UDF operation and start a replace-if-exists segment. Returns: :class:`WriteSegmentBuilder` for chaining. """ self._qb._finalize_udf_spec() return self._qb._start_write_verb("replace_if_exists", arg1, *more_keys)
[docs] def delete( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Finalize the UDF operation and start a delete segment. Returns: :class:`WriteSegmentBuilder` for chaining. """ self._qb._finalize_udf_spec() return self._qb._start_write_verb("delete", arg1, *more_keys)
[docs] def touch( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Finalize the UDF operation and start a touch segment. Returns: :class:`WriteSegmentBuilder` for chaining. """ self._qb._finalize_udf_spec() return self._qb._start_write_verb("touch", arg1, *more_keys)
[docs] def exists( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Finalize the UDF operation and start an exists-check segment. Returns: :class:`WriteSegmentBuilder` for chaining. """ self._qb._finalize_udf_spec() return self._qb._start_write_verb("exists", arg1, *more_keys)
[docs] async def execute(self, on_error: OnError | None = None) -> RecordStream: """Run the current builder state and return a :class:`~aerospike_sdk.record_stream.RecordStream`. Requires :meth:`function` to have been called for the pending UDF operation. Args: on_error: Same as :meth:`QueryBuilder.execute`. Returns: Stream of per-key results and optional ``udf_result`` fields. Example:: stream = await ( session.execute_udf(k1, k2) .function("pkg", "fn") .execute() ) Raises: ValueError: If no UDF function was selected before execute. """ if self._qb._udf_function is None: raise ValueError( "function(package, name) must be called before execute()", ) self._qb._finalize_udf_spec() return await self._qb.execute(on_error)