Source code for aerospike_sdk.aio.info

# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

"""InfoCommands - High-level interface for Aerospike info commands."""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Dict, List, Optional, Set

if TYPE_CHECKING:  # Not unused — avoids circular import; used in type annotations only.
    from aerospike_sdk.aio.session import Session

log = logging.getLogger("aerospike_sdk.info")


[docs] class InfoCommands: """ Provides high-level methods to execute common Aerospike info commands. This class encapsulates the most commonly used Aerospike info commands and provides a convenient API for retrieving cluster information. Example:: info = session.info() # Get all namespaces namespaces = await info.namespaces() # Get namespace details ns_detail = await info.namespace_details("test") # Get all secondary indexes indexes = await info.secondary_indexes() """
[docs] def __init__(self, session: "Session") -> None: """ Initialize InfoCommands. Args: session: The Session to use for info commands. """ self._session = session
[docs] async def build(self) -> Set[str]: """ Get the build information from all nodes in the cluster. Returns: A set of build strings from all nodes. """ # Get info from all nodes and merge the results all_responses = await self._session._client._client.info_on_all_nodes("build") # Extract build strings from all node responses build_set: Set[str] = set() for node_response in all_responses.values(): # Response format is typically {"build": "8.1.0.1"} for value in node_response.values(): if isinstance(value, str) and value: build_set.add(value.strip()) return build_set
[docs] async def namespaces(self) -> Set[str]: """ Get the list of namespaces from all nodes in the cluster. Returns: A set of namespace names from all nodes. """ # Get info from all nodes and merge the results all_responses = await self._session._client._client.info_on_all_nodes("namespaces") # Extract namespace names from all node responses namespace_set: Set[str] = set() for node_response in all_responses.values(): # Response format is typically {"namespaces": "ns1,ns2,ns3"} for value in node_response.values(): if isinstance(value, str) and value: # Split comma-separated namespace list namespace_set.update([ns.strip() for ns in value.split(",") if ns.strip()]) return namespace_set
[docs] async def namespace_details(self, namespace: str) -> Optional[Dict[str, str]]: """ Get detailed information about a specific namespace. Args: namespace: The name of the namespace. Returns: A dictionary containing namespace details, or None if not found. """ try: response = await self._session._client._client.info(f"namespace/{namespace}") if not response: return None # Check if response indicates namespace doesn't exist # Response format for non-existent: {'namespace/name': 'type=unknown'} expected_key = f"namespace/{namespace}" if expected_key in response and str(response[expected_key]).strip() == "type=unknown": return None return response except Exception: log.debug("namespace_details(%s) failed", namespace, exc_info=True) return None
[docs] async def sets(self, namespace: str) -> List[str]: """ Get the list of sets in a specific namespace. Args: namespace: The name of the namespace. Returns: A list of set names in the namespace. """ # Get info from all nodes and merge the results all_responses = await self._session._client._client.info_on_all_nodes(f"sets/{namespace}") # Extract set names from all node responses set_set: Set[str] = set() for node_response in all_responses.values(): # Response format is typically {"sets": "set1,set2,set3"} for value in node_response.values(): if isinstance(value, str) and value: # Split comma-separated set list set_set.update([s.strip() for s in value.split(",") if s.strip()]) return sorted(list(set_set))
[docs] async def secondary_indexes(self, namespace: Optional[str] = None) -> List[Dict[str, str]]: """ Get information about all secondary indexes. Args: namespace: Optional namespace filter. If provided, only returns indexes for that namespace. Returns: A list of dictionaries containing secondary index information. """ all_responses = await self._session._client._client.info_on_all_nodes("sindex-list") index_map: Dict[str, Dict[str, str]] = {} for node_response in all_responses.values(): for value in node_response.values(): if isinstance(value, str) and value: for entry in value.split(";"): entry = entry.strip() if not entry: continue fields: Dict[str, str] = {} for token in entry.split(":"): if "=" in token: k, v = token.split("=", 1) fields[k] = v index_name = fields.get("indexname", "") ns = fields.get("ns", "") if not index_name or not ns: continue if namespace and ns != namespace: continue if index_name not in index_map: index_map[index_name] = { "namespace": ns, "set": fields.get("set", ""), "bin": fields.get("bin", ""), "name": index_name, } if "type" in fields: index_map[index_name]["type"] = fields["type"] if "state" in fields: index_map[index_name]["state"] = fields["state"] return list(index_map.values())
[docs] async def secondary_index_details( self, namespace: str, index_name: str ) -> Optional[Dict[str, str]]: """ Get detailed information about a specific secondary index. Args: namespace: The namespace containing the index. index_name: The name of the index. Returns: A dictionary containing index details, or None if not found. """ try: response = await self._session._client._client.info(f"sindex/{namespace}/{index_name}") if not response: return None # Check if response indicates index doesn't exist # Response format for non-existent: {'sindex/ns/name': 'ERROR:201:no index'} expected_key = f"sindex/{namespace}/{index_name}" if expected_key in response and "ERROR:201:no index" in str(response[expected_key]): return None return response except Exception: log.debug( "secondary_index_details(%s, %s) failed", namespace, index_name, exc_info=True, ) return None
[docs] async def is_cluster_stable(self) -> bool: """ Check if all nodes agree on the current cluster state. Returns: True if the cluster is stable, False otherwise. """ # Get cluster state from all nodes all_responses = await self._session._client._client.info_on_all_nodes("cluster-stable") if not all_responses: return False # Check if all nodes report "true" for cluster-stable for node_response in all_responses.values(): for value in node_response.values(): if isinstance(value, str): # cluster-stable returns "true" or "false" if value.lower() != "true": return False return True
[docs] async def get_cluster_size(self) -> int: """ Get the number of nodes in the cluster. Returns: The number of nodes in the cluster. """ node_names = await self._session._client._client.node_names() return len(node_names)
[docs] async def info(self, command: str) -> Dict[str, str]: """ Execute a raw info command against the cluster. Args: command: The info command to execute (e.g., "statistics", "build"). Returns: A dictionary containing the info command response as key-value pairs. """ return await self._session._client._client.info(command)
[docs] async def info_on_all_nodes(self, command: str) -> Dict[str, Dict[str, str]]: """ Execute a raw info command against all nodes in the cluster. Args: command: The info command to execute (e.g., "statistics", "build"). Returns: A dictionary mapping node names to their response dictionaries. """ return await self._session._client._client.info_on_all_nodes(command)