RecordStream¶
- class aerospike_sdk.record_stream.RecordStream[source]¶
Bases:
objectAsync iterator of
RecordResultrows.Produced by
await session.query(...).execute()and similar APIs. Preferasync for row in stream, or helpers such ascollect()andfirst(). Do not callRecordStream(...)directly; use factories likefrom_list()orfrom_batch_records().Example
Typical consumption with
async for:stream = await session.query(key).bins(["name"]).execute() async for row in stream: if row.is_ok and row.record: print(row.record.bins)
See also
first_or_raise(): Assert a single OK row.- classmethod from_list(results)[source]¶
Wrap an already-materialised list of results.
- Return type:
- Example::
stream = RecordStream.from_list([row1, row2]) rows = await stream.collect()
- classmethod chain(streams)[source]¶
Yield all results from each stream in order.
- Return type:
- Example::
combined = RecordStream.chain([stream_a, stream_b])
- classmethod from_batch_records(batch_records)[source]¶
Wrap a sequence of async-client
BatchRecordobjects.- Return type:
- Example::
stream = RecordStream.from_batch_records(batch_records)
- classmethod from_pac_batch_stream(pac_stream, on_error=None)[source]¶
Lazy-feed adapter over a PAC
BatchRecordStream.The PAC stream yields
(idx, BatchRecord)tuples in completion order (the node that responds first yields first), not input order.idxis the position of the originating op in the input ops list; it’s mapped toRecordResult.indexso positional consumers can still recover input order viastream.collect()followed byresults.sort(key=lambda r: r.index)if needed.Per-key errors land on each
BatchRecord.result_codeand surface asRecordResultwithis_ok=False. Cluster-level errors raise from__anext__and are converted to PSDK exceptions via_convert_pac_exception().- Parameters:
pac_stream (
Any) – PACBatchRecordStreamto drain.on_error (
Optional[Callable[[Key,int,AerospikeError],None]]) – Optional(key, index, exception) -> Nonecallback. When set, per-key failures are dispatched to the handler and excluded from the returned stream; cluster-level errors still raise from__anext__.
- Return type:
- classmethod from_recordset(recordset)[source]¶
Wrap a
Recordset(async iterable ofRecord).Each yielded
Recordis converted to aRecordResultwithresult_code=OKandindex=-1(queries have no positional index).- Return type:
- Example::
stream = RecordStream.from_recordset(recordset)
- classmethod from_chunked_recordset(recordset, reexecute, limit=0)[source]¶
Wrap a
Recordsetfor chunked iteration.The stream yields records from the current chunk. Call
has_more_chunks()to advance to the next server chunk.- Parameters:
- Return type:
- classmethod from_single(key, record)[source]¶
Wrap a single-key result.
- Return type:
- Example::
stream = RecordStream.from_single(key, record)
- classmethod from_error(key, result_code, in_doubt=False, exception=None)[source]¶
Wrap a single-key error as a one-element stream.
- Return type:
- Example::
stream = RecordStream.from_error(key, ResultCode.TIMEOUT)
- async has_more_chunks()[source]¶
Check whether more server-side chunks remain.
On the first call this returns
Trueso the caller enters the iteration loop for the already-loaded first chunk. Subsequent calls inspect the server’sPartitionFiltercursor: if more partitions remain, a new query round-trip is issued transparently andTrueis returned.Returns
Falsewhen: * the server cursor is done (all partitions scanned), or * the overalllimithas been reached, or * the stream was not created withfrom_chunked_recordset().Example:
stream = await session.query(SET).chunk_size(10).execute() chunk = 0 while await stream.has_more_chunks(): chunk += 1 print(f"Chunk: {chunk}") async for rr in stream: print(rr.record.bins)
- Return type:
- async first()[source]¶
Consume and return the first row, or
Noneif there are no rows.- Return type:
- Returns:
The first
RecordResult, orNonewhen the stream is empty.
Note
This advances the iterator; remaining rows are left for further
async foror other helpers only if the underlying source allows partial consumption (most SDK streams are single-pass).Example:
stream = await session.query(key).execute() row = await stream.first() if row is None: ...
- async first_or_raise()[source]¶
Return the first row and require success (see
RecordResult.or_raise()).- Return type:
- Returns:
The first OK
RecordResult.- Raises:
StopAsyncIteration – If the stream yields no rows (empty).
AerospikeError – If the first row is not OK (from
RecordResult.or_raise()).
Example
rec = (await stream.first_or_raise()).record_or_raise()
- async first_udf_result()[source]¶
Scan forward for the first non-
Noneudf_result.- Example::
value = await stream.first_udf_result()
See also
Session.execute_udf(): Produces streams with UDF results.
- async collect()[source]¶
Drain the stream into a list (order preserved).
- Return type:
- Returns:
All remaining
RecordResultinstances.
Example
rows = await stream.collect() oks = [r for r in rows if r.is_ok]
SyncRecordStream¶
- class aerospike_sdk.sync.record_stream.SyncRecordStream[source]¶
Bases:
objectSynchronous iterator of
RecordResult.Produced by sync builder terminals (
execute()). Iterate with a regularforloop or use the helpers below (first,collect,failures, …). Single-pass — most underlying sources do not support resetting.Example:
for row in session.query(key).bins(["name"]).execute(): if row.is_ok and row.record: print(row.record.bins)
See also
aerospike_sdk.record_stream.RecordStream: async counterpart.- classmethod from_batch_records(batch_records)[source]¶
Wrap a list of PAC
BatchRecordobjects.- Return type:
- classmethod from_pac_batch_stream(pac_stream, on_error=None)[source]¶
Lazy-feed adapter over a PAC
BatchRecordStream(sync iter).See
aerospike_sdk.record_stream.RecordStream.from_pac_batch_stream()for the contract. This sync variant pulls(idx, BatchRecord)tuples via PAC’s blocking__iter__/__next__and maps each to aRecordResultwithindex=idx(NOT enumeration — completion order can differ from input order).- Parameters:
pac_stream (
Any) – PACBatchRecordStream(sync iter) to drain.on_error (
Optional[Callable[[Key,int,AerospikeError],None]]) – Optional(key, index, exception) -> Nonecallback. When set, per-key failures are dispatched to the handler and excluded from the returned stream; cluster-level errors still raise from__next__.
- Return type:
- classmethod from_pac_recordset(recordset)[source]¶
Wrap a PAC
Recordset(sync__iter__/__next__).Each yielded
Recordbecomes an OKRecordResultwithindex=-1(queries have no positional index).- Return type:
- classmethod from_chunked_pac_recordset(recordset, reexecute, limit=0)[source]¶
Wrap a PAC
Recordsetfor chunked iteration.reexecuteis a sync callable that takes the currentPartitionFilterand returns the nextRecordset(orNoneto stop). Usehas_more_chunks()to advance.- Return type:
- classmethod from_single(key, record)[source]¶
Wrap a single-key result.
Sets
result_code = OKwhenrecord is not None; otherwiseKEY_NOT_FOUND_ERROR.- Return type:
- classmethod from_error(key, result_code, in_doubt=False, exception=None)[source]¶
Wrap a single-key error as a one-element stream.
- Return type:
- first_or_raise()[source]¶
Return the first row, or raise if the stream is empty / not OK.
- Return type: