File tree 2 files changed +38
-0
lines changed
src/elasticsearch_mcp_server
2 files changed +38
-0
lines changed Original file line number Diff line number Diff line change 4
4
from .tools .index import IndexTools
5
5
from .tools .document import DocumentTools
6
6
from .tools .cluster import ClusterTools
7
+ from .tools .alias import AliasTools
7
8
8
9
class ElasticsearchMCPServer :
9
10
def __init__ (self ):
@@ -26,11 +27,13 @@ def _register_tools(self):
26
27
index_tools = IndexTools (self .logger )
27
28
document_tools = DocumentTools (self .logger )
28
29
cluster_tools = ClusterTools (self .logger )
30
+ alias_tools = AliasTools (self .logger )
29
31
30
32
# Register tools from each module
31
33
index_tools .register_tools (self .mcp )
32
34
document_tools .register_tools (self .mcp )
33
35
cluster_tools .register_tools (self .mcp )
36
+ alias_tools .register_tools (self .mcp )
34
37
35
38
def run (self ):
36
39
"""Run the MCP server."""
Original file line number Diff line number Diff line change
1
+ from typing import Any
2
+ from ..es_client import ElasticsearchClient
3
+ from mcp .types import TextContent
4
+
5
+
6
+ class AliasTools (ElasticsearchClient ):
7
+ def register_tools (self , mcp : Any ):
8
+ """Register alias-related tools."""
9
+
10
+ @mcp .tool (description = "List all aliases in the Elasticsearch cluster" )
11
+ async def list_aliases () -> list [TextContent ]:
12
+ """List all aliases in the Elasticsearch cluster."""
13
+ self .logger .info ("Listing aliases..." )
14
+ try :
15
+ aliases = self .es_client .cat .aliases (format = "json" )
16
+ return [TextContent (type = "text" , text = str (aliases ))]
17
+ except Exception as e :
18
+ self .logger .error (f"Error listing aliases: { e } " )
19
+ return [TextContent (type = "text" , text = f"Error: { str (e )} " )]
20
+
21
+ @mcp .tool (description = "Get alias information for an index" )
22
+ async def get_alias (index : str ) -> list [TextContent ]:
23
+ """
24
+ Get alias information for a specific index.
25
+
26
+ Args:
27
+ index: Name of the index
28
+ """
29
+ self .logger .info (f"Getting alias information for index: { index } " )
30
+ try :
31
+ response = self .es_client .indices .get_alias (index = index )
32
+ return [TextContent (type = "text" , text = str (response ))]
33
+ except Exception as e :
34
+ self .logger .error (f"Error getting alias information: { e } " )
35
+ return [TextContent (type = "text" , text = f"Error: { str (e )} " )]
You can’t perform that action at this time.
0 commit comments