RecordStream

class aerospike_sdk.record_stream.RecordStream[source]

Bases: object

Async iterator of RecordResult rows.

Produced by await session.query(...).execute() and similar APIs. Prefer async for row in stream, or helpers such as collect() and first(). Do not call RecordStream(...) directly; use factories like from_list() or from_batch_records().

Example

Typical consumption with async for:

stream = await session.query(key).bins(["name"]).execute()
async for row in stream:
    if row.is_ok and row.record:
        print(row.record.bins)

See also

first_or_raise(): Assert a single OK row.

__init__(source)[source]
classmethod from_list(results)[source]

Wrap an already-materialised list of results.

Return type:

RecordStream

Example::

stream = RecordStream.from_list([row1, row2]) rows = await stream.collect()

classmethod chain(streams)[source]

Yield all results from each stream in order.

Return type:

RecordStream

Example::

combined = RecordStream.chain([stream_a, stream_b])

classmethod from_batch_records(batch_records)[source]

Wrap a sequence of async-client BatchRecord objects.

Return type:

RecordStream

Example::

stream = RecordStream.from_batch_records(batch_records)

classmethod from_pac_batch_stream(pac_stream, on_error=None)[source]

Lazy-feed adapter over a PAC BatchRecordStream.

The PAC stream yields (idx, BatchRecord) tuples in completion order (the node that responds first yields first), not input order. idx is the position of the originating op in the input ops list; it’s mapped to RecordResult.index so positional consumers can still recover input order via stream.collect() followed by results.sort(key=lambda r: r.index) if needed.

Per-key errors land on each BatchRecord.result_code and surface as RecordResult with is_ok=False. Cluster-level errors raise from __anext__ and are converted to PSDK exceptions via _convert_pac_exception().

Parameters:
  • pac_stream (Any) – PAC BatchRecordStream to drain.

  • on_error (Optional[Callable[[Key, int, AerospikeError], None]]) – Optional (key, index, exception) -> None callback. When set, per-key failures are dispatched to the handler and excluded from the returned stream; cluster-level errors still raise from __anext__.

Return type:

RecordStream

classmethod from_recordset(recordset)[source]

Wrap a Recordset (async iterable of Record).

Each yielded Record is converted to a RecordResult with result_code=OK and index=-1 (queries have no positional index).

Return type:

RecordStream

Example::

stream = RecordStream.from_recordset(recordset)

classmethod from_chunked_recordset(recordset, reexecute, limit=0)[source]

Wrap a Recordset for chunked iteration.

The stream yields records from the current chunk. Call has_more_chunks() to advance to the next server chunk.

Parameters:
  • recordset (Any) – The PAC Recordset from the first query call.

  • reexecute (Callable[[PartitionFilter], Awaitable[Any]]) – An async callable that accepts an updated PartitionFilter and returns a new Recordset.

  • limit (int) – Optional overall record limit (0 = unlimited).

Return type:

RecordStream

classmethod from_single(key, record)[source]

Wrap a single-key result.

Return type:

RecordStream

Example::

stream = RecordStream.from_single(key, record)

classmethod from_error(key, result_code, in_doubt=False, exception=None)[source]

Wrap a single-key error as a one-element stream.

Return type:

RecordStream

Example::

stream = RecordStream.from_error(key, ResultCode.TIMEOUT)

async has_more_chunks()[source]

Check whether more server-side chunks remain.

On the first call this returns True so the caller enters the iteration loop for the already-loaded first chunk. Subsequent calls inspect the server’s PartitionFilter cursor: if more partitions remain, a new query round-trip is issued transparently and True is returned.

Returns False when: * the server cursor is done (all partitions scanned), or * the overall limit has been reached, or * the stream was not created with from_chunked_recordset().

Example:

stream = await session.query(SET).chunk_size(10).execute()
chunk = 0
while await stream.has_more_chunks():
    chunk += 1
    print(f"Chunk: {chunk}")
    async for rr in stream:
        print(rr.record.bins)
Return type:

bool

async first()[source]

Consume and return the first row, or None if there are no rows.

Return type:

RecordResult | None

Returns:

The first RecordResult, or None when the stream is empty.

Note

This advances the iterator; remaining rows are left for further async for or other helpers only if the underlying source allows partial consumption (most SDK streams are single-pass).

Example:

stream = await session.query(key).execute()
row = await stream.first()
if row is None:
    ...
async first_or_raise()[source]

Return the first row and require success (see RecordResult.or_raise()).

Return type:

RecordResult

Returns:

The first OK RecordResult.

Raises:

Example

rec = (await stream.first_or_raise()).record_or_raise()

async first_udf_result()[source]

Scan forward for the first non-None udf_result.

Return type:

Any | None

Returns:

The UDF return value, or None if no row carries a UDF result.

Example::

value = await stream.first_udf_result()

See also

Session.execute_udf(): Produces streams with UDF results.

async collect()[source]

Drain the stream into a list (order preserved).

Return type:

list[RecordResult]

Returns:

All remaining RecordResult instances.

Example

rows = await stream.collect() oks = [r for r in rows if r.is_ok]

async failures()[source]

Drain the stream and return rows where is_ok is false.

Return type:

list[RecordResult]

Returns:

Only error or non-OK rows.

Note

Like collect(), this consumes the entire stream.

close()[source]

Mark the stream closed; further __anext__() calls stop iteration.

Idempotent. Use when abandoning a stream early to cooperate with resource cleanup where supported.

Return type:

None

SyncRecordStream

class aerospike_sdk.sync.record_stream.SyncRecordStream[source]

Bases: object

Synchronous iterator of RecordResult.

Produced by sync builder terminals (execute()). Iterate with a regular for loop or use the helpers below (first, collect, failures, …). Single-pass — most underlying sources do not support resetting.

Example:

for row in session.query(key).bins(["name"]).execute():
    if row.is_ok and row.record:
        print(row.record.bins)

See also

aerospike_sdk.record_stream.RecordStream: async counterpart.

__init__(source)[source]
classmethod from_list(results)[source]

Wrap an already-materialized list of results.

Return type:

SyncRecordStream

classmethod from_batch_records(batch_records)[source]

Wrap a list of PAC BatchRecord objects.

Return type:

SyncRecordStream

classmethod from_pac_batch_stream(pac_stream, on_error=None)[source]

Lazy-feed adapter over a PAC BatchRecordStream (sync iter).

See aerospike_sdk.record_stream.RecordStream.from_pac_batch_stream() for the contract. This sync variant pulls (idx, BatchRecord) tuples via PAC’s blocking __iter__/__next__ and maps each to a RecordResult with index=idx (NOT enumeration — completion order can differ from input order).

Parameters:
  • pac_stream (Any) – PAC BatchRecordStream (sync iter) to drain.

  • on_error (Optional[Callable[[Key, int, AerospikeError], None]]) – Optional (key, index, exception) -> None callback. When set, per-key failures are dispatched to the handler and excluded from the returned stream; cluster-level errors still raise from __next__.

Return type:

SyncRecordStream

classmethod from_pac_recordset(recordset)[source]

Wrap a PAC Recordset (sync __iter__ / __next__).

Each yielded Record becomes an OK RecordResult with index=-1 (queries have no positional index).

Return type:

SyncRecordStream

classmethod from_chunked_pac_recordset(recordset, reexecute, limit=0)[source]

Wrap a PAC Recordset for chunked iteration.

reexecute is a sync callable that takes the current PartitionFilter and returns the next Recordset (or None to stop). Use has_more_chunks() to advance.

Return type:

SyncRecordStream

classmethod from_single(key, record)[source]

Wrap a single-key result.

Sets result_code = OK when record is not None; otherwise KEY_NOT_FOUND_ERROR.

Return type:

SyncRecordStream

classmethod from_error(key, result_code, in_doubt=False, exception=None)[source]

Wrap a single-key error as a one-element stream.

Return type:

SyncRecordStream

classmethod chain(streams)[source]

Yield all results from each stream in order.

Return type:

SyncRecordStream

first()[source]

Consume and return the first row, or None if empty.

Return type:

Optional[RecordResult]

first_or_raise()[source]

Return the first row, or raise if the stream is empty / not OK.

Return type:

RecordResult

first_udf_result()[source]

Scan forward for the first non-None udf_result.

Return type:

Any | None

collect()[source]

Drain the stream into a list.

Return type:

list[RecordResult]

failures()[source]

Drain the stream, returning only rows whose is_ok is false.

Return type:

list[RecordResult]

close()[source]

Mark the stream closed; further iteration raises StopIteration.

Return type:

None

has_more_chunks()[source]

Return whether more server-side chunks remain.

First call always returns True so the caller enters the loop for the already-loaded first chunk. Subsequent calls inspect the cursor; new chunks are fetched transparently via reexecute.

Return type:

bool