Reading Data

All reads go through session.query(), which returns a QueryBuilder. Chain methods to configure the query, then call .execute() to get a RecordStream.

Point Read (Single Key)

users = DataSet.of("test", "users")

stream = await session.query(users.id(1)).execute()
result = await stream.first_or_raise()
print(result.record.bins)  # {'name': 'Alice', 'age': 30}

Set Scan (All Records)

stream = await session.query(users).execute()
async for result in stream:
    print(result.record.bins)
stream.close()

Batch Read (Multiple Keys)

stream = await session.query(*users.ids(1, 2, 3)).execute()
async for result in stream:
    print(result.record.key, result.record.bins)
stream.close()

Selecting Bins

Return only specific bins to reduce network transfer:

stream = await session.query(users).bins(["name", "age"]).execute()

Or exclude all bins (metadata only):

stream = await session.query(users).with_no_bins().execute()

Filtering with AEL

Use the Aerospike Expression Language to filter records server-side:

stream = await (
    session.query(users)
    .where("$.age > 25 and $.status == 'active'")
    .execute()
)

Or with a pre-built FilterExpression:

from aerospike_sdk import Exp

expr = Exp.and_([
    Exp.gt(Exp.int_bin("age"), Exp.int_val(25)),
    Exp.eq(Exp.string_bin("status"), Exp.string_val("active")),
])
stream = await session.query(users).where(expr).execute()

Partition Filtering

Query specific partitions for parallel consumption:

stream = await (
    session.query(users)
    .on_partitions(0, 1, 2)
    .execute()
)

Or a contiguous range:

stream = await (
    session.query(users)
    .on_partition_range(begin=0, count=1024)
    .execute()
)

Query Policies

Fine-tune query behavior:

from aerospike_async import QueryDuration

stream = await (
    session.query(users)
    .where("$.age > 18")
    .expected_duration(QueryDuration.LONG)
    .chunk_size(500)
    .execute()
)

RecordResult

Each item in the stream is a RecordResult:

async for result in stream:
    if result.is_ok:
        record = result.record
        print(record.key, record.bins, record.generation, record.expiration)
    else:
        print(f"Error: {result.result_code}")

Use record_or_raise() to raise on error results:

async for result in stream:
    record = result.record_or_raise()

Query Hints

Influence secondary index selection with QueryHint:

from aerospike_sdk import QueryHint

stream = await (
    session.query(users)
    .where("$.age > 25")
    .with_hint(QueryHint(index_name="age_idx"))
    .execute()
)