Source code for aerospike_sdk.aio.operations.cdt_write

# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

"""CDT action builders for write contexts.

Extends the read-only builders from ``cdt_read`` with:

- ``remove()`` / ``remove_all_others()`` terminals
- ``set_to(value)`` / ``add(value)`` write terminals (map key navigation)
- Collection-level map/list terminals (``map_clear``, ``map_upsert_items``, …)
- Nested navigation that preserves write capability through the chain
"""

from __future__ import annotations

from typing import Any, Callable, Optional, Sequence

from aerospike_async import (
    CTX,
    ListOperation,
    ListOrderType,
    ListPolicy,
    ListReturnType,
    ListSortFlags,
    ListWriteFlags,
    MapOperation,
    MapOrder,
    MapPolicy,
    MapReturnType,
    MapWriteFlags,
)

from aerospike_sdk.aio.operations.cdt_read import (
    CdtReadBuilder,
    CdtReadInvertableBuilder,
    T,
    _ReturnTypeCls,
    _map_item_pairs,
)

_DEFAULT_MAP_POLICY = MapPolicy(None, None)
_UNORDERED_LIST_POLICY = ListPolicy(None, None)
_ORDERED_LIST_POLICY = ListPolicy(ListOrderType.ORDERED, None)


def _resolve_list_policy(
    order: ListOrderType | None,
    *,
    unique: bool = False,
    bounded: bool = False,
    no_fail: bool = False,
    partial: bool = False,
) -> ListPolicy:
    """Build a ``ListPolicy`` from caller-supplied write-flag booleans.

    Flags are combined as a bitmask (same semantics as the server's list
    write flags). When none are set, returns a pre-built constant for the
    given *order* when possible (zero allocation).
    """
    flags = 0
    if unique:
        flags |= ListWriteFlags.ADD_UNIQUE
    if bounded:
        flags |= ListWriteFlags.INSERT_BOUNDED
    if no_fail:
        flags |= ListWriteFlags.NO_FAIL
    if partial:
        flags |= ListWriteFlags.PARTIAL
    if not flags:
        if order is None:
            return _UNORDERED_LIST_POLICY
        if order is ListOrderType.ORDERED:
            return _ORDERED_LIST_POLICY
        return ListPolicy(order, None)
    return ListPolicy(order, flags)


def _resolve_map_policy(
    base_flags: int,
    *,
    order: MapOrder | None = None,
    persist_index: bool = False,
    no_fail: bool = False,
    partial: bool = False,
) -> MapPolicy:
    """Build a ``MapPolicy`` from base flags plus caller-supplied options."""
    flags = base_flags
    if no_fail:
        flags |= MapWriteFlags.NO_FAIL
    if partial:
        flags |= MapWriteFlags.PARTIAL
    if persist_index:
        return MapPolicy.new_with_flags_and_persisted_index(order, flags)
    if flags:
        return MapPolicy.new_with_flags(order, flags)
    return MapPolicy(order, None)


class _RemoveMixin:
    """Shared remove logic for CDT write builders."""

    _remove_factory: Callable[[Any], Any]
    _parent: Any
    _rt: Any

    def _emit_remove(self, return_type: Any) -> Any:
        op = self._remove_factory(return_type)
        self._parent.add_operation(op)  # type: ignore[union-attr]
        return self._parent

    def remove(self, *, return_type: Any = None) -> Any:
        """Remove the selected CDT element(s).

        Example::

            await session.update(key).bin("m").on_map_key("x").remove().execute()

            stream = await (
                session.update(key)
                    .bin("scores")
                    .on_map_value_range(80, 100)
                    .remove(return_type=MapReturnType.VALUE)
                    .execute()
            )

        Args:
            return_type: What the server should return about the removed
                elements. Use :class:`~aerospike_async.MapReturnType` or
                :class:`~aerospike_async.ListReturnType` (for example
                ``MapReturnType.VALUE``, ``ListReturnType.COUNT``). When
                omitted, the server returns no removal metadata (``NONE``).

        Returns:
            The parent builder for chaining.

        See Also:
            :meth:`CdtWriteInvertableBuilder.remove_all_others`: Remove
            everything except the current selection (inverted).
        """
        rt = self._rt.NONE if return_type is None else return_type
        return self._emit_remove(rt)


[docs] class CdtWriteBuilder(_RemoveMixin, CdtReadBuilder[T]): """CDT action builder with read, remove, and write terminals. Inherits all read terminals and navigation from ``CdtReadBuilder``. Adds ``remove()``, and on map-key paths ``set_to()`` / ``add()``. Navigation returns ``CdtWriteBuilder`` to preserve write capability. Example: Set a nested map value and remove another key:: await ( session.upsert(key) .bin("settings").on_map_key("theme").set_to("dark") .bin("settings").on_map_key("old_key").remove() .execute() ) """ __slots__ = ("_remove_factory", "_set_to_factory", "_add_factory")
[docs] def __init__( self, parent: T, op_factory: Callable[[Any], Any], remove_factory: Callable[[Any], Any], return_type_cls: _ReturnTypeCls, *, is_map: bool, bin_name: str = "", ctx: Sequence[Any] = (), to_ctx: Callable[[], Any] | None = None, set_to_factory: Callable[[Any], Any] | None = None, add_factory: Callable[[Any], Any] | None = None, ) -> None: super().__init__( parent, op_factory, return_type_cls, is_map=is_map, bin_name=bin_name, ctx=ctx, to_ctx=to_ctx, ) self._remove_factory = remove_factory self._set_to_factory = set_to_factory self._add_factory = add_factory
def _build_navigated( self, *, op_factory: Callable[[Any], Any], rt_cls: _ReturnTypeCls, is_map: bool, ctx: Sequence[Any], to_ctx: Callable[[], Any], **extra: Any, ) -> CdtWriteBuilder[T]: return CdtWriteBuilder( self._parent, op_factory, extra.get("remove_factory", self._remove_factory), rt_cls, is_map=is_map, bin_name=self._bin_name, ctx=ctx, to_ctx=to_ctx, set_to_factory=extra.get("set_to_factory"), add_factory=extra.get("add_factory"), )
[docs] def on_map_key( self, key: Any, *, create_type: Optional[MapOrder] = None, ) -> CdtWriteBuilder[T]: """Navigate into a map element by key (supports set_to / add). Args: key: Map key to target. create_type: If set, use a create-on-missing context for this key with the given map key order. Returns: :class:`CdtWriteBuilder` for writing the targeted element. """ b, new_ctx, ctx_l = self._push_ctx() if create_type is not None: to_ctx = lambda: CTX.map_key_create(key, create_type) else: to_ctx = lambda: CTX.map_key(key) return self._build_navigated( op_factory=lambda rt: MapOperation.get_by_key(b, key, rt).set_context(ctx_l), remove_factory=lambda rt: MapOperation.remove_by_key(b, key, rt).set_context(ctx_l), set_to_factory=lambda v: MapOperation.put(b, key, v, _DEFAULT_MAP_POLICY).set_context(ctx_l), add_factory=lambda v: MapOperation.increment_value(b, key, v, _DEFAULT_MAP_POLICY).set_context(ctx_l), rt_cls=MapReturnType, is_map=True, ctx=new_ctx, to_ctx=to_ctx, )
[docs] def on_map_index(self, index: int) -> CdtWriteBuilder[T]: # type: ignore[override] """Navigate into a map element by index.""" return super().on_map_index(index) # type: ignore[return-value]
[docs] def on_map_rank(self, rank: int) -> CdtWriteBuilder[T]: # type: ignore[override] """Navigate into a map element by rank (0 = lowest value).""" return super().on_map_rank(rank) # type: ignore[return-value]
[docs] def on_list_index( self, index: int, *, order: Optional[ListOrderType] = None, pad: bool = False, ) -> CdtWriteBuilder[T]: # type: ignore[override] """Navigate into a list element by index.""" return super().on_list_index(index, order=order, pad=pad) # type: ignore[return-value]
[docs] def on_list_rank(self, rank: int) -> CdtWriteBuilder[T]: # type: ignore[override] """Navigate into a list element by rank (0 = lowest value).""" return super().on_list_rank(rank) # type: ignore[return-value]
def _build_invertable( self, op_factory: Callable[[Any], Any], rt_cls: _ReturnTypeCls, *, is_map: bool, ctx: Sequence[Any], to_ctx: Callable[[], Any] | None = None, remove_factory: Callable[[Any], Any] | None = None, ) -> CdtWriteInvertableBuilder[T]: assert remove_factory is not None return CdtWriteInvertableBuilder( self._parent, op_factory, remove_factory, rt_cls, is_map=is_map, bin_name=self._bin_name, ctx=ctx, to_ctx=to_ctx, )
[docs] def on_map_value(self, value: Any) -> CdtWriteInvertableBuilder[T]: """Navigate into map elements matching a value (may match multiple).""" b, new_ctx, ctx_l = self._push_ctx() return self._build_invertable( lambda rt: MapOperation.get_by_value(b, value, rt).set_context(ctx_l), MapReturnType, is_map=True, ctx=new_ctx, to_ctx=lambda: CTX.map_value(value), remove_factory=lambda rt: MapOperation.remove_by_value(b, value, rt).set_context(ctx_l), )
[docs] def on_map_index_range( self, index: int, count: Optional[int] = None, ) -> CdtWriteInvertableBuilder[T]: """Navigate into map elements by index range.""" b, new_ctx, ctx_l = self._push_ctx() if count is None: get_f = lambda rt: MapOperation.get_by_index_range_from( b, index, rt, ).set_context(ctx_l) rm_f = lambda rt: MapOperation.remove_by_index_range_from( b, index, rt, ).set_context(ctx_l) else: get_f = lambda rt: MapOperation.get_by_index_range( b, index, count, rt, ).set_context(ctx_l) rm_f = lambda rt: MapOperation.remove_by_index_range( b, index, count, rt, ).set_context(ctx_l) return self._build_invertable( get_f, MapReturnType, is_map=True, ctx=new_ctx, remove_factory=rm_f, )
[docs] def on_map_key_range( self, start: Any, end: Any, ) -> CdtWriteInvertableBuilder[T]: """Navigate into map elements by key range ``[start, end)``.""" b, new_ctx, ctx_l = self._push_ctx() return self._build_invertable( lambda rt: MapOperation.get_by_key_range( b, start, end, rt, ).set_context(ctx_l), MapReturnType, is_map=True, ctx=new_ctx, remove_factory=lambda rt: MapOperation.remove_by_key_range( b, start, end, rt, ).set_context(ctx_l), )
[docs] def on_map_rank_range( self, rank: int, count: Optional[int] = None, ) -> CdtWriteInvertableBuilder[T]: """Navigate into map elements by rank range.""" b, new_ctx, ctx_l = self._push_ctx() if count is None: get_f = lambda rt: MapOperation.get_by_rank_range_from( b, rank, rt, ).set_context(ctx_l) rm_f = lambda rt: MapOperation.remove_by_rank_range_from( b, rank, rt, ).set_context(ctx_l) else: get_f = lambda rt: MapOperation.get_by_rank_range( b, rank, count, rt, ).set_context(ctx_l) rm_f = lambda rt: MapOperation.remove_by_rank_range( b, rank, count, rt, ).set_context(ctx_l) return self._build_invertable( get_f, MapReturnType, is_map=True, ctx=new_ctx, remove_factory=rm_f, )
[docs] def on_map_value_range( self, start: Any, end: Any, ) -> CdtWriteInvertableBuilder[T]: """Navigate into map elements by value range ``[start, end)``.""" b, new_ctx, ctx_l = self._push_ctx() return self._build_invertable( lambda rt: MapOperation.get_by_value_range( b, start, end, rt, ).set_context(ctx_l), MapReturnType, is_map=True, ctx=new_ctx, remove_factory=lambda rt: MapOperation.remove_by_value_range( b, start, end, rt, ).set_context(ctx_l), )
[docs] def on_map_key_relative_index_range( self, key: Any, index: int, count: Optional[int] = None, ) -> CdtWriteInvertableBuilder[T]: """Navigate into map entries by index range relative to an anchor key.""" b, new_ctx, ctx_l = self._push_ctx() return self._build_invertable( lambda rt: MapOperation.get_by_key_relative_index_range( b, key, index, count, rt, ).set_context(ctx_l), MapReturnType, is_map=True, ctx=new_ctx, remove_factory=lambda rt: MapOperation.remove_by_key_relative_index_range( b, key, index, count, rt, ).set_context(ctx_l), )
[docs] def on_map_value_relative_rank_range( self, value: Any, rank: int, count: Optional[int] = None, ) -> CdtWriteInvertableBuilder[T]: """Navigate into map entries by value rank range relative to an anchor.""" b, new_ctx, ctx_l = self._push_ctx() return self._build_invertable( lambda rt: MapOperation.get_by_value_relative_rank_range( b, value, rank, count, rt, ).set_context(ctx_l), MapReturnType, is_map=True, ctx=new_ctx, remove_factory=lambda rt: MapOperation.remove_by_value_relative_rank_range( b, value, rank, count, rt, ).set_context(ctx_l), )
[docs] def on_map_key_list(self, keys: Sequence[Any]) -> CdtWriteInvertableBuilder[T]: """Navigate into map elements matching a list of keys.""" b, new_ctx, ctx_l = self._push_ctx() return self._build_invertable( lambda rt: MapOperation.get_by_key_list(b, keys, rt).set_context(ctx_l), MapReturnType, is_map=True, ctx=new_ctx, remove_factory=lambda rt: MapOperation.remove_by_key_list(b, keys, rt).set_context(ctx_l), )
[docs] def on_map_value_list( self, values: Sequence[Any], ) -> CdtWriteInvertableBuilder[T]: """Navigate into map elements matching a list of values.""" b, new_ctx, ctx_l = self._push_ctx() return self._build_invertable( lambda rt: MapOperation.get_by_value_list( b, values, rt, ).set_context(ctx_l), MapReturnType, is_map=True, ctx=new_ctx, remove_factory=lambda rt: MapOperation.remove_by_value_list( b, values, rt, ).set_context(ctx_l), )
[docs] def on_list_value(self, value: Any) -> CdtWriteInvertableBuilder[T]: """Navigate into list elements matching a value.""" b, new_ctx, ctx_l = self._push_ctx() return self._build_invertable( lambda rt: ListOperation.get_by_value(b, value, rt).set_context(ctx_l), ListReturnType, is_map=False, ctx=new_ctx, to_ctx=lambda: CTX.list_value(value), remove_factory=lambda rt: ListOperation.remove_by_value(b, value, rt).set_context(ctx_l), )
[docs] def on_list_index_range( self, index: int, count: Optional[int] = None, ) -> CdtWriteInvertableBuilder[T]: """Navigate into list elements by index range.""" b, new_ctx, ctx_l = self._push_ctx() return self._build_invertable( lambda rt: ListOperation.get_by_index_range( b, index, count, rt, ).set_context(ctx_l), ListReturnType, is_map=False, ctx=new_ctx, remove_factory=lambda rt: ListOperation.remove_by_index_range( b, index, count, rt, ).set_context(ctx_l), )
[docs] def on_list_rank_range( self, rank: int, count: Optional[int] = None, ) -> CdtWriteInvertableBuilder[T]: """Navigate into list elements by rank range.""" b, new_ctx, ctx_l = self._push_ctx() return self._build_invertable( lambda rt: ListOperation.get_by_rank_range( b, rank, count, rt, ).set_context(ctx_l), ListReturnType, is_map=False, ctx=new_ctx, remove_factory=lambda rt: ListOperation.remove_by_rank_range( b, rank, count, rt, ).set_context(ctx_l), )
[docs] def on_list_value_range( self, start: Any, end: Any, ) -> CdtWriteInvertableBuilder[T]: """Navigate into list elements by value range ``[start, end)``.""" b, new_ctx, ctx_l = self._push_ctx() return self._build_invertable( lambda rt: ListOperation.get_by_value_range( b, start, end, rt, ).set_context(ctx_l), ListReturnType, is_map=False, ctx=new_ctx, remove_factory=lambda rt: ListOperation.remove_by_value_range( b, start, end, rt, ).set_context(ctx_l), )
[docs] def on_list_value_relative_rank_range( self, value: Any, rank: int, count: Optional[int] = None, ) -> CdtWriteInvertableBuilder[T]: """Navigate into list elements by value rank range relative to anchor.""" b, new_ctx, ctx_l = self._push_ctx() return self._build_invertable( lambda rt: ListOperation.get_by_value_relative_rank_range( b, value, rank, count, rt, ).set_context(ctx_l), ListReturnType, is_map=False, ctx=new_ctx, remove_factory=lambda rt: ListOperation.remove_by_value_relative_rank_range( b, value, rank, count, rt, ).set_context(ctx_l), )
[docs] def on_list_value_list( self, values: Sequence[Any], ) -> CdtWriteInvertableBuilder[T]: """Navigate into list elements matching a list of values.""" b, new_ctx, ctx_l = self._push_ctx() return self._build_invertable( lambda rt: ListOperation.get_by_value_list( b, values, rt, ).set_context(ctx_l), ListReturnType, is_map=False, ctx=new_ctx, remove_factory=lambda rt: ListOperation.remove_by_value_list( b, values, rt, ).set_context(ctx_l), )
# -- Write terminals ------------------------------------------------------
[docs] def set_to(self, value: Any) -> T: """Set the value at the current CDT path. Requires prior ``on_map_key()`` navigation. Args: value: New value to store. Example:: .bin("m").on_map_key("color").set_to("blue") Raises: TypeError: If not preceded by ``on_map_key()`` navigation. """ if self._set_to_factory is None: raise TypeError( "set_to() requires on_map_key() navigation" ) op = self._set_to_factory(value) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def add(self, value: Any) -> T: """Increment the value at the current CDT path. Args: value: Numeric value to add. Returns: The parent builder for chaining. Raises: TypeError: If not preceded by ``on_map_key()`` navigation. """ if self._add_factory is None: raise TypeError( "add() requires on_map_key() navigation" ) op = self._add_factory(value) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
# -- Collection-level map -------------------------------------------------
[docs] def map_clear(self) -> T: """Remove all entries from the map at the current CDT path. Returns: The parent builder for chaining. """ ctx = self._context_list_for_nested_ops() op = MapOperation.clear(self._bin_name).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def map_upsert_items( self, items: Any, *, order: MapOrder | None = None, persist_index: bool = False, no_fail: bool = False, partial: bool = False, ) -> T: """Put multiple map entries (create or update each key). Args: items: Mapping or sequence of ``(key, value)`` pairs. order: Map key order for the policy. persist_index: Maintain a persistent index on the map. no_fail: Do not raise on write failures. partial: Allow partial success for bulk operations. Returns: The parent builder for chaining. """ ctx = self._context_list_for_nested_ops() pairs = _map_item_pairs(items) policy = _resolve_map_policy( MapWriteFlags.DEFAULT, order=order, persist_index=persist_index, no_fail=no_fail, partial=partial, ) op = MapOperation.put_items( self._bin_name, pairs, policy, ).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def map_insert_items( self, items: Any, *, order: MapOrder | None = None, persist_index: bool = False, no_fail: bool = False, partial: bool = False, ) -> T: """Put map entries only for keys that do not yet exist. Args: items: Mapping or sequence of ``(key, value)`` pairs. order: Map key order for the policy. persist_index: Maintain a persistent index on the map. no_fail: Do not raise on write failures. partial: Allow partial success for bulk operations. Returns: The parent builder for chaining. """ ctx = self._context_list_for_nested_ops() pairs = _map_item_pairs(items) policy = _resolve_map_policy( MapWriteFlags.CREATE_ONLY, order=order, persist_index=persist_index, no_fail=no_fail, partial=partial, ) op = MapOperation.put_items( self._bin_name, pairs, policy, ).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def map_update_items( self, items: Any, *, order: MapOrder | None = None, persist_index: bool = False, no_fail: bool = False, partial: bool = False, ) -> T: """Update existing map entries only (no new keys). Args: items: Mapping or sequence of ``(key, value)`` pairs. order: Map key order for the policy. persist_index: Maintain a persistent index on the map. no_fail: Do not raise on write failures. partial: Allow partial success for bulk operations. Returns: The parent builder for chaining. """ ctx = self._context_list_for_nested_ops() pairs = _map_item_pairs(items) policy = _resolve_map_policy( MapWriteFlags.UPDATE_ONLY, order=order, persist_index=persist_index, no_fail=no_fail, partial=partial, ) op = MapOperation.put_items( self._bin_name, pairs, policy, ).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def map_create(self, order: MapOrder) -> T: """Create an empty map with the given key order. Args: order: Key sort order for the map. Returns: The parent builder for chaining. """ ctx = self._context_list_for_nested_ops() op = MapOperation.create(self._bin_name, order).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def map_set_policy(self, order: MapOrder) -> T: """Set map sort order policy without changing entries. Args: order: Key sort order to apply. Returns: The parent builder for chaining. """ ctx = self._context_list_for_nested_ops() op = MapOperation.set_map_policy( self._bin_name, MapPolicy(order, None), ).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
# -- Collection-level list ------------------------------------------------
[docs] def list_clear(self) -> T: """Remove all elements from the list at the current CDT path. Returns: The parent builder for chaining. """ ctx = self._context_list_for_nested_ops() op = ListOperation.clear(self._bin_name).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def list_sort(self, flags: ListSortFlags = ListSortFlags.DEFAULT) -> T: """Sort the list at the current path. Args: flags: Sort behaviour flags (default ``DEFAULT``). Returns: The parent builder for chaining. """ ctx = self._context_list_for_nested_ops() op = ListOperation.sort(self._bin_name, flags).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def list_append_items( self, items: Sequence[Any], *, unique: bool = False, bounded: bool = False, no_fail: bool = False, partial: bool = False, ) -> T: """Append values to an unordered list. Args: items: Values to append. unique: Reject items that already exist in the list. bounded: Reject inserts beyond the current list bounds. no_fail: Do not raise on write failures. partial: Allow partial success for bulk operations. Returns: The parent builder for chaining. Example:: .bin("tags").on_map_key("items").list_append_items([10, 20]) """ ctx = self._context_list_for_nested_ops() policy = _resolve_list_policy( None, unique=unique, bounded=bounded, no_fail=no_fail, partial=partial, ) op = ListOperation.append_items( self._bin_name, items, policy, ).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def list_add_items( self, items: Sequence[Any], *, unique: bool = False, bounded: bool = False, no_fail: bool = False, partial: bool = False, ) -> T: """Insert values into an ordered list (sorted positions). Args: items: Values to insert. unique: Reject items that already exist in the list. bounded: Reject inserts beyond the current list bounds. no_fail: Do not raise on write failures. partial: Allow partial success for bulk operations. Returns: The parent builder for chaining. """ ctx = self._context_list_for_nested_ops() policy = _resolve_list_policy( ListOrderType.ORDERED, unique=unique, bounded=bounded, no_fail=no_fail, partial=partial, ) op = ListOperation.append_items( self._bin_name, items, policy, ).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def list_create( self, order: ListOrderType, *, pad: bool = False, persist_index: bool = False, ) -> T: """Create an empty list with the given order. Args: order: Element ordering for the list. pad: If ``True``, allow sparse indexes. persist_index: If ``True``, maintain a persistent index. Returns: The parent builder for chaining. """ ctx = self._context_list_for_nested_ops() op = ListOperation.create( self._bin_name, order, pad, persist_index, ).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def list_set_order(self, order: ListOrderType) -> T: """Set list sort order without changing elements. Args: order: Sort order to apply. Returns: The parent builder for chaining. """ ctx = self._context_list_for_nested_ops() op = ListOperation.set_order(self._bin_name, order).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
# -- Index-based list (nested CDT) ---------------------------------------
[docs] def list_insert( self, index: int, value: Any, *, unique: bool = False, bounded: bool = False, no_fail: bool = False, ) -> T: """Insert *value* at *index* in the list at the current CDT path. Args: index: List index (0-based; negative counts from the end). value: Element to insert. unique: Reject if the value already exists in the list. bounded: Reject if index is beyond the current list bounds. no_fail: Do not raise on write failures. """ ctx = self._context_list_for_nested_ops() policy = _resolve_list_policy( None, unique=unique, bounded=bounded, no_fail=no_fail, ) op = ListOperation.insert( self._bin_name, index, value, policy, ).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def list_insert_items( self, index: int, items: Sequence[Any], *, unique: bool = False, bounded: bool = False, no_fail: bool = False, partial: bool = False, ) -> T: """Insert *items* starting at *index* in the list at the current path. Args: index: List index at which to insert the first element. items: Values to insert in order. unique: Reject items that already exist in the list. bounded: Reject inserts beyond the current list bounds. no_fail: Do not raise on write failures. partial: Allow partial success for bulk operations. """ ctx = self._context_list_for_nested_ops() policy = _resolve_list_policy( None, unique=unique, bounded=bounded, no_fail=no_fail, partial=partial, ) op = ListOperation.insert_items( self._bin_name, index, items, policy, ).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def list_set(self, index: int, value: Any) -> T: """Replace the list element at *index* with *value*.""" ctx = self._context_list_for_nested_ops() op = ListOperation.set(self._bin_name, index, value).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def list_increment(self, index: int, value: int = 1) -> T: """Add *value* to the numeric element at *index* (default ``1``).""" ctx = self._context_list_for_nested_ops() if value == 1: op = ListOperation.increment_by_one(self._bin_name, index).set_context( ctx, ) else: op = ListOperation.increment( self._bin_name, index, value, _UNORDERED_LIST_POLICY, ).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def list_remove(self, index: int) -> T: """Remove the element at *index* from the list at the current path.""" ctx = self._context_list_for_nested_ops() op = ListOperation.remove(self._bin_name, index).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def list_remove_range( self, index: int, count: Optional[int] = None, ) -> T: """Remove *count* elements from *index*, or through the end if *count* is ``None``.""" ctx = self._context_list_for_nested_ops() if count is None: op = ListOperation.remove_range_from(self._bin_name, index).set_context( ctx, ) else: op = ListOperation.remove_range( self._bin_name, index, count, ).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def list_pop(self, index: int) -> T: """Pop the element at *index* (returned in the operate result).""" ctx = self._context_list_for_nested_ops() op = ListOperation.pop(self._bin_name, index).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def list_pop_range( self, index: int, count: Optional[int] = None, ) -> T: """Pop *count* elements from *index*, or through the end if *count* is ``None``.""" ctx = self._context_list_for_nested_ops() if count is None: op = ListOperation.pop_range_from(self._bin_name, index).set_context( ctx, ) else: op = ListOperation.pop_range( self._bin_name, index, count, ).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] def list_trim(self, index: int, count: int) -> T: """Keep *count* elements starting at *index*; remove the rest.""" ctx = self._context_list_for_nested_ops() op = ListOperation.trim(self._bin_name, index, count).set_context(ctx) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
[docs] class CdtWriteInvertableBuilder(CdtWriteBuilder[T], CdtReadInvertableBuilder[T]): """CDT action builder with read, inverted-read, remove, and nested write paths. Combines :class:`CdtWriteBuilder` navigation (including invertable selectors) with inverted read terminals from :class:`CdtReadInvertableBuilder`. Example: Remove everything except the selected range:: .bin("m").on_map_value_range(0, 100).remove_all_others() """ __slots__ = ()
[docs] def __init__( self, parent: T, op_factory: Callable[[Any], Any], remove_factory: Callable[[Any], Any], return_type_cls: _ReturnTypeCls, *, is_map: bool, bin_name: str = "", ctx: Sequence[Any] = (), to_ctx: Callable[[], Any] | None = None, ) -> None: CdtWriteBuilder.__init__( self, parent, op_factory, remove_factory, return_type_cls, is_map=is_map, bin_name=bin_name, ctx=ctx, to_ctx=to_ctx, set_to_factory=None, add_factory=None, )
[docs] def remove_all_others(self, *, return_type: Any = None) -> T: """Remove all CDT elements *except* the selected ones. The inverted selection flag is applied automatically; pass only the base return type (for example ``MapReturnType.COUNT``), not OR'd with ``INVERTED``. Example:: await session.update(key).bin("m").on_map_key_range("b", "d").remove_all_others().execute() stream = await ( session.update(key) .bin("m") .on_map_key_range("b", "d") .remove_all_others(return_type=MapReturnType.VALUE) .execute() ) Args: return_type: What the server should return about the removed elements. When omitted, the server returns no removal metadata (``NONE``). Returns: The parent builder for chaining. See Also: :meth:`remove`: Remove only the current selection. """ base = self._rt.NONE if return_type is None else return_type return self._emit_remove(base | self._rt.INVERTED)