RecordStream¶
- class aerospike_sdk.record_stream.RecordStream[source]¶
Bases:
objectAsync iterator of
RecordResultrows.Produced by
await session.query(...).execute()and similar APIs. Preferasync for row in stream, or helpers such ascollect()andfirst(). Do not callRecordStream(...)directly; use factories likefrom_list()orfrom_batch_records().Example
Typical consumption with
async for:stream = await session.query(key).bins(["name"]).execute() async for row in stream: if row.is_ok and row.record: print(row.record.bins)
See also
first_or_raise(): Assert a single OK row.- classmethod from_list(results)[source]¶
Wrap an already-materialised list of results.
- Return type:
- Example::
stream = RecordStream.from_list([row1, row2]) rows = await stream.collect()
- classmethod chain(streams)[source]¶
Yield all results from each stream in order.
- Return type:
- Example::
combined = RecordStream.chain([stream_a, stream_b])
- classmethod from_batch_records(batch_records)[source]¶
Wrap a sequence of async-client
BatchRecordobjects.- Return type:
- Example::
stream = RecordStream.from_batch_records(batch_records)
- classmethod from_recordset(recordset)[source]¶
Wrap a
Recordset(async iterable ofRecord).Each yielded
Recordis converted to aRecordResultwithresult_code=OKandindex=-1(queries have no positional index).- Return type:
- Example::
stream = RecordStream.from_recordset(recordset)
- classmethod from_chunked_recordset(recordset, reexecute, limit=0)[source]¶
Wrap a
Recordsetfor chunked iteration.The stream yields records from the current chunk. Call
has_more_chunks()to advance to the next server chunk.- Parameters:
- Return type:
- classmethod from_single(key, record)[source]¶
Wrap a single-key result.
- Return type:
- Example::
stream = RecordStream.from_single(key, record)
- classmethod from_error(key, result_code, in_doubt=False, exception=None)[source]¶
Wrap a single-key error as a one-element stream.
- Return type:
- Example::
stream = RecordStream.from_error(key, ResultCode.TIMEOUT)
- async has_more_chunks()[source]¶
Check whether more server-side chunks remain.
On the first call this returns
Trueso the caller enters the iteration loop for the already-loaded first chunk. Subsequent calls inspect the server’sPartitionFiltercursor: if more partitions remain, a new query round-trip is issued transparently andTrueis returned.Returns
Falsewhen: * the server cursor is done (all partitions scanned), or * the overalllimithas been reached, or * the stream was not created withfrom_chunked_recordset().Example:
stream = await session.query(SET).chunk_size(10).execute() chunk = 0 while await stream.has_more_chunks(): chunk += 1 print(f"Chunk: {chunk}") async for rr in stream: print(rr.record.bins)
- Return type:
- async first()[source]¶
Consume and return the first row, or
Noneif there are no rows.- Return type:
- Returns:
The first
RecordResult, orNonewhen the stream is empty.
Note
This advances the iterator; remaining rows are left for further
async foror other helpers only if the underlying source allows partial consumption (most SDK streams are single-pass).Example:
stream = await session.query(key).execute() row = await stream.first() if row is None: ...
- async first_or_raise()[source]¶
Return the first row and require success (see
RecordResult.or_raise()).- Return type:
- Returns:
The first OK
RecordResult.- Raises:
StopAsyncIteration – If the stream yields no rows (empty).
AerospikeError – If the first row is not OK (from
RecordResult.or_raise()).
Example
rec = (await stream.first_or_raise()).record_or_raise()
- async first_udf_result()[source]¶
Scan forward for the first non-
Noneudf_result.- Example::
value = await stream.first_udf_result()
See also
Session.execute_udf(): Produces streams with UDF results.
- async collect()[source]¶
Drain the stream into a list (order preserved).
- Return type:
- Returns:
All remaining
RecordResultinstances.
Example
rows = await stream.collect() oks = [r for r in rows if r.is_ok]
SyncRecordStream¶
- class aerospike_sdk.sync.record_stream.SyncRecordStream[source]¶
Bases:
objectBlocking view of
RecordStream.Iterator protocol and helpers (
first,collect, …) call into the underlying async stream via the parent client’s event-loop manager, so each step may block the calling thread until data is ready.Behavior vs async: Ordering and per-row semantics match
RecordStream; only the API surface is synchronous.close()closes the async stream directly without crossing the loop (releases resources; pending async iteration should stop).Example:
with SyncClient("localhost:3000") as client: session = client.create_session() for row in session.query(ns, set).bins(["name"]).execute(): if row.record: print(row.record.bins)
See also
- first_or_raise()[source]¶
Return the first row or raise if the stream is empty or not OK.
- Raises:
AerospikeError – On a failed row or empty-stream error from the async layer. See
first_or_raise().- Return type: