# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Background monitor that caches secondary index metadata for transparent
filter generation.
The :class:`IndexesMonitor` runs a daemon thread that periodically queries
the cluster via ``sindex-list`` and ``sindex-stat`` info commands, converts
the responses into :class:`~aerospike_sdk.ael.filter_gen.Index` objects, and
stores them in per-namespace
:class:`~aerospike_sdk.ael.filter_gen.IndexContext` caches.
The thread uses PAC's blocking info APIs, so the monitor works identically
for the async :class:`~aerospike_sdk.aio.client.Client` and the synchronous
:class:`~aerospike_sdk.sync.client.SyncClient` — no event loop required.
"""
from __future__ import annotations
import logging
import threading
from typing import TYPE_CHECKING, Dict, List, Optional
from aerospike_async.exceptions import AerospikeError as PacError
from aerospike_sdk.ael.filter_gen import Index, IndexContext, IndexTypeEnum
if TYPE_CHECKING: # avoids circular import; used in type annotations only.
from aerospike_async import Client as PacClient
log = logging.getLogger("aerospike_sdk.index_monitor")
_DEFAULT_REFRESH_INTERVAL: float = 5.0
_DEFAULT_READY_TIMEOUT: float = 30.0
_SINDEX_TYPE_MAP: Dict[str, IndexTypeEnum] = {
"numeric": IndexTypeEnum.NUMERIC,
# Server 8.1.2+ reports integer indexes as ``type=integer``; older servers
# report ``type=numeric``. Both collapse to the same internal enum.
"integer": IndexTypeEnum.NUMERIC,
"string": IndexTypeEnum.STRING,
"geo2dsphere": IndexTypeEnum.GEO2D_SPHERE,
"blob": IndexTypeEnum.BLOB,
}
def _parse_sindex_list(raw_responses: Dict[str, Dict[str, str]]) -> List[Dict[str, str]]:
"""Parse ``info_on_all_nodes("sindex-list")`` into deduplicated index dicts.
Server response is semicolon-separated entries where each entry contains
colon-separated ``key=value`` pairs, e.g.::
ns=test:indexname=age_idx:set=users:bin=age:type=numeric:indextype=default:...
"""
index_map: Dict[str, Dict[str, str]] = {}
for node_response in raw_responses.values():
for value in node_response.values():
if not isinstance(value, str) or not value:
continue
for entry in value.split(";"):
entry = entry.strip()
if not entry:
continue
fields: Dict[str, str] = {}
for token in entry.split(":"):
if "=" in token:
k, v = token.split("=", 1)
fields[k] = v
index_name = fields.get("indexname")
if not index_name or index_name in index_map:
continue
ns = fields.get("ns")
bin_name = fields.get("bin")
if not ns or not bin_name:
continue
rec: Dict[str, str] = {
"ns": ns,
"set": fields.get("set", ""),
"bin": bin_name,
"indexname": index_name,
}
if "type" in fields:
rec["type"] = fields["type"]
if "indextype" in fields:
rec["indextype"] = fields["indextype"]
if "context" in fields:
rec["context"] = fields["context"]
index_map[index_name] = rec
return list(index_map.values())
def _parse_entries_per_bval(raw_response: Dict[str, str]) -> Optional[float]:
"""Extract ``entries_per_bval`` from an ``sindex-stat`` info response."""
for value in raw_response.values():
if not isinstance(value, str):
continue
for token in value.split(";"):
token = token.strip()
if token.startswith("entries_per_bval="):
try:
return float(token.split("=", 1)[1])
except (ValueError, IndexError):
pass
return None
def _stat_entries_per_bval_blocking(
pac_client: "PacClient", ns: str, indexname: str,
) -> Optional[float]:
try:
stat_resp = pac_client.info_blocking(
f"sindex-stat:namespace={ns};indexname={indexname}",
)
return _parse_entries_per_bval(stat_resp)
except PacError:
log.debug(
"Failed to fetch sindex-stat for %s.%s",
ns, indexname, exc_info=True,
)
return None
def _fetch_indexes_blocking(pac_client: "PacClient") -> Dict[str, IndexContext]:
"""Fetch all secondary indexes and return per-namespace ``IndexContext`` caches."""
raw = pac_client.info_on_all_nodes_blocking("sindex-list")
entries = _parse_sindex_list(raw)
indexes_by_ns: Dict[str, List[Index]] = {}
for entry in entries:
ns = entry["ns"]
bval = _stat_entries_per_bval_blocking(pac_client, ns, entry["indexname"])
idx_type = _SINDEX_TYPE_MAP.get(
entry.get("type", "").lower(), IndexTypeEnum.NUMERIC,
)
index = Index(
bin=entry["bin"],
index_type=idx_type,
namespace=ns,
name=entry["indexname"],
bin_values_ratio=bval,
set_name=entry.get("set") or None,
)
indexes_by_ns.setdefault(ns, []).append(index)
return {
ns: IndexContext.of(ns, idxs) for ns, idxs in indexes_by_ns.items()
}
[docs]
class IndexesMonitor:
"""Daemon-thread background monitor that caches secondary index metadata.
Matches the JSDK ``IndexesMonitor`` design: a single daemon thread polls
the cluster's info APIs at a fixed interval and refreshes an in-memory
cache. Readers (sync or async builders) consult the cache through
:meth:`get_index_context`, which is non-blocking.
The monitor starts lazily: :meth:`start` is invoked by the query
builders on the first AEL ``where()`` query that needs cached metadata,
not by ``Client.connect``. Callers that never use AEL filters pay zero
daemon-thread cost. :meth:`stop` is called from the matching ``close``
paths and is a no-op when the monitor never started.
Example::
monitor = IndexesMonitor()
monitor.start(pac_client) # idempotent; safe to call repeatedly
monitor.wait_until_ready()
ctx = monitor.get_index_context("test")
monitor.stop()
Args:
refresh_interval: Seconds between cache refreshes (default 5.0).
"""
[docs]
def __init__(self, refresh_interval: float = _DEFAULT_REFRESH_INTERVAL) -> None:
self._refresh_interval = refresh_interval
self._cache: Dict[str, IndexContext] = {}
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._initial_ready = threading.Event()
[docs]
def start(self, pac_client: "PacClient") -> None:
"""Begin background refresh (idempotent; lazy-start friendly).
Safe to call from multiple builders on every AEL query — if the
daemon thread is already running, this is a no-op. The first
metadata fetch runs asynchronously on the daemon thread; callers
that need index metadata immediately (for example AEL ``where()``
filter generation on a dataset query) should call
:meth:`wait_until_ready` before relying on :meth:`get_index_context`.
"""
if self._thread is not None and self._thread.is_alive():
return
self._stop_event.clear()
self._initial_ready.clear()
self._thread = threading.Thread(
target=self._run,
args=(pac_client,),
name="aerospike-sdk-index-monitor",
daemon=True,
)
self._thread.start()
[docs]
def wait_until_ready(self, timeout: Optional[float] = None) -> None:
"""Block until the first index refresh attempt has finished.
After this returns, :meth:`get_index_context` reflects the latest fetch
(possibly empty if the cluster reports no secondary indexes).
Args:
timeout: Seconds to wait; ``None`` uses ``_DEFAULT_READY_TIMEOUT``.
Raises:
RuntimeError: If :meth:`start` was not called.
TimeoutError: If the first refresh does not complete in time.
"""
if self._thread is None:
raise RuntimeError("IndexesMonitor.start must be called first")
limit = _DEFAULT_READY_TIMEOUT if timeout is None else timeout
if not self._initial_ready.wait(limit):
raise TimeoutError(
f"IndexesMonitor first refresh did not complete in {limit:.1f}s",
)
[docs]
def stop(self) -> None:
"""Stop the background refresh thread.
Idempotent: safe to call when not running. Joins the thread with a
short timeout so a stuck refresh doesn't block shutdown.
"""
self._stop_event.set()
thread = self._thread
if thread is not None:
thread.join(timeout=self._refresh_interval + 1.0)
self._thread = None
[docs]
def get_index_context(self, namespace: str) -> Optional[IndexContext]:
"""Return the cached ``IndexContext`` for *namespace*, or ``None``."""
return self._cache.get(namespace)
def _run(self, pac_client: "PacClient") -> None:
"""Periodic refresh loop. Runs on the daemon thread."""
while not self._stop_event.is_set():
try:
self._cache = _fetch_indexes_blocking(pac_client)
total = sum(len(ctx.indexes) for ctx in self._cache.values())
log.debug(
"Index cache refreshed: %d index(es) across %d namespace(s)",
total,
len(self._cache),
)
except Exception:
log.debug("Error refreshing index cache", exc_info=True)
finally:
# Idempotent: only the first fetch completion signals readiness.
self._initial_ready.set()
# Sleep with stop awareness so shutdown is prompt.
if self._stop_event.wait(self._refresh_interval):
break