Source code for aerospike_sdk.sync.transactional_session
# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Synchronous multi-record transaction (MRT) session."""
from __future__ import annotations
import types
from typing import Optional, TYPE_CHECKING
from aerospike_async import AbortStatus, CommitStatus, Txn
from aerospike_sdk.sync.session import SyncSession
if TYPE_CHECKING:
from aerospike_sdk.policy.behavior import Behavior
from aerospike_sdk.sync.client import SyncClient
[docs]
class SyncTransactionalSession(SyncSession):
"""Sync context manager grouping operations into a multi-record transaction.
Every session API (``query``, ``upsert``, ``insert``, ``batch``, ...)
works unchanged inside ``with``; the active :class:`~aerospike_async.Txn`
is threaded onto every policy the builders hand to the PAC.
On clean exit the transaction commits; if an exception propagates out
the transaction aborts. Explicit :meth:`commit`, :meth:`abort`, and
:meth:`rollback` (alias for ``abort``) are available for manual control.
Example:
>>> with client.create_session().begin_transaction() as tx:
... tx.upsert(accounts.id("A")).bin("balance").set_to(100).execute()
... tx.upsert(accounts.id("B")).bin("balance").set_to(200).execute()
See Also:
:meth:`SyncSession.begin_transaction`:
Preferred construction entry.
"""
[docs]
def __init__(self, client: SyncClient, behavior: Behavior) -> None:
"""Construct via :meth:`SyncSession.begin_transaction` rather than directly."""
super().__init__(client, behavior)
self._finalized = False
@property
def txn(self) -> Txn:
"""Return the active :class:`~aerospike_async.Txn`.
Raises:
RuntimeError: If the session has not been entered (no active txn).
"""
if self._txn is None:
raise RuntimeError("TransactionalSession is not active.")
return self._txn
@property
def active(self) -> bool:
"""``True`` when a transaction has been started and not yet finalized."""
return self._txn is not None and not self._finalized
[docs]
def commit(self) -> CommitStatus:
"""Commit the transaction and return the server-reported status."""
if self._txn is None or self._finalized:
raise RuntimeError("No active transaction to commit.")
status = self._pac_client.commit_blocking(self._txn)
self._finalized = True
self._txn = None
return status
[docs]
def abort(self) -> AbortStatus:
"""Abort the transaction and return the server-reported status."""
if self._txn is None or self._finalized:
raise RuntimeError("No active transaction to abort.")
status = self._pac_client.abort_blocking(self._txn)
self._finalized = True
self._txn = None
return status
[docs]
def rollback(self) -> AbortStatus:
"""Alias for :meth:`abort`."""
return self.abort()
def __enter__(self) -> SyncTransactionalSession:
if self._txn is not None:
raise RuntimeError("TransactionalSession is already active.")
self._txn = Txn()
self._finalized = False
return self
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[types.TracebackType],
) -> None:
if self._txn is None or self._finalized:
return
try:
if exc_type is None:
self.commit()
else:
self.abort()
finally:
self._finalized = True
self._txn = None