Source code for aerospike_sdk.aio.transactional_session

# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

"""TransactionalSession - Session for multi-record transactional operations (MRT)."""

from __future__ import annotations

from typing import Any, Optional, TYPE_CHECKING

from aerospike_async import AbortStatus, CommitStatus, Txn

from aerospike_sdk.aio.session import Session

if TYPE_CHECKING:
    from aerospike_sdk.aio.client import Client
    from aerospike_sdk.policy.behavior import Behavior


[docs] class TransactionalSession(Session): """Async context manager that groups operations into a multi-record transaction. Subclasses :class:`~aerospike_sdk.aio.session.Session`, so every session API (``query``, ``upsert``, ``insert``, ``batch``, ...) works unchanged inside ``async with``; builders capture the active :class:`~aerospike_async.Txn` via :meth:`~aerospike_sdk.aio.session.Session.get_current_transaction` and thread it onto every policy they hand to the PAC — the user never touches a policy. On clean exit the transaction is committed; if an exception propagates out of the block the transaction is aborted. Explicit :meth:`commit`, :meth:`abort`, and :meth:`rollback` (alias for ``abort``) are also available for manual control. Example: >>> async with client.create_session().begin_transaction() as tx: ... await tx.upsert(accounts.id("A")).bin("balance").set_to(100).execute() ... await tx.upsert(accounts.id("B")).bin("balance").set_to(200).execute() # Auto-committed on clean exit; auto-aborted on exception. See Also: :meth:`aerospike_sdk.aio.session.Session.begin_transaction` :meth:`aerospike_sdk.aio.client.Client.transaction_session` """
[docs] def __init__( self, client: "Client", behavior: Optional["Behavior"] = None, ) -> None: """Create a transactional session; prefer :meth:`Session.begin_transaction`. Args: client: Connected :class:`~aerospike_sdk.aio.client.Client`. behavior: Policy bundle for operations started from this session. Defaults to :attr:`Behavior.DEFAULT` when omitted. Note: Application code should not construct ``TransactionalSession`` directly; call :meth:`Session.begin_transaction` or :meth:`Client.transaction_session` instead. See Also: :meth:`aerospike_sdk.aio.session.Session.begin_transaction` """ if behavior is None: from aerospike_sdk.policy.behavior import Behavior as _Behavior behavior = _Behavior.DEFAULT super().__init__(client, behavior) # _txn is inherited from Session (initially None); __aenter__ sets it. self._finalized = False
@property def txn(self) -> Txn: """Return the underlying :class:`~aerospike_async.Txn`. Raises: RuntimeError: If the session has not been entered (no active txn). Returns: The active :class:`~aerospike_async.Txn`. """ if self._txn is None: raise RuntimeError( "TransactionalSession is not active; enter the 'async with' " "block before accessing .txn." ) return self._txn @property def active(self) -> bool: """``True`` when a transaction has been started and not yet finalized. Returns: Whether a transaction is currently active on this session. """ return self._txn is not None and not self._finalized
[docs] async def commit(self) -> CommitStatus: """Commit the transaction and return the server-reported status. Raises: RuntimeError: If the session has no active transaction. Returns: :class:`~aerospike_async.CommitStatus` reported by the server. See Also: :meth:`abort`: Undo the transaction instead of committing. """ if self._txn is None or self._finalized: raise RuntimeError("No active transaction to commit.") status = await self._client._async_client.commit(self._txn) self._finalized = True return status
[docs] async def abort(self) -> AbortStatus: """Abort the transaction and return the server-reported status. Raises: RuntimeError: If the session has no active transaction. Returns: :class:`~aerospike_async.AbortStatus` reported by the server. See Also: :meth:`commit`: Persist the transaction instead of aborting. :meth:`rollback`: Alias for this method. """ if self._txn is None or self._finalized: raise RuntimeError("No active transaction to abort.") status = await self._client._async_client.abort(self._txn) self._finalized = True return status
[docs] async def rollback(self) -> AbortStatus: """Alias for :meth:`abort`. Returns: :class:`~aerospike_async.AbortStatus` reported by the server. """ return await self.abort()
async def __aenter__(self) -> "TransactionalSession": if self._txn is not None: raise RuntimeError("TransactionalSession is already active.") self._txn = Txn() self._finalized = False return self async def __aexit__( self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[Any], ) -> None: if self._txn is None or self._finalized: return try: if exc_type is None: await self.commit() else: await self.abort() finally: self._finalized = True # Drop the txn reference so builders created after exit don't # accidentally participate in a finalized transaction. self._txn = None