Source code for aerospike_sdk.error_strategy
# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Error handling strategy for SDK operations.
Controls how per-record errors are surfaced during execution:
- **Default** (no argument): single-key operations raise immediately;
batch / multi-key operations embed errors in the ``RecordStream``.
- **ErrorStrategy.IN_STREAM**: always embed errors as ``RecordResult``
entries with non-OK result codes, even for single-key operations.
- **ErrorHandler** callback: errors are dispatched to the callback and
excluded from the returned stream.
"""
from __future__ import annotations
from enum import Enum
from typing import Callable, Union
from aerospike_async import Key
from aerospike_sdk.exceptions import AerospikeError
[docs]
class ErrorStrategy(Enum):
"""Strategy for handling per-record errors during execution.
Pass to ``execute(on_error=...)`` to override the default behavior.
Example::
stream = await (
session.query(k1, k2).execute(on_error=ErrorStrategy.IN_STREAM)
)
"""
IN_STREAM = "in_stream"
"""Embed errors in the ``RecordStream`` as ``RecordResult`` entries."""
ErrorHandler = Callable[[Key, int, AerospikeError], None]
"""Callback ``(key, index, exception) -> None`` for per-record error handling.
The callback receives the failed record's key, its position in the batch
(0-based; 0 for single-key, -1 for queries), and the typed exception.
Errors dispatched to the handler are excluded from the returned stream.
"""
OnError = Union[ErrorStrategy, ErrorHandler]
"""Type alias for the ``on_error`` parameter of ``execute()``."""
# ---------------------------------------------------------------------------
# Internal disposition (not part of public API)
# ---------------------------------------------------------------------------
class _ErrorDisposition(Enum):
"""Resolved error routing decision, threaded through execution internals."""
THROW = "throw"
IN_STREAM = "in_stream"
HANDLER = "handler"
def _resolve_disposition(
on_error: OnError | None,
is_single_key: bool,
) -> _ErrorDisposition:
"""Resolve user-facing ``on_error`` to an internal disposition.
When ``on_error`` is ``None``, the default depends on cardinality:
single-key operations raise (THROW), batch operations embed (IN_STREAM).
"""
if on_error is None:
return _ErrorDisposition.THROW if is_single_key else _ErrorDisposition.IN_STREAM
if isinstance(on_error, ErrorStrategy):
if on_error is ErrorStrategy.IN_STREAM:
return _ErrorDisposition.IN_STREAM
return _ErrorDisposition.HANDLER