1
1
import logging
2
2
from typing import Sequence
3
+ import concurrent .futures
4
+ import atexit
3
5
4
6
import clickhouse_connect
5
7
from clickhouse_connect .driver .binding import quote_identifier , format_query_value
16
18
)
17
19
logger = logging .getLogger (MCP_SERVER_NAME )
18
20
21
+ QUERY_EXECUTOR = concurrent .futures .ThreadPoolExecutor (max_workers = 10 )
22
+ atexit .register (lambda : QUERY_EXECUTOR .shutdown (wait = True ))
23
+ SELECT_QUERY_TIMEOUT_SECS = 30
24
+
19
25
load_dotenv ()
20
26
21
27
deps = [
@@ -105,9 +111,7 @@ def get_table_info(table):
105
111
return tables
106
112
107
113
108
- @mcp .tool ()
109
- def run_select_query (query : str ):
110
- logger .info (f"Executing SELECT query: { query } " )
114
+ def execute_query (query : str ):
111
115
client = create_clickhouse_client ()
112
116
try :
113
117
res = client .query (query , settings = {"readonly" : 1 })
@@ -125,6 +129,19 @@ def run_select_query(query: str):
125
129
return f"error running query: { err } "
126
130
127
131
132
+ @mcp .tool ()
133
+ def run_select_query (query : str ):
134
+ logger .info (f"Executing SELECT query: { query } " )
135
+ future = QUERY_EXECUTOR .submit (execute_query , query )
136
+ try :
137
+ result = future .result (timeout = SELECT_QUERY_TIMEOUT_SECS )
138
+ return result
139
+ except concurrent .futures .TimeoutError :
140
+ logger .warning (f"Query timed out after { SELECT_QUERY_TIMEOUT_SECS } seconds: { query } " )
141
+ future .cancel ()
142
+ return f"Queries taking longer than { SELECT_QUERY_TIMEOUT_SECS } seconds are currently not supported."
143
+
144
+
128
145
def create_clickhouse_client ():
129
146
client_config = config .get_client_config ()
130
147
logger .info (
0 commit comments