Source code for aerospike_sdk.sync.info

# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

"""SyncInfoCommands — synchronous info-command helpers using PAC ``_blocking``.

Never touches asyncio. Each call routes through PAC's ``info_blocking`` /
``info_on_all_nodes_blocking`` and parses the responses the same way the async
:class:`~aerospike_sdk.aio.info.InfoCommands` does.
"""

from __future__ import annotations

import logging
from typing import Any, Dict, List, Optional, Set

log = logging.getLogger("aerospike_sdk.sync.info")


def _merge_set_values(responses: Dict[str, Dict[str, str]]) -> Set[str]:
    """Flatten ``{node: {key: 'a;b;c'}}`` responses into a set of items."""
    out: Set[str] = set()
    for node_response in responses.values():
        for value in node_response.values():
            if isinstance(value, str) and value:
                out.update(s.strip() for s in value.split(";") if s.strip())
    return out


[docs] class SyncInfoCommands: """Synchronous high-level info-command helpers. Constructed by :meth:`SyncSession.info` (no args). Calls PAC's ``info_blocking`` / ``info_on_all_nodes_blocking`` directly — no asyncio loop is involved. """
[docs] def __init__(self, pac_client: Any) -> None: """Pair with the PAC ``aerospike_async.Client`` from the session.""" self._pac = pac_client
[docs] def build(self) -> Set[str]: """Build strings from every node.""" responses = self._pac.info_on_all_nodes_blocking("build") out: Set[str] = set() for node_response in responses.values(): for value in node_response.values(): if isinstance(value, str) and value: out.add(value.strip()) return out
[docs] def namespaces(self) -> Set[str]: """Namespace names across the cluster.""" responses = self._pac.info_on_all_nodes_blocking("namespaces") return _merge_set_values(responses)
[docs] def namespace_details(self, namespace: str) -> Optional[Dict[str, str]]: """Per-namespace info; ``None`` when the namespace is unknown.""" try: response = self._pac.info_blocking(f"namespace/{namespace}") except Exception: log.debug("namespace_details(%s) failed", namespace, exc_info=True) return None if not response: return None expected_key = f"namespace/{namespace}" if expected_key in response and str(response[expected_key]).strip() == "type=unknown": return None return response
[docs] def sets(self, namespace: str) -> List[str]: """Set names in ``namespace``.""" responses = self._pac.info_on_all_nodes_blocking(f"sets/{namespace}") out: Set[str] = set() for node_response in responses.values(): for value in node_response.values(): if isinstance(value, str) and value: out.update(s.strip() for s in value.split(",") if s.strip()) return sorted(out)
[docs] def secondary_indexes(self, namespace: Optional[str] = None) -> List[Dict[str, str]]: """All secondary indexes (optionally filtered by namespace).""" responses = self._pac.info_on_all_nodes_blocking("sindex-list") index_map: Dict[str, Dict[str, str]] = {} for node_response in responses.values(): for value in node_response.values(): if not isinstance(value, str) or not value: continue for entry in value.split(";"): entry = entry.strip() if not entry: continue fields: Dict[str, str] = {} for token in entry.split(":"): if "=" in token: k, v = token.split("=", 1) fields[k] = v index_name = fields.get("indexname", "") ns = fields.get("ns", "") if not index_name or not ns: continue if namespace and ns != namespace: continue if index_name not in index_map: entry_map = { "namespace": ns, "set": fields.get("set", ""), "bin": fields.get("bin", ""), "name": index_name, } if "type" in fields: entry_map["type"] = fields["type"] if "state" in fields: entry_map["state"] = fields["state"] index_map[index_name] = entry_map return list(index_map.values())
[docs] def secondary_index_details( self, namespace: str, index_name: str ) -> Optional[Dict[str, str]]: """Details for one secondary index; ``None`` when missing.""" try: response = self._pac.info_blocking(f"sindex/{namespace}/{index_name}") except Exception: log.debug( "secondary_index_details(%s, %s) failed", namespace, index_name, exc_info=True, ) return None if not response: return None expected_key = f"sindex/{namespace}/{index_name}" if expected_key in response and "ERROR:201:no index" in str(response[expected_key]): return None return response
[docs] def is_cluster_stable(self) -> bool: """``True`` when every node reports ``cluster-stable=true``.""" responses = self._pac.info_on_all_nodes_blocking("cluster-stable") if not responses: return False for node_response in responses.values(): for value in node_response.values(): if isinstance(value, str) and value.lower() != "true": return False return True
[docs] def get_cluster_size(self) -> int: """Number of cluster nodes.""" return len(self._pac.node_names_blocking())
[docs] def info(self, command: str) -> Dict[str, str]: """Raw info command against one random node.""" return self._pac.info_blocking(command)
[docs] def info_on_all_nodes(self, command: str) -> Dict[str, Dict[str, str]]: """Raw info command against every node.""" return self._pac.info_on_all_nodes_blocking(command)