Source code for aerospike_sdk.sync.info

# Copyright 2025-2026 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

"""SyncInfoCommands - Synchronous wrapper for InfoCommands."""

from __future__ import annotations

from typing import Dict, List, Optional, Set

from aerospike_sdk.aio.info import InfoCommands as AsyncInfoCommands
from aerospike_sdk.sync.client import _EventLoopManager


[docs] class SyncInfoCommands: """ Synchronous wrapper for InfoCommands. Provides the same API as the async InfoCommands but executes operations synchronously by running them on an event loop. """
[docs] def __init__(self, async_info: AsyncInfoCommands, loop_manager: _EventLoopManager) -> None: """ Initialize a SyncInfoCommands. Args: async_info: The async InfoCommands to wrap. loop_manager: The event loop manager for executing async operations. """ self._async_info = async_info self._loop_manager = loop_manager
[docs] def build(self) -> Set[str]: """Get the build information from all nodes in the cluster (synchronous).""" async def _build(): return await self._async_info.build() return self._loop_manager.run_async(_build())
[docs] def namespaces(self) -> Set[str]: """Get the list of namespaces from all nodes in the cluster (synchronous).""" async def _namespaces(): return await self._async_info.namespaces() return self._loop_manager.run_async(_namespaces())
[docs] def namespace_details(self, namespace: str) -> Optional[Dict[str, str]]: """Get detailed information about a specific namespace (synchronous).""" async def _namespace_details(): return await self._async_info.namespace_details(namespace) return self._loop_manager.run_async(_namespace_details())
[docs] def sets(self, namespace: str) -> List[str]: """Get the list of sets in a specific namespace (synchronous).""" async def _sets(): return await self._async_info.sets(namespace) return self._loop_manager.run_async(_sets())
[docs] def secondary_indexes(self, namespace: Optional[str] = None) -> List[Dict[str, str]]: """Get information about all secondary indexes (synchronous).""" async def _secondary_indexes(): return await self._async_info.secondary_indexes(namespace) return self._loop_manager.run_async(_secondary_indexes())
[docs] def secondary_index_details( self, namespace: str, index_name: str ) -> Optional[Dict[str, str]]: """Get detailed information about a specific secondary index (synchronous).""" async def _secondary_index_details(): return await self._async_info.secondary_index_details(namespace, index_name) return self._loop_manager.run_async(_secondary_index_details())
[docs] def is_cluster_stable(self) -> bool: """Check if all nodes agree on the current cluster state (synchronous).""" async def _is_cluster_stable(): return await self._async_info.is_cluster_stable() return self._loop_manager.run_async(_is_cluster_stable())
[docs] def get_cluster_size(self) -> int: """Get the number of nodes in the cluster (synchronous).""" async def _get_cluster_size(): return await self._async_info.get_cluster_size() return self._loop_manager.run_async(_get_cluster_size())
[docs] def info(self, command: str) -> Dict[str, str]: """Execute a raw info command against the cluster (synchronous).""" async def _info(): return await self._async_info.info(command) return self._loop_manager.run_async(_info())
[docs] def info_on_all_nodes(self, command: str) -> Dict[str, Dict[str, str]]: """Execute a raw info command against all nodes in the cluster (synchronous).""" async def _info_on_all_nodes(): return await self._async_info.info_on_all_nodes(command) return self._loop_manager.run_async(_info_on_all_nodes())