From c04e7208f2622c5c7d988fc0f42afaa8ef7a25c5 Mon Sep 17 00:00:00 2001 From: Kaushik Iska <iska.kaushik@gmail.com> Date: Mon, 31 Mar 2025 11:24:35 -0500 Subject: [PATCH 1/2] fix: prevent BrokenResourceError by returning structured responses for query errors fixes https://github.com/ClickHouse/mcp-clickhouse/issues/25 --- mcp_clickhouse/mcp_server.py | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/mcp_clickhouse/mcp_server.py b/mcp_clickhouse/mcp_server.py index 7a7b1a0..f8f7201 100644 --- a/mcp_clickhouse/mcp_server.py +++ b/mcp_clickhouse/mcp_server.py @@ -128,21 +128,36 @@ def execute_query(query: str): return rows except Exception as err: logger.error(f"Error executing query: {err}") - return f"error running query: {err}" + # Return a structured dictionary rather than a string to ensure proper serialization + # by the MCP protocol. String responses for errors can cause BrokenResourceError. + return {"error": str(err)} @mcp.tool() def run_select_query(query: str): """Run a SELECT query in a ClickHouse database""" logger.info(f"Executing SELECT query: {query}") - future = QUERY_EXECUTOR.submit(execute_query, query) try: - result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS) - return result - except concurrent.futures.TimeoutError: - logger.warning(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds: {query}") - future.cancel() - return f"Queries taking longer than {SELECT_QUERY_TIMEOUT_SECS} seconds are currently not supported." + future = QUERY_EXECUTOR.submit(execute_query, query) + try: + result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS) + # Check if we received an error structure from execute_query + if isinstance(result, dict) and "error" in result: + logger.warning(f"Query failed: {result['error']}") + # MCP requires structured responses; string error messages can cause + # serialization issues leading to BrokenResourceError + return {"status": "error", "message": f"Query failed: {result['error']}"} + return result + except concurrent.futures.TimeoutError: + logger.warning(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds: {query}") + future.cancel() + # Return a properly structured response for timeout errors + return {"status": "error", "message": f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds"} + except Exception as e: + logger.error(f"Unexpected error in run_select_query: {str(e)}") + # Catch all other exceptions and return them in a structured format + # to prevent MCP serialization failures + return {"status": "error", "message": f"Unexpected error: {str(e)}"} def create_clickhouse_client(): From 719647eff89d67425196e46d86d458c33e520ed1 Mon Sep 17 00:00:00 2001 From: Kaushik Iska <iska.kaushik@gmail.com> Date: Mon, 31 Mar 2025 11:29:50 -0500 Subject: [PATCH 2/2] fix test --- tests/test_tool.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_tool.py b/tests/test_tool.py index d8e71e3..e931c3f 100644 --- a/tests/test_tool.py +++ b/tests/test_tool.py @@ -71,8 +71,9 @@ def test_run_select_query_failure(self): """Test running a SELECT query with an error.""" query = f"SELECT * FROM {self.test_db}.non_existent_table" result = run_select_query(query) - self.assertIsInstance(result, str) - self.assertIn("error running query", result) + self.assertIsInstance(result, dict) + self.assertEqual(result["status"], "error") + self.assertIn("Query failed", result["message"]) def test_table_and_column_comments(self): """Test that table and column comments are correctly retrieved."""