Writing Data¶
All writes go through session entry points that return a
WriteSegmentBuilder. Chain bin operations, then
call .execute().
Write Verbs¶
Method |
Behavior |
Record must exist? |
|---|---|---|
|
Create or update |
No |
|
Create only |
No (fails if exists) |
|
Update only |
Yes (fails if missing) |
|
Replace all bins |
Yes (fails if missing) |
|
Remove record |
No |
|
Reset TTL |
Yes |
|
Check existence |
No |
Bin Chaining¶
Set individual bins with the chainable .bin().set_to() pattern:
users = DataSet.of("test", "users")
await (
session.upsert(users.id(1))
.bin("name").set_to("Alice")
.bin("age").set_to(30)
.bin("active").set_to(True)
.execute()
)
Dict Pattern¶
Set multiple bins at once with .put():
await (
session.upsert(users.id(1))
.put({"name": "Alice", "age": 30, "active": True})
.execute()
)
Increment¶
await (
session.update(users.id(1))
.bin("login_count").increment_by(1)
.execute()
)
GeoJSON Bins¶
Use set_to_geo_json(...) to write a bin as a GeoJSON value. The bin’s
server-side particle type is GEOJSON, not STRING, which makes it eligible for
GEO2DSPHERE indexing and geoCompare(...) queries.
places = DataSet.of("test", "places")
await (
session.upsert(places.id("space_needle"))
.bin("loc").set_to_geo_json('{"type":"Point","coordinates":[-122.349,47.620]}')
.execute()
)
AeroCircle and Polygon values use the same method — only the GeoJSON string differs.
HyperLogLog Bins¶
HllConfig describes a sketch’s precision
(index_bit_count + optional min_hash_bit_count). Initialize a new sketch
with hll_init(...), then accumulate elements with hll_add(...):
from aerospike_sdk import HllConfig
visitors = DataSet.of("test", "visitors")
await (
session.upsert(visitors.id("day_1"))
.bin("h").hll_init(HllConfig.of(14))
.bin("h").hll_add(["user-1", "user-2", "user-3"])
.execute()
)
Each write method (hll_init, hll_add, hll_set_union) accepts four
keyword-only flags:
Flag |
Effect |
|---|---|
|
Fail with |
|
Fail with |
|
Suppress the mode-constraint error from |
|
( |
Passing both create_only=True and update_only=True raises ValueError
immediately at the call site (before the wire request).
# Create the sketch only if it doesn't exist; don't error if it does.
await (
session.upsert(visitors.id("day_2"))
.bin("h").hll_init(HllConfig.of(14), create_only=True, no_fail=True)
.execute()
)
To inspect a sketch’s bit widths, call hll_describe() and decode the
two-element list result via RecordResult.get_hll_config(bin_name):
rs = await session.query(visitors.id("day_1")).bin("h").hll_describe().execute()
result = await rs.first_or_raise()
config = result.get_hll_config("h")
# config == HllConfig.of(14)
Insert (Fail if Exists)¶
stream = await (
session.insert(users.id(99))
.put({"name": "New User"})
.execute()
)
result = await stream.first_or_raise()
Replace (Overwrite All Bins)¶
await (
session.replace(users.id(1))
.put({"name": "Alice Updated"})
.execute()
)
# Only "name" bin remains; "age" and "active" are removed
Delete¶
# Single key
await session.delete(users.id(1)).execute()
# Multiple keys
await session.delete(*users.ids(1, 2, 3)).execute()
Durable delete¶
Aerospike supports two delete modes:
a normal delete that removes the record (and its lineage) outright, and
a durable delete that leaves a tombstone so a strongly-consistent (SC) cluster can resolve the deletion across partitions.
The SDK exposes both per-operation overrides and builder defaults:
Method |
Scope |
Effect |
|---|---|---|
|
one operation |
Force durable delete for this delete only |
|
one operation |
Force a non-durable delete for this delete only |
|
builder |
Prefer durable when resolving Behavior defaults — typical for SC namespaces |
|
builder |
Prefer non-durable when resolving Behavior defaults |
The override (with_* / without_*) wins over the default; the default
folds into Behavior settings resolution.
# Force durable on this single delete (per-op override)
await session.delete(users.id(5)).with_durable_delete().execute()
# Use durable as the default for every delete in this segment (SC-friendly)
await (
session.delete(*users.ids(1, 2, 3))
.default_with_durable_delete()
.execute()
)
Conditional Writes¶
Filter writes server-side with .where():
await (
session.update(users.id(1))
.where("$.age >= 18")
.bin("verified").set_to(True)
.execute()
)
Records that don’t match the filter are skipped. Use .fail_on_filtered_out()
to raise an error instead:
await (
session.update(users.id(1))
.where("$.age >= 18")
.fail_on_filtered_out()
.bin("verified").set_to(True)
.execute()
)
Generation Check (Optimistic Locking)¶
await (
session.update(users.id(1))
.ensure_generation_is(5)
.bin("balance").set_to(100)
.execute()
)
TTL / Expiration¶
await (
session.upsert(users.id(1))
.expire_record_after_seconds(3600)
.put({"session_token": "abc123"})
.execute()
)
Batch Writes¶
Multiple keys with the same operation:
await (
session.upsert(*users.ids(1, 2, 3))
.bin("status").set_to("migrated")
.execute()
)
Mixed operations across different keys are handled automatically when you chain multiple write segments.
execute() vs execute_stream()¶
The batch builder exposes two terminal methods with different result-delivery semantics:
execute()— buffered. Awaits every per-key result, then returns aRecordStreambacked by a fully-materialized list. Writes are guaranteed to have completed server-side by the time this method returns; subsequent reads observe the new state without races. Safe for “fire-and-forget” use (await the call and discard the returned stream). Use this for most workloads.execute_stream()— lazy. Returns aRecordStreamthat yields oneRecordResultper__anext__(__next__on sync) as the cluster responds. First record arrives at first-RTT, not after all keys complete. Peak memory is bounded — useful for large batches where buffering the full result list would be expensive.Caveats for
execute_stream():Yields completion order, not input order. Each
RecordResultcarries its originating op’s input position in.index. Sort after collecting if you need positional results.No writes-complete-on-return guarantee. If you discard the returned stream without iterating, per-node tasks may still be in-flight when subsequent code runs — subsequent reads can race against pending writes.
Per-key errors land inline on
RecordResult(.is_ok=False,.exception); cluster-level errors raise mid-iteration.
# Lazy streaming — process records as they arrive
stream = await (
session.batch()
.upsert(key1).bin("v").set_to(1)
.upsert(key2).bin("v").set_to(2)
.execute_stream()
)
async for result in stream:
print(result.index, result.is_ok, result.key.value)
# When positional ordering is needed, collect + sort
stream = await (...).execute_stream()
results = await stream.collect()
results.sort(key=lambda r: r.index)
Sync siblings: SyncBatchOperationBuilder.execute() and .execute_stream()
have the same contract; iterate the latter with for result in stream (rather
than async for).