Source code for aerospike_sdk.sync.record_stream

# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

"""SyncRecordStream — synchronous wrapper around :class:`RecordStream`."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from aerospike_sdk.record_result import RecordResult

if TYPE_CHECKING:  # Not unused — avoids circular import; used in type annotations only.
    from aerospike_sdk.record_stream import RecordStream
    from aerospike_sdk.sync.client import _EventLoopManager


[docs] class SyncRecordStream: """Blocking view of :class:`~aerospike_sdk.record_stream.RecordStream`. Iterator protocol and helpers (``first``, ``collect``, …) call into the underlying async stream via the parent client's event-loop manager, so each step may block the calling thread until data is ready. **Behavior vs async:** Ordering and per-row semantics match :class:`~aerospike_sdk.record_stream.RecordStream`; only the API surface is synchronous. ``close()`` closes the async stream directly without crossing the loop (releases resources; pending async iteration should stop). Example:: with SyncClient("localhost:3000") as client: session = client.create_session() for row in session.query(ns, set).bins(["name"]).execute(): if row.record: print(row.record.bins) See Also: :class:`~aerospike_sdk.record_stream.RecordStream` """
[docs] def __init__(self, stream: RecordStream, loop_manager: _EventLoopManager) -> None: """Wrap ``stream`` for synchronous consumption (internal use).""" self._stream = stream self._loop_manager = loop_manager
# -- sync iteration ------------------------------------------------------ def __iter__(self) -> SyncRecordStream: return self def __next__(self) -> RecordResult: async def _next() -> RecordResult: return await self._stream.__anext__() try: return self._loop_manager.run_async(_next()) except StopAsyncIteration: raise StopIteration # -- convenience methods -------------------------------------------------
[docs] def first(self) -> RecordResult | None: """Return the first row, or ``None`` if the stream is empty. See Also: :meth:`~aerospike_sdk.record_stream.RecordStream.first` """ return self._loop_manager.run_async(self._stream.first())
[docs] def first_or_raise(self) -> RecordResult: """Return the first row or raise if the stream is empty or not OK. Raises: AerospikeError: On a failed row or empty-stream error from the async layer. See :meth:`~aerospike_sdk.record_stream.RecordStream.first_or_raise`. """ return self._loop_manager.run_async(self._stream.first_or_raise())
[docs] def first_udf_result(self) -> Any | None: """Return the first non-``None`` :attr:`~RecordResult.udf_result`.""" return self._loop_manager.run_async(self._stream.first_udf_result())
[docs] def collect(self) -> list[RecordResult]: """Drain the stream into a list (blocks until complete).""" return self._loop_manager.run_async(self._stream.collect())
[docs] def failures(self) -> list[RecordResult]: """Collect rows whose :attr:`~RecordResult.result_code` is not OK.""" return self._loop_manager.run_async(self._stream.failures())
[docs] def close(self) -> None: """Release the underlying async stream.""" self._stream.close()