Source code for aerospike_sdk.aio.operations.query

# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

"""Chainable builders for reads, writes, and chained multi-operation queries."""

from __future__ import annotations

import logging
from dataclasses import dataclass, field
from typing import (
    TYPE_CHECKING,
    Any,
    Generic,
    List,
    Optional,
    Sequence,
    TypeVar,
    Union,
    overload,
)

from aerospike_async import (
    BasePolicy,
    BatchDeleteOp,
    BatchDeletePolicy,
    BatchPolicy,
    BatchReadOp,
    BatchReadPolicy,
    BatchUDFPolicy,
    BatchWriteOp,
    BatchWritePolicy,
    BitOperation,
    BitPolicy,
    BitwiseResizeFlags,
    BitWriteFlags,
    CTX,
    Client,
    ExecuteTask,
    Expiration,
    ExpOperation,
    ExpReadFlags,
    ExpWriteFlags,
    Filter,
    FilterExpression,
    GenerationPolicy,
    HllOperation,
    Key,
    ListOperation,
    ListOrderType,
    ListReturnType,
    ListSortFlags,
    MapOperation,
    MapOrder,
    MapPolicy,
    MapReturnType,
    MapWriteFlags,
    Operation,
    PartitionFilter,
    QueryDuration,
    QueryPolicy,
    ReadPolicy,
    RecordExistsAction,
    Replica,
    Statement,
    Txn,
    WritePolicy,
)
from aerospike_async.exceptions import ResultCode

from aerospike_sdk.aio.operations.cdt_read import (
    CdtReadBuilder,
    CdtReadInvertableBuilder,
    _map_item_pairs,
)
from aerospike_sdk.aio.operations.cdt_write import (
    CdtWriteBuilder,
    CdtWriteInvertableBuilder,
    _UNORDERED_LIST_POLICY,
    _resolve_list_policy,
    _resolve_map_policy,
)

log = logging.getLogger("aerospike_sdk.query")

_bitwise_and = BitOperation.and_
_bitwise_not = BitOperation.not_
_bitwise_or = BitOperation.or_


def _bit_policy_or_default(policy: Optional[Any]) -> Any:
    if policy is None:
        return BitPolicy(BitWriteFlags.DEFAULT)
    return policy


def _resize_flags_or_default(resize_flags: Optional[Any]) -> Any:
    if resize_flags is None:
        return BitwiseResizeFlags.DEFAULT
    return resize_flags



_TTL_NEVER_EXPIRE = -1
_TTL_DONT_UPDATE = -2
_TTL_SERVER_DEFAULT = 0


def _to_expiration(ttl: int) -> Expiration:
    """Convert an integer TTL value to an ``Expiration`` object."""
    if ttl == _TTL_NEVER_EXPIRE:
        return Expiration.NEVER_EXPIRE
    if ttl == _TTL_DONT_UPDATE:
        return Expiration.DONT_UPDATE
    if ttl == _TTL_SERVER_DEFAULT:
        return Expiration.NAMESPACE_DEFAULT
    return Expiration.seconds(ttl)


from aerospike_sdk.policy.policy_mapper import (
    to_batch_policy,
    to_batch_read_policy,
    to_query_policy,
    to_read_policy,
    to_write_policy,
)

from aerospike_sdk.background_shared import (
    make_background_write_policy,
    reject_unsupported_background_write_ops,
)
from aerospike_sdk.ael.parser import parse_ael, parse_ael_with_index
from aerospike_sdk.error_strategy import (
    ErrorHandler,
    OnError,
    _ErrorDisposition,
    _resolve_disposition,
)
from aerospike_sdk.exceptions import (
    AerospikeError,
    _convert_pac_exception,
    _result_code_to_exception,
)
from aerospike_sdk.policy.behavior_settings import OpKind, OpShape
from aerospike_sdk.record_result import RecordResult, batch_records_to_results
from aerospike_sdk.record_stream import RecordStream

[docs] @dataclass(frozen=True) class QueryHint: """Hint for influencing secondary index selection and query scheduling. Provide ``index_name`` to force a specific named secondary index, or ``bin_name`` to redirect the filter to a different bin's index. These two are mutually exclusive. ``query_duration`` overrides the policy's ``expected_duration`` for this query only. Example:: hint = QueryHint( index_name="age_idx", query_duration=QueryDuration.SHORT, ) stream = await ( session.query(dataset) .filter(Filter.equal("age", 30)) .with_hint(hint) .execute() ) Args: index_name: Force the query to use the named secondary index. bin_name: Redirect the filter to use a different bin's index. query_duration: Override ``expected_duration`` on the query policy. Raises: ValueError: If both ``index_name`` and ``bin_name`` are provided. See Also: :meth:`QueryBuilder.with_hint` """ index_name: Optional[str] = None bin_name: Optional[str] = None query_duration: Optional[QueryDuration] = None def __post_init__(self) -> None: if self.index_name is not None and self.bin_name is not None: raise ValueError( "index_name and bin_name are mutually exclusive; " "provide one or neither, not both" )
@dataclass class _FilterRecord: """Internal: wraps a Filter with optional creation metadata for hint reconstruction.""" filter: Filter method: Optional[str] = None identifier: Optional[str] = None args: Optional[tuple] = None ctx: Optional[List[CTX]] = None def rebuild_for_hint(self, hint: QueryHint) -> Filter: """Reconstruct this filter with the hint's index_name or bin_name override.""" if self.method is None or self.args is None: raise ValueError( "Cannot apply index_name/bin_name hint to a pre-built Filter. " "Use Filter.*_by_index() directly or let the PFC generate the " "filter via parse_ael_with_index()." ) if hint.index_name is not None: factory = getattr(Filter, f"{self.method}_by_index") f = factory(hint.index_name, *self.args) elif hint.bin_name is not None: factory = getattr(Filter, self.method) f = factory(hint.bin_name, *self.args) else: return self.filter if self.ctx: f = f.context(self.ctx) return f if TYPE_CHECKING: from aerospike_sdk.ael.filter_gen import IndexContext from aerospike_sdk.index_monitor import IndexesMonitor from aerospike_sdk.policy.behavior import Behavior class _WriteVerbs: """Mixin exposing write verbs that open a :class:`WriteSegmentBuilder`. Implemented on :class:`QueryBuilder` (chain from a read query) and :class:`WriteBinBuilder` (chain from a bin-scoped write). Each method finalizes the prior segment when applicable and targets new key(s). """ def _start_write_verb( self, op_type: str, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: raise NotImplementedError def upsert( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Open a create-or-update segment for the given key(s). Example:: await session.upsert(key).put({"name": "Bob"}).execute() Returns: :class:`WriteSegmentBuilder` for ``put`` / ``bin`` / ``execute``. See Also: :meth:`~aerospike_sdk.aio.session.Session.upsert`: Session entry point. """ return self._start_write_verb("upsert", arg1, *more_keys) def insert( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Open a create-only segment (fails if the record exists). Example:: await session.insert(key).put({"name": "Ada"}).execute() Returns: :class:`WriteSegmentBuilder` for further bins and :meth:`execute`. """ return self._start_write_verb("insert", arg1, *more_keys) def update( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Open an update-only segment (fails if the record is missing). Example:: await session.update(key).bin("count").add(1).execute() Returns: :class:`WriteSegmentBuilder` for further bins and :meth:`execute`. """ return self._start_write_verb("update", arg1, *more_keys) def replace( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Open a full replace segment (removes bins not written in this segment). Example:: await session.replace(key).put({"a": 1}).execute() Returns: :class:`WriteSegmentBuilder`. """ return self._start_write_verb("replace", arg1, *more_keys) def replace_if_exists( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Open replace-if-exists semantics (like replace, but only if the record exists). Example:: await session.replace_if_exists(key).put({"a": 1}).execute() Returns: :class:`WriteSegmentBuilder`. """ return self._start_write_verb("replace_if_exists", arg1, *more_keys) def delete( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Open a delete segment. Example:: await session.delete(key).execute() Returns: :class:`WriteSegmentBuilder` (often followed immediately by :meth:`execute`). """ return self._start_write_verb("delete", arg1, *more_keys) def touch( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Open a touch segment (TTL refresh). Example:: await session.touch(key).execute() Returns: :class:`WriteSegmentBuilder`. """ return self._start_write_verb("touch", arg1, *more_keys) def exists( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Open an exists-check segment. Example:: stream = await session.exists(key).execute() found = (await stream.first_or_raise()).as_bool() Returns: :class:`WriteSegmentBuilder`. """ return self._start_write_verb("exists", arg1, *more_keys) _OP_TYPE_TO_REA: dict[str, RecordExistsAction] = { "insert": RecordExistsAction.CREATE_ONLY, "update": RecordExistsAction.UPDATE_ONLY, "replace": RecordExistsAction.REPLACE, "replace_if_exists": RecordExistsAction.REPLACE_ONLY, } _FAST_WRITES_REQUIRING_KEY = frozenset({"update", "replace_if_exists"}) def _build_exp_write_flags( base: int, ignore_op_failure: bool, ignore_eval_failure: bool, delete_if_null: bool, ) -> int: """OR together ExpWriteFlags bitmask from boolean kwargs.""" flags = base if ignore_op_failure: flags |= ExpWriteFlags.POLICY_NO_FAIL if ignore_eval_failure: flags |= ExpWriteFlags.EVAL_NO_FAIL if delete_if_null: flags |= ExpWriteFlags.ALLOW_DELETE return flags @dataclass(slots=True) class _OperationSpec: """A single operation segment in a chained builder. Each spec captures the keys, accumulated bin operations, projected bins, optional filter expression, and the operation type for one segment in a chain. ``op_type`` is ``None`` for read/query segments. For write segments it is one of ``"upsert"``, ``"insert"``, ``"update"``, ``"replace"``, ``"replace_if_exists"``, ``"delete"``, ``"touch"``, or ``"exists"``. """ keys: List[Key] operations: List[Any] = field(default_factory=list) bins: Optional[List[str]] = None filter_expression: Optional[FilterExpression] = None op_type: Optional[str] = None generation: Optional[int] = None ttl_seconds: Optional[int] = None durable_delete: Optional[bool] = None udf_package: Optional[str] = None udf_function: Optional[str] = None udf_args: Optional[List[Any]] = None _T = TypeVar("_T")
[docs] class QueryBuilder(_WriteVerbs): """Chain reads, writes, UDF calls, filters, and policies before ``execute``. Start from :meth:`~aerospike_sdk.aio.session.Session.query` or :meth:`~aerospike_sdk.aio.client.Client.query`. Use :meth:`where` or :meth:`filter_expression` for server-side predicates, :meth:`bins` or :meth:`bin` for projections, and transition methods such as :meth:`upsert` for writes. Await :meth:`execute` for a :class:`~aerospike_sdk.record_stream.RecordStream`. Example: Set-wide read with filter and projection:: stream = await ( session.query(users) .where("$.status == 'active'") .bins(["user_id", "name"]) .execute() ) async for row in stream: if row.is_ok and row.record: print(row.record.bins) Point read on a key, then chain an upsert:: stream = await ( session.query(users.id("u1")) .bins(["name"]) .upsert(users.id("u1")) .put({"last_seen": 123}) .execute() ) See Also: :class:`WriteSegmentBuilder`: Bin writes after a write verb. :class:`QueryBinBuilder`: Per-bin read operations. """
[docs] def __init__( self, client: Client, namespace: str, set_name: str, behavior: Optional[Behavior] = None, indexes_monitor: Optional["IndexesMonitor"] = None, cached_read_policy: Optional[ReadPolicy] = None, cached_write_policy: Optional[WritePolicy] = None, txn: Optional[Txn] = None, ) -> None: """ Initialize a QueryBuilder. Args: client: The underlying async client. namespace: The namespace name. set_name: The set name. behavior: Optional Behavior for deriving policies. indexes_monitor: Optional monitor providing cached index metadata for transparent filter generation from AEL expressions. cached_read_policy: Pre-computed read policy from the session. cached_write_policy: Pre-computed write policy from the session. txn: Optional active :class:`~aerospike_async.Txn` captured from a transactional session at construction; every policy this builder hands to the PAC gets stamped with it. ``None`` means no transaction participation. Callers rarely pass this directly — transactional sessions thread it through automatically. """ self._client = client self._namespace = namespace self._set_name = set_name self._behavior = behavior self._indexes_monitor = indexes_monitor self._bins: Optional[List[str]] = None self._with_no_bins: bool = False self._filter_records: List[_FilterRecord] = [] self._filter_expression: Optional[FilterExpression] = None self._query_hint: Optional[QueryHint] = None self._where_ael: Optional[str] = None self._index_context: Optional["IndexContext"] = None self._policy: Optional[QueryPolicy] = None self._partition_filter: Optional[PartitionFilter] = None self._chunk_size: Optional[int] = None self._fail_on_filtered_out: bool = False self._respond_all_keys: bool = False self._operations: List[Any] = [] self._specs: List[_OperationSpec] = [] self._single_key: Optional[Key] = None self._keys: Optional[List[Key]] = None self._read_policy: Optional[ReadPolicy] = None self._op_type: Optional[str] = None self._generation: Optional[int] = None self._ttl_seconds: Optional[int] = None self._durable_delete: Optional[bool] = None self._default_filter_expression: Optional[FilterExpression] = None self._default_ttl_seconds: Optional[int] = None self._udf_package: Optional[str] = None self._udf_function: Optional[str] = None self._udf_args: Optional[List[Any]] = None # Reuse session-cached policies when available; fall back to # computing them lazily from the behavior on first use. # MRT participation: when set, every policy produced by this # builder is stamped via _apply_txn. The cached policies can't be # reused under MRT because they were pre-computed without a txn, so # we null them out to force re-derivation from behavior. self._txn: Optional[Txn] = txn if txn is None: self._base_read_policy: Optional[ReadPolicy] = cached_read_policy self._base_write_policy: Optional[WritePolicy] = cached_write_policy else: self._base_read_policy = None self._base_write_policy = None
def _apply_txn(self, policy: Any) -> Any: """Stamp this builder's captured txn on an outer policy in place. No-op when the builder was constructed outside a transactional session. Applied at every policy-construction site so the txn propagates uniformly without the caller touching the policy. Args: policy: A :class:`~aerospike_async.ReadPolicy`, :class:`~aerospike_async.WritePolicy`, :class:`~aerospike_async.QueryPolicy`, or :class:`~aerospike_async.BatchPolicy` (or ``None``). Returns: The same ``policy`` object, for fluent use. """ if self._txn is not None and policy is not None: policy.txn = self._txn return policy def _make_batch_policy( self, settings: Optional[Any], ) -> Optional[BatchPolicy]: """Build a BatchPolicy from settings and stamp the captured txn. Returns ``None`` when neither a settings bundle nor an active transaction is in play (the PAC tolerates a ``None`` batch policy in that case). Under MRT, always materializes a policy so the txn can ride along. Args: settings: Settings bundle from behavior (may be ``None``). Returns: A txn-stamped :class:`~aerospike_async.BatchPolicy`, or ``None`` when no policy is needed. """ bp = to_batch_policy(settings) if settings is not None else None if self._txn is not None and bp is None: bp = BatchPolicy() return self._apply_txn(bp) def _batch_policy_for( self, op_kind: "OpKind", op_shape: "OpShape", ) -> Optional[BatchPolicy]: """Shorthand: :meth:`_make_batch_policy` keyed off behavior settings.""" settings = ( self._behavior.get_settings(op_kind, op_shape) if self._behavior is not None else None ) return self._make_batch_policy(settings)
[docs] def with_txn(self, txn: Optional[Txn]) -> "QueryBuilder": """Opt this builder into (or out of) a specific transaction. Overrides any transaction captured at construction. Pass ``None`` to opt out of an ambient transaction (useful inside a :class:`~aerospike_sdk.aio.transactional_session.TransactionalSession` when a single operation must run outside the MRT). Args: txn: The :class:`~aerospike_async.Txn` to participate in, or ``None`` to run without a transaction. Returns: This builder for method chaining. Example: >>> async with session.begin_transaction() as tx: ... await tx.upsert(k1).bin("v").set_to(1).execute() ... # Run this one write outside the transaction: ... await tx.upsert(k2).with_txn(None).bin("v").set_to(2).execute() See Also: :meth:`aerospike_sdk.aio.session.Session.get_current_transaction` """ self._txn = txn # Cached policies were built without this override — drop them so # subsequent policy lookups re-derive from behavior with the right # txn stamped on. self._base_read_policy = None self._base_write_policy = None return self
[docs] def bins(self, bin_names: List[str]) -> QueryBuilder: """Restrict the read to a non-empty set of bin names. Mutually exclusive with :meth:`with_no_bins`. Args: bin_names: Non-empty list of bin names to return. Returns: This builder for method chaining. Raises: ValueError: If ``bin_names`` is empty or :meth:`with_no_bins` was already called. Example: Restrict a query or key read to specific bins:: stream = await session.query(users.id(1)).bins(["name", "email"]).execute() See Also: :meth:`with_no_bins`: Metadata-only reads without bin payloads. :meth:`bin`: Per-bin operations (CDT, expressions). """ if self._with_no_bins: raise ValueError("Cannot specify both 'with_no_bins' and provide a list of bin names") if not bin_names: raise ValueError("bin_names must not be empty; use with_no_bins() for metadata-only reads") self._bins = bin_names self._with_no_bins = False return self
[docs] def bin(self, bin_name: str) -> QueryBinBuilder[QueryBuilder]: """Start a bin-level read operation. Returns a :class:`QueryBinBuilder` for specifying how to read from the named bin (simple get, CDT navigation, or expression read). Args: bin_name: The bin to operate on. Returns: A QueryBinBuilder for method chaining. Example:: rs = await session.query(users.id(1)) \\ .bin("settings").on_map_key("theme").get_values() \\ .bin("age").get() \\ .execute() """ return QueryBinBuilder(self, bin_name)
[docs] def add_operation(self, op: Any) -> None: """Append a read operation produced by a bin or CDT builder.""" self._operations.append(op)
[docs] def with_write_operations( self, operations: Sequence[Any], ) -> QueryBuilder: """Attach scalar write operations for a background dataset task. Prefer :meth:`aerospike_sdk.aio.session.Session.background_task` for chained bin writes. Use with :meth:`execute_background_task` on a dataset query (no keys). Only ``Operation`` and ``ExpOperation.write``-style writes are valid; list, map, bit, and HLL operations are rejected before calling the client. Args: operations: Sequence of write operations (e.g. ``Operation.put``, ``Operation.touch``). Returns: self for method chaining. """ self._operations.extend(operations) return self
[docs] def with_no_bins(self) -> QueryBuilder: """ Specify that no bins should be read (header-only query). This method is useful when you only need to check for record existence or get metadata like generation numbers, without reading the actual data. This method cannot be used together with bins(). Returns: self for method chaining. Raises: ValueError: If used together with bins(). """ if self._bins is not None: raise ValueError("Cannot specify both 'with_no_bins' and provide a list of bin names") self._with_no_bins = True self._bins = [] return self
[docs] def filter(self, filter_obj: Filter) -> QueryBuilder: """Add a secondary index filter to the query. Args: filter_obj: The filter to add. Returns: This builder for method chaining. """ self._filter_records.append(_FilterRecord(filter=filter_obj)) return self
[docs] def filter_expression(self, expression: FilterExpression) -> QueryBuilder: """ Set a FilterExpression for server-side filtering. FilterExpression allows complex server-side filtering that doesn't require secondary indexes. This is more efficient than client-side filtering as it reduces network traffic and processing. Args: expression: The FilterExpression to apply. Returns: self for method chaining. Example:: # Filter by multiple conditions server-side filter_exp = FilterExpression.and_([ FilterExpression.eq( FilterExpression.string_bin("category"), FilterExpression.string_val("Shoes") ), FilterExpression.eq( FilterExpression.string_bin("usage"), FilterExpression.string_val("Sports") ) ]) recordset = await client.query("test", "products").filter_expression(filter_exp).execute() """ self._filter_expression = expression return self
@overload def where(self, expression: str) -> QueryBuilder: ... @overload def where(self, expression: FilterExpression) -> QueryBuilder: ...
[docs] def where( self, expression: Union[str, FilterExpression], ) -> QueryBuilder: """Apply a server-side filter for dataset queries or keyed reads that support it. String arguments are parsed with the AEL; prefer f-strings for dynamic literals. Pass a pre-built :class:`~aerospike_async.FilterExpression` when constructing filters programmatically. Args: expression: AEL string or ``FilterExpression``. Returns: This builder for chaining. Example: qb = session.query(ds).where("$.status == 'active'") qb = session.query(ds).where(f"$.score > {min_score}") See Also: :meth:`default_where`: Default filter for chained operations without their own. :meth:`filter_expression`: Attach an expression without AEL parsing. """ if isinstance(expression, str): self._where_ael = expression self._filter_expression = parse_ael(expression) else: self._where_ael = None self._filter_expression = expression return self
[docs] def with_index_context(self, index_context: "IndexContext") -> QueryBuilder: """Explicitly override the secondary index metadata used for filter generation. Most applications do **not** need this method. The client automatically discovers and caches secondary index metadata from the cluster in the background. Use this only when you need to force a specific index context that differs from the live cluster state. Args: index_context: Index metadata for the query's namespace. Returns: This builder for method chaining. See Also: :class:`~aerospike_sdk.ael.filter_gen.IndexContext` """ self._index_context = index_context return self
[docs] def with_policy(self, policy: QueryPolicy) -> QueryBuilder: """ Set the query policy. Args: policy: The query policy to use. Returns: self for method chaining. """ self._policy = policy return self
[docs] def with_read_policy(self, policy: ReadPolicy) -> QueryBuilder: """ Set the read policy (for single key or batch key queries). Args: policy: The read policy to use. Returns: self for method chaining. """ self._read_policy = policy return self
[docs] def partition(self, partition_filter: PartitionFilter) -> QueryBuilder: """Restrict a dataset query using a PAC :class:`~aerospike_async.PartitionFilter`. Prefer :meth:`on_partition` or :meth:`on_partition_range` for common cases. Args: partition_filter: Built filter (all partitions, by id, by range, etc.). Returns: This builder for chaining. See Also: :meth:`on_partition_range`: Inclusive start, exclusive end partition ids. """ self._partition_filter = partition_filter return self
[docs] def on_partitions(self, *partition_ids: int) -> QueryBuilder: """ Set partitions to query by partition IDs. Args: *partition_ids: One or more partition IDs to query. Returns: self for method chaining. Example:: query = session.query(dataset).on_partitions(1, 2, 3) """ if len(partition_ids) == 1: self._partition_filter = PartitionFilter.by_id(partition_ids[0]) else: # For multiple partitions, we need to use a range or multiple filters # Since PartitionFilter.by_id only takes one ID, we'll use by_range # for now. This is a limitation of the underlying client. min_id = min(partition_ids) max_id = max(partition_ids) self._partition_filter = PartitionFilter.by_range(min_id, max_id + 1) return self
[docs] def on_partition(self, part_id: int) -> QueryBuilder: """ Target a specific partition for the query. This method restricts the query to a single partition. This can be useful for load balancing or when you know the data distribution across partitions. Args: part_id: The partition ID to target (0-4095) Returns: self for method chaining Raises: ValueError: If part_id is out of range Example:: query = session.query(dataset).on_partition(5) """ return self.on_partition_range(part_id, part_id + 1)
[docs] def on_partition_range(self, start_incl: int, end_excl: int) -> QueryBuilder: """ Target a range of partitions for the query. This method restricts the query to a specific range of partitions. This can be useful for load balancing, parallel processing, or when you know the data distribution across partitions. The partition range can only be set once per query. Subsequent calls with different ranges will overwrite the previous range. Args: start_incl: Start partition (inclusive, 0-4095) end_excl: End partition (exclusive, 1-4096) Returns: self for method chaining Raises: ValueError: If partition range is invalid Example:: # Query partitions 0-2047 (first half) query = session.query(dataset).on_partition_range(0, 2048) # Query partitions 100-199 query = session.query(dataset).on_partition_range(100, 200) """ # Partition range validation if start_incl < 0 or start_incl >= 4096: raise ValueError(f"Start partition must be in range 0-4095, not {start_incl}") if end_excl < 1 or end_excl > 4096: raise ValueError(f"End partition must be in range 1-4096, not {end_excl}") if start_incl >= end_excl: raise ValueError( f"Start partition ({start_incl}) must be < end partition ({end_excl})" ) self._partition_filter = PartitionFilter.by_range(start_incl, end_excl) return self
[docs] def chunk_size(self, chunk_size: int) -> QueryBuilder: """Tune server-side streaming chunk size (maps to query policy ``max_records`` chunking). This method controls how many records are fetched per chunk from the server when using server-side streaming. The chunk size affects memory usage and network round trips (Larger values reduce round trips; smaller values bound memory per fetch). This is distinct from client-side pagination. Args: chunk_size: Records per chunk; must be positive. Returns: This builder for chaining. Raises: ValueError: If ``chunk_size <= 0``. Example:: query = session.query(dataset).chunk_size(100) See Also: :meth:`max_records`: Cap total records returned. """ if chunk_size <= 0: raise ValueError(f"Chunk size must be > 0, not {chunk_size}") self._chunk_size = chunk_size return self
[docs] def records_per_second(self, rps: int) -> QueryBuilder: """ Set the maximum records per second for the query. Args: rps: Maximum records per second to process. Returns: self for method chaining. Example:: query = session.query(dataset).records_per_second(1000) """ self._ensure_policy().records_per_second = rps return self
[docs] def max_records(self, max_records: int) -> QueryBuilder: """ Set the maximum number of records to return. Args: max_records: Maximum number of records to return. Returns: self for method chaining. Example:: query = session.query(dataset).max_records(10000) """ self._ensure_policy().max_records = max_records return self
[docs] def limit(self, limit: int) -> QueryBuilder: """ Set the maximum number of records to return (alias for max_records). This method is an alias for max_records(). It limits the total number of records returned by the query. Once the limit is reached, the query will stop processing. Args: limit: Maximum number of records to return (must be > 0). Returns: self for method chaining. Raises: ValueError: If limit is <= 0. Example:: query = session.query(dataset).limit(100) """ if limit <= 0: raise ValueError(f"Limit must be > 0, not {limit}") return self.max_records(limit)
[docs] def expected_duration(self, duration: "QueryDuration") -> QueryBuilder: """ Set the expected duration of the query. Args: duration: Expected duration (QueryDuration.LONG, QueryDuration.SHORT, or QueryDuration.LONG_RELAX_AP). Returns: self for method chaining. Example:: from aerospike_async import QueryDuration query = session.query(dataset).expected_duration(QueryDuration.SHORT) """ self._ensure_policy().expected_duration = duration return self
[docs] def with_hint(self, hint: QueryHint) -> QueryBuilder: """Attach a query hint for secondary index selection or scheduling. A hint can redirect which secondary index is used (``index_name``), remap the filter to a different bin (``bin_name``), or override the expected query duration (``query_duration``). Only one call to ``with_hint`` is allowed per builder. Example:: stream = await ( session.query(dataset) .filter(Filter.equal("age", 30)) .with_hint(QueryHint(index_name="age_idx")) .execute() ) Args: hint: A :class:`QueryHint` instance. Returns: This builder for method chaining. Raises: ValueError: If ``with_hint`` has already been called on this builder. See Also: :class:`QueryHint` """ if self._query_hint is not None: raise ValueError("with_hint() can only be called once per query builder") self._query_hint = hint return self
[docs] def replica(self, replica: "Replica") -> QueryBuilder: """ Set the replica preference for the query. Args: replica: Replica preference. One of ``Replica.MASTER``, ``Replica.MASTER_PROLES``, ``Replica.RANDOM``, ``Replica.SEQUENCE``, or ``Replica.PREFER_RACK``. Returns: self for method chaining. Example:: from aerospike_async import Replica query = session.query(dataset).replica(Replica.SEQUENCE) """ self._ensure_policy().replica = replica return self
[docs] def base_policy(self, base_policy: "BasePolicy") -> QueryBuilder: """ Set the base policy for the query. Args: base_policy: The base policy to use. Returns: self for method chaining. Example:: from aerospike_async import BasePolicy base = BasePolicy() query = session.query(dataset).base_policy(base) """ self._ensure_policy().base_policy = base_policy return self
[docs] def fail_on_filtered_out(self) -> QueryBuilder: """Surface rows that fail a filter as ``FILTERED_OUT`` instead of omitting them. Applies to key-based reads where a filter excludes the record. Without this flag, filtered keys may be absent from the stream depending on policy. Returns: This builder for chaining. See Also: :meth:`respond_all_keys`: Include missing-key rows in batch reads. """ self._fail_on_filtered_out = True return self
[docs] def respond_all_keys(self) -> QueryBuilder: """Ensure batch/point reads emit one row per requested key, including not-found. Missing keys appear as non-OK :class:`~aerospike_sdk.record_result.RecordResult` entries (typically ``KEY_NOT_FOUND``) instead of being skipped. Returns: This builder for chaining. See Also: :meth:`fail_on_filtered_out`: Filter mismatch vs missing key. """ self._respond_all_keys = True return self
# -- Chain-level defaults ------------------------------------------------- @overload def default_where(self, expression: str) -> QueryBuilder: ... @overload def default_where(self, expression: FilterExpression) -> QueryBuilder: ...
[docs] def default_where( self, expression: Union[str, FilterExpression], ) -> QueryBuilder: """Set a filter applied to any chained operation that does not call :meth:`where`. When a chain contains multiple operations (reads, writes, UDFs), each operation inherits this filter unless it supplies its own :meth:`where`. Example:: stream = await ( session.upsert(k1) .bin("status").set_to("active") .where(f"$.age >= {min_age}") .delete(k2, k3) .upsert(k4) .bin("flag").set_to(True) .default_where("$.active == true") .execute() ) # upsert(k1) keeps its own where(); the delete and # second upsert inherit default_where. Args: expression: AEL string or ``FilterExpression``. Returns: This builder for chaining. See Also: :meth:`where`: Per-operation filter on the current operation. """ if isinstance(expression, str): self._default_filter_expression = parse_ael(expression) else: self._default_filter_expression = expression return self
[docs] def default_expire_record_after_seconds(self, seconds: int) -> QueryBuilder: """Set a default TTL applied to chained operations that lack their own. Args: seconds: Time-to-live in seconds (must be > 0). Returns: self for method chaining. Raises: ValueError: If seconds is <= 0. """ if seconds <= 0: raise ValueError("seconds must be greater than 0") self._default_ttl_seconds = seconds return self
[docs] def default_never_expire(self) -> QueryBuilder: """Set the default TTL to never expire (TTL = -1).""" self._default_ttl_seconds = _TTL_NEVER_EXPIRE return self
[docs] def default_with_no_change_in_expiration(self) -> QueryBuilder: """Set the default to preserve each record's existing TTL (TTL = -2).""" self._default_ttl_seconds = _TTL_DONT_UPDATE return self
[docs] def default_expiry_from_server_default(self) -> QueryBuilder: """Set the default TTL to the namespace's server default (TTL = 0).""" self._default_ttl_seconds = _TTL_SERVER_DEFAULT return self
# -- Query stacking -------------------------------------------------------
[docs] def query( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> QueryBuilder: """Chain another query with new key(s) for batch/point stacking. Finalizes the current query segment and begins a new one with the given key(s). Each segment can have its own bins, operations, and filter expression. Dataset (index) queries cannot be stacked. Args: arg1: A single :class:`Key` or a ``List[Key]``. *more_keys: Additional keys (varargs). Returns: ``self`` for method chaining. Raises: ValueError: If the current query is a dataset query (no keys). Example:: rs = await session.query(users.ids(1, 2, 3)) \\ .bin("map").get() \\ .query(users.ids(4, 5, 6)) \\ .bin("name").get() \\ .execute() """ if (self._single_key is None and self._keys is None and not self._specs): raise ValueError( "Dataset (index) queries cannot be stacked. " "Query stacking is only supported for key-based queries." ) self._finalize_current_spec() self._op_type = None self._set_current_keys(arg1, *more_keys) return self
# -- Write transitions (QueryBuilder -> WriteSegmentBuilder) --------------- def _start_write_segment( self, op_type: str, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: """Finalize current spec, set up a write segment, return builder.""" self._finalize_current_spec() self._op_type = op_type self._set_current_keys(arg1, *more_keys) return WriteSegmentBuilder(self) def _start_write_verb( self, op_type: str, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: return self._start_write_segment(op_type, arg1, *more_keys) def _ensure_policy(self) -> QueryPolicy: """Return the existing policy or create a default one.""" if self._policy is None: self._policy = self._apply_txn(QueryPolicy()) return self._policy def _build_statement(self) -> Statement: """Build a Statement object from the builder configuration.""" bins = self._bins statement = Statement(self._namespace, self._set_name, bins) if self._filter_records: hint = self._query_hint needs_rebuild = ( hint is not None and (hint.index_name is not None or hint.bin_name is not None) ) filters = [] for rec in self._filter_records: if needs_rebuild and hint is not None and rec.method is not None: filters.append(rec.rebuild_for_hint(hint)) else: filters.append(rec.filter) statement.filters = filters return statement
[docs] async def execute( self, on_error: OnError | None = None, ) -> RecordStream: """Execute the query and return a :class:`RecordStream`. Handles single-key, batch-key, and dataset queries. When a chain contains multiple operations, each operation is executed and results are combined into a single stream. Args: on_error: Controls how per-record errors are surfaced. - ``None`` (default): single-key operations raise on error; batch / multi-key operations embed errors in the stream. - ``ErrorStrategy.IN_STREAM``: always embed errors in the stream as ``RecordResult`` entries. - A callable ``(key, index, exception) -> None``: errors are dispatched to the callback and excluded from the stream. Returns: A :class:`~aerospike_sdk.record_stream.RecordStream` of :class:`~aerospike_sdk.record_result.RecordResult` rows. Raises: AerospikeError: If the builder mixes dataset query with per-bin read operations (unsupported combination). AerospikeError: Typed subclasses for timeouts, connection failures, etc. when the client raises instead of embedding errors. Example: Single-key read with default error handling:: stream = await session.query(key).bins(["x"]).execute() row = await stream.first_or_raise() Multi-key read, keep errors in the stream:: stream = await ( session.query(k1, k2, k3) .execute(on_error=ErrorStrategy.IN_STREAM) ) rows = await stream.collect() See Also: :class:`~aerospike_sdk.error_strategy.ErrorStrategy`: ``on_error`` options. """ self._finalize_current_spec() if self._specs: # Fast path for the common single-spec case: skip the # sum/log overhead that only benefits multi-spec debugging. if len(self._specs) == 1: spec0 = self._specs[0] is_single = len(spec0.keys) == 1 # Ultra-fast path: single-key operations with no spec-level # overrides bypass the full _execute_spec → policy-build → # RecordStream chain and call the PAC directly. if ( is_single and on_error is None and spec0.filter_expression is None and spec0.generation is None and spec0.ttl_seconds is None and not spec0.durable_delete ): result = await self._execute_single_key_direct(spec0) if result is not None: return result if __debug__ and log.isEnabledFor(logging.DEBUG): log.debug( "execute: %s.%s specs=1 keys=%d", self._namespace, self._set_name, len(spec0.keys), ) disp = _resolve_disposition(on_error, is_single) handler = on_error if callable(on_error) else None return await self._execute_spec(spec0, disp, handler) total_keys = sum(len(s.keys) for s in self._specs) log.debug( "execute: %s.%s specs=%d keys=%d", self._namespace, self._set_name, len(self._specs), total_keys, ) is_single = False disp = _resolve_disposition(on_error, is_single) handler = on_error if callable(on_error) else None if self._specs_require_sequential_run(): sub_disp = _resolve_disposition(on_error, is_single_key=False) streams: List[RecordStream] = [] for spec in self._specs: streams.append( await self._execute_spec(spec, sub_disp, handler)) return RecordStream.chain(streams) batch_policy = self._batch_policy_for( OpKind.WRITE_NON_RETRYABLE, OpShape.BATCH) all_ops: list = [] all_keys: List[Key] = [] for spec in self._specs: all_keys.extend(spec.keys) all_ops.extend(self._spec_to_batch_ops(spec)) try: batch_records = await self._client.batch(batch_policy, all_ops) except Exception as e: return self._handle_batch_error(all_keys, e, disp, handler) return self._filtered_batch_stream(batch_records, disp, handler) # Dataset query path (no keys were specified) if self._operations: raise AerospikeError( "Bin-level read operations are not supported on dataset/index " "queries (requires Advanced Bin Projection, not yet available)", result_code=ResultCode.OP_NOT_APPLICABLE, ) return await self._execute_dataset_query()
@staticmethod def _reject_unsupported_background_write_ops( operations: Sequence[Any], ) -> None: reject_unsupported_background_write_ops(operations) def _make_background_write_policy(self) -> WritePolicy: return make_background_write_policy( self._behavior, self._filter_expression, None, None, )
[docs] async def execute_background_task(self) -> ExecuteTask: """Run a background write against all records matching this dataset query. Returns a server task handle; poll with ``wait_till_complete`` or ``query_status``. Requires :meth:`with_write_operations`; only scalar ``Operation`` / expression writes are allowed. Raises: ValueError: If the builder targets keys or has no write operations. AerospikeError: If unsupported operation types are present. """ self._finalize_current_spec() if self._specs: raise ValueError( "Background task execution applies only to dataset queries.", ) if not self._operations: raise ValueError( "At least one write operation is required; use " "with_write_operations(...).", ) self._reject_unsupported_background_write_ops(self._operations) log.debug( "background task: %s.%s ops=%d", self._namespace, self._set_name, len(self._operations), ) wp = self._make_background_write_policy() statement = self._build_statement() try: return await self._client.query_operate( wp, statement, list(self._operations)) except Exception as e: raise _convert_pac_exception(e) from e
[docs] async def execute_udf_background_task( self, package_name: str, function_name: str, args: Optional[Sequence[Any]] = None, ) -> ExecuteTask: """Apply a registered UDF to matching records as a background task. Do not use :meth:`with_write_operations` on the same builder. Raises: ValueError: If the builder targets keys or has write operations set. """ self._finalize_current_spec() if self._specs: raise ValueError( "Background task execution applies only to dataset queries.", ) if self._operations: raise ValueError( "Do not combine with_write_operations with " "execute_udf_background_task.", ) log.debug( "background UDF: %s.%s %s.%s", self._namespace, self._set_name, package_name, function_name, ) wp = self._make_background_write_policy() statement = self._build_statement() py_args: Optional[List[Any]] = list(args) if args is not None else None try: return await self._client.query_execute_udf( wp, statement, package_name, function_name, py_args) except Exception as e: raise _convert_pac_exception(e) from e
# -- Private helpers ------------------------------------------------------- def _set_current_keys( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> None: """Parse key argument(s) and set ``_single_key`` or ``_keys``.""" if isinstance(arg1, list): if not arg1: raise ValueError("keys list cannot be empty") self._keys = list(arg1) + list(more_keys) if more_keys else arg1 elif isinstance(arg1, Key): if more_keys: self._keys = [arg1, *more_keys] else: self._single_key = arg1 else: raise TypeError( f"requires a Key or List[Key], got {type(arg1).__name__}" ) def _finalize_current_spec(self) -> None: """Package the current key/ops/bins/filter/op_type state into an _OperationSpec.""" if self._single_key is not None: keys = [self._single_key] elif self._keys is not None: keys = self._keys else: return filt = self._filter_expression or self._default_filter_expression ttl = self._ttl_seconds if self._ttl_seconds is not None else self._default_ttl_seconds # Hand off the current operations list directly; allocate a fresh # one for the next spec instead of copying. self._specs.append(_OperationSpec( keys=keys, operations=self._operations, bins=self._bins, filter_expression=filt, op_type=self._op_type, generation=self._generation, ttl_seconds=ttl, durable_delete=self._durable_delete, udf_package=None, udf_function=None, udf_args=None, )) self._single_key = None self._keys = None self._operations = [] self._bins = None self._with_no_bins = False self._filter_expression = None self._op_type = None self._generation = None self._ttl_seconds = None self._durable_delete = None def _set_current_keys_from_varargs(self, keys: tuple[Key, ...]) -> None: if len(keys) == 1: self._single_key = keys[0] self._keys = None else: self._keys = list(keys) self._single_key = None def _clear_pending_udf_state(self) -> None: self._udf_package = None self._udf_function = None self._udf_args = None def _finalize_udf_spec(self) -> None: if self._udf_function is None: return if self._udf_package is None: raise ValueError("UDF package name is required") if self._single_key is not None: keys: List[Key] = [self._single_key] elif self._keys is not None: keys = list(self._keys) else: return filt = self._filter_expression or self._default_filter_expression udf_args: Optional[List[Any]] = ( list(self._udf_args) if self._udf_args is not None else None ) self._specs.append(_OperationSpec( keys=keys, operations=[], bins=None, filter_expression=filt, op_type="udf", generation=None, ttl_seconds=None, durable_delete=None, udf_package=self._udf_package, udf_function=self._udf_function, udf_args=udf_args, )) self._single_key = None self._keys = None self._operations = [] self._bins = None self._with_no_bins = False self._filter_expression = None self._op_type = None self._generation = None self._ttl_seconds = None self._durable_delete = None self._clear_pending_udf_state() def _specs_require_sequential_run(self) -> bool: return any(spec.op_type == "udf" for spec in self._specs) async def _execute_spec( self, spec: _OperationSpec, disp: _ErrorDisposition, handler: ErrorHandler | None, ) -> RecordStream: """Execute a single :class:`_OperationSpec`.""" keys = spec.keys op_type = spec.op_type if __debug__ and log.isEnabledFor(logging.DEBUG): log.debug( "_execute_spec: op=%s keys=%d ops=%d", op_type or "read", len(keys), len(spec.operations), ) if op_type is None: has_ops = bool(spec.operations) if len(keys) == 1: if has_ops: return await self._execute_single_key_operate( spec, disp, handler) return await self._execute_single_key_read( spec, disp, handler) if has_ops: return await self._execute_batch_read_operate( spec, disp, handler) return await self._execute_batch_read(spec, disp, handler) if op_type == "udf": if len(keys) == 1: return await self._execute_single_key_udf(spec, disp, handler) return await self._execute_batch_udf(spec, disp, handler) if op_type == "delete": if len(keys) == 1: return await self._execute_single_key_delete(spec, disp, handler) return await self._execute_batch_delete(spec, disp, handler) if op_type == "touch": if len(keys) == 1: return await self._execute_single_key_touch(spec, disp, handler) return await self._execute_batch_touch(spec, disp, handler) if op_type == "exists": if len(keys) == 1: return await self._execute_single_key_exists(spec, disp, handler) return await self._execute_batch_exists(spec, disp, handler) if len(keys) == 1: return await self._execute_single_key_write(spec, disp, handler) return await self._execute_batch_write(spec, disp, handler) def _make_udf_write_policy(self, spec: _OperationSpec) -> WritePolicy: if self._behavior is not None: wp = to_write_policy( self._behavior.get_settings( OpKind.WRITE_NON_RETRYABLE, OpShape.POINT)) else: wp = WritePolicy() self._apply_txn(wp) if spec.filter_expression is not None: wp.filter_expression = spec.filter_expression return wp async def _execute_single_key_udf( self, spec: _OperationSpec, disp: _ErrorDisposition, handler: ErrorHandler | None, ) -> RecordStream: key = spec.keys[0] pkg = spec.udf_package fn = spec.udf_function if pkg is None or fn is None: raise ValueError("UDF spec missing package or function name") wp = self._make_udf_write_policy(spec) try: val = await self._client.execute_udf( wp, key, pkg, fn, spec.udf_args) except Exception as e: return self._handle_error(key, e, disp, handler, op_type="udf") return RecordStream.from_list([ RecordResult( key=key, record=None, result_code=ResultCode.OK, index=0, udf_result=val, ), ]) async def _execute_batch_udf( self, spec: _OperationSpec, disp: _ErrorDisposition, handler: ErrorHandler | None, ) -> RecordStream: pkg = spec.udf_package fn = spec.udf_function if pkg is None or fn is None: raise ValueError("UDF spec missing package or function name") batch_policy = self._batch_policy_for( OpKind.WRITE_NON_RETRYABLE, OpShape.BATCH) udf_policy: Optional[BatchUDFPolicy] = None fe = spec.filter_expression if fe is not None: up = BatchUDFPolicy() up.filter_expression = fe udf_policy = up try: batch_records = await self._client.batch_apply( batch_policy, udf_policy, spec.keys, pkg, fn, spec.udf_args, ) except Exception as e: return self._handle_batch_error(spec.keys, e, disp, handler) return self._filtered_batch_stream( batch_records, disp, handler, op_type="udf") @staticmethod def _should_include_result( result_code: ResultCode, respond_all_keys: bool, fail_on_filtered_out: bool, ) -> bool: """Decide whether to include a result in the stream. Decides whether to include a per-key result in the stream. """ if result_code == ResultCode.OK: return True if result_code == ResultCode.KEY_NOT_FOUND_ERROR: return respond_all_keys if result_code == ResultCode.FILTERED_OUT: return fail_on_filtered_out or respond_all_keys return True def _filtered_batch_stream( self, batch_records, disp: _ErrorDisposition = _ErrorDisposition.IN_STREAM, handler: ErrorHandler | None = None, op_type: Optional[str] = None, ) -> RecordStream: """Convert batch records to a filtered RecordStream. Applies the error disposition to each per-record result: THROW raises on the first error, HANDLER dispatches errors to the callback (excluding them from the stream), IN_STREAM includes them. """ all_results = batch_records_to_results(list(batch_records)) filtered: list[RecordResult] = [] for r in all_results: if not r.is_ok and self._is_actionable(r.result_code, op_type): if disp is _ErrorDisposition.THROW: raise _result_code_to_exception( r.result_code, str(r.result_code), r.in_doubt) if disp is _ErrorDisposition.HANDLER and handler is not None: handler(r.key, r.index, _result_code_to_exception( r.result_code, str(r.result_code), r.in_doubt)) continue if not self._should_include_result( r.result_code, self._respond_all_keys, self._fail_on_filtered_out ): continue filtered.append(r) return RecordStream.from_list(filtered) _WRITES_REQUIRING_EXISTING_KEY = frozenset({"update", "replace_if_exists"}) def _is_actionable(self, rc: ResultCode, op_type: Optional[str]) -> bool: """Whether *rc* should be routed through disposition logic. ``KEY_NOT_FOUND_ERROR`` is only actionable when the operation explicitly requires an existing record (update, replace_if_exists). ``FILTERED_OUT`` is only actionable when ``fail_on_filtered_out`` has been set. All other non-OK codes are always actionable. """ if rc == ResultCode.KEY_NOT_FOUND_ERROR: return op_type in self._WRITES_REQUIRING_EXISTING_KEY if rc == ResultCode.FILTERED_OUT: return self._fail_on_filtered_out return True def _handle_error( self, key: Key, exc: Exception, disp: _ErrorDisposition, handler: ErrorHandler | None, index: int = 0, op_type: Optional[str] = None, ) -> RecordStream: """Route a per-key error according to the resolved disposition. The PAC raises ``ServerError`` for ``KEY_NOT_FOUND_ERROR`` and ``FILTERED_OUT`` rather than returning a sentinel. Whether these codes are routed through disposition depends on the operation context (see ``_is_actionable``). """ pfc_exc = _convert_pac_exception(exc) rc = pfc_exc.result_code or ResultCode.OK in_doubt = pfc_exc.in_doubt if self._is_actionable(rc, op_type): if disp is _ErrorDisposition.THROW: raise pfc_exc from exc if disp is _ErrorDisposition.HANDLER and handler is not None: handler(key, index, pfc_exc) return RecordStream.from_list([]) if not self._should_include_result( rc, self._respond_all_keys, self._fail_on_filtered_out ): return RecordStream.from_list([]) return RecordStream.from_error(key, rc, in_doubt, exception=pfc_exc) @staticmethod def _handle_batch_error( keys: List[Key], exc: Exception, disp: _ErrorDisposition, handler: ErrorHandler | None, ) -> RecordStream: """Route a batch-level error according to the resolved disposition. When the entire batch call fails (e.g. timeout, connection error), we create one error result per key. """ pfc_exc = _convert_pac_exception(exc) rc = pfc_exc.result_code or ResultCode.OK in_doubt = pfc_exc.in_doubt if disp is _ErrorDisposition.THROW: raise pfc_exc from exc if disp is _ErrorDisposition.HANDLER and handler is not None: for i, key in enumerate(keys): handler(key, i, pfc_exc) return RecordStream.from_list([]) results = [ RecordResult( key=key, record=None, result_code=rc, in_doubt=in_doubt, index=i, exception=pfc_exc, ) for i, key in enumerate(keys) ] return RecordStream.from_list(results) def _make_read_policy( self, spec: _OperationSpec, ) -> ReadPolicy: """Build a ``ReadPolicy`` for single-key reads.""" if self._read_policy is not None: rp = self._read_policy elif self._behavior is not None: if self._base_read_policy is None: self._base_read_policy = self._apply_txn(to_read_policy( self._behavior.get_settings(OpKind.READ, OpShape.POINT))) if spec.filter_expression is None: return self._base_read_policy rp = self._apply_txn(to_read_policy( self._behavior.get_settings(OpKind.READ, OpShape.POINT))) else: rp = self._apply_txn(ReadPolicy()) if spec.filter_expression is not None: rp.filter_expression = spec.filter_expression return rp async def _execute_single_key_direct( self, spec: _OperationSpec, ) -> Optional[RecordStream]: """Ultra-fast path for single-key reads and writes. Calls the PAC directly, bypassing _execute_spec, policy construction, and full RecordStream wrapping. Returns ``None`` if the operation type is not supported by this path (caller falls back to the normal chain). On PAC exceptions, uses the standard error disposition (single-key default = THROW). """ key = spec.keys[0] op_type = spec.op_type has_ops = bool(spec.operations) if op_type is None and not has_ops: # Simple read — all bins or projected bins. if self._base_read_policy is None and self._behavior is not None: self._base_read_policy = self._apply_txn(to_read_policy( self._behavior.get_settings(OpKind.READ, OpShape.POINT))) rp = self._apply_txn(self._base_read_policy or ReadPolicy()) try: record = await self._client.get(rp, key, spec.bins) except Exception as e: return self._handle_error( key, e, _ErrorDisposition.THROW, None) return RecordStream.from_single(key, record) if has_ops and op_type not in ("delete", "touch", "exists", "udf"): # Write via operate — upsert or verb with REA override. if self._base_write_policy is None and self._behavior is not None: self._base_write_policy = self._apply_txn(to_write_policy( self._behavior.get_settings( OpKind.WRITE_NON_RETRYABLE, OpShape.POINT))) rea = _OP_TYPE_TO_REA.get(op_type) if op_type else None if rea is not None: if self._behavior is not None: wp = self._apply_txn(to_write_policy( self._behavior.get_settings( OpKind.WRITE_NON_RETRYABLE, OpShape.POINT))) else: wp = self._apply_txn(WritePolicy()) wp.record_exists_action = rea else: wp = self._apply_txn(self._base_write_policy or WritePolicy()) try: record = await self._client.operate(wp, key, spec.operations) except Exception as e: return self._handle_error( key, e, _ErrorDisposition.THROW, None, op_type=spec.op_type) return RecordStream.from_single(key, record) # Not a simple case — fall back to normal chain. return None async def _execute_single_key_read( self, spec: _OperationSpec, disp: _ErrorDisposition, handler: ErrorHandler | None, ) -> RecordStream: key = spec.keys[0] read_policy = self._make_read_policy(spec) try: record = await self._client.get(read_policy, key, spec.bins) except Exception as e: return self._handle_error(key, e, disp, handler) return RecordStream.from_single(key, record) async def _execute_single_key_operate( self, spec: _OperationSpec, disp: _ErrorDisposition, handler: ErrorHandler | None, ) -> RecordStream: key = spec.keys[0] policy = self._apply_txn(WritePolicy()) if spec.filter_expression is not None: policy.filter_expression = spec.filter_expression try: record = await self._client.operate(policy, key, spec.operations) except Exception as e: return self._handle_error(key, e, disp, handler) return RecordStream.from_single(key, record) async def _execute_batch_read( self, spec: _OperationSpec, disp: _ErrorDisposition, handler: ErrorHandler | None, ) -> RecordStream: batch_read_policy = None if self._behavior is not None: settings = self._behavior.get_settings(OpKind.READ, OpShape.BATCH) batch_read_policy = to_batch_read_policy(settings) batch_policy = self._batch_policy_for(OpKind.READ, OpShape.BATCH) if spec.filter_expression is not None: if batch_read_policy is None: batch_read_policy = BatchReadPolicy() batch_read_policy.filter_expression = spec.filter_expression try: batch_records = await self._client.batch_read( batch_policy, batch_read_policy, spec.keys, spec.bins) except Exception as e: return self._handle_batch_error(spec.keys, e, disp, handler) return self._filtered_batch_stream(batch_records, disp, handler) async def _execute_batch_read_operate( self, spec: _OperationSpec, disp: _ErrorDisposition, handler: ErrorHandler | None, ) -> RecordStream: batch_policy = self._batch_policy_for(OpKind.READ, OpShape.BATCH) ops_per_key = [spec.operations] * len(spec.keys) bwp = None if spec.filter_expression is not None: bwp = BatchWritePolicy() bwp.filter_expression = spec.filter_expression try: batch_records = await self._client.batch_operate( batch_policy, bwp, spec.keys, ops_per_key) except Exception as e: return self._handle_batch_error(spec.keys, e, disp, handler) return self._filtered_batch_stream(batch_records, disp, handler) # -- Write execution helpers ---------------------------------------------- def _make_write_policy(self, spec: _OperationSpec) -> WritePolicy: """Build a ``WritePolicy`` for single-key writes.""" op_type = spec.op_type or "upsert" rea = _OP_TYPE_TO_REA.get(op_type) # Fast path: reuse the cached base policy when no spec-level # overrides exist and the op type doesn't require a REA change. if self._behavior is not None: if self._base_write_policy is None: self._base_write_policy = self._apply_txn(to_write_policy( self._behavior.get_settings( OpKind.WRITE_NON_RETRYABLE, OpShape.POINT))) if ( rea is None and spec.filter_expression is None and spec.generation is None and spec.ttl_seconds is None and not spec.durable_delete ): return self._base_write_policy wp = self._apply_txn(to_write_policy( self._behavior.get_settings( OpKind.WRITE_NON_RETRYABLE, OpShape.POINT))) else: wp = self._apply_txn(WritePolicy()) if rea is not None: wp.record_exists_action = rea if spec.filter_expression is not None: wp.filter_expression = spec.filter_expression if spec.generation is not None: wp.generation_policy = GenerationPolicy.EXPECT_GEN_EQUAL wp.generation = spec.generation if spec.ttl_seconds is not None: wp.expiration = _to_expiration(spec.ttl_seconds) if spec.durable_delete: wp.durable_delete = True return wp @staticmethod def _make_batch_write_policy(spec: _OperationSpec) -> Optional[BatchWritePolicy]: """Build a ``BatchWritePolicy`` for multi-key batch writes.""" has_settings = ( spec.filter_expression is not None or spec.generation is not None or spec.ttl_seconds is not None or spec.durable_delete ) if not has_settings: return None bwp = BatchWritePolicy() if spec.filter_expression is not None: bwp.filter_expression = spec.filter_expression if spec.generation is not None: bwp.generation = spec.generation if spec.ttl_seconds is not None: bwp.expiration = _to_expiration(spec.ttl_seconds) if spec.durable_delete: bwp.durable_delete = True return bwp @staticmethod def _make_batch_delete_policy(spec: _OperationSpec) -> Optional[BatchDeletePolicy]: """Build a ``BatchDeletePolicy`` for multi-key batch deletes.""" has_settings = ( spec.filter_expression is not None or spec.generation is not None or spec.durable_delete ) if not has_settings: return None bdp = BatchDeletePolicy() if spec.filter_expression is not None: bdp.filter_expression = spec.filter_expression if spec.generation is not None: bdp.generation = spec.generation if spec.durable_delete: bdp.durable_delete = True return bdp async def _execute_single_key_write( self, spec: _OperationSpec, disp: _ErrorDisposition, handler: ErrorHandler | None, ) -> RecordStream: key = spec.keys[0] wp = self._make_write_policy(spec) try: record = await self._client.operate(wp, key, spec.operations) except Exception as e: return self._handle_error( key, e, disp, handler, op_type=spec.op_type) return RecordStream.from_single(key, record) async def _execute_single_key_delete( self, spec: _OperationSpec, disp: _ErrorDisposition, handler: ErrorHandler | None, ) -> RecordStream: key = spec.keys[0] wp = self._make_write_policy(spec) try: existed = await self._client.delete(wp, key) except Exception as e: return self._handle_error( key, e, disp, handler, op_type="delete") rc = ResultCode.OK if existed else ResultCode.KEY_NOT_FOUND_ERROR if self._should_include_result( rc, self._respond_all_keys, self._fail_on_filtered_out ): return RecordStream.from_error(key, rc) return RecordStream.from_list([]) async def _execute_batch_write( self, spec: _OperationSpec, disp: _ErrorDisposition, handler: ErrorHandler | None, ) -> RecordStream: batch_policy = self._batch_policy_for( OpKind.WRITE_NON_RETRYABLE, OpShape.BATCH) bwp = self._make_batch_write_policy(spec) ops_per_key = [spec.operations] * len(spec.keys) try: batch_records = await self._client.batch_operate( batch_policy, bwp, spec.keys, ops_per_key) except Exception as e: return self._handle_batch_error(spec.keys, e, disp, handler) return self._filtered_batch_stream( batch_records, disp, handler, op_type=spec.op_type) async def _execute_batch_delete( self, spec: _OperationSpec, disp: _ErrorDisposition, handler: ErrorHandler | None, ) -> RecordStream: batch_policy = self._batch_policy_for( OpKind.WRITE_NON_RETRYABLE, OpShape.BATCH) bdp = self._make_batch_delete_policy(spec) try: batch_records = await self._client.batch_delete( batch_policy, bdp, spec.keys) except Exception as e: return self._handle_batch_error(spec.keys, e, disp, handler) return self._filtered_batch_stream( batch_records, disp, handler, op_type="delete") async def _execute_single_key_touch( self, spec: _OperationSpec, disp: _ErrorDisposition, handler: ErrorHandler | None, ) -> RecordStream: key = spec.keys[0] wp = self._make_write_policy(spec) try: await self._client.touch(wp, key) except Exception as e: return self._handle_error(key, e, disp, handler, op_type="touch") if self._should_include_result( ResultCode.OK, self._respond_all_keys, self._fail_on_filtered_out ): return RecordStream.from_error(key, ResultCode.OK) return RecordStream.from_list([]) async def _execute_batch_touch( self, spec: _OperationSpec, disp: _ErrorDisposition, handler: ErrorHandler | None, ) -> RecordStream: batch_policy = self._batch_policy_for( OpKind.WRITE_NON_RETRYABLE, OpShape.BATCH) bwp = self._make_batch_write_policy(spec) touch_ops = [Operation.touch()] ops_per_key = [touch_ops] * len(spec.keys) try: batch_records = await self._client.batch_operate( batch_policy, bwp, spec.keys, ops_per_key) except Exception as e: return self._handle_batch_error(spec.keys, e, disp, handler) return self._filtered_batch_stream( batch_records, disp, handler, op_type="touch") async def _execute_single_key_exists( self, spec: _OperationSpec, disp: _ErrorDisposition, handler: ErrorHandler | None, ) -> RecordStream: key = spec.keys[0] if self._read_policy is not None: rp = self._read_policy elif self._behavior is not None: rp = self._apply_txn(to_read_policy( self._behavior.get_settings(OpKind.READ, OpShape.POINT))) else: rp = self._apply_txn(ReadPolicy()) if spec.filter_expression is not None: rp.filter_expression = spec.filter_expression try: found = await self._client.exists(rp, key) except Exception as e: return self._handle_error( key, e, disp, handler, op_type="exists") rc = ResultCode.OK if found else ResultCode.KEY_NOT_FOUND_ERROR if self._should_include_result( rc, self._respond_all_keys, self._fail_on_filtered_out ): return RecordStream.from_error(key, rc) return RecordStream.from_list([]) async def _execute_batch_exists( self, spec: _OperationSpec, disp: _ErrorDisposition, handler: ErrorHandler | None, ) -> RecordStream: batch_policy = self._batch_policy_for(OpKind.READ, OpShape.BATCH) brp = self._make_batch_read_policy(spec) try: found_list = await self._client.batch_exists( batch_policy, brp, spec.keys) except Exception as e: return self._handle_batch_error(spec.keys, e, disp, handler) results = [] for key, found in zip(spec.keys, found_list): rc = ResultCode.OK if found else ResultCode.KEY_NOT_FOUND_ERROR if self._should_include_result( rc, self._respond_all_keys, self._fail_on_filtered_out ): results.append(RecordResult(key, None, rc)) return RecordStream.from_list(results) # -- Mixed-batch execution (multi-spec chains) ---------------------------- def _spec_to_batch_ops( self, spec: _OperationSpec, ) -> list: """Convert one spec into a list of ``BatchReadOp`` / ``BatchWriteOp`` / ``BatchDeleteOp`` objects for the PAC mixed-batch API.""" ops: list = [] op_type = spec.op_type if op_type is None: brp = self._make_batch_read_policy(spec) for key in spec.keys: if spec.operations: ops.append(BatchReadOp( key, operations=list(spec.operations), policy=brp)) else: ops.append(BatchReadOp(key, bins=spec.bins, policy=brp)) elif op_type == "delete": bdp = self._make_batch_delete_policy(spec) for key in spec.keys: ops.append(BatchDeleteOp(key, policy=bdp)) elif op_type == "touch": bwp = self._make_batch_write_policy_mixed(spec) touch_ops = [Operation.touch()] for key in spec.keys: ops.append(BatchWriteOp(key, touch_ops, policy=bwp)) elif op_type == "exists": brp = self._make_batch_read_policy(spec) for key in spec.keys: ops.append(BatchReadOp(key, bins=[], policy=brp)) else: bwp = self._make_batch_write_policy_mixed(spec) for key in spec.keys: ops.append(BatchWriteOp( key, list(spec.operations), policy=bwp)) return ops @staticmethod def _make_batch_read_policy( spec: _OperationSpec, ) -> Optional[BatchReadPolicy]: """Build a ``BatchReadPolicy`` from per-spec settings.""" if spec.filter_expression is None: return None brp = BatchReadPolicy() brp.filter_expression = spec.filter_expression return brp @staticmethod def _make_batch_write_policy_mixed( spec: _OperationSpec, ) -> Optional[BatchWritePolicy]: """Build a ``BatchWritePolicy`` that includes ``record_exists_action`` for use in mixed-batch calls.""" op_type = spec.op_type or "upsert" rea = _OP_TYPE_TO_REA.get(op_type) has_settings = ( rea is not None or spec.filter_expression is not None or spec.generation is not None or spec.ttl_seconds is not None or spec.durable_delete ) if not has_settings: return None bwp = BatchWritePolicy() if rea is not None: bwp.record_exists_action = rea if spec.filter_expression is not None: bwp.filter_expression = spec.filter_expression if spec.generation is not None: bwp.generation_policy = GenerationPolicy.EXPECT_GEN_EQUAL bwp.generation = spec.generation if spec.ttl_seconds is not None: bwp.expiration = _to_expiration(spec.ttl_seconds) if spec.durable_delete: bwp.durable_delete = True return bwp async def _execute_dataset_query(self) -> RecordStream: log.debug( "dataset query: %s.%s filter=%s chunk=%s hint=%s", self._namespace, self._set_name, self._filter_expression is not None or bool(self._filter_records), self._chunk_size, self._query_hint is not None, ) if self._policy is not None: policy = self._policy elif self._behavior is not None: policy = self._apply_txn(to_query_policy( self._behavior.get_settings(OpKind.READ, OpShape.QUERY))) else: policy = self._apply_txn(QueryPolicy()) if self._chunk_size is not None and self._chunk_size > 0: policy.max_records = self._chunk_size if self._filter_expression is not None: policy.filter_expression = self._filter_expression hint = self._query_hint if hint is not None and hint.query_duration is not None: policy.expected_duration = hint.query_duration self._resolve_index_context() partition_filter = self._partition_filter or PartitionFilter.all() if self._where_ael is not None and self._index_context is not None: self._auto_generate_filters(hint, policy) statement = self._build_statement() try: recordset = await self._client.query( policy, partition_filter, statement) except Exception as e: raise _convert_pac_exception(e) from e if self._chunk_size is not None and self._chunk_size > 0: client = self._client async def _reexecute(pf: PartitionFilter) -> Any: return await client.query(policy, pf, statement) return RecordStream.from_chunked_recordset( recordset, reexecute=_reexecute, limit=0, ) return RecordStream.from_recordset(recordset) def _resolve_index_context(self) -> None: """Auto-populate ``_index_context`` from the monitor when not set.""" if self._index_context is not None: return if self._indexes_monitor is None: return ctx = self._indexes_monitor.get_index_context(self._namespace) if ctx is not None: self._index_context = ctx def _auto_generate_filters( self, hint: Optional[QueryHint], policy: QueryPolicy, ) -> None: """Parse AEL with index context to generate Filter + Exp. When a hint provides ``index_name`` or ``bin_name``, those overrides are forwarded to the filter generation pipeline. """ if self._where_ael is None or self._index_context is None: return hint_index = hint.index_name if hint is not None else None hint_bin = hint.bin_name if hint is not None else None result = parse_ael_with_index( self._where_ael, self._index_context, hint_index_name=hint_index, hint_bin_name=hint_bin, ) if result.filter is not None: self._filter_records.append(_FilterRecord(filter=result.filter)) log.debug( "Auto-selected secondary index filter for query on %s.%s", self._namespace, self._set_name, ) if result.exp is not None: policy.filter_expression = result.exp
[docs] class WriteSegmentBuilder(_WriteVerbs): """Accumulate scalar and CDT writes for the current operation's key(s). Obtained from :class:`QueryBuilder` after a write verb or from :class:`WriteBinBuilder` when chaining. Call :meth:`put`, :meth:`bin`, expression helpers, optional :meth:`where` / TTL / generation guards, then :meth:`execute` on this object or transition with :meth:`query` / another write verb on the mixin. Example: Upsert two bins, then read the stream of results:: stream = await ( session.upsert(key) .put({"name": "Ada", "score": 100}) .execute() ) See Also: :meth:`QueryBuilder.execute`: Runs all chained operations. """ __slots__ = ("_qb",)
[docs] def __init__(self, qb: QueryBuilder) -> None: self._qb = qb
[docs] def with_txn(self, txn: Optional[Txn]) -> "WriteSegmentBuilder": """Opt this write into (or out of) a specific transaction. Delegates to the underlying :class:`QueryBuilder`; see :meth:`QueryBuilder.with_txn`. Args: txn: The :class:`~aerospike_async.Txn` to participate in, or ``None`` to run without a transaction. Returns: This segment for chaining. See Also: :meth:`QueryBuilder.with_txn` """ self._qb.with_txn(txn) return self
# -- Bin operations -------------------------------------------------------
[docs] def bin(self, bin_name: str) -> WriteBinBuilder: """Start a bin-level write operation. Args: bin_name: The bin to operate on. Returns: A WriteBinBuilder for method chaining. """ return WriteBinBuilder(self, bin_name)
[docs] def put(self, bins: dict) -> WriteSegmentBuilder: """Apply ``Operation.put`` for each bin in the mapping. Args: bins: Map of bin name to value. Returns: This segment for chaining. Example:: await session.upsert(key).put({"email": "a@b.com", "age": 30}).execute() See Also: :meth:`bin`: Per-bin CDT or scalar follow-ups. """ for bin_name, value in bins.items(): self._qb._operations.append(Operation.put(bin_name, value)) return self
[docs] def set_bins(self, bins: dict) -> WriteSegmentBuilder: """Alias for :meth:`put`.""" return self.put(bins)
def _add_op(self, op: Any) -> WriteSegmentBuilder: self._qb._operations.append(op) return self
[docs] def add_operation(self, op: Any) -> None: """Append an operation (used by CDT action builders).""" self._qb._operations.append(op)
# -- Scalar bin operations (direct on segment) ----------------------------
[docs] def set_to(self, bin_name: str, value: Any) -> WriteSegmentBuilder: """Set a bin to *value*.""" return self._add_op(Operation.put(bin_name, value))
[docs] def add(self, bin_name: str, value: Any) -> WriteSegmentBuilder: """Add a numeric *value* to a bin.""" return self._add_op(Operation.add(bin_name, value))
[docs] def increment_by(self, bin_name: str, value: Any) -> WriteSegmentBuilder: """Alias for :meth:`add`.""" return self.add(bin_name, value)
[docs] def get(self, bin_name: str) -> WriteSegmentBuilder: """Read a bin value back within a write operate.""" return self._add_op(Operation.get_bin(bin_name))
[docs] def append(self, bin_name: str, value: str) -> WriteSegmentBuilder: """Append a string to a bin.""" return self._add_op(Operation.append(bin_name, value))
[docs] def prepend(self, bin_name: str, value: str) -> WriteSegmentBuilder: """Prepend a string to a bin.""" return self._add_op(Operation.prepend(bin_name, value))
[docs] def remove_bin(self, bin_name: str) -> WriteSegmentBuilder: """Delete a bin from the record.""" return self._add_op(Operation.put(bin_name, None))
# -- Record-level operations ----------------------------------------------
[docs] def delete_record(self) -> WriteSegmentBuilder: """Add a record-level delete to the current operate call. Unlike :meth:`~_WriteVerbs.delete` which targets a different key, this deletes the record being operated on as part of the same atomic operation. Example:: stream = await ( session.upsert(key) .bin("name").get() .delete_record() .execute() ) Returns: This segment for chaining. See Also: :meth:`~_WriteVerbs.delete`: Start a new delete segment for a key. """ return self._add_op(Operation.delete())
[docs] def touch_record(self) -> WriteSegmentBuilder: """Add a record-level touch to the current operate call. Resets the record's TTL as part of an atomic multi-operation call. Combine with :meth:`expire_record_after_seconds` to set a new TTL. Example:: stream = await ( session.upsert(key) .bin("score").get() .touch_record() .expire_record_after_seconds(120) .execute() ) Returns: This segment for chaining. See Also: :meth:`~_WriteVerbs.touch`: Start a new touch segment for a key. """ return self._add_op(Operation.touch())
# -- Expression operations (direct on segment) ----------------------------
[docs] def select_from( self, bin_name: str, expression: Union[str, FilterExpression], *, ignore_eval_failure: bool = False, ) -> WriteSegmentBuilder: """Read a computed value into a bin using an AEL expression.""" flags = ExpReadFlags.EVAL_NO_FAIL if ignore_eval_failure else ExpReadFlags.DEFAULT expr = parse_ael(expression) if isinstance(expression, str) else expression return self._add_op(ExpOperation.read(bin_name, expr, flags))
[docs] def insert_from( self, bin_name: str, expression: Union[str, FilterExpression], *, ignore_op_failure: bool = False, ignore_eval_failure: bool = False, delete_if_null: bool = False, ) -> WriteSegmentBuilder: """Write expression result only if bin does not already exist.""" flags = _build_exp_write_flags( ExpWriteFlags.CREATE_ONLY, ignore_op_failure, ignore_eval_failure, delete_if_null, ) expr = parse_ael(expression) if isinstance(expression, str) else expression return self._add_op(ExpOperation.write(bin_name, expr, flags))
[docs] def update_from( self, bin_name: str, expression: Union[str, FilterExpression], *, ignore_op_failure: bool = False, ignore_eval_failure: bool = False, delete_if_null: bool = False, ) -> WriteSegmentBuilder: """Write expression result only if bin already exists.""" flags = _build_exp_write_flags( ExpWriteFlags.UPDATE_ONLY, ignore_op_failure, ignore_eval_failure, delete_if_null, ) expr = parse_ael(expression) if isinstance(expression, str) else expression return self._add_op(ExpOperation.write(bin_name, expr, flags))
[docs] def upsert_from( self, bin_name: str, expression: Union[str, FilterExpression], *, ignore_op_failure: bool = False, ignore_eval_failure: bool = False, delete_if_null: bool = False, ) -> WriteSegmentBuilder: """Write expression result, creating or overwriting the bin.""" flags = _build_exp_write_flags( ExpWriteFlags.DEFAULT, ignore_op_failure, ignore_eval_failure, delete_if_null, ) expr = parse_ael(expression) if isinstance(expression, str) else expression return self._add_op(ExpOperation.write(bin_name, expr, flags))
# -- Transition methods ---------------------------------------------------
[docs] def query( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> QueryBuilder: """Finalize current write segment and start a read segment. Args: arg1: A single Key or List[Key]. *more_keys: Additional keys (varargs). Returns: The parent QueryBuilder for method chaining. """ self._qb._finalize_current_spec() self._qb._op_type = None self._qb._set_current_keys(arg1, *more_keys) return self._qb
def _start_write_verb( self, op_type: str, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: self._qb._finalize_current_spec() self._qb._op_type = op_type self._qb._set_current_keys(arg1, *more_keys) return self # -- Per-operation settings ------------------------------------------------ @overload def where(self, expression: str) -> WriteSegmentBuilder: ... @overload def where(self, expression: FilterExpression) -> WriteSegmentBuilder: ...
[docs] def where( self, expression: Union[str, FilterExpression], ) -> WriteSegmentBuilder: """Set a filter expression on the current write segment. Args: expression: AEL string or pre-built FilterExpression. Returns: self for method chaining. """ if isinstance(expression, str): self._qb._filter_expression = parse_ael(expression) else: self._qb._filter_expression = expression return self
[docs] def expire_record_after_seconds(self, seconds: int) -> WriteSegmentBuilder: """Set the TTL on the current write segment. Args: seconds: Time-to-live in seconds (must be > 0). Returns: self for method chaining. Raises: ValueError: If seconds is <= 0. """ if seconds <= 0: raise ValueError("seconds must be greater than 0") self._qb._ttl_seconds = seconds return self
[docs] def never_expire(self) -> WriteSegmentBuilder: """Set this record to never expire (TTL = -1). Returns: self for method chaining. """ self._qb._ttl_seconds = _TTL_NEVER_EXPIRE return self
[docs] def with_no_change_in_expiration(self) -> WriteSegmentBuilder: """Preserve the record's existing TTL (TTL = -2). Returns: self for method chaining. """ self._qb._ttl_seconds = _TTL_DONT_UPDATE return self
[docs] def expiry_from_server_default(self) -> WriteSegmentBuilder: """Use the namespace's default TTL for this record (TTL = 0). Returns: self for method chaining. """ self._qb._ttl_seconds = _TTL_SERVER_DEFAULT return self
[docs] def ensure_generation_is(self, generation: int) -> WriteSegmentBuilder: """Set expected generation for optimistic locking on the current segment. Args: generation: The expected generation number (must be > 0). Returns: self for method chaining. Raises: ValueError: If generation is <= 0. """ if generation <= 0: raise ValueError("Generation must be greater than 0") self._qb._generation = generation return self
[docs] def durably_delete(self) -> WriteSegmentBuilder: """Enable durable delete on the current segment. Returns: self for method chaining. """ self._qb._durable_delete = True return self
[docs] def respond_all_keys(self) -> WriteSegmentBuilder: """Include results for missing keys in the stream. Returns: self for method chaining. """ self._qb._respond_all_keys = True return self
[docs] def fail_on_filtered_out(self) -> WriteSegmentBuilder: """Mark filtered-out records with ``FILTERED_OUT`` result code. Returns: self for method chaining. """ self._qb._fail_on_filtered_out = True return self
[docs] def replace_only(self) -> WriteSegmentBuilder: """Change the current segment to replace-if-exists semantics. The record must already exist; the operation fails with ``KEY_NOT_FOUND_ERROR`` if it does not. All existing bins are removed and only the bins specified in this segment are written. Returns: self for method chaining. """ self._qb._op_type = "replace_if_exists" return self
# -- Execution ------------------------------------------------------------
[docs] async def execute( self, on_error: OnError | None = None, ) -> RecordStream: """Run the parent :class:`QueryBuilder` stack (same as ``self._qb.execute``). Args: on_error: Optional :class:`~aerospike_sdk.error_strategy.ErrorStrategy` or error callback; see :meth:`QueryBuilder.execute`. Returns: :class:`~aerospike_sdk.record_stream.RecordStream` of results. Example:: stream = await session.upsert(key).put({"x": 1}).execute() await stream.first_or_raise() Raises: Same as :meth:`QueryBuilder.execute`. """ return await self._qb.execute(on_error)
class _SingleKeyWriteSegment(WriteSegmentBuilder): """Lightweight single-key write path that bypasses QueryBuilder overhead. On the hot path (put + execute), calls the PAC directly without ``_finalize_current_spec``, ``_OperationSpec``, or ``execute()`` dispatch. Advanced features (``where``, TTL, generation, chaining) trigger in-place promotion: ``self._qb`` is populated so all inherited ``WriteSegmentBuilder`` methods work naturally. """ __slots__ = ( "_client_fast", "_key", "_op_type_fast", "_ops", "_write_policy", "_behavior_fast", "_read_policy", "_txn", ) def __init__( self, client: Client, key: Key, op_type: str, behavior: Any, write_policy: WritePolicy | None, read_policy: ReadPolicy | None = None, txn: Optional[Txn] = None, ) -> None: self._qb = None # type: ignore[assignment] self._client_fast = client self._key = key self._op_type_fast = op_type self._ops: list[Any] = [] # Under MRT we can't reuse the session's cached write/read policies # (they were built without a txn), so null them here and force the # fast path to derive fresh policies from behavior on each execute. if txn is None: self._write_policy = write_policy self._read_policy = read_policy else: self._write_policy = None self._read_policy = None self._behavior_fast = behavior self._txn: Optional[Txn] = txn def _apply_txn(self, policy: Any) -> Any: """Stamp this segment's captured txn on an outer policy in place.""" if self._txn is not None and policy is not None: policy.txn = self._txn return policy def with_txn(self, txn: Optional[Txn]) -> "_SingleKeyWriteSegment": """Opt this write into (or out of) a specific transaction. See :meth:`QueryBuilder.with_txn` for semantics. """ self._txn = txn self._write_policy = None self._read_policy = None if self._qb is not None: self._qb.with_txn(txn) return self # -- Operation methods --------------------------------------------------- # On the fast path (_qb is None) these use self._ops directly. # After promotion (_qb is set) they delegate to the QB's list. def put(self, bins: dict) -> WriteSegmentBuilder: if self._qb is not None: return super().put(bins) ops = self._ops for bin_name, value in bins.items(): ops.append(Operation.put(bin_name, value)) return self def _add_op(self, op: Any) -> WriteSegmentBuilder: if self._qb is not None: self._qb._operations.append(op) else: self._ops.append(op) return self def add_operation(self, op: Any) -> None: if self._qb is not None: self._qb._operations.append(op) else: self._ops.append(op) def replace_only(self) -> WriteSegmentBuilder: if self._qb is not None: return super().replace_only() self._op_type_fast = "replace_if_exists" return self # -- In-place promotion -------------------------------------------------- def _promote(self) -> None: """Populate ``self._qb`` so inherited WriteSegmentBuilder methods work.""" if self._qb is not None: return qb = QueryBuilder( client=self._client_fast, namespace=self._key.namespace, set_name=self._key.set_name, behavior=self._behavior_fast, cached_write_policy=self._write_policy, cached_read_policy=self._read_policy, txn=self._txn, ) qb._op_type = self._op_type_fast qb._single_key = self._key qb._operations = self._ops self._qb = qb def where(self, expression): self._promote() return super().where(expression) def expire_record_after_seconds(self, seconds): self._promote() return super().expire_record_after_seconds(seconds) def never_expire(self): self._promote() return super().never_expire() def with_no_change_in_expiration(self): self._promote() return super().with_no_change_in_expiration() def expiry_from_server_default(self): self._promote() return super().expiry_from_server_default() def ensure_generation_is(self, generation): self._promote() return super().ensure_generation_is(generation) def durably_delete(self): self._promote() return super().durably_delete() def respond_all_keys(self): self._promote() return super().respond_all_keys() def fail_on_filtered_out(self): self._promote() return super().fail_on_filtered_out() def query(self, arg1, *more_keys): self._promote() return super().query(arg1, *more_keys) def _start_write_verb(self, op_type, arg1, *more_keys): self._promote() return super()._start_write_verb(op_type, arg1, *more_keys) # -- Error handling ------------------------------------------------------ @staticmethod def _handle_fast_error( exc: Exception, op_type: str, ) -> RecordStream: pfc_exc = _convert_pac_exception(exc) rc = pfc_exc.result_code or ResultCode.OK if rc == ResultCode.KEY_NOT_FOUND_ERROR: if op_type in _FAST_WRITES_REQUIRING_KEY: raise pfc_exc from exc elif rc != ResultCode.FILTERED_OUT: raise pfc_exc from exc return RecordStream.from_list([]) # -- Policy helpers ------------------------------------------------------ def _get_write_policy(self) -> WritePolicy: wp = self._write_policy if wp is None and self._behavior_fast is not None: wp = self._apply_txn(to_write_policy( self._behavior_fast.get_settings( OpKind.WRITE_NON_RETRYABLE, OpShape.POINT))) self._write_policy = wp return self._apply_txn(wp or WritePolicy()) # -- Execution ----------------------------------------------------------- async def execute( # type: ignore[override] self, on_error: OnError | None = None, ) -> RecordStream: if self._qb is not None: return await self._qb.execute(on_error) if on_error is not None: self._promote() return await self._qb.execute(on_error) # type: ignore[union-attr] key = self._key op_type = self._op_type_fast # -- delete (PAC returns bool, no record) -- if op_type == "delete": wp = self._get_write_policy() try: existed = await self._client_fast.delete(wp, key) except Exception as exc: return self._handle_fast_error(exc, "delete") if existed: return RecordStream.from_error(key, ResultCode.OK) return RecordStream.from_list([]) # -- touch (no record returned) -- if op_type == "touch": wp = self._get_write_policy() try: await self._client_fast.touch(wp, key) except Exception as exc: return self._handle_fast_error(exc, "touch") return RecordStream.from_error(key, ResultCode.OK) # -- exists (uses ReadPolicy, returns bool) -- if op_type == "exists": rp = self._read_policy if rp is None and self._behavior_fast is not None: rp = self._apply_txn(to_read_policy( self._behavior_fast.get_settings( OpKind.READ, OpShape.POINT))) self._read_policy = rp if rp is None: rp = self._apply_txn(ReadPolicy()) try: found = await self._client_fast.exists(rp, key) except Exception as exc: return self._handle_fast_error(exc, "exists") if found: return RecordStream.from_error(key, ResultCode.OK) return RecordStream.from_list([]) # -- operate-based: upsert, insert, update, replace, replace_if_exists -- rea = _OP_TYPE_TO_REA.get(op_type) if op_type else None if rea is not None: if self._behavior_fast is not None: wp = self._apply_txn(to_write_policy( self._behavior_fast.get_settings( OpKind.WRITE_NON_RETRYABLE, OpShape.POINT))) else: wp = self._apply_txn(WritePolicy()) wp.record_exists_action = rea else: wp = self._get_write_policy() try: record = await self._client_fast.operate( wp, key, self._ops) except Exception as exc: return self._handle_fast_error(exc, op_type or "upsert") return RecordStream.from_single(key, record)
[docs] class WriteBinBuilder(_WriteVerbs): """Per-bin write builder inside a :class:`WriteSegmentBuilder`. Start with :meth:`WriteSegmentBuilder.bin`. Scalar methods delegate to the segment; ``map_*`` and ``list_*`` append collection operations; ``hll_*`` and ``bit_*`` append HyperLogLog and blob bit operations; nested CDT builders capture context for maps and lists. Write verbs on this class finalize the segment and start a new one on new keys. Example: Set a map key and append to a list within the same write:: await ( session.upsert(key) .bin("config").on_map_key("level").set_to(5) .bin("tags").list_append(value="new_tag") .execute() ) See Also: :class:`QueryBinBuilder`: Read-side analogue for queries. """ __slots__ = ("_segment", "_bin")
[docs] def __init__(self, segment: WriteSegmentBuilder, bin_name: str) -> None: self._segment = segment self._bin = bin_name
# -- Scalar writes --------------------------------------------------------
[docs] def set_to(self, value: Any) -> WriteSegmentBuilder: """Set the bin to *value* (``Operation.put``).""" return self._segment.set_to(self._bin, value)
[docs] def add(self, value: Any) -> WriteSegmentBuilder: """Add a numeric *value* to the bin (``Operation.add``).""" return self._segment.add(self._bin, value)
[docs] def increment_by(self, value: Any) -> WriteSegmentBuilder: """Alias of :meth:`add`.""" return self.add(value)
[docs] def append(self, value: str) -> WriteSegmentBuilder: """String append (``Operation.append``).""" return self._segment.append(self._bin, value)
[docs] def prepend(self, value: str) -> WriteSegmentBuilder: """String prepend (``Operation.prepend``).""" return self._segment.prepend(self._bin, value)
[docs] def remove(self) -> WriteSegmentBuilder: """Drop the bin (write ``None``).""" return self._segment.remove_bin(self._bin)
[docs] def get(self) -> WriteSegmentBuilder: """Return the bin value after writes complete (``Operation.get_bin``).""" return self._segment.get(self._bin)
# -- CDT list structural operations ---------------------------------------
[docs] def list_add( self, value: Any, *, unique: bool = False, bounded: bool = False, no_fail: bool = False, ) -> WriteSegmentBuilder: """Add *value* to an ordered list (sorted insert). Args: value: Element to insert in sorted order. unique: Reject if the value already exists in the list. bounded: Reject if index is beyond the current list bounds. no_fail: Do not raise on write failures. Returns: The parent :class:`WriteSegmentBuilder`. """ policy = _resolve_list_policy( ListOrderType.ORDERED, unique=unique, bounded=bounded, no_fail=no_fail, ) return self._segment._add_op( ListOperation.append(self._bin, value, policy), )
[docs] def list_append( self, value: Any, *, unique: bool = False, bounded: bool = False, no_fail: bool = False, ) -> WriteSegmentBuilder: """Append *value* to the end of an unordered list. Args: value: Value to append. unique: Reject if the value already exists in the list. bounded: Reject if index is beyond the current list bounds. no_fail: Do not raise on write failures. Example:: .bin("tags").list_append(value="python") """ policy = _resolve_list_policy( None, unique=unique, bounded=bounded, no_fail=no_fail, ) return self._segment._add_op( ListOperation.append(self._bin, value, policy), )
# -- Collection-level map -------------------------------------------------
[docs] def map_clear(self) -> WriteSegmentBuilder: """Remove all entries from the map bin. Returns: The parent :class:`WriteSegmentBuilder`. """ return self._segment._add_op(MapOperation.clear(self._bin))
[docs] def map_size(self) -> WriteSegmentBuilder: """Return the map element count (read within operate). Returns: The parent :class:`WriteSegmentBuilder`. """ return self._segment._add_op(MapOperation.size(self._bin))
[docs] def map_upsert_items( self, items: Any, *, order: MapOrder | None = None, persist_index: bool = False, no_fail: bool = False, partial: bool = False, ) -> WriteSegmentBuilder: """Put multiple map entries (create or update each key). Args: items: Mapping or sequence of ``(key, value)`` pairs. order: Map key order for the policy. persist_index: Maintain a persistent index on the map. no_fail: Do not raise on write failures. partial: Allow partial success for bulk operations. Example:: .bin("settings").map_upsert_items({"theme": "dark", "lang": "en"}) """ pairs = _map_item_pairs(items) policy = _resolve_map_policy( MapWriteFlags.DEFAULT, order=order, persist_index=persist_index, no_fail=no_fail, partial=partial, ) return self._segment._add_op( MapOperation.put_items(self._bin, pairs, policy), )
[docs] def map_insert_items( self, items: Any, *, order: MapOrder | None = None, persist_index: bool = False, no_fail: bool = False, partial: bool = False, ) -> WriteSegmentBuilder: """Put map entries only for keys that do not yet exist. Args: items: Mapping or sequence of ``(key, value)`` pairs. order: Map key order for the policy. persist_index: Maintain a persistent index on the map. no_fail: Do not raise on write failures. partial: Allow partial success for bulk operations. """ pairs = _map_item_pairs(items) policy = _resolve_map_policy( MapWriteFlags.CREATE_ONLY, order=order, persist_index=persist_index, no_fail=no_fail, partial=partial, ) return self._segment._add_op( MapOperation.put_items(self._bin, pairs, policy), )
[docs] def map_update_items( self, items: Any, *, order: MapOrder | None = None, persist_index: bool = False, no_fail: bool = False, partial: bool = False, ) -> WriteSegmentBuilder: """Update existing map entries only (no new keys). Args: items: Key-value pairs to update for existing keys only. order: Map key order for the policy. persist_index: Maintain a persistent index on the map. no_fail: Do not raise on write failures. partial: Allow partial success for bulk operations. Returns: The parent :class:`WriteSegmentBuilder`. """ pairs = _map_item_pairs(items) policy = _resolve_map_policy( MapWriteFlags.UPDATE_ONLY, order=order, persist_index=persist_index, no_fail=no_fail, partial=partial, ) return self._segment._add_op( MapOperation.put_items(self._bin, pairs, policy), )
[docs] def map_create(self, order: MapOrder) -> WriteSegmentBuilder: """Create an empty map with the given key order. Args: order: Map key sort order. Returns: The parent :class:`WriteSegmentBuilder`. """ return self._segment._add_op(MapOperation.create(self._bin, order))
[docs] def map_set_policy(self, order: MapOrder) -> WriteSegmentBuilder: """Set map sort order policy without changing entries. Args: order: Map key sort order policy. Returns: The parent :class:`WriteSegmentBuilder`. """ return self._segment._add_op( MapOperation.set_map_policy(self._bin, MapPolicy(order, None)), )
# -- Collection-level list ------------------------------------------------
[docs] def list_clear(self) -> WriteSegmentBuilder: """Remove all elements from the list bin. Returns: The parent :class:`WriteSegmentBuilder`. """ return self._segment._add_op(ListOperation.clear(self._bin))
[docs] def list_sort( self, flags: ListSortFlags = ListSortFlags.DEFAULT, ) -> WriteSegmentBuilder: """Sort the list bin. Args: flags: Sort behavior flags. Returns: The parent :class:`WriteSegmentBuilder`. """ return self._segment._add_op(ListOperation.sort(self._bin, flags))
[docs] def list_size(self) -> WriteSegmentBuilder: """Return the list element count (read within operate). Returns: The parent :class:`WriteSegmentBuilder`. """ return self._segment._add_op(ListOperation.size(self._bin))
[docs] def list_append_items( self, items: Any, *, unique: bool = False, bounded: bool = False, no_fail: bool = False, partial: bool = False, ) -> WriteSegmentBuilder: """Append values to an unordered list. Args: items: Values to append. unique: Reject items that already exist in the list. bounded: Reject inserts beyond the current list bounds. no_fail: Do not raise on write failures. partial: Allow partial success for bulk operations. """ policy = _resolve_list_policy( None, unique=unique, bounded=bounded, no_fail=no_fail, partial=partial, ) return self._segment._add_op( ListOperation.append_items(self._bin, items, policy), )
[docs] def list_add_items( self, items: Any, *, unique: bool = False, bounded: bool = False, no_fail: bool = False, partial: bool = False, ) -> WriteSegmentBuilder: """Insert values into an ordered list (sorted positions). Args: items: Sequence of values to insert in sorted order. unique: Reject items that already exist in the list. bounded: Reject inserts beyond the current list bounds. no_fail: Do not raise on write failures. partial: Allow partial success for bulk operations. Returns: The parent :class:`WriteSegmentBuilder`. """ policy = _resolve_list_policy( ListOrderType.ORDERED, unique=unique, bounded=bounded, no_fail=no_fail, partial=partial, ) return self._segment._add_op( ListOperation.append_items(self._bin, items, policy), )
[docs] def list_create( self, order: ListOrderType, *, pad: bool = False, persist_index: bool = False, ) -> WriteSegmentBuilder: """Create an empty list with the given order. Args: order: List element order. pad: Whether to pad with None entries. persist_index: Whether to persist element indices. Returns: The parent :class:`WriteSegmentBuilder`. """ return self._segment._add_op( ListOperation.create(self._bin, order, pad, persist_index), )
[docs] def list_set_order(self, order: ListOrderType) -> WriteSegmentBuilder: """Set list sort order without changing elements. Args: order: List element order. Returns: The parent :class:`WriteSegmentBuilder`. """ return self._segment._add_op(ListOperation.set_order(self._bin, order))
# -- Index-based list (whole-bin) ----------------------------------------
[docs] def list_insert( self, index: int, value: Any, *, unique: bool = False, bounded: bool = False, no_fail: bool = False, ) -> WriteSegmentBuilder: """Insert *value* at *index* in an unordered list. Args: index: List index (0-based; negative counts from the end). value: Element to insert. unique: Reject if the value already exists in the list. bounded: Reject if index is beyond the current list bounds. no_fail: Do not raise on write failures. Returns: The parent :class:`WriteSegmentBuilder`. See Also: :meth:`list_append`, :meth:`QueryBinBuilder.list_get` """ policy = _resolve_list_policy( None, unique=unique, bounded=bounded, no_fail=no_fail, ) return self._segment._add_op( ListOperation.insert(self._bin, index, value, policy), )
[docs] def list_insert_items( self, index: int, items: Sequence[Any], *, unique: bool = False, bounded: bool = False, no_fail: bool = False, partial: bool = False, ) -> WriteSegmentBuilder: """Insert a sequence of values starting at *index*. Args: index: List index at which to insert the first element. items: Values to insert in order. unique: Reject items that already exist in the list. bounded: Reject inserts beyond the current list bounds. no_fail: Do not raise on write failures. partial: Allow partial success for bulk operations. Returns: The parent :class:`WriteSegmentBuilder`. See Also: :meth:`list_insert`, :meth:`list_append_items` """ policy = _resolve_list_policy( None, unique=unique, bounded=bounded, no_fail=no_fail, partial=partial, ) return self._segment._add_op( ListOperation.insert_items(self._bin, index, items, policy), )
[docs] def list_set(self, index: int, value: Any) -> WriteSegmentBuilder: """Replace the element at *index* with *value*. Args: index: List index (0-based; negative counts from the end). value: New element value. Returns: The parent :class:`WriteSegmentBuilder`. See Also: :meth:`list_get` """ return self._segment._add_op(ListOperation.set(self._bin, index, value))
[docs] def list_increment(self, index: int, value: int = 1) -> WriteSegmentBuilder: """Add *value* to the numeric element at *index* (default increment is ``1``). Args: index: List index (0-based; negative counts from the end). value: Amount to add; ``1`` uses a dedicated server path. Returns: The parent :class:`WriteSegmentBuilder`. See Also: :meth:`list_set` """ if value == 1: return self._segment._add_op( ListOperation.increment_by_one(self._bin, index), ) return self._segment._add_op( ListOperation.increment( self._bin, index, value, _UNORDERED_LIST_POLICY, ), )
[docs] def list_remove(self, index: int) -> WriteSegmentBuilder: """Remove the element at *index*. Args: index: List index (0-based; negative counts from the end). Returns: The parent :class:`WriteSegmentBuilder`. See Also: :meth:`list_remove_range` """ return self._segment._add_op(ListOperation.remove(self._bin, index))
[docs] def list_remove_range( self, index: int, count: Optional[int] = None, ) -> WriteSegmentBuilder: """Remove *count* elements starting at *index*, or all from *index* onward. Args: index: Starting list index. count: Number of elements to remove; ``None`` removes through the end. Returns: The parent :class:`WriteSegmentBuilder`. See Also: :meth:`list_remove` """ if count is None: op = ListOperation.remove_range_from(self._bin, index) else: op = ListOperation.remove_range(self._bin, index, count) return self._segment._add_op(op)
[docs] def list_pop(self, index: int) -> WriteSegmentBuilder: """Remove and return the element at *index* (read in the operate result). Args: index: List index (0-based; negative counts from the end). Returns: The parent :class:`WriteSegmentBuilder`. See Also: :meth:`list_pop_range` """ return self._segment._add_op(ListOperation.pop(self._bin, index))
[docs] def list_pop_range( self, index: int, count: Optional[int] = None, ) -> WriteSegmentBuilder: """Pop *count* elements from *index*, or from *index* through the end. Args: index: Starting list index. count: Number of elements; ``None`` pops through the end. Returns: The parent :class:`WriteSegmentBuilder`. See Also: :meth:`list_pop` """ if count is None: op = ListOperation.pop_range_from(self._bin, index) else: op = ListOperation.pop_range(self._bin, index, count) return self._segment._add_op(op)
[docs] def list_trim(self, index: int, count: int) -> WriteSegmentBuilder: """Keep only *count* elements starting at *index*; remove the rest. Args: index: Starting list index of the range to keep. count: Number of elements to keep. Returns: The parent :class:`WriteSegmentBuilder`. See Also: :meth:`list_remove_range` """ return self._segment._add_op( ListOperation.trim(self._bin, index, count), )
# -- HyperLogLog ----------------------------------------------------------
[docs] def hll_init( self, index_bit_count: int, min_hash_bit_count: int = -1, flags: int = 0, ) -> WriteSegmentBuilder: """Initialize an empty HyperLogLog sketch in this bin. Use before :meth:`hll_add` on a new bin. Pass ``-1`` for ``min_hash_bit_count`` to use the server default. Combine ``flags`` with values from :class:`~aerospike_sdk.HLLWriteFlags`. Example:: await ( session.upsert(key) .bin("visitors") .hll_init(12) .execute() ) Args: index_bit_count: Register width index bits (precision); typical values are in the 4–16 range per server documentation. min_hash_bit_count: Minimum hash bits, or ``-1`` for default. flags: Optional HLL write flags (often ``0`` or :attr:`~aerospike_sdk.HLLWriteFlags.DEFAULT`). Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`hll_add`: Add distinct values to the sketch. :meth:`QueryBinBuilder.hll_get_count`: Read cardinality in a query. :class:`~aerospike_sdk.HLLWriteFlags` """ return self._segment._add_op( HllOperation.init( self._bin, index_bit_count, min_hash_bit_count, flags, ), )
[docs] def hll_add( self, values: Sequence[Any], index_bit_count: int = -1, min_hash_bit_count: int = -1, flags: int = 0, ) -> WriteSegmentBuilder: """Add distinct values to the HyperLogLog sketch in this bin. The server hashes each element into the sketch. Use ``-1`` for index or min-hash bit counts to inherit defaults. Example:: await ( session.upsert(key) .bin("visitors") .hll_add(["user-1", "user-2"]) .execute() ) Args: values: Sequence of values (for example strings or blobs) to add. index_bit_count: Index bits, or ``-1`` for default. min_hash_bit_count: Min-hash bits, or ``-1`` for default. flags: Optional HLL write flags (often ``0``). Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`hll_init`: Create an empty sketch. :meth:`hll_get_count`: Read cardinality in the same operate batch. :class:`~aerospike_sdk.HLLWriteFlags` """ return self._segment._add_op( HllOperation.add( self._bin, list(values), index_bit_count, min_hash_bit_count, flags, ), )
[docs] def hll_set_union(self, hll_list: Sequence[Any], flags: int = 0) -> WriteSegmentBuilder: """Merge other HyperLogLog sketches into this bin (destructive union). Each entry in ``hll_list`` is typically another HLL blob (``bytes``) returned from a prior read. Example:: await ( session.upsert(key) .bin("merged") .hll_set_union([other_hll_blob]) .execute() ) Args: hll_list: Sketches to union into the target bin. flags: Optional HLL write flags (often ``0``). Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`hll_get_union`: Non-destructive union read. :meth:`hll_add`: Add raw values instead of whole sketches. """ return self._segment._add_op( HllOperation.set_union(self._bin, list(hll_list), flags), )
[docs] def hll_fold(self, index_bit_count: int) -> WriteSegmentBuilder: """Reduce sketch precision to a lower ``index_bit_count`` (merge registers). Example:: await session.update(key).bin("hll").hll_fold(10).execute() Args: index_bit_count: New (smaller) index bit width after folding. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`hll_init`: Initial precision when creating a sketch. """ return self._segment._add_op(HllOperation.fold(self._bin, index_bit_count))
[docs] def hll_refresh_count(self) -> WriteSegmentBuilder: """Refresh the cached cardinality estimate stored with the sketch. Example:: await session.update(key).bin("hll").hll_refresh_count().execute() Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`hll_get_count`: Read the estimate in the same batch. """ return self._segment._add_op(HllOperation.refresh_count(self._bin))
[docs] def hll_get_count(self) -> WriteSegmentBuilder: """Read the estimated cardinality in a multi-operation write (``operate``). The result is returned for this bin when the write completes. For a read-only path, use :meth:`QueryBinBuilder.hll_get_count`. Example:: stream = await ( session.update(key) .bin("hll") .hll_get_count() .execute() ) Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`QueryBinBuilder.hll_get_count`: Same read on a query builder. :meth:`hll_add`: Populate the sketch before counting. """ return self._segment._add_op(HllOperation.get_count(self._bin))
[docs] def hll_describe(self) -> WriteSegmentBuilder: """Read index and min-hash bit counts describing the stored sketch. Example:: await session.update(key).bin("hll").hll_describe().execute() Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`QueryBinBuilder.hll_describe`: Same read on a query builder. """ return self._segment._add_op(HllOperation.describe(self._bin))
[docs] def hll_get_union(self, hll_list: Sequence[Any]) -> WriteSegmentBuilder: """Read the union sketch without modifying the stored bin. Example:: await ( session.update(key) .bin("hll") .hll_get_union([peer_blob]) .execute() ) Args: hll_list: Other sketches (blobs) to include in the union result. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`QueryBinBuilder.hll_get_union`: Same read on a query builder. :meth:`hll_set_union`: Persist a union into the bin. """ return self._segment._add_op( HllOperation.get_union(self._bin, list(hll_list)), )
[docs] def hll_get_union_count(self, hll_list: Sequence[Any]) -> WriteSegmentBuilder: """Read the estimated cardinality of the union with other sketches. Example:: await ( session.update(key) .bin("hll") .hll_get_union_count([peer_blob]) .execute() ) Args: hll_list: Other sketches to union for the estimate. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`QueryBinBuilder.hll_get_union_count`: Same read on a query. """ return self._segment._add_op( HllOperation.get_union_count(self._bin, list(hll_list)), )
[docs] def hll_get_intersect_count(self, hll_list: Sequence[Any]) -> WriteSegmentBuilder: """Read the estimated intersection cardinality with other sketches. Example:: await ( session.update(key) .bin("hll") .hll_get_intersect_count([peer_blob]) .execute() ) Args: hll_list: Other sketches included in the intersection estimate. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`QueryBinBuilder.hll_get_intersect_count`: Same read on a query. """ return self._segment._add_op( HllOperation.get_intersect_count(self._bin, list(hll_list)), )
[docs] def hll_get_similarity(self, hll_list: Sequence[Any]) -> WriteSegmentBuilder: """Read Jaccard similarity between this sketch and other sketches. Example:: await ( session.update(key) .bin("hll") .hll_get_similarity([peer_blob]) .execute() ) Args: hll_list: Other sketches to compare. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`QueryBinBuilder.hll_get_similarity`: Same read on a query. """ return self._segment._add_op( HllOperation.get_similarity(self._bin, list(hll_list)), )
# -- Bit (blob) -----------------------------------------------------------
[docs] def bit_resize( self, byte_size: int, resize_flags: Optional[Any] = None, policy: Optional[Any] = None, ) -> WriteSegmentBuilder: """Grow or shrink the raw bytes backing this bin. When ``resize_flags`` is ``None``, :attr:`~aerospike_sdk.BitwiseResizeFlags.DEFAULT` is used. When ``policy`` is ``None``, a default :class:`~aerospike_sdk.BitPolicy` is built from :attr:`~aerospike_sdk.BitWriteFlags.DEFAULT`. Example:: await session.upsert(key).bin("flags").bit_resize(4).execute() Args: byte_size: Target size of the blob in bytes. resize_flags: Optional :class:`~aerospike_sdk.BitwiseResizeFlags` value; ``None`` selects ``DEFAULT``. policy: Optional :class:`~aerospike_sdk.BitPolicy`; ``None`` selects a default policy. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`bit_insert`: Insert raw bytes at an offset. :meth:`QueryBinBuilder.bit_get`: Read bits in a query. """ return self._segment._add_op( BitOperation.resize( self._bin, byte_size, _resize_flags_or_default(resize_flags), _bit_policy_or_default(policy), ), )
[docs] def bit_insert( self, byte_offset: int, value: Any, policy: Optional[Any] = None, ) -> WriteSegmentBuilder: """Insert ``value`` (bytes) at a byte offset in the blob bin. Example:: await session.update(key).bin("blob").bit_insert(0, b"\\x01\\x02").execute() Args: byte_offset: Byte position at which to insert. value: Bytes to insert. policy: Optional :class:`~aerospike_sdk.BitPolicy`; ``None`` selects a default policy. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`bit_remove`: Remove a byte range. """ return self._segment._add_op( BitOperation.insert( self._bin, byte_offset, value, _bit_policy_or_default(policy), ), )
[docs] def bit_remove( self, byte_offset: int, byte_size: int, policy: Optional[Any] = None, ) -> WriteSegmentBuilder: """Remove ``byte_size`` bytes starting at ``byte_offset``. Example:: await session.update(key).bin("blob").bit_remove(0, 2).execute() Args: byte_offset: Start of the range to remove. byte_size: Number of bytes to remove. policy: Optional :class:`~aerospike_sdk.BitPolicy`; ``None`` selects a default policy. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`bit_insert`: Insert bytes at an offset. """ return self._segment._add_op( BitOperation.remove( self._bin, byte_offset, byte_size, _bit_policy_or_default(policy), ), )
[docs] def bit_set( self, bit_offset: int, bit_size: int, value: Any, policy: Optional[Any] = None, ) -> WriteSegmentBuilder: """Overwrite ``bit_size`` bits at ``bit_offset`` with ``value``. ``value`` is typically a small ``bytes`` object whose bits replace the range (see server documentation for encoding). Example:: await session.update(key).bin("blob").bit_set(0, 8, b"\\xff").execute() Args: bit_offset: Starting bit index within the blob. bit_size: Width of the field in bits. value: Bits to write (commonly ``bytes``). policy: Optional :class:`~aerospike_sdk.BitPolicy`; ``None`` selects a default policy. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`bit_get`: Read the same range in an operate or query. :meth:`bit_or`, :meth:`bit_xor`, :meth:`bit_and`, :meth:`bit_not` """ return self._segment._add_op( BitOperation.set( self._bin, bit_offset, bit_size, value, _bit_policy_or_default(policy), ), )
[docs] def bit_or( self, bit_offset: int, bit_size: int, value: Any, policy: Optional[Any] = None, ) -> WriteSegmentBuilder: """Bitwise OR ``value`` into the ``bit_size`` bits at ``bit_offset``. Example:: await session.update(key).bin("blob").bit_or(0, 8, b"\\x0f").execute() Args: bit_offset: Starting bit index. bit_size: Field width in bits. value: Right-hand side of the OR (typically ``bytes``). policy: Optional :class:`~aerospike_sdk.BitPolicy`; ``None`` selects a default policy. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`bit_and` :meth:`bit_xor` :meth:`bit_not` """ return self._segment._add_op( _bitwise_or( self._bin, bit_offset, bit_size, value, _bit_policy_or_default(policy), ), )
[docs] def bit_xor( self, bit_offset: int, bit_size: int, value: Any, policy: Optional[Any] = None, ) -> WriteSegmentBuilder: """Bitwise XOR ``value`` into the ``bit_size`` bits at ``bit_offset``. Example:: await session.update(key).bin("blob").bit_xor(0, 8, b"\\xff").execute() Args: bit_offset: Starting bit index. bit_size: Field width in bits. value: Right-hand side of the XOR (typically ``bytes``). policy: Optional :class:`~aerospike_sdk.BitPolicy`; ``None`` selects a default policy. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`bit_or` :meth:`bit_and` :meth:`bit_not` """ return self._segment._add_op( BitOperation.xor( self._bin, bit_offset, bit_size, value, _bit_policy_or_default(policy), ), )
[docs] def bit_and( self, bit_offset: int, bit_size: int, value: Any, policy: Optional[Any] = None, ) -> WriteSegmentBuilder: """Bitwise AND ``value`` into the ``bit_size`` bits at ``bit_offset``. Example:: await session.update(key).bin("blob").bit_and(0, 8, b"\\xf0").execute() Args: bit_offset: Starting bit index. bit_size: Field width in bits. value: Right-hand side of the AND (typically ``bytes``). policy: Optional :class:`~aerospike_sdk.BitPolicy`; ``None`` selects a default policy. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`bit_or` :meth:`bit_xor` :meth:`bit_not` """ return self._segment._add_op( _bitwise_and( self._bin, bit_offset, bit_size, value, _bit_policy_or_default(policy), ), )
[docs] def bit_not( self, bit_offset: int, bit_size: int, policy: Optional[Any] = None, ) -> WriteSegmentBuilder: """Invert every bit in the range ``[bit_offset, bit_offset + bit_size)``. Example:: await session.update(key).bin("blob").bit_not(0, 8).execute() Args: bit_offset: Starting bit index. bit_size: Field width in bits. policy: Optional :class:`~aerospike_sdk.BitPolicy`; ``None`` selects a default policy. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`bit_or` :meth:`bit_xor` :meth:`bit_and` """ return self._segment._add_op( _bitwise_not( self._bin, bit_offset, bit_size, _bit_policy_or_default(policy), ), )
[docs] def bit_lshift( self, bit_offset: int, bit_size: int, shift: int, policy: Optional[Any] = None, ) -> WriteSegmentBuilder: """Left-shift the ``bit_size`` bits at ``bit_offset`` by ``shift`` bits. Example:: await session.update(key).bin("blob").bit_lshift(0, 16, 2).execute() Args: bit_offset: Starting bit index. bit_size: Field width in bits. shift: Number of bits to shift left. policy: Optional :class:`~aerospike_sdk.BitPolicy`; ``None`` selects a default policy. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`bit_rshift` """ return self._segment._add_op( BitOperation.lshift( self._bin, bit_offset, bit_size, shift, _bit_policy_or_default(policy), ), )
[docs] def bit_rshift( self, bit_offset: int, bit_size: int, shift: int, policy: Optional[Any] = None, ) -> WriteSegmentBuilder: """Right-shift the ``bit_size`` bits at ``bit_offset`` by ``shift`` bits. Example:: await session.update(key).bin("blob").bit_rshift(0, 16, 2).execute() Args: bit_offset: Starting bit index. bit_size: Field width in bits. shift: Number of bits to shift right. policy: Optional :class:`~aerospike_sdk.BitPolicy`; ``None`` selects a default policy. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`bit_lshift` """ return self._segment._add_op( BitOperation.rshift( self._bin, bit_offset, bit_size, shift, _bit_policy_or_default(policy), ), )
[docs] def bit_add( self, bit_offset: int, bit_size: int, value: int, signed: bool, action: Any, policy: Optional[Any] = None, ) -> WriteSegmentBuilder: """Add ``value`` to the integer encoded in ``bit_size`` bits at ``bit_offset``. ``action`` selects overflow behavior (for example :attr:`~aerospike_sdk.BitwiseOverflowActions.WRAP`). Example:: from aerospike_sdk import BitwiseOverflowActions await ( session.update(key) .bin("blob") .bit_add(0, 16, 1, False, BitwiseOverflowActions.WRAP) .execute() ) Args: bit_offset: Starting bit index of the integer field. bit_size: Width of the integer in bits. value: Amount to add. signed: ``True`` if the stored integer is signed. action: Overflow policy (:class:`~aerospike_sdk.BitwiseOverflowActions`). policy: Optional :class:`~aerospike_sdk.BitPolicy`; ``None`` selects a default policy. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`bit_subtract` :meth:`bit_set_int` :meth:`bit_get_int` """ return self._segment._add_op( BitOperation.add( self._bin, bit_offset, bit_size, value, signed, action, _bit_policy_or_default(policy), ), )
[docs] def bit_subtract( self, bit_offset: int, bit_size: int, value: int, signed: bool, action: Any, policy: Optional[Any] = None, ) -> WriteSegmentBuilder: """Subtract ``value`` from the integer in ``bit_size`` bits at ``bit_offset``. Example:: from aerospike_sdk import BitwiseOverflowActions await ( session.update(key) .bin("blob") .bit_subtract(0, 16, 1, False, BitwiseOverflowActions.SATURATE) .execute() ) Args: bit_offset: Starting bit index of the integer field. bit_size: Width of the integer in bits. value: Amount to subtract. signed: ``True`` if the stored integer is signed. action: Overflow policy (:class:`~aerospike_sdk.BitwiseOverflowActions`). policy: Optional :class:`~aerospike_sdk.BitPolicy`; ``None`` selects a default policy. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`bit_add` :meth:`bit_set_int` """ return self._segment._add_op( BitOperation.subtract( self._bin, bit_offset, bit_size, value, signed, action, _bit_policy_or_default(policy), ), )
[docs] def bit_set_int( self, bit_offset: int, bit_size: int, value: int, policy: Optional[Any] = None, ) -> WriteSegmentBuilder: """Write integer ``value`` into ``bit_size`` bits at ``bit_offset``. Example:: await session.update(key).bin("blob").bit_set_int(0, 16, 42).execute() Args: bit_offset: Starting bit index. bit_size: Width of the integer in bits. value: Integer to store. policy: Optional :class:`~aerospike_sdk.BitPolicy`; ``None`` selects a default policy. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`bit_get_int` :meth:`bit_add` """ return self._segment._add_op( BitOperation.set_int( self._bin, bit_offset, bit_size, value, _bit_policy_or_default(policy), ), )
[docs] def bit_get(self, bit_offset: int, bit_size: int) -> WriteSegmentBuilder: """Read ``bit_size`` bits at ``bit_offset`` as raw bytes in a write operate. For read-only access, use :meth:`QueryBinBuilder.bit_get`. Example:: await session.update(key).bin("blob").bit_get(0, 8).execute() Args: bit_offset: Starting bit index. bit_size: Number of bits to read. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`QueryBinBuilder.bit_get` :meth:`bit_set` """ return self._segment._add_op(BitOperation.get(self._bin, bit_offset, bit_size))
[docs] def bit_count(self, bit_offset: int, bit_size: int) -> WriteSegmentBuilder: """Count bits set to ``1`` in ``bit_size`` bits starting at ``bit_offset``. Example:: await session.update(key).bin("blob").bit_count(0, 8).execute() Args: bit_offset: Starting bit index. bit_size: Number of bits to scan. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`QueryBinBuilder.bit_count` """ return self._segment._add_op(BitOperation.count(self._bin, bit_offset, bit_size))
[docs] def bit_lscan(self, bit_offset: int, bit_size: int, value: bool) -> WriteSegmentBuilder: """Return the leftmost bit index in the range matching ``value``. ``value`` is ``True`` to search for a set bit (``1``) or ``False`` for an unset bit (``0``). Example:: await session.update(key).bin("blob").bit_lscan(0, 8, True).execute() Args: bit_offset: Starting bit index. bit_size: Number of bits to scan. value: ``True`` for set bits, ``False`` for unset bits. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`bit_rscan` :meth:`QueryBinBuilder.bit_lscan` """ return self._segment._add_op( BitOperation.lscan(self._bin, bit_offset, bit_size, value), )
[docs] def bit_rscan(self, bit_offset: int, bit_size: int, value: bool) -> WriteSegmentBuilder: """Return the rightmost bit index in the range matching ``value``. Example:: await session.update(key).bin("blob").bit_rscan(0, 8, False).execute() Args: bit_offset: Starting bit index. bit_size: Number of bits to scan. value: ``True`` for set bits, ``False`` for unset bits. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`bit_lscan` :meth:`QueryBinBuilder.bit_rscan` """ return self._segment._add_op( BitOperation.rscan(self._bin, bit_offset, bit_size, value), )
[docs] def bit_get_int( self, bit_offset: int, bit_size: int, signed: bool, ) -> WriteSegmentBuilder: """Decode an integer from ``bit_size`` bits at ``bit_offset``. Example:: await session.update(key).bin("blob").bit_get_int(0, 16, False).execute() Args: bit_offset: Starting bit index. bit_size: Width of the integer in bits. signed: ``True`` to interpret as two's-complement signed. Returns: The parent :class:`WriteSegmentBuilder` for chaining. See Also: :meth:`QueryBinBuilder.bit_get_int` :meth:`bit_set_int` """ return self._segment._add_op( BitOperation.get_int(self._bin, bit_offset, bit_size, signed), )
# -- Expression operations ------------------------------------------------
[docs] def select_from( self, expression: Union[str, FilterExpression], *, ignore_eval_failure: bool = False, ) -> WriteSegmentBuilder: """Read a computed value into this bin using an AEL expression.""" return self._segment.select_from( self._bin, expression, ignore_eval_failure=ignore_eval_failure, )
[docs] def insert_from( self, expression: Union[str, FilterExpression], *, ignore_op_failure: bool = False, ignore_eval_failure: bool = False, delete_if_null: bool = False, ) -> WriteSegmentBuilder: """Write expression result only if bin does not already exist.""" return self._segment.insert_from( self._bin, expression, ignore_op_failure=ignore_op_failure, ignore_eval_failure=ignore_eval_failure, delete_if_null=delete_if_null, )
[docs] def update_from( self, expression: Union[str, FilterExpression], *, ignore_op_failure: bool = False, ignore_eval_failure: bool = False, delete_if_null: bool = False, ) -> WriteSegmentBuilder: """Write expression result only if bin already exists.""" return self._segment.update_from( self._bin, expression, ignore_op_failure=ignore_op_failure, ignore_eval_failure=ignore_eval_failure, delete_if_null=delete_if_null, )
[docs] def upsert_from( self, expression: Union[str, FilterExpression], *, ignore_op_failure: bool = False, ignore_eval_failure: bool = False, delete_if_null: bool = False, ) -> WriteSegmentBuilder: """Write expression result, creating or overwriting the bin.""" return self._segment.upsert_from( self._bin, expression, ignore_op_failure=ignore_op_failure, ignore_eval_failure=ignore_eval_failure, delete_if_null=delete_if_null, )
# -- Map navigation (singular -> CdtWriteBuilder) --------------------------
[docs] def on_map_index(self, index: int) -> CdtWriteBuilder[WriteSegmentBuilder]: """Navigate to a map element by index. Args: index: List index (0-based, negative counts from end). Returns: :class:`CdtWriteBuilder` for writing the targeted element(s). """ b = self._bin return CdtWriteBuilder( self._segment, lambda rt: MapOperation.get_by_index(b, index, rt), lambda rt: MapOperation.remove_by_index(b, index, rt), MapReturnType, is_map=True, bin_name=b, to_ctx=lambda: CTX.map_index(index), )
[docs] def on_map_key( self, key: Any, *, create_type: Optional[MapOrder] = None, ) -> CdtWriteBuilder[WriteSegmentBuilder]: """Navigate to a map element by key. Args: key: Map key to target. create_type: If set, use a create-on-missing context for this key with the given map key order. Returns: :class:`CdtWriteBuilder` for writing the targeted element(s). """ b = self._bin _mp = MapPolicy(None, None) if create_type is not None: to_ctx = lambda: CTX.map_key_create(key, create_type) else: to_ctx = lambda: CTX.map_key(key) return CdtWriteBuilder( self._segment, lambda rt: MapOperation.get_by_key(b, key, rt), lambda rt: MapOperation.remove_by_key(b, key, rt), MapReturnType, is_map=True, bin_name=b, to_ctx=to_ctx, set_to_factory=lambda v: MapOperation.put(b, key, v, _mp), add_factory=lambda v: MapOperation.increment_value(b, key, v, _mp), )
[docs] def on_map_rank(self, rank: int) -> CdtWriteBuilder[WriteSegmentBuilder]: """Navigate to a map element by rank (0 = lowest value).""" b = self._bin return CdtWriteBuilder( self._segment, lambda rt: MapOperation.get_by_rank(b, rank, rt), lambda rt: MapOperation.remove_by_rank(b, rank, rt), MapReturnType, is_map=True, bin_name=b, to_ctx=lambda: CTX.map_rank(rank), )
# -- Map navigation (invertable -> CdtWriteInvertableBuilder) -------------
[docs] def on_map_value(self, value: Any) -> CdtWriteInvertableBuilder[WriteSegmentBuilder]: """Navigate to map elements matching a value. Args: value: Value to match. Returns: :class:`CdtWriteInvertableBuilder` for writing the targeted element(s). """ b = self._bin return CdtWriteInvertableBuilder( self._segment, lambda rt: MapOperation.get_by_value(b, value, rt), lambda rt: MapOperation.remove_by_value(b, value, rt), MapReturnType, is_map=True, bin_name=b, to_ctx=lambda: CTX.map_value(value), )
[docs] def on_map_index_range( self, index: int, count: Optional[int] = None, ) -> CdtWriteInvertableBuilder[WriteSegmentBuilder]: """Navigate to map elements by index range. Args: index: List index (0-based, negative counts from end). count: Maximum entries to select; ``None`` for all remaining. Returns: :class:`CdtWriteInvertableBuilder` for writing the targeted element(s). """ b = self._bin if count is None: get_f = lambda rt: MapOperation.get_by_index_range_from(b, index, rt) rm_f = lambda rt: MapOperation.remove_by_index_range_from(b, index, rt) else: get_f = lambda rt: MapOperation.get_by_index_range(b, index, count, rt) rm_f = lambda rt: MapOperation.remove_by_index_range(b, index, count, rt) return CdtWriteInvertableBuilder( self._segment, get_f, rm_f, MapReturnType, is_map=True, bin_name=b, to_ctx=None, )
[docs] def on_map_key_range( self, start: Any, end: Any, ) -> CdtWriteInvertableBuilder[WriteSegmentBuilder]: """Navigate to map elements by key range [start, end). Args: start: Inclusive range start. end: Exclusive range end. Returns: :class:`CdtWriteInvertableBuilder` for writing the targeted element(s). """ b = self._bin return CdtWriteInvertableBuilder( self._segment, lambda rt: MapOperation.get_by_key_range(b, start, end, rt), lambda rt: MapOperation.remove_by_key_range(b, start, end, rt), MapReturnType, is_map=True, bin_name=b, to_ctx=None, )
[docs] def on_map_rank_range( self, rank: int, count: Optional[int] = None, ) -> CdtWriteInvertableBuilder[WriteSegmentBuilder]: """Navigate to map elements by rank range. Args: rank: Rank position (0 = lowest value). count: Maximum entries to select; ``None`` for all remaining. Returns: :class:`CdtWriteInvertableBuilder` for writing the targeted element(s). """ b = self._bin if count is None: get_f = lambda rt: MapOperation.get_by_rank_range_from(b, rank, rt) rm_f = lambda rt: MapOperation.remove_by_rank_range_from(b, rank, rt) else: get_f = lambda rt: MapOperation.get_by_rank_range(b, rank, count, rt) rm_f = lambda rt: MapOperation.remove_by_rank_range(b, rank, count, rt) return CdtWriteInvertableBuilder( self._segment, get_f, rm_f, MapReturnType, is_map=True, bin_name=b, to_ctx=None, )
[docs] def on_map_value_range( self, start: Any, end: Any, ) -> CdtWriteInvertableBuilder[WriteSegmentBuilder]: """Navigate to map elements by value range [start, end). Args: start: Inclusive range start. end: Exclusive range end. Returns: :class:`CdtWriteInvertableBuilder` for writing the targeted element(s). """ b = self._bin return CdtWriteInvertableBuilder( self._segment, lambda rt: MapOperation.get_by_value_range(b, start, end, rt), lambda rt: MapOperation.remove_by_value_range(b, start, end, rt), MapReturnType, is_map=True, bin_name=b, to_ctx=None, )
[docs] def on_map_key_relative_index_range( self, key: Any, index: int, count: Optional[int] = None, ) -> CdtWriteInvertableBuilder[WriteSegmentBuilder]: """Navigate to map entries by index range relative to an anchor key. Args: key: Map key to target. index: Index offset from the anchor key. count: Maximum entries to select; ``None`` for all remaining. Returns: :class:`CdtWriteInvertableBuilder` for writing the targeted element(s). """ b = self._bin return CdtWriteInvertableBuilder( self._segment, lambda rt: MapOperation.get_by_key_relative_index_range( b, key, index, count, rt, ), lambda rt: MapOperation.remove_by_key_relative_index_range( b, key, index, count, rt, ), MapReturnType, is_map=True, bin_name=b, to_ctx=None, )
[docs] def on_map_value_relative_rank_range( self, value: Any, rank: int, count: Optional[int] = None, ) -> CdtWriteInvertableBuilder[WriteSegmentBuilder]: """Navigate to map entries by value rank range relative to an anchor value. Args: value: Value to match. rank: Rank offset from the anchor value. count: Maximum entries to select; ``None`` for all remaining. Returns: :class:`CdtWriteInvertableBuilder` for writing the targeted element(s). """ b = self._bin return CdtWriteInvertableBuilder( self._segment, lambda rt: MapOperation.get_by_value_relative_rank_range( b, value, rank, count, rt, ), lambda rt: MapOperation.remove_by_value_relative_rank_range( b, value, rank, count, rt, ), MapReturnType, is_map=True, bin_name=b, to_ctx=None, )
[docs] def on_map_key_list(self, keys: List[Any]) -> CdtWriteInvertableBuilder[WriteSegmentBuilder]: """Navigate to map elements matching a list of keys. Args: keys: Map keys to match. Returns: :class:`CdtWriteInvertableBuilder` for writing the targeted element(s). """ b = self._bin return CdtWriteInvertableBuilder( self._segment, lambda rt: MapOperation.get_by_key_list(b, keys, rt), lambda rt: MapOperation.remove_by_key_list(b, keys, rt), MapReturnType, is_map=True, bin_name=b, to_ctx=None, )
[docs] def on_map_value_list(self, values: List[Any]) -> CdtWriteInvertableBuilder[WriteSegmentBuilder]: """Navigate to map elements matching a list of values. Args: values: Values to match. Returns: :class:`CdtWriteInvertableBuilder` for writing the targeted element(s). """ b = self._bin return CdtWriteInvertableBuilder( self._segment, lambda rt: MapOperation.get_by_value_list(b, values, rt), lambda rt: MapOperation.remove_by_value_list(b, values, rt), MapReturnType, is_map=True, bin_name=b, to_ctx=None, )
# -- List navigation (singular -> CdtWriteBuilder) ------------------------
[docs] def on_list_index( self, index: int, *, order: Optional[ListOrderType] = None, pad: bool = False, ) -> CdtWriteBuilder[WriteSegmentBuilder]: """Navigate to a list element by index. Args: index: List index (0-based, negative counts from end). order: If set (or if *pad* is ``True``), use create-on-missing list context with this order; when only *pad* is ``True``, defaults to :data:`~aerospike_async.ListOrderType.UNORDERED`. pad: When using create-on-missing context, allow sparse indexes. Returns: :class:`CdtWriteBuilder` for writing the targeted element(s). Example:: .bin("items").on_list_index(0).set_to("first") """ b = self._bin use_create = order is not None or pad if use_create: eff_order = order if order is not None else ListOrderType.UNORDERED to_ctx = lambda: CTX.list_index_create(index, eff_order, pad) else: to_ctx = lambda: CTX.list_index(index) return CdtWriteBuilder( self._segment, lambda rt: ListOperation.get_by_index(b, index, rt), lambda rt: ListOperation.remove_by_index(b, index, rt), ListReturnType, is_map=False, bin_name=b, to_ctx=to_ctx, )
[docs] def on_list_rank(self, rank: int) -> CdtWriteBuilder[WriteSegmentBuilder]: """Navigate to a list element by rank (0 = lowest value). Args: rank: Rank position (0 = lowest value). Returns: :class:`CdtWriteBuilder` for writing the targeted element(s). """ b = self._bin return CdtWriteBuilder( self._segment, lambda rt: ListOperation.get_by_rank(b, rank, rt), lambda rt: ListOperation.remove_by_rank(b, rank, rt), ListReturnType, is_map=False, bin_name=b, to_ctx=lambda: CTX.list_rank(rank), )
# -- List navigation (invertable -> CdtWriteInvertableBuilder) ------------
[docs] def on_list_value(self, value: Any) -> CdtWriteInvertableBuilder[WriteSegmentBuilder]: """Navigate to list elements matching a value. Args: value: Value to match. Returns: :class:`CdtWriteInvertableBuilder` for writing the targeted element(s). """ b = self._bin return CdtWriteInvertableBuilder( self._segment, lambda rt: ListOperation.get_by_value(b, value, rt), lambda rt: ListOperation.remove_by_value(b, value, rt), ListReturnType, is_map=False, bin_name=b, to_ctx=lambda: CTX.list_value(value), )
[docs] def on_list_index_range( self, index: int, count: Optional[int] = None, ) -> CdtWriteInvertableBuilder[WriteSegmentBuilder]: """Navigate to list elements by index range. Args: index: List index (0-based, negative counts from end). count: Maximum entries to select; ``None`` for all remaining. Returns: :class:`CdtWriteInvertableBuilder` for writing the targeted element(s). """ b = self._bin return CdtWriteInvertableBuilder( self._segment, lambda rt: ListOperation.get_by_index_range(b, index, count, rt), lambda rt: ListOperation.remove_by_index_range(b, index, count, rt), ListReturnType, is_map=False, bin_name=b, to_ctx=None, )
[docs] def on_list_rank_range( self, rank: int, count: Optional[int] = None, ) -> CdtWriteInvertableBuilder[WriteSegmentBuilder]: """Navigate to list elements by rank range. Args: rank: Rank position (0 = lowest value). count: Maximum entries to select; ``None`` for all remaining. Returns: :class:`CdtWriteInvertableBuilder` for writing the targeted element(s). """ b = self._bin return CdtWriteInvertableBuilder( self._segment, lambda rt: ListOperation.get_by_rank_range(b, rank, count, rt), lambda rt: ListOperation.remove_by_rank_range(b, rank, count, rt), ListReturnType, is_map=False, bin_name=b, to_ctx=None, )
[docs] def on_list_value_range( self, start: Any, end: Any, ) -> CdtWriteInvertableBuilder[WriteSegmentBuilder]: """Navigate to list elements by value range [start, end). Args: start: Inclusive range start. end: Exclusive range end. Returns: :class:`CdtWriteInvertableBuilder` for writing the targeted element(s). """ b = self._bin return CdtWriteInvertableBuilder( self._segment, lambda rt: ListOperation.get_by_value_range(b, start, end, rt), lambda rt: ListOperation.remove_by_value_range(b, start, end, rt), ListReturnType, is_map=False, bin_name=b, to_ctx=None, )
[docs] def on_list_value_relative_rank_range( self, value: Any, rank: int, count: Optional[int] = None, ) -> CdtWriteInvertableBuilder[WriteSegmentBuilder]: """Navigate to list elements by value rank range relative to an anchor value. Args: value: Value to match. rank: Rank offset from the anchor value. count: Maximum entries to select; ``None`` for all remaining. Returns: :class:`CdtWriteInvertableBuilder` for writing the targeted element(s). """ b = self._bin return CdtWriteInvertableBuilder( self._segment, lambda rt: ListOperation.get_by_value_relative_rank_range( b, value, rank, count, rt, ), lambda rt: ListOperation.remove_by_value_relative_rank_range( b, value, rank, count, rt, ), ListReturnType, is_map=False, bin_name=b, to_ctx=None, )
[docs] def on_list_value_list(self, values: List[Any]) -> CdtWriteInvertableBuilder[WriteSegmentBuilder]: """Navigate to list elements matching a list of values. Args: values: Values to match. Returns: :class:`CdtWriteInvertableBuilder` for writing the targeted element(s). """ b = self._bin return CdtWriteInvertableBuilder( self._segment, lambda rt: ListOperation.get_by_value_list(b, values, rt), lambda rt: ListOperation.remove_by_value_list(b, values, rt), ListReturnType, is_map=False, bin_name=b, to_ctx=None, )
# -- Convenience transitions (delegate to segment) ------------------------
[docs] def bin(self, bin_name: str) -> WriteBinBuilder: """Start the next bin operation without leaving the write segment.""" return WriteBinBuilder(self._segment, bin_name)
[docs] def query( self, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> QueryBuilder: """Shortcut: finalize write segment and start a read segment.""" return self._segment.query(arg1, *more_keys)
def _start_write_verb( self, op_type: str, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: return self._segment._start_write_verb(op_type, arg1, *more_keys)
[docs] async def execute( self, on_error: OnError | None = None, ) -> RecordStream: """Shortcut: execute all accumulated specs.""" return await self._segment.execute(on_error)
[docs] class QueryBinBuilder(_WriteVerbs, Generic[_T]): """Per-bin reads and CDT navigation for :class:`QueryBuilder` (and sync twin). The type parameter is the parent builder type; the parent must implement ``add_operation``. Use :meth:`get` for whole-bin reads, :meth:`select_from` for expression reads, ``on_map_*`` / ``on_list_*`` for paths, ``hll_*`` / ``bit_*`` for HyperLogLog and blob bit reads, then :meth:`QueryBuilder.execute`. Write verbs delegate to the parent to chain writes after reads. Example: Read map keys and list size in a single query:: stream = await ( session.query(key) .bin("settings").on_map_key("theme").get_values() .bin("tags").list_size() .execute() ) See Also: :class:`WriteBinBuilder`: Per-bin write builder. :class:`~aerospike_sdk.aio.operations.cdt_read.CdtReadBuilder`: Nested reads. """ __slots__ = ("_parent", "_bin")
[docs] def __init__(self, parent: _T, bin_name: str) -> None: self._parent = parent self._bin = bin_name
# -- Simple read ----------------------------------------------------------
[docs] def get(self) -> _T: """Include the bin value in the read result. Returns: The parent builder for chaining. See Also: :meth:`select_from`: Virtual bin from an expression. """ self._parent.add_operation(Operation.get_bin(self._bin)) # type: ignore[union-attr] return self._parent
[docs] def map_size(self) -> _T: """Return the number of entries in the map.""" self._parent.add_operation(MapOperation.size(self._bin)) # type: ignore[union-attr] return self._parent
[docs] def list_size(self) -> _T: """Read list length into the operate/read result.""" self._parent.add_operation(ListOperation.size(self._bin)) # type: ignore[union-attr] return self._parent
[docs] def list_get(self, index: int) -> _T: """Read the list element at *index* into the query result. Args: index: List index (0-based; negative counts from the end). Returns: The parent builder for chaining. See Also: :meth:`list_get_range`, :meth:`WriteBinBuilder.list_set` """ self._parent.add_operation(ListOperation.get(self._bin, index)) # type: ignore[union-attr] return self._parent
[docs] def list_get_range(self, index: int, count: Optional[int] = None) -> _T: """Read a contiguous slice of the list starting at *index*. Args: index: Starting list index. count: Number of elements; ``None`` reads through the end. Returns: The parent builder for chaining. See Also: :meth:`list_get` """ if count is None: op = ListOperation.get_range_from(self._bin, index) else: op = ListOperation.get_range(self._bin, index, count) self._parent.add_operation(op) # type: ignore[union-attr] return self._parent
# -- HyperLogLog reads ----------------------------------------------------
[docs] def hll_get_count(self) -> _T: """Read the estimated HyperLogLog cardinality for this bin. The estimate appears under this bin's name in the record returned from :meth:`QueryBuilder.execute`. To read during a multi-op write, use :meth:`WriteBinBuilder.hll_get_count`. Example:: stream = await session.query(key).bin("visitors").hll_get_count().execute() Returns: The parent query builder for chaining. See Also: :meth:`WriteBinBuilder.hll_get_count` :meth:`WriteBinBuilder.hll_add` """ self._parent.add_operation(HllOperation.get_count(self._bin)) # type: ignore[union-attr] return self._parent
[docs] def hll_describe(self) -> _T: """Read index and min-hash bit parameters describing the stored sketch. Example:: stream = await session.query(key).bin("visitors").hll_describe().execute() Returns: The parent query builder for chaining. See Also: :meth:`WriteBinBuilder.hll_describe` """ self._parent.add_operation(HllOperation.describe(self._bin)) # type: ignore[union-attr] return self._parent
[docs] def hll_get_union(self, hll_list: Sequence[Any]) -> _T: """Read the union sketch of this bin and ``hll_list`` without updating storage. Example:: stream = await ( session.query(key).bin("hll").hll_get_union([peer_blob]).execute() ) Args: hll_list: Other HLL blobs to include in the union result. Returns: The parent query builder for chaining. See Also: :meth:`WriteBinBuilder.hll_get_union` :meth:`hll_get_union_count` """ self._parent.add_operation( HllOperation.get_union(self._bin, list(hll_list)), ) # type: ignore[union-attr] return self._parent
[docs] def hll_get_union_count(self, hll_list: Sequence[Any]) -> _T: """Read the estimated cardinality of the union with other sketches. Example:: stream = await ( session.query(key).bin("hll").hll_get_union_count([peer_blob]).execute() ) Args: hll_list: Other sketches included in the union estimate. Returns: The parent query builder for chaining. See Also: :meth:`WriteBinBuilder.hll_get_union_count` :meth:`hll_get_intersect_count` """ self._parent.add_operation( HllOperation.get_union_count(self._bin, list(hll_list)), ) # type: ignore[union-attr] return self._parent
[docs] def hll_get_intersect_count(self, hll_list: Sequence[Any]) -> _T: """Read the estimated intersection cardinality with other sketches. Example:: stream = await ( session.query(key) .bin("hll") .hll_get_intersect_count([peer_blob]) .execute() ) Args: hll_list: Other sketches included in the intersection estimate. Returns: The parent query builder for chaining. See Also: :meth:`WriteBinBuilder.hll_get_intersect_count` :meth:`hll_get_union_count` """ self._parent.add_operation( HllOperation.get_intersect_count(self._bin, list(hll_list)), ) # type: ignore[union-attr] return self._parent
[docs] def hll_get_similarity(self, hll_list: Sequence[Any]) -> _T: """Read Jaccard similarity between this sketch and other sketches. Example:: stream = await ( session.query(key).bin("hll").hll_get_similarity([peer_blob]).execute() ) Args: hll_list: Other sketches to compare. Returns: The parent query builder for chaining. See Also: :meth:`WriteBinBuilder.hll_get_similarity` """ self._parent.add_operation( HllOperation.get_similarity(self._bin, list(hll_list)), ) # type: ignore[union-attr] return self._parent
# -- Bit (blob) reads -----------------------------------------------------
[docs] def bit_get(self, bit_offset: int, bit_size: int) -> _T: """Read ``bit_size`` bits at ``bit_offset`` as raw bytes. Example:: stream = await session.query(key).bin("blob").bit_get(0, 8).execute() Args: bit_offset: Starting bit index. bit_size: Number of bits to read. Returns: The parent query builder for chaining. See Also: :meth:`WriteBinBuilder.bit_get` :meth:`bit_get_int` """ self._parent.add_operation( BitOperation.get(self._bin, bit_offset, bit_size), ) # type: ignore[union-attr] return self._parent
[docs] def bit_count(self, bit_offset: int, bit_size: int) -> _T: """Count bits set to ``1`` in the given range. Example:: stream = await session.query(key).bin("blob").bit_count(0, 8).execute() Args: bit_offset: Starting bit index. bit_size: Number of bits to scan. Returns: The parent query builder for chaining. See Also: :meth:`WriteBinBuilder.bit_count` """ self._parent.add_operation( BitOperation.count(self._bin, bit_offset, bit_size), ) # type: ignore[union-attr] return self._parent
[docs] def bit_lscan(self, bit_offset: int, bit_size: int, value: bool) -> _T: """Scan from the left for the first set (``True``) or unset (``False``) bit. Example:: stream = await session.query(key).bin("blob").bit_lscan(0, 8, True).execute() Args: bit_offset: Starting bit index. bit_size: Number of bits to scan. value: ``True`` to find a ``1`` bit, ``False`` to find a ``0`` bit. Returns: The parent query builder for chaining. See Also: :meth:`bit_rscan` :meth:`WriteBinBuilder.bit_lscan` """ self._parent.add_operation( BitOperation.lscan(self._bin, bit_offset, bit_size, value), ) # type: ignore[union-attr] return self._parent
[docs] def bit_rscan(self, bit_offset: int, bit_size: int, value: bool) -> _T: """Scan from the right for the first set (``True``) or unset (``False``) bit. Example:: stream = await session.query(key).bin("blob").bit_rscan(0, 8, False).execute() Args: bit_offset: Starting bit index. bit_size: Number of bits to scan. value: ``True`` to find a ``1`` bit, ``False`` to find a ``0`` bit. Returns: The parent query builder for chaining. See Also: :meth:`bit_lscan` :meth:`WriteBinBuilder.bit_rscan` """ self._parent.add_operation( BitOperation.rscan(self._bin, bit_offset, bit_size, value), ) # type: ignore[union-attr] return self._parent
[docs] def bit_get_int(self, bit_offset: int, bit_size: int, signed: bool) -> _T: """Decode an integer from ``bit_size`` bits at ``bit_offset``. Example:: stream = await ( session.query(key).bin("blob").bit_get_int(0, 16, False).execute() ) Args: bit_offset: Starting bit index. bit_size: Width of the integer in bits. signed: ``True`` for two's-complement signed decoding. Returns: The parent query builder for chaining. See Also: :meth:`WriteBinBuilder.bit_get_int` :meth:`WriteBinBuilder.bit_set_int` """ self._parent.add_operation( BitOperation.get_int(self._bin, bit_offset, bit_size, signed), ) # type: ignore[union-attr] return self._parent
# -- Expression read ------------------------------------------------------
[docs] def select_from( self, expression: Union[str, FilterExpression], *, ignore_eval_failure: bool = False, ) -> _T: """Read a computed value into this bin using an AEL expression. The result appears as a virtual bin in the returned record. Args: expression: AEL string or pre-built FilterExpression. ignore_eval_failure: If True, silently return None when the expression cannot be evaluated (e.g. missing bin). Returns: The parent builder for method chaining. """ flags = ExpReadFlags.EVAL_NO_FAIL if ignore_eval_failure else ExpReadFlags.DEFAULT expr = parse_ael(expression) if isinstance(expression, str) else expression self._parent.add_operation(ExpOperation.read(self._bin, expr, flags)) # type: ignore[union-attr] return self._parent
# -- Write transitions (delegate to parent) ------------------------------- def _start_write_verb( self, op_type: str, arg1: Union[Key, List[Key]], *more_keys: Key, ) -> WriteSegmentBuilder: return self._parent._start_write_verb(op_type, arg1, *more_keys) # type: ignore[union-attr] # -- Map navigation (singular -> CdtReadBuilder) --------------------------
[docs] def on_map_index(self, index: int) -> CdtReadBuilder[_T]: """Navigate to a map element by index. Args: index: List index (0-based, negative counts from end). Returns: :class:`CdtReadBuilder` for reading the targeted element(s). """ b = self._bin return CdtReadBuilder( self._parent, lambda rt: MapOperation.get_by_index(b, index, rt), MapReturnType, is_map=True, bin_name=b, to_ctx=lambda: CTX.map_index(index), )
[docs] def on_map_key( self, key: Any, *, create_type: Optional[MapOrder] = None, ) -> CdtReadBuilder[_T]: """Navigate to a map element by key. Args: key: Map key to target. create_type: If set, use a create-on-missing context for this key with the given map key order. Returns: :class:`CdtReadBuilder` for reading the targeted element(s). """ b = self._bin if create_type is not None: to_ctx = lambda: CTX.map_key_create(key, create_type) else: to_ctx = lambda: CTX.map_key(key) return CdtReadBuilder( self._parent, lambda rt: MapOperation.get_by_key(b, key, rt), MapReturnType, is_map=True, bin_name=b, to_ctx=to_ctx, )
[docs] def on_map_rank(self, rank: int) -> CdtReadBuilder[_T]: """Navigate to a map element by rank (0 = lowest value).""" b = self._bin return CdtReadBuilder( self._parent, lambda rt: MapOperation.get_by_rank(b, rank, rt), MapReturnType, is_map=True, bin_name=b, to_ctx=lambda: CTX.map_rank(rank), )
# -- Map navigation (singular invertable -> CdtReadInvertableBuilder) -----
[docs] def on_map_value(self, value: Any) -> CdtReadInvertableBuilder[_T]: """Navigate to map elements matching a value.""" b = self._bin return CdtReadInvertableBuilder( self._parent, lambda rt: MapOperation.get_by_value(b, value, rt), MapReturnType, is_map=True, bin_name=b, to_ctx=lambda: CTX.map_value(value), )
# -- Map navigation (range -> CdtReadInvertableBuilder) -------------------
[docs] def on_map_index_range( self, index: int, count: Optional[int] = None, ) -> CdtReadInvertableBuilder[_T]: """Navigate to map elements by index range.""" b = self._bin if count is None: factory = lambda rt: MapOperation.get_by_index_range_from(b, index, rt) else: factory = lambda rt: MapOperation.get_by_index_range(b, index, count, rt) return CdtReadInvertableBuilder( self._parent, factory, MapReturnType, is_map=True, bin_name=b, to_ctx=None, )
[docs] def on_map_key_range( self, start: Any, end: Any, ) -> CdtReadInvertableBuilder[_T]: """Navigate to map elements by key range [start, end).""" b = self._bin return CdtReadInvertableBuilder( self._parent, lambda rt: MapOperation.get_by_key_range(b, start, end, rt), MapReturnType, is_map=True, bin_name=b, to_ctx=None, )
[docs] def on_map_rank_range( self, rank: int, count: Optional[int] = None, ) -> CdtReadInvertableBuilder[_T]: """Navigate to map elements by rank range.""" b = self._bin if count is None: factory = lambda rt: MapOperation.get_by_rank_range_from(b, rank, rt) else: factory = lambda rt: MapOperation.get_by_rank_range(b, rank, count, rt) return CdtReadInvertableBuilder( self._parent, factory, MapReturnType, is_map=True, bin_name=b, to_ctx=None, )
[docs] def on_map_value_range( self, start: Any, end: Any, ) -> CdtReadInvertableBuilder[_T]: """Navigate to map elements by value range [start, end).""" b = self._bin return CdtReadInvertableBuilder( self._parent, lambda rt: MapOperation.get_by_value_range(b, start, end, rt), MapReturnType, is_map=True, bin_name=b, to_ctx=None, )
[docs] def on_map_key_relative_index_range( self, key: Any, index: int, count: Optional[int] = None, ) -> CdtReadInvertableBuilder[_T]: """Navigate to map entries by index range relative to an anchor key. Args: key: Map key to target. index: List index (0-based, negative counts from end). count: Maximum entries to select; ``None`` for all remaining. Returns: :class:`CdtReadInvertableBuilder` for reading the targeted element(s). """ b = self._bin return CdtReadInvertableBuilder( self._parent, lambda rt: MapOperation.get_by_key_relative_index_range( b, key, index, count, rt, ), MapReturnType, is_map=True, bin_name=b, to_ctx=None, )
[docs] def on_map_value_relative_rank_range( self, value: Any, rank: int, count: Optional[int] = None, ) -> CdtReadInvertableBuilder[_T]: """Navigate to map entries by value rank range relative to an anchor value.""" b = self._bin return CdtReadInvertableBuilder( self._parent, lambda rt: MapOperation.get_by_value_relative_rank_range( b, value, rank, count, rt, ), MapReturnType, is_map=True, bin_name=b, to_ctx=None, )
# -- Map navigation (list selectors -> CdtReadInvertableBuilder) ----------
[docs] def on_map_key_list(self, keys: List[Any]) -> CdtReadInvertableBuilder[_T]: """Navigate to map elements matching a list of keys.""" b = self._bin return CdtReadInvertableBuilder( self._parent, lambda rt: MapOperation.get_by_key_list(b, keys, rt), MapReturnType, is_map=True, bin_name=b, to_ctx=None, )
[docs] def on_map_value_list(self, values: List[Any]) -> CdtReadInvertableBuilder[_T]: """Navigate to map elements matching a list of values.""" b = self._bin return CdtReadInvertableBuilder( self._parent, lambda rt: MapOperation.get_by_value_list(b, values, rt), MapReturnType, is_map=True, bin_name=b, to_ctx=None, )
# -- List navigation (singular -> CdtReadBuilder) -------------------------
[docs] def on_list_index( self, index: int, *, order: Optional[ListOrderType] = None, pad: bool = False, ) -> CdtReadBuilder[_T]: """Navigate to a list element by index. Args: order: If set (or if *pad* is ``True``), use create-on-missing list context with this order; when only *pad* is ``True``, defaults to :data:`~aerospike_async.ListOrderType.UNORDERED`. pad: When using create-on-missing context, allow sparse indexes. Example:: .bin("items").on_list_index(-1).get_values() """ b = self._bin use_create = order is not None or pad if use_create: eff_order = order if order is not None else ListOrderType.UNORDERED to_ctx = lambda: CTX.list_index_create(index, eff_order, pad) else: to_ctx = lambda: CTX.list_index(index) return CdtReadBuilder( self._parent, lambda rt: ListOperation.get_by_index(b, index, rt), ListReturnType, is_map=False, bin_name=b, to_ctx=to_ctx, )
[docs] def on_list_rank(self, rank: int) -> CdtReadBuilder[_T]: """Navigate to a list element by rank (0 = lowest value).""" b = self._bin return CdtReadBuilder( self._parent, lambda rt: ListOperation.get_by_rank(b, rank, rt), ListReturnType, is_map=False, bin_name=b, to_ctx=lambda: CTX.list_rank(rank), )
# -- List navigation (singular invertable -> CdtReadInvertableBuilder) ----
[docs] def on_list_value(self, value: Any) -> CdtReadInvertableBuilder[_T]: """Navigate to list elements matching a value.""" b = self._bin return CdtReadInvertableBuilder( self._parent, lambda rt: ListOperation.get_by_value(b, value, rt), ListReturnType, is_map=False, bin_name=b, to_ctx=lambda: CTX.list_value(value), )
# -- List navigation (range -> CdtReadInvertableBuilder) ------------------
[docs] def on_list_index_range( self, index: int, count: Optional[int] = None, ) -> CdtReadInvertableBuilder[_T]: """Navigate to list elements by index range.""" b = self._bin return CdtReadInvertableBuilder( self._parent, lambda rt: ListOperation.get_by_index_range(b, index, count, rt), ListReturnType, is_map=False, bin_name=b, to_ctx=None, )
[docs] def on_list_rank_range( self, rank: int, count: Optional[int] = None, ) -> CdtReadInvertableBuilder[_T]: """Navigate to list elements by rank range.""" b = self._bin return CdtReadInvertableBuilder( self._parent, lambda rt: ListOperation.get_by_rank_range(b, rank, count, rt), ListReturnType, is_map=False, bin_name=b, to_ctx=None, )
[docs] def on_list_value_range( self, start: Any, end: Any, ) -> CdtReadInvertableBuilder[_T]: """Navigate to list elements by value range [start, end).""" b = self._bin return CdtReadInvertableBuilder( self._parent, lambda rt: ListOperation.get_by_value_range(b, start, end, rt), ListReturnType, is_map=False, bin_name=b, to_ctx=None, )
[docs] def on_list_value_relative_rank_range( self, value: Any, rank: int, count: Optional[int] = None, ) -> CdtReadInvertableBuilder[_T]: """Navigate to list elements by value rank range relative to an anchor value.""" b = self._bin return CdtReadInvertableBuilder( self._parent, lambda rt: ListOperation.get_by_value_relative_rank_range( b, value, rank, count, rt, ), ListReturnType, is_map=False, bin_name=b, to_ctx=None, )
# -- List navigation (list selector -> CdtReadInvertableBuilder) ----------
[docs] def on_list_value_list(self, values: List[Any]) -> CdtReadInvertableBuilder[_T]: """Navigate to list elements matching a list of values.""" b = self._bin return CdtReadInvertableBuilder( self._parent, lambda rt: ListOperation.get_by_value_list(b, values, rt), ListReturnType, is_map=False, bin_name=b, to_ctx=None, )