Source code for aerospike_sdk.sync.record_stream

# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

"""SyncRecordStream — pure-sync iterator of :class:`RecordResult` rows.

Does not wrap an async :class:`RecordStream`. Sources are sync iterables —
typically a PAC :class:`Recordset`, a list of ``BatchRecord``, or a
materialized list of :class:`RecordResult`.

Factory classmethods mirror :class:`aerospike_sdk.record_stream.RecordStream`
so callers that already use ``from_list`` / ``from_batch_records`` /
``from_recordset`` / ``from_single`` / ``from_error`` / ``chain`` keep
the same shape.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Callable, Iterator, Optional, Sequence

from aerospike_async import Key, ResultCode
from aerospike_sdk.record_result import RecordResult, batch_records_to_results

if TYPE_CHECKING:
    from aerospike_async import Record
    from aerospike_sdk.error_strategy import ErrorHandler
    from aerospike_sdk.exceptions import AerospikeError


[docs] class SyncRecordStream: """Synchronous iterator of :class:`~aerospike_sdk.record_result.RecordResult`. Produced by sync builder terminals (``execute()``). Iterate with a regular ``for`` loop or use the helpers below (``first``, ``collect``, ``failures``, ...). Single-pass — most underlying sources do not support resetting. Example:: for row in session.query(key).bins(["name"]).execute(): if row.is_ok and row.record: print(row.record.bins) See Also: :class:`aerospike_sdk.record_stream.RecordStream`: async counterpart. """ __slots__ = ( "_source", "_closed", "_single_result", # Chunked-recordset state (set by from_chunked_pac_recordset; left # unset for non-chunked streams). Slotted to allow assignment. "_chunked", "_chunk_recordset", "_chunk_reexecute", "_chunk_limit", "_chunk_count", "_chunk_first", "_counter_ref", )
[docs] def __init__(self, source: Iterator[RecordResult]) -> None: self._source = source self._closed = False self._single_result: Optional[RecordResult] = None
# -- factory constructors ------------------------------------------------
[docs] @classmethod def from_list(cls, results: Sequence[RecordResult]) -> "SyncRecordStream": """Wrap an already-materialized list of results.""" return cls(iter(results))
[docs] @classmethod def from_batch_records(cls, batch_records: Sequence) -> "SyncRecordStream": """Wrap a list of PAC ``BatchRecord`` objects.""" return cls.from_list(batch_records_to_results(list(batch_records)))
[docs] @classmethod def from_pac_batch_stream( cls, pac_stream: Any, on_error: "ErrorHandler | None" = None, ) -> "SyncRecordStream": """Lazy-feed adapter over a PAC ``BatchRecordStream`` (sync iter). See :meth:`aerospike_sdk.record_stream.RecordStream.from_pac_batch_stream` for the contract. This sync variant pulls ``(idx, BatchRecord)`` tuples via PAC's blocking ``__iter__``/``__next__`` and maps each to a :class:`RecordResult` with ``index=idx`` (NOT enumeration — completion order can differ from input order). Args: pac_stream: PAC ``BatchRecordStream`` (sync iter) to drain. on_error: Optional ``(key, index, exception) -> None`` callback. When set, per-key failures are dispatched to the handler and excluded from the returned stream; cluster-level errors still raise from ``__next__``. """ from aerospike_sdk.exceptions import _convert_pac_exception, _result_code_to_exception def _gen() -> Iterator[RecordResult]: try: for idx, br in pac_stream: rc = ( br.result_code if br.result_code is not None else ResultCode.OK ) if on_error is not None and rc != ResultCode.OK: on_error(br.key, idx, _result_code_to_exception( rc, str(rc), br.in_doubt)) continue yield RecordResult( key=br.key, record=br.record, result_code=rc, in_doubt=br.in_doubt, index=idx, ) except Exception as e: raise _convert_pac_exception(e) from e return cls(_gen())
[docs] @classmethod def from_pac_recordset(cls, recordset: Any) -> "SyncRecordStream": """Wrap a PAC ``Recordset`` (sync ``__iter__`` / ``__next__``). Each yielded ``Record`` becomes an OK :class:`RecordResult` with ``index=-1`` (queries have no positional index). """ def _gen() -> Iterator[RecordResult]: for record in recordset: key = ( record.key if hasattr(record, "key") and record.key is not None else Key("", "", 0) ) yield RecordResult( key=key, record=record, result_code=ResultCode.OK, ) return cls(_gen())
[docs] @classmethod def from_chunked_pac_recordset( cls, recordset: Any, reexecute: Callable[[Any], Any], limit: int = 0, ) -> "SyncRecordStream": """Wrap a PAC ``Recordset`` for chunked iteration. ``reexecute`` is a *sync* callable that takes the current :class:`PartitionFilter` and returns the next ``Recordset`` (or ``None`` to stop). Use :meth:`has_more_chunks` to advance. """ inst = cls(iter([])) # placeholder source; replaced below inst._chunked = True # type: ignore[attr-defined] inst._chunk_recordset = recordset # type: ignore[attr-defined] inst._chunk_reexecute = reexecute # type: ignore[attr-defined] inst._chunk_limit = limit # type: ignore[attr-defined] inst._chunk_count = 0 # type: ignore[attr-defined] inst._chunk_first = True # type: ignore[attr-defined] inst._counter_ref = [0] # type: ignore[attr-defined] inst._source = _chunked_iter(recordset, limit, inst._counter_ref) # type: ignore[attr-defined] return inst
[docs] @classmethod def from_single( cls, key: Key, record: Optional["Record"], ) -> "SyncRecordStream": """Wrap a single-key result. Sets ``result_code = OK`` when ``record is not None``; otherwise ``KEY_NOT_FOUND_ERROR``. """ rc = ResultCode.OK if record is not None else ResultCode.KEY_NOT_FOUND_ERROR result = RecordResult(key=key, record=record, result_code=rc, index=0) inst = cls(iter([result])) inst._single_result = result return inst
[docs] @classmethod def from_error( cls, key: Key, result_code: ResultCode, in_doubt: bool = False, exception: "Optional[AerospikeError]" = None, ) -> "SyncRecordStream": """Wrap a single-key error as a one-element stream.""" return cls.from_list([RecordResult( key=key, record=None, result_code=result_code, in_doubt=in_doubt, index=0, exception=exception, )])
[docs] @classmethod def chain(cls, streams: Sequence["SyncRecordStream"]) -> "SyncRecordStream": """Yield all results from each stream in order.""" def _gen() -> Iterator[RecordResult]: for st in streams: yield from st return cls(_gen())
# -- sync iteration ------------------------------------------------------ def __iter__(self) -> "SyncRecordStream": return self def __next__(self) -> RecordResult: if self._closed: raise StopIteration return next(self._source) # -- convenience helpers -------------------------------------------------
[docs] def first(self) -> Optional[RecordResult]: """Consume and return the first row, or ``None`` if empty.""" r = self._single_result if r is not None: self._single_result = None self._closed = True return r try: return next(self) except StopIteration: return None
[docs] def first_or_raise(self) -> RecordResult: """Return the first row, or raise if the stream is empty / not OK.""" result = self.first() if result is None: raise StopIteration("SyncRecordStream is empty") return result.or_raise()
[docs] def first_udf_result(self) -> Any | None: """Scan forward for the first non-``None`` ``udf_result``.""" for r in self: if r.udf_result is not None: return r.udf_result return None
[docs] def collect(self) -> list[RecordResult]: """Drain the stream into a list.""" return list(self)
[docs] def failures(self) -> list[RecordResult]: """Drain the stream, returning only rows whose ``is_ok`` is false.""" return [r for r in self if not r.is_ok]
[docs] def close(self) -> None: """Mark the stream closed; further iteration raises ``StopIteration``.""" self._closed = True
# -- chunked iteration (rare, only used by partition-resumable queries) --
[docs] def has_more_chunks(self) -> bool: """Return whether more server-side chunks remain. First call always returns ``True`` so the caller enters the loop for the already-loaded first chunk. Subsequent calls inspect the cursor; new chunks are fetched transparently via ``reexecute``. """ chunked = getattr(self, "_chunked", False) if not chunked: first = getattr(self, "_chunk_first", True) if first: self._chunk_first = False # type: ignore[attr-defined] return True return False if self._chunk_first: # type: ignore[attr-defined] self._chunk_first = False # type: ignore[attr-defined] return True if 0 < self._chunk_limit <= self._chunk_count: # type: ignore[attr-defined] return False # PAC Recordset's partition_filter() is currently async. For the # sync path we rely on the recordset object to expose a # `partition_filter_sync()` method, OR the reexecute callable to # handle the cursor advance internally and return None when done. pf_getter = getattr(self._chunk_recordset, "partition_filter_sync", None) # type: ignore[attr-defined] if pf_getter is None: return False pf = pf_getter() if pf is None or pf.done(): return False counted_so_far = self._counter_ref[0] # type: ignore[attr-defined] if self._chunk_reexecute is None: # type: ignore[attr-defined] return False new_recordset = self._chunk_reexecute(pf) # type: ignore[attr-defined] if new_recordset is None: return False self._chunk_recordset = new_recordset # type: ignore[attr-defined] self._chunk_count = counted_so_far # type: ignore[attr-defined] self._source = _chunked_iter( new_recordset, self._chunk_limit, self._counter_ref, # type: ignore[attr-defined] ) self._closed = False return True
def _chunked_iter( recordset: Any, limit: int, counter: list, ) -> Iterator[RecordResult]: """Iterator that counts records and stops at ``limit`` (0 = unlimited).""" for record in recordset: if 0 < limit <= counter[0]: return key = ( record.key if hasattr(record, "key") and record.key is not None else Key("", "", 0) ) counter[0] += 1 yield RecordResult( key=key, record=record, result_code=ResultCode.OK, )