# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Background monitor that caches secondary index metadata for transparent
filter generation.
The :class:`IndexesMonitor` periodically queries the cluster via
``sindex-list`` and ``sindex-stat`` info commands, converts the responses into
:class:`~aerospike_sdk.ael.filter_gen.Index` objects, and stores them in
per-namespace :class:`~aerospike_sdk.ael.filter_gen.IndexContext` caches.
This module is intentionally at the SDK layer (not in PAC or core) — index
metadata is only needed by the AEL filter-generation pipeline.
"""
from __future__ import annotations
import asyncio
import logging
from typing import TYPE_CHECKING, Dict, List, Optional
from aerospike_async.exceptions import AerospikeError as PacError
from aerospike_sdk.ael.filter_gen import Index, IndexContext, IndexTypeEnum
if TYPE_CHECKING: # Not unused — avoids circular import; used in type annotations only.
from aerospike_async import Client
log = logging.getLogger("aerospike_sdk.index_monitor")
_DEFAULT_REFRESH_INTERVAL: float = 5.0
_INITIAL_FETCH_TIMEOUT: float = 1.0
_SINDEX_TYPE_MAP: Dict[str, IndexTypeEnum] = {
"numeric": IndexTypeEnum.NUMERIC,
"string": IndexTypeEnum.STRING,
"geo2dsphere": IndexTypeEnum.GEO2D_SPHERE,
"blob": IndexTypeEnum.BLOB,
}
def _parse_sindex_list(raw_responses: Dict[str, Dict[str, str]]) -> List[Dict[str, str]]:
"""Parse ``info_on_all_nodes("sindex-list")`` into deduplicated index dicts.
Server response is semicolon-separated entries where each entry contains
colon-separated ``key=value`` pairs, e.g.::
ns=test:indexname=age_idx:set=users:bin=age:type=numeric:indextype=default:...
"""
index_map: Dict[str, Dict[str, str]] = {}
for node_response in raw_responses.values():
for value in node_response.values():
if not isinstance(value, str) or not value:
continue
for entry in value.split(";"):
entry = entry.strip()
if not entry:
continue
fields: Dict[str, str] = {}
for token in entry.split(":"):
if "=" in token:
k, v = token.split("=", 1)
fields[k] = v
index_name = fields.get("indexname")
if not index_name or index_name in index_map:
continue
ns = fields.get("ns")
bin_name = fields.get("bin")
if not ns or not bin_name:
continue
rec: Dict[str, str] = {
"ns": ns,
"set": fields.get("set", ""),
"bin": bin_name,
"indexname": index_name,
}
if "type" in fields:
rec["type"] = fields["type"]
if "indextype" in fields:
rec["indextype"] = fields["indextype"]
if "context" in fields:
rec["context"] = fields["context"]
index_map[index_name] = rec
return list(index_map.values())
def _parse_entries_per_bval(raw_response: Dict[str, str]) -> Optional[float]:
"""Extract ``entries_per_bval`` from an ``sindex-stat`` info response."""
for value in raw_response.values():
if not isinstance(value, str):
continue
for token in value.split(";"):
token = token.strip()
if token.startswith("entries_per_bval="):
try:
return float(token.split("=", 1)[1])
except (ValueError, IndexError):
pass
return None
async def _fetch_indexes(client: "Client") -> Dict[str, IndexContext]:
"""Fetch all secondary indexes and return per-namespace ``IndexContext`` caches."""
raw = await client.info_on_all_nodes("sindex-list")
entries = _parse_sindex_list(raw)
indexes_by_ns: Dict[str, List[Index]] = {}
for entry in entries:
ns = entry["ns"]
idx_type = _SINDEX_TYPE_MAP.get(
entry.get("type", "").lower(), IndexTypeEnum.NUMERIC,
)
bval: Optional[float] = None
try:
stat_resp = await client.info(
f"sindex-stat:namespace={ns};indexname={entry['indexname']}",
)
bval = _parse_entries_per_bval(stat_resp)
except PacError:
log.debug("Failed to fetch sindex-stat for %s.%s",
ns, entry["indexname"], exc_info=True)
index = Index(
bin=entry["bin"],
index_type=idx_type,
namespace=ns,
name=entry["indexname"],
bin_values_ratio=bval,
)
indexes_by_ns.setdefault(ns, []).append(index)
return {
ns: IndexContext.of(ns, idxs) for ns, idxs in indexes_by_ns.items()
}
[docs]
class IndexesMonitor:
"""Async background task that caches secondary index metadata.
Start via :meth:`start` (typically called by ``Client.connect``).
Retrieve cached data with :meth:`get_index_context`. Stop via
:meth:`stop` (called by ``Client.close``).
Example::
monitor = IndexesMonitor()
await monitor.start(client)
ctx = monitor.get_index_context("test")
await monitor.stop()
Args:
refresh_interval: Seconds between cache refreshes (default 5.0).
"""
[docs]
def __init__(self, refresh_interval: float = _DEFAULT_REFRESH_INTERVAL) -> None:
self._refresh_interval = refresh_interval
self._cache: Dict[str, IndexContext] = {}
self._task: Optional[asyncio.Task[None]] = None
self._initial_ready = asyncio.Event()
[docs]
async def start(self, client: "Client") -> None:
"""Begin background refresh. Waits up to 1 s for the first fetch."""
if self._task is not None:
return
self._initial_ready.clear()
self._task = asyncio.create_task(self._run(client))
try:
await asyncio.wait_for(
self._initial_ready.wait(), timeout=_INITIAL_FETCH_TIMEOUT,
)
except asyncio.TimeoutError:
log.warning(
"Initial index fetch did not complete within %.1f s; "
"monitoring continues in background",
_INITIAL_FETCH_TIMEOUT,
)
[docs]
async def stop(self) -> None:
"""Cancel the background refresh task."""
if self._task is not None:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
[docs]
def get_index_context(self, namespace: str) -> Optional[IndexContext]:
"""Return the cached ``IndexContext`` for *namespace*, or ``None``."""
return self._cache.get(namespace)
async def _run(self, client: "Client") -> None:
"""Periodic refresh loop."""
while True:
try:
self._cache = await _fetch_indexes(client)
total = sum(len(ctx.indexes) for ctx in self._cache.values())
log.debug(
"Index cache refreshed: %d index(es) across %d namespace(s)",
total,
len(self._cache),
)
except asyncio.CancelledError:
raise
except Exception:
log.debug("Error refreshing index cache", exc_info=True)
finally:
self._initial_ready.set()
await asyncio.sleep(self._refresh_interval)