Source code for aerospike_sdk.record_stream

# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

"""RecordStream — async iterable of RecordResult for batch and query operations."""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any, AsyncIterator, Awaitable, Callable, Sequence

from aerospike_async import Key, PartitionFilter, Record
from aerospike_async.exceptions import ResultCode

from aerospike_sdk.record_result import RecordResult, batch_records_to_results

if TYPE_CHECKING:  # Not unused — needed for forward-reference type annotations and Sphinx autodoc.
    from aerospike_sdk.exceptions import AerospikeError

log = logging.getLogger(__name__)


class _SingleResultIter:
    """Lightweight async iterator that yields exactly one RecordResult."""

    __slots__ = ("_result",)

    def __init__(self, result: RecordResult) -> None:
        self._result: RecordResult | None = result

    def __aiter__(self) -> _SingleResultIter:
        return self

    async def __anext__(self) -> RecordResult:
        r = self._result
        if r is None:
            raise StopAsyncIteration
        self._result = None
        return r


[docs] class RecordStream: """Async iterator of :class:`~aerospike_sdk.record_result.RecordResult` rows. Produced by ``await session.query(...).execute()`` and similar APIs. Prefer ``async for row in stream``, or helpers such as :meth:`collect` and :meth:`first`. Do not call ``RecordStream(...)`` directly; use factories like :meth:`from_list` or :meth:`from_batch_records`. Example: Typical consumption with ``async for``:: stream = await session.query(key).bins(["name"]).execute() async for row in stream: if row.is_ok and row.record: print(row.record.bins) See Also: :meth:`first_or_raise`: Assert a single OK row. """
[docs] def __init__(self, source: AsyncIterator[RecordResult]) -> None: self._source = source self._closed = False self._chunked = False self._chunk_first = True self._chunk_recordset: Any = None self._chunk_reexecute: Callable[ [PartitionFilter], Awaitable[Any] ] | None = None self._chunk_limit: int = 0 self._chunk_count: int = 0 self._counter_ref: list[int] = [0] # Fast-path cache for single-result streams: avoids async # iteration overhead in first() / first_or_raise(). self._single_result: RecordResult | None = None
# -- factory constructors ------------------------------------------------
[docs] @classmethod def from_list(cls, results: Sequence[RecordResult]) -> RecordStream: """Wrap an already-materialised list of results. Example:: stream = RecordStream.from_list([row1, row2]) rows = await stream.collect() """ async def _iter() -> AsyncIterator[RecordResult]: for r in results: yield r return cls(_iter())
[docs] @classmethod def chain(cls, streams: Sequence[RecordStream]) -> RecordStream: """Yield all results from each stream in order. Example:: combined = RecordStream.chain([stream_a, stream_b]) """ async def _iter() -> AsyncIterator[RecordResult]: for st in streams: async for r in st: yield r return cls(_iter())
[docs] @classmethod def from_batch_records(cls, batch_records: Sequence) -> RecordStream: """Wrap a sequence of async-client ``BatchRecord`` objects. Example:: stream = RecordStream.from_batch_records(batch_records) """ return cls.from_list(batch_records_to_results(list(batch_records)))
[docs] @classmethod def from_recordset(cls, recordset) -> RecordStream: """Wrap a ``Recordset`` (async iterable of ``Record``). Each yielded ``Record`` is converted to a :class:`RecordResult` with ``result_code=OK`` and ``index=-1`` (queries have no positional index). Example:: stream = RecordStream.from_recordset(recordset) """ async def _iter() -> AsyncIterator[RecordResult]: async for record in recordset: key = record.key if hasattr(record, "key") and record.key is not None else Key("", "", 0) yield RecordResult( key=key, record=record, result_code=ResultCode.OK, ) return cls(_iter())
[docs] @classmethod def from_chunked_recordset( cls, recordset: Any, reexecute: Callable[[PartitionFilter], Awaitable[Any]], limit: int = 0, ) -> RecordStream: """Wrap a ``Recordset`` for chunked iteration. The stream yields records from the current chunk. Call :meth:`has_more_chunks` to advance to the next server chunk. Args: recordset: The PAC ``Recordset`` from the first query call. reexecute: An async callable that accepts an updated ``PartitionFilter`` and returns a new ``Recordset``. limit: Optional overall record limit (0 = unlimited). """ stream = cls._make_chunk_iter(recordset, limit, 0) stream._chunked = True stream._chunk_recordset = recordset stream._chunk_reexecute = reexecute stream._chunk_limit = limit return stream
@classmethod def _make_chunk_iter( cls, recordset: Any, limit: int, already_counted: int, ) -> RecordStream: """Build a RecordStream that counts records against *limit*.""" counter = [already_counted] async def _iter() -> AsyncIterator[RecordResult]: async for record in recordset: if 0 < limit <= counter[0]: break key = ( record.key if hasattr(record, "key") and record.key is not None else Key("", "", 0) ) counter[0] += 1 yield RecordResult( key=key, record=record, result_code=ResultCode.OK, ) inst = cls(_iter()) inst._chunk_count = already_counted inst._counter_ref = counter return inst
[docs] @classmethod def from_single(cls, key: Key, record: Record | None) -> RecordStream: """Wrap a single-key result. Example:: stream = RecordStream.from_single(key, record) """ rc = ResultCode.OK if record is not None else ResultCode.KEY_NOT_FOUND_ERROR result = RecordResult(key=key, record=record, result_code=rc, index=0) inst = cls(_SingleResultIter(result)) inst._single_result = result return inst
[docs] @classmethod def from_error( cls, key: Key, result_code: ResultCode, in_doubt: bool = False, exception: AerospikeError | None = None, ) -> RecordStream: """Wrap a single-key error as a one-element stream. Example:: stream = RecordStream.from_error(key, ResultCode.TIMEOUT) """ return cls.from_list([RecordResult( key=key, record=None, result_code=result_code, in_doubt=in_doubt, index=0, exception=exception, )])
# -- async iteration ----------------------------------------------------- def __aiter__(self) -> RecordStream: """Return ``self`` for ``async for`` iteration.""" return self async def __anext__(self) -> RecordResult: """Yield the next :class:`~aerospike_sdk.record_result.RecordResult`. Raises: StopAsyncIteration: When the stream is exhausted or :meth:`close` was called before more rows were read. """ if self._closed: raise StopAsyncIteration return await self._source.__anext__() # -- chunked iteration ---------------------------------------------------
[docs] async def has_more_chunks(self) -> bool: """Check whether more server-side chunks remain. On the first call this returns ``True`` so the caller enters the iteration loop for the already-loaded first chunk. Subsequent calls inspect the server's ``PartitionFilter`` cursor: if more partitions remain, a new query round-trip is issued transparently and ``True`` is returned. Returns ``False`` when: * the server cursor is done (all partitions scanned), or * the overall ``limit`` has been reached, or * the stream was not created with :meth:`from_chunked_recordset`. Example:: stream = await session.query(SET).chunk_size(10).execute() chunk = 0 while await stream.has_more_chunks(): chunk += 1 print(f"Chunk: {chunk}") async for rr in stream: print(rr.record.bins) """ if not self._chunked: if self._chunk_first: self._chunk_first = False return True return False if self._chunk_first: self._chunk_first = False return True if 0 < self._chunk_limit <= self._chunk_count: return False pf = await self._chunk_recordset.partition_filter() if pf is None or pf.done(): return False counted_so_far = self._counter_ref[0] log.debug("fetching next chunk (counted=%d)", counted_so_far) if self._chunk_reexecute is None: return False recordset = await self._chunk_reexecute(pf) self._chunk_recordset = recordset self._chunk_count = counted_so_far new_stream = self._make_chunk_iter( recordset, self._chunk_limit, counted_so_far, ) self._source = new_stream._source self._counter_ref = new_stream._counter_ref self._closed = False return True
# -- convenience methods -------------------------------------------------
[docs] async def first(self) -> RecordResult | None: """Consume and return the first row, or ``None`` if there are no rows. Returns: The first :class:`~aerospike_sdk.record_result.RecordResult`, or ``None`` when the stream is empty. Note: This advances the iterator; remaining rows are left for further ``async for`` or other helpers only if the underlying source allows partial consumption (most SDK streams are single-pass). Example:: stream = await session.query(key).execute() row = await stream.first() if row is None: ... """ # Fast path: skip async iteration for single-result streams. r = self._single_result if r is not None: self._single_result = None self._closed = True return r try: return await self.__anext__() except StopAsyncIteration: return None
[docs] async def first_or_raise(self) -> RecordResult: """Return the first row and require success (see :meth:`RecordResult.or_raise`). Returns: The first OK :class:`~aerospike_sdk.record_result.RecordResult`. Raises: StopAsyncIteration: If the stream yields no rows (empty). AerospikeError: If the first row is not OK (from :meth:`RecordResult.or_raise`). Example: rec = (await stream.first_or_raise()).record_or_raise() """ result = await self.first() if result is None: raise StopAsyncIteration("RecordStream is empty") return result.or_raise()
[docs] async def first_udf_result(self) -> Any | None: """Scan forward for the first non-``None`` :attr:`~aerospike_sdk.record_result.RecordResult.udf_result`. Returns: The UDF return value, or ``None`` if no row carries a UDF result. Example:: value = await stream.first_udf_result() See Also: :meth:`Session.execute_udf`: Produces streams with UDF results. """ async for r in self: if r.udf_result is not None: return r.udf_result return None
[docs] async def collect(self) -> list[RecordResult]: """Drain the stream into a list (order preserved). Returns: All remaining :class:`~aerospike_sdk.record_result.RecordResult` instances. Example: rows = await stream.collect() oks = [r for r in rows if r.is_ok] """ results: list[RecordResult] = [] async for r in self: results.append(r) return results
[docs] async def failures(self) -> list[RecordResult]: """Drain the stream and return rows where :attr:`~aerospike_sdk.record_result.RecordResult.is_ok` is false. Returns: Only error or non-OK rows. Note: Like :meth:`collect`, this consumes the entire stream. """ return [r async for r in self if not r.is_ok]
[docs] def close(self) -> None: """Mark the stream closed; further :meth:`__anext__` calls stop iteration. Idempotent. Use when abandoning a stream early to cooperate with resource cleanup where supported. """ self._closed = True