RecordStream

class aerospike_sdk.record_stream.RecordStream[source]

Bases: object

Async iterator of RecordResult rows.

Produced by await session.query(...).execute() and similar APIs. Prefer async for row in stream, or helpers such as collect() and first(). Do not call RecordStream(...) directly; use factories like from_list() or from_batch_records().

Example

Typical consumption with async for:

stream = await session.query(key).bins(["name"]).execute()
async for row in stream:
    if row.is_ok and row.record:
        print(row.record.bins)

See also

first_or_raise(): Assert a single OK row.

__init__(source)[source]
classmethod from_list(results)[source]

Wrap an already-materialised list of results.

Return type:

RecordStream

Example::

stream = RecordStream.from_list([row1, row2]) rows = await stream.collect()

classmethod chain(streams)[source]

Yield all results from each stream in order.

Return type:

RecordStream

Example::

combined = RecordStream.chain([stream_a, stream_b])

classmethod from_batch_records(batch_records)[source]

Wrap a sequence of async-client BatchRecord objects.

Return type:

RecordStream

Example::

stream = RecordStream.from_batch_records(batch_records)

classmethod from_recordset(recordset)[source]

Wrap a Recordset (async iterable of Record).

Each yielded Record is converted to a RecordResult with result_code=OK and index=-1 (queries have no positional index).

Return type:

RecordStream

Example::

stream = RecordStream.from_recordset(recordset)

classmethod from_chunked_recordset(recordset, reexecute, limit=0)[source]

Wrap a Recordset for chunked iteration.

The stream yields records from the current chunk. Call has_more_chunks() to advance to the next server chunk.

Parameters:
  • recordset (Any) – The PAC Recordset from the first query call.

  • reexecute (Callable[[PartitionFilter], Awaitable[Any]]) – An async callable that accepts an updated PartitionFilter and returns a new Recordset.

  • limit (int) – Optional overall record limit (0 = unlimited).

Return type:

RecordStream

classmethod from_single(key, record)[source]

Wrap a single-key result.

Return type:

RecordStream

Example::

stream = RecordStream.from_single(key, record)

classmethod from_error(key, result_code, in_doubt=False, exception=None)[source]

Wrap a single-key error as a one-element stream.

Return type:

RecordStream

Example::

stream = RecordStream.from_error(key, ResultCode.TIMEOUT)

async has_more_chunks()[source]

Check whether more server-side chunks remain.

On the first call this returns True so the caller enters the iteration loop for the already-loaded first chunk. Subsequent calls inspect the server’s PartitionFilter cursor: if more partitions remain, a new query round-trip is issued transparently and True is returned.

Returns False when: * the server cursor is done (all partitions scanned), or * the overall limit has been reached, or * the stream was not created with from_chunked_recordset().

Example:

stream = await session.query(SET).chunk_size(10).execute()
chunk = 0
while await stream.has_more_chunks():
    chunk += 1
    print(f"Chunk: {chunk}")
    async for rr in stream:
        print(rr.record.bins)
Return type:

bool

async first()[source]

Consume and return the first row, or None if there are no rows.

Return type:

RecordResult | None

Returns:

The first RecordResult, or None when the stream is empty.

Note

This advances the iterator; remaining rows are left for further async for or other helpers only if the underlying source allows partial consumption (most SDK streams are single-pass).

Example:

stream = await session.query(key).execute()
row = await stream.first()
if row is None:
    ...
async first_or_raise()[source]

Return the first row and require success (see RecordResult.or_raise()).

Return type:

RecordResult

Returns:

The first OK RecordResult.

Raises:

Example

rec = (await stream.first_or_raise()).record_or_raise()

async first_udf_result()[source]

Scan forward for the first non-None udf_result.

Return type:

Any | None

Returns:

The UDF return value, or None if no row carries a UDF result.

Example::

value = await stream.first_udf_result()

See also

Session.execute_udf(): Produces streams with UDF results.

async collect()[source]

Drain the stream into a list (order preserved).

Return type:

list[RecordResult]

Returns:

All remaining RecordResult instances.

Example

rows = await stream.collect() oks = [r for r in rows if r.is_ok]

async failures()[source]

Drain the stream and return rows where is_ok is false.

Return type:

list[RecordResult]

Returns:

Only error or non-OK rows.

Note

Like collect(), this consumes the entire stream.

close()[source]

Mark the stream closed; further __anext__() calls stop iteration.

Idempotent. Use when abandoning a stream early to cooperate with resource cleanup where supported.

Return type:

None

SyncRecordStream

class aerospike_sdk.sync.record_stream.SyncRecordStream[source]

Bases: object

Blocking view of RecordStream.

Iterator protocol and helpers (first, collect, …) call into the underlying async stream via the parent client’s event-loop manager, so each step may block the calling thread until data is ready.

Behavior vs async: Ordering and per-row semantics match RecordStream; only the API surface is synchronous. close() closes the async stream directly without crossing the loop (releases resources; pending async iteration should stop).

Example:

with SyncClient("localhost:3000") as client:
    session = client.create_session()
    for row in session.query(ns, set).bins(["name"]).execute():
        if row.record:
            print(row.record.bins)

See also

RecordStream

__init__(stream, loop_manager)[source]

Wrap stream for synchronous consumption (internal use).

first()[source]

Return the first row, or None if the stream is empty.

See also

first()

Return type:

RecordResult | None

first_or_raise()[source]

Return the first row or raise if the stream is empty or not OK.

Raises:

AerospikeError – On a failed row or empty-stream error from the async layer. See first_or_raise().

Return type:

RecordResult

first_udf_result()[source]

Return the first non-None udf_result.

Return type:

Any | None

collect()[source]

Drain the stream into a list (blocks until complete).

Return type:

list[RecordResult]

failures()[source]

Collect rows whose result_code is not OK.

Return type:

list[RecordResult]

close()[source]

Release the underlying async stream.

Return type:

None