-
Notifications
You must be signed in to change notification settings - Fork 44
/
Copy pathqgis_mcp_server.py
270 lines (226 loc) · 9.28 KB
/
qgis_mcp_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
#!/usr/bin/env python3
"""
QGIS MCP Client - Simple client to connect to the QGIS MCP server
"""
import logging
from contextlib import asynccontextmanager
import socket
import json
from typing import AsyncIterator, Dict, Any
from mcp.server.fastmcp import FastMCP, Context
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("QgisMCPServer")
class QgisMCPServer:
def __init__(self, host='localhost', port=9876):
self.host = host
self.port = port
self.socket = None
def connect(self):
"""Connect to the QGIS MCP server"""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
return True
except Exception as e:
print(f"Error connecting to server: {str(e)}")
return False
def disconnect(self):
"""Disconnect from the server"""
if self.socket:
self.socket.close()
self.socket = None
def send_command(self, command_type, params=None):
"""Send a command to the server and get the response"""
if not self.socket:
print("Not connected to server")
return None
# Create command
command = {
"type": command_type,
"params": params or {}
}
try:
# Send the command
self.socket.sendall(json.dumps(command).encode('utf-8'))
# Receive the response
response_data = b''
while True:
chunk = self.socket.recv(4096)
if not chunk:
break
response_data += chunk
# Try to decode as JSON to see if it's complete
try:
json.loads(response_data.decode('utf-8'))
break # Valid JSON, we have the full message
except json.JSONDecodeError:
continue # Keep receiving
# Parse and return the response
return json.loads(response_data.decode('utf-8'))
except Exception as e:
print(f"Error sending command: {str(e)}")
return None
_qgis_connection = None
def get_qgis_connection():
"""Get or create a persistent Qgis connection"""
global _qgis_connection
# If we have an existing connection, check if it's still valid
if _qgis_connection is not None:
# Test if the connection is still alive with a simple ping
try:
# Just try to send a small message to check if the socket is still connected
_qgis_connection.sock.sendall(b'')
return _qgis_connection
except Exception as e:
# Connection is dead, close it and create a new one
logger.warning(f"Existing connection is no longer valid: {str(e)}")
try:
_qgis_connection.disconnect()
except Exception:
pass
_qgis_connection = None
# Create a new connection if needed
if _qgis_connection is None:
_qgis_connection = QgisMCPServer(host="localhost", port=9876)
if not _qgis_connection.connect():
logger.error("Failed to connect to Qgis")
_qgis_connection = None
raise Exception("Could not connect to Qgis. Make sure the Qgis plugin is running.")
logger.info("Created new persistent connection to Qgis")
return _qgis_connection
@asynccontextmanager
async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
"""Manage server startup and shutdown lifecycle"""
# We don't need to create a connection here since we're using the global connection
# for resources and tools
try:
# Just log that we're starting up
logger.info("QgisMCPServer server starting up")
# Try to connect to Qgis on startup to verify it's available
try:
# This will initialize the global connection if needed
qgis = get_qgis_connection()
logger.info("Successfully connected to Qgis on startup")
except Exception as e:
logger.warning(f"Could not connect to Qgis on startup: {str(e)}")
logger.warning("Make sure the Qgis addon is running before using Qgis resources or tools")
# Return an empty context - we're using the global connection
yield {}
finally:
# Clean up the global connection on shutdown
global _qgis_connection
if _qgis_connection:
logger.info("Disconnecting from Qgis on shutdown")
_qgis_connection.disconnect()
_qgis_connection = None
logger.info("QgisMCPServer server shut down")
mcp = FastMCP(
"Qgis_mcp",
description="Qgis integration through the Model Context Protocol",
lifespan=server_lifespan
)
@mcp.tool()
def ping(ctx: Context) -> str:
"""Simple ping command to check server connectivity"""
qgis = get_qgis_connection()
result = qgis.send_command("ping")
return json.dumps(result, indent=2)
@mcp.tool()
def get_qgis_info(ctx: Context) -> str:
"""Get QGIS information"""
qgis = get_qgis_connection()
result = qgis.send_command("get_qgis_info")
return json.dumps(result, indent=2)
@mcp.tool()
def load_project(ctx: Context, path: str) -> str:
"""Load a QGIS project from the specified path."""
qgis = get_qgis_connection()
result = qgis.send_command("load_project", {"path": path})
return json.dumps(result, indent=2)
@mcp.tool()
def create_new_project(ctx: Context, path: str) -> str:
"""Create a new project a save it"""
qgis = get_qgis_connection()
result = qgis.send_command("create_new_project", {"path": path})
return json.dumps(result, indent=2)
@mcp.tool()
def get_project_info(ctx: Context) -> str:
"""Get current project information"""
qgis = get_qgis_connection()
result = qgis.send_command("get_project_info")
return json.dumps(result, indent=2)
@mcp.tool()
def add_vector_layer(ctx: Context, path: str, provider: str = "ogr", name: str = None) -> str:
"""Add a vector layer to the project."""
qgis = get_qgis_connection()
params = {"path": path, "provider": provider}
if name:
params["name"] = name
result = qgis.send_command("add_vector_layer", params)
return json.dumps(result, indent=2)
@mcp.tool()
def add_raster_layer(ctx: Context, path: str, provider: str = "gdal", name: str = None) -> str:
"""Add a raster layer to the project."""
qgis = get_qgis_connection()
params = {"path": path, "provider": provider}
if name:
params["name"] = name
result = qgis.send_command("add_raster_layer", params)
return json.dumps(result, indent=2)
@mcp.tool()
def get_layers(ctx: Context) -> str:
"""Retrieve all layers in the current project."""
qgis = get_qgis_connection()
result = qgis.send_command("get_layers")
return json.dumps(result, indent=2)
@mcp.tool()
def remove_layer(ctx: Context, layer_id: str) -> str:
"""Remove a layer from the project by its ID."""
qgis = get_qgis_connection()
result = qgis.send_command("remove_layer", {"layer_id": layer_id})
return json.dumps(result, indent=2)
@mcp.tool()
def zoom_to_layer(ctx: Context, layer_id: str) -> str:
"""Zoom to the extent of a specified layer."""
qgis = get_qgis_connection()
result = qgis.send_command("zoom_to_layer", {"layer_id": layer_id})
return json.dumps(result, indent=2)
@mcp.tool()
def get_layer_features(ctx: Context, layer_id: str, limit: int = 10) -> str:
"""Retrieve features from a vector layer with an optional limit."""
qgis = get_qgis_connection()
result = qgis.send_command("get_layer_features", {"layer_id": layer_id, "limit": limit})
return json.dumps(result, indent=2)
@mcp.tool()
def execute_processing(ctx: Context, algorithm: str, parameters: dict) -> str:
"""Execute a processing algorithm with the given parameters."""
qgis = get_qgis_connection()
result = qgis.send_command("execute_processing", {"algorithm": algorithm, "parameters": parameters})
return json.dumps(result, indent=2)
@mcp.tool()
def save_project(ctx: Context, path: str = None) -> str:
"""Save the current project to the given path, or to the current project path if not specified."""
qgis = get_qgis_connection()
params = {}
if path:
params["path"] = path
result = qgis.send_command("save_project", params)
return json.dumps(result, indent=2)
@mcp.tool()
def render_map(ctx: Context, path: str, width: int = 800, height: int = 600) -> str:
"""Render the current map view to an image file with the specified dimensions."""
qgis = get_qgis_connection()
result = qgis.send_command("render_map", {"path": path, "width": width, "height": height})
return json.dumps(result, indent=2)
@mcp.tool()
def execute_code(ctx: Context, code: str) -> str:
"""Execute arbitrary PyQGIS code provided as a string."""
qgis = get_qgis_connection()
result = qgis.send_command("execute_code", {"code": code})
return json.dumps(result, indent=2)
def main():
"""Run the MCP server"""
mcp.run()
if __name__ == "__main__":
main()