-
Notifications
You must be signed in to change notification settings - Fork 48
/
Copy pathqgis_mcp_plugin.py
611 lines (512 loc) · 21.7 KB
/
qgis_mcp_plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
import os
import json
import socket
import traceback
from qgis.core import *
from qgis.gui import *
from qgis.PyQt.QtCore import QObject, pyqtSignal, QTimer, Qt, QSize
from qgis.PyQt.QtWidgets import QAction, QDockWidget, QVBoxLayout, QLabel, QPushButton, QSpinBox, QWidget
from qgis.PyQt.QtGui import QIcon, QColor
from qgis.utils import active_plugins
class QgisMCPServer(QObject):
"""Server class to handle socket connections and execute QGIS commands"""
def __init__(self, host='localhost', port=9876, iface=None):
super().__init__()
self.host = host
self.port = port
self.iface = iface
self.running = False
self.socket = None
self.client = None
self.buffer = b''
self.timer = None
def start(self):
"""Start the server"""
self.running = True
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.bind((self.host, self.port))
self.socket.listen(1)
self.socket.setblocking(False)
# Create a timer to process server operations
self.timer = QTimer()
self.timer.timeout.connect(self.process_server)
self.timer.start(100) # 100ms interval
QgsMessageLog.logMessage(f"QGIS MCP server started on {self.host}:{self.port}", "QGIS MCP")
return True
except Exception as e:
QgsMessageLog.logMessage(f"Failed to start server: {str(e)}", "QGIS MCP", Qgis.Critical)
self.stop()
return False
def stop(self):
"""Stop the server"""
self.running = False
if self.timer:
self.timer.stop()
self.timer = None
if self.socket:
self.socket.close()
if self.client:
self.client.close()
self.socket = None
self.client = None
QgsMessageLog.logMessage("QGIS MCP server stopped", "QGIS MCP")
def process_server(self):
"""Process server operations (called by timer)"""
if not self.running:
return
try:
# Accept new connections
if not self.client and self.socket:
try:
self.client, address = self.socket.accept()
self.client.setblocking(False)
QgsMessageLog.logMessage(f"Connected to client: {address}", "QGIS MCP")
except BlockingIOError:
pass # No connection waiting
except Exception as e:
QgsMessageLog.logMessage(f"Error accepting connection: {str(e)}", "QGIS MCP", Qgis.Warning)
# Process existing connection
if self.client:
try:
# Try to receive data
try:
data = self.client.recv(8192)
if data:
self.buffer += data
# Try to process complete messages
try:
# Attempt to parse the buffer as JSON
command = json.loads(self.buffer.decode('utf-8'))
# If successful, clear the buffer and process command
self.buffer = b''
response = self.execute_command(command)
response_json = json.dumps(response)
self.client.sendall(response_json.encode('utf-8'))
except json.JSONDecodeError:
# Incomplete data, keep in buffer
pass
else:
# Connection closed by client
QgsMessageLog.logMessage("Client disconnected", "QGIS MCP")
self.client.close()
self.client = None
self.buffer = b''
except BlockingIOError:
pass # No data available
except Exception as e:
QgsMessageLog.logMessage(f"Error receiving data: {str(e)}", "QGIS MCP", Qgis.Warning)
self.client.close()
self.client = None
self.buffer = b''
except Exception as e:
QgsMessageLog.logMessage(f"Error with client: {str(e)}", "QGIS MCP", Qgis.Warning)
if self.client:
self.client.close()
self.client = None
self.buffer = b''
except Exception as e:
QgsMessageLog.logMessage(f"Server error: {str(e)}", "QGIS MCP", Qgis.Critical)
def execute_command(self, command):
"""Execute a command"""
try:
cmd_type = command.get("type")
params = command.get("params", {})
handlers = {
"ping": self.ping,
"get_qgis_info": self.get_qgis_info,
"load_project": self.load_project,
"get_project_info": self.get_project_info,
"execute_code": self.execute_code,
"add_vector_layer": self.add_vector_layer,
"add_raster_layer": self.add_raster_layer,
"get_layers": self.get_layers,
"remove_layer": self.remove_layer,
"zoom_to_layer": self.zoom_to_layer,
"get_layer_features": self.get_layer_features,
"execute_processing": self.execute_processing,
"save_project": self.save_project,
"render_map": self.render_map,
"create_new_project": self.create_new_project,
}
handler = handlers.get(cmd_type)
if handler:
try:
QgsMessageLog.logMessage(f"Executing handler for {cmd_type}", "QGIS MCP")
result = handler(**params)
QgsMessageLog.logMessage(f"Handler execution complete", "QGIS MCP")
return {"status": "success", "result": result}
except Exception as e:
QgsMessageLog.logMessage(f"Error in handler: {str(e)}", "QGIS MCP", Qgis.Critical)
traceback.print_exc()
return {"status": "error", "message": str(e)}
else:
return {"status": "error", "message": f"Unknown command type: {cmd_type}"}
except Exception as e:
QgsMessageLog.logMessage(f"Error executing command: {str(e)}", "QGIS MCP", Qgis.Critical)
traceback.print_exc()
return {"status": "error", "message": str(e)}
# Command handlers
def ping(self, **kwargs):
"""Simple ping command"""
return {"pong": True}
def get_qgis_info(self, **kwargs):
"""Get basic QGIS information"""
return {
"qgis_version": Qgis.version(),
"profile_folder": QgsApplication.qgisSettingsDirPath(),
"plugins_count": len(active_plugins)
}
def get_project_info(self, **kwargs):
"""Get information about the current QGIS project"""
project = QgsProject.instance()
# Get basic project information
info = {
"filename": project.fileName(),
"title": project.title(),
"layer_count": len(project.mapLayers()),
"crs": project.crs().authid(),
"layers": []
}
# Add basic layer information (limit to 10 layers for performance)
layers = list(project.mapLayers().values())
for i, layer in enumerate(layers):
if i >= 10: # Limit to 10 layers
break
layer_info = {
"id": layer.id(),
"name": layer.name(),
"type": self._get_layer_type(layer),
"visible": layer.isValid() and project.layerTreeRoot().findLayer(layer.id()).isVisible()
}
info["layers"].append(layer_info)
return info
def _get_layer_type(self, layer):
"""Helper to get layer type as string"""
if layer.type() == QgsMapLayer.VectorLayer:
return f"vector_{layer.geometryType()}"
elif layer.type() == QgsMapLayer.RasterLayer:
return "raster"
else:
return str(layer.type())
def execute_code(self, code, **kwargs):
"""Execute arbitrary PyQGIS code"""
try:
# Create a local namespace for execution
namespace = {
"qgis": Qgis,
"QgsProject": QgsProject,
"iface": self.iface,
"QgsApplication": QgsApplication,
"QgsVectorLayer": QgsVectorLayer,
"QgsRasterLayer": QgsRasterLayer,
"QgsCoordinateReferenceSystem": QgsCoordinateReferenceSystem
}
# Execute the code
exec(code, namespace)
return {"executed": True}
except Exception as e:
raise Exception(f"Code execution error: {str(e)}")
def add_vector_layer(self, path, name=None, provider="ogr", **kwargs):
"""Add a vector layer to the project"""
if not name:
name = os.path.basename(path)
# Create the layer
layer = QgsVectorLayer(path, name, provider)
if not layer.isValid():
raise Exception(f"Layer is not valid: {path}")
# Add to project
QgsProject.instance().addMapLayer(layer)
return {
"id": layer.id(),
"name": layer.name(),
"type": self._get_layer_type(layer),
"feature_count": layer.featureCount()
}
def add_raster_layer(self, path, name=None, provider="gdal", **kwargs):
"""Add a raster layer to the project"""
if not name:
name = os.path.basename(path)
# Create the layer
layer = QgsRasterLayer(path, name, provider)
if not layer.isValid():
raise Exception(f"Layer is not valid: {path}")
# Add to project
QgsProject.instance().addMapLayer(layer)
return {
"id": layer.id(),
"name": layer.name(),
"type": "raster",
"width": layer.width(),
"height": layer.height()
}
def get_layers(self, **kwargs):
"""Get all layers in the project"""
project = QgsProject.instance()
layers = []
for layer_id, layer in project.mapLayers().items():
layer_info = {
"id": layer_id,
"name": layer.name(),
"type": self._get_layer_type(layer),
"visible": project.layerTreeRoot().findLayer(layer_id).isVisible()
}
# Add type-specific information
if layer.type() == QgsMapLayer.VectorLayer:
layer_info.update({
"feature_count": layer.featureCount(),
"geometry_type": layer.geometryType()
})
elif layer.type() == QgsMapLayer.RasterLayer:
layer_info.update({
"width": layer.width(),
"height": layer.height()
})
layers.append(layer_info)
return layers
def remove_layer(self, layer_id, **kwargs):
"""Remove a layer from the project"""
project = QgsProject.instance()
if layer_id in project.mapLayers():
project.removeMapLayer(layer_id)
return {"removed": layer_id}
else:
raise Exception(f"Layer not found: {layer_id}")
def zoom_to_layer(self, layer_id, **kwargs):
"""Zoom to a layer's extent"""
project = QgsProject.instance()
if layer_id in project.mapLayers():
layer = project.mapLayer(layer_id)
self.iface.setActiveLayer(layer)
self.iface.zoomToActiveLayer()
return {"zoomed_to": layer_id}
else:
raise Exception(f"Layer not found: {layer_id}")
def get_layer_features(self, layer_id, limit=10, **kwargs):
"""Get features from a vector layer"""
project = QgsProject.instance()
if layer_id in project.mapLayers():
layer = project.mapLayer(layer_id)
if layer.type() != QgsMapLayer.VectorLayer:
raise Exception(f"Layer is not a vector layer: {layer_id}")
features = []
for i, feature in enumerate(layer.getFeatures()):
if i >= limit:
break
# Extract attributes
attrs = {}
for field in layer.fields():
attrs[field.name()] = feature.attribute(field.name())
# Extract geometry if available
geom = None
if feature.hasGeometry():
geom = {
"type": feature.geometry().type(),
"wkt": feature.geometry().asWkt(precision=4)
}
features.append({
"id": feature.id(),
"attributes": attrs,
"geometry": geom
})
return {
"layer_id": layer_id,
"feature_count": layer.featureCount(),
"features": features,
"fields": [field.name() for field in layer.fields()]
}
else:
raise Exception(f"Layer not found: {layer_id}")
def execute_processing(self, algorithm, parameters, **kwargs):
"""Execute a processing algorithm"""
try:
import processing
result = processing.run(algorithm, parameters)
return {
"algorithm": algorithm,
"result": {k: str(v) for k, v in result.items()} # Convert values to strings for JSON
}
except Exception as e:
raise Exception(f"Processing error: {str(e)}")
def save_project(self, path=None, **kwargs):
"""Save the current project"""
project = QgsProject.instance()
if not path and not project.fileName():
raise Exception("No project path specified and no current project path")
save_path = path if path else project.fileName()
if project.write(save_path):
return {"saved": save_path}
else:
raise Exception(f"Failed to save project to {save_path}")
def load_project(self, path, **kwargs):
"""Load a project"""
project = QgsProject.instance()
if project.read(path):
self.iface.mapCanvas().refresh()
return {
"loaded": path,
"layer_count": len(project.mapLayers())
}
else:
raise Exception(f"Failed to load project from {path}")
def create_new_project(self, path, **kwargs):
"""
Creates a new QGIS project and saves it at the specified path.
If a project is already loaded, it clears it before creating the new one.
:param project_path: Full path where the project will be saved
(e.g., 'C:/path/to/project.qgz')
"""
project = QgsProject.instance()
if project.fileName():
project.clear()
project.setFileName(path)
self.iface.mapCanvas().refresh()
# Save the project
if project.write():
return {
"created": f"Project created and saved successfully at: {path}",
"layer_count": len(project.mapLayers())
}
else:
raise Exception(f"Failed to save project to {path}")
def render_map(self, path, width=800, height=600, **kwargs):
"""Render the current map view to an image"""
try:
# Create map settings
ms = QgsMapSettings()
# Set layers to render
layers = list(QgsProject.instance().mapLayers().values())
ms.setLayers(layers)
# Set map canvas properties
rect = self.iface.mapCanvas().extent()
ms.setExtent(rect)
ms.setOutputSize(QSize(width, height))
ms.setBackgroundColor(QColor(255, 255, 255))
ms.setOutputDpi(96)
# Create the render
render = QgsMapRendererParallelJob(ms)
# Start rendering
render.start()
render.waitForFinished()
# Get the image and save
img = render.renderedImage()
if img.save(path):
return {
"rendered": True,
"path": path,
"width": width,
"height": height
}
else:
raise Exception(f"Failed to save rendered image to {path}")
except Exception as e:
raise Exception(f"Render error: {str(e)}")
class QgisMCPDockWidget(QDockWidget):
"""Dock widget for the QGIS MCP plugin"""
closed = pyqtSignal()
def __init__(self, iface):
super().__init__("QGIS MCP")
self.iface = iface
self.server = None
self.setup_ui()
def setup_ui(self):
"""Set up the dock widget UI"""
# Create widget and layout
widget = QWidget()
layout = QVBoxLayout()
widget.setLayout(layout)
# Add port selection
layout.addWidget(QLabel("Server Port:"))
self.port_spin = QSpinBox()
self.port_spin.setMinimum(1024)
self.port_spin.setMaximum(65535)
self.port_spin.setValue(9876)
layout.addWidget(self.port_spin)
# Add server control buttons
self.start_button = QPushButton("Start Server")
self.start_button.clicked.connect(self.start_server)
layout.addWidget(self.start_button)
self.stop_button = QPushButton("Stop Server")
self.stop_button.clicked.connect(self.stop_server)
self.stop_button.setEnabled(False)
layout.addWidget(self.stop_button)
# Add status label
self.status_label = QLabel("Server: Stopped")
layout.addWidget(self.status_label)
# Add to dock widget
self.setWidget(widget)
def start_server(self):
"""Start the server"""
if not self.server:
port = self.port_spin.value()
self.server = QgisMCPServer(port=port, iface=self.iface)
if self.server.start():
self.status_label.setText(f"Server: Running on port {self.server.port}")
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
self.port_spin.setEnabled(False)
def stop_server(self):
"""Stop the server"""
if self.server:
self.server.stop()
self.server = None
self.status_label.setText("Server: Stopped")
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
self.port_spin.setEnabled(True)
def closeEvent(self, event):
"""Stop server on dock close"""
self.stop_server()
self.closed.emit()
super().closeEvent(event)
class QgisMCPPlugin:
"""Main plugin class for QGIS MCP"""
def __init__(self, iface):
self.iface = iface
self.dock_widget = None
self.action = None
def initGui(self):
"""Initialize GUI"""
# Create action
self.action = QAction(
"QGIS MCP",
self.iface.mainWindow()
)
self.action.setCheckable(True)
self.action.triggered.connect(self.toggle_dock)
# Add to plugins menu and toolbar
self.iface.addPluginToMenu("QGIS MCP", self.action)
self.iface.addToolBarIcon(self.action)
def toggle_dock(self, checked):
"""Toggle the dock widget"""
if checked:
# Create dock widget if it doesn't exist
if not self.dock_widget:
self.dock_widget = QgisMCPDockWidget(self.iface)
self.iface.addDockWidget(Qt.RightDockWidgetArea, self.dock_widget)
# Connect close event
self.dock_widget.closed.connect(self.dock_closed)
else:
# Show existing dock widget
self.dock_widget.show()
else:
# Hide dock widget
if self.dock_widget:
self.dock_widget.hide()
def dock_closed(self):
"""Handle dock widget closed"""
self.action.setChecked(False)
def unload(self):
"""Unload plugin"""
# Stop server if running
if self.dock_widget:
self.dock_widget.stop_server()
self.iface.removeDockWidget(self.dock_widget)
self.dock_widget = None
# Remove plugin menu item and toolbar icon
self.iface.removePluginMenu("QGIS MCP", self.action)
self.iface.removeToolBarIcon(self.action)
# Plugin entry point
def classFactory(iface):
return QgisMCPPlugin(iface)