-
Notifications
You must be signed in to change notification settings - Fork 387
/
Copy pathwebsocket_server.py
503 lines (414 loc) · 16.4 KB
/
websocket_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
# Author: Johan Hanssen Seferidis
# License: MIT
import sys
import struct
import ssl
from base64 import b64encode
from hashlib import sha1
import logging
from socket import error as SocketError
import errno
import threading
from socketserver import ThreadingMixIn, TCPServer, StreamRequestHandler
from urllib.parse import urlparse, parse_qs
from websocket_server.thread import WebsocketServerThread
logger = logging.getLogger(__name__)
logging.basicConfig()
'''
+-+-+-+-+-------+-+-------------+-------------------------------+
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| Payload Data continued ... |
+---------------------------------------------------------------+
'''
FIN = 0x80
OPCODE = 0x0f
MASKED = 0x80
PAYLOAD_LEN = 0x7f
PAYLOAD_LEN_EXT16 = 0x7e
PAYLOAD_LEN_EXT64 = 0x7f
OPCODE_CONTINUATION = 0x0
OPCODE_TEXT = 0x1
OPCODE_BINARY = 0x2
OPCODE_CLOSE_CONN = 0x8
OPCODE_PING = 0x9
OPCODE_PONG = 0xA
CLOSE_STATUS_NORMAL = 1000
DEFAULT_CLOSE_REASON = bytes('', encoding='utf-8')
class API():
def run_forever(self, threaded=False):
return self._run_forever(threaded)
def new_client(self, client, server):
pass
def client_left(self, client, server):
pass
def message_received(self, client, server, message):
pass
def set_fn_new_client(self, fn):
self.new_client = fn
def set_fn_client_left(self, fn):
self.client_left = fn
def set_fn_message_received(self, fn):
self.message_received = fn
def send_message(self, client, msg):
self._unicast(client, msg)
def send_message_to_all(self, msg):
self._multicast(msg)
def deny_new_connections(self, status=CLOSE_STATUS_NORMAL, reason=DEFAULT_CLOSE_REASON):
self._deny_new_connections(status, reason)
def allow_new_connections(self):
self._allow_new_connections()
def shutdown_gracefully(self, status=CLOSE_STATUS_NORMAL, reason=DEFAULT_CLOSE_REASON):
self._shutdown_gracefully(status, reason)
def shutdown_abruptly(self):
self._shutdown_abruptly()
def disconnect_clients_gracefully(self, status=CLOSE_STATUS_NORMAL, reason=DEFAULT_CLOSE_REASON):
self._disconnect_clients_gracefully(status, reason)
def disconnect_clients_abruptly(self):
self._disconnect_clients_abruptly()
class WebsocketServer(ThreadingMixIn, TCPServer, API):
"""
A websocket server waiting for clients to connect.
Args:
port(int): Port to bind to
host(str): Hostname or IP to listen for connections. By default 127.0.0.1
is being used. To accept connections from any client, you should use
0.0.0.0.
loglevel: Logging level from logging module to use for logging. By default
warnings and errors are being logged.
Properties:
clients(list): A list of connected clients. A client is a dictionary
like below.
{
'id' : id,
'handler' : handler,
'address' : (addr, port)
}
"""
allow_reuse_address = True
daemon_threads = True # comment to keep threads alive until finished
def __init__(self, host='127.0.0.1', port=0, loglevel=logging.WARNING, key=None, cert=None):
logger.setLevel(loglevel)
TCPServer.__init__(self, (host, port), WebSocketHandler)
self.host = host
self.port = self.socket.getsockname()[1]
self.key = key
self.cert = cert
self.clients = []
self.id_counter = 0
self.thread = None
self._deny_clients = False
def _run_forever(self, threaded):
cls_name = self.__class__.__name__
try:
logger.info("Listening on port %d for clients.." % self.port)
if threaded:
self.daemon = True
self.thread = WebsocketServerThread(target=super().serve_forever, daemon=True, logger=logger)
logger.info(f"Starting {cls_name} on thread {self.thread.getName()}.")
self.thread.start()
else:
self.thread = threading.current_thread()
logger.info(f"Starting {cls_name} on main thread.")
super().serve_forever()
except KeyboardInterrupt:
self.server_close()
logger.info("Server terminated.")
except Exception as e:
logger.error(str(e), exc_info=True)
sys.exit(1)
def _message_received_(self, handler, msg):
self.message_received(self.handler_to_client(handler), self, msg)
def _ping_received_(self, handler, msg):
handler.send_pong(msg)
def _pong_received_(self, handler, msg):
pass
def _new_client_(self, handler):
if self._deny_clients:
status = self._deny_clients["status"]
reason = self._deny_clients["reason"]
handler.send_close(status, reason)
self._terminate_client_handler(handler)
return
self.id_counter += 1
client = {
'id': self.id_counter,
'handler': handler,
'address': handler.client_address
}
self.clients.append(client)
self.new_client(client, self)
def _client_left_(self, handler):
client = self.handler_to_client(handler)
self.client_left(client, self)
if client in self.clients:
self.clients.remove(client)
def _unicast(self, receiver_client, msg):
receiver_client['handler'].send_message(msg)
def _multicast(self, msg):
for client in self.clients:
self._unicast(client, msg)
def handler_to_client(self, handler):
for client in self.clients:
if client['handler'] == handler:
return client
def _terminate_client_handler(self, handler):
handler.keep_alive = False
handler.finish()
handler.connection.close()
def _terminate_client_handlers(self):
"""
Ensures request handler for each client is terminated correctly
"""
for client in self.clients:
self._terminate_client_handler(client["handler"])
def _shutdown_gracefully(self, status=CLOSE_STATUS_NORMAL, reason=DEFAULT_CLOSE_REASON):
"""
Send a CLOSE handshake to all connected clients before terminating server
"""
self.keep_alive = False
self._disconnect_clients_gracefully(status, reason)
self.server_close()
self.shutdown()
def _shutdown_abruptly(self):
"""
Terminate server without sending a CLOSE handshake
"""
self.keep_alive = False
self._disconnect_clients_abruptly()
self.server_close()
self.shutdown()
def _disconnect_clients_gracefully(self, status=CLOSE_STATUS_NORMAL, reason=DEFAULT_CLOSE_REASON):
"""
Terminate clients gracefully without shutting down the server
"""
for client in self.clients:
client["handler"].send_close(status, reason)
self._terminate_client_handlers()
def _disconnect_clients_abruptly(self):
"""
Terminate clients abruptly (no CLOSE handshake) without shutting down the server
"""
self._terminate_client_handlers()
def _deny_new_connections(self, status, reason):
self._deny_clients = {
"status": status,
"reason": reason,
}
def _allow_new_connections(self):
self._deny_clients = False
class WebSocketHandler(StreamRequestHandler):
def __init__(self, socket, addr, server):
self.server = server
self.headers = {}
self.query_params = {}
assert not hasattr(self, "_send_lock"), "_send_lock already exists"
self._send_lock = threading.Lock()
if server.key and server.cert:
try:
socket = ssl.wrap_socket(socket, server_side=True, certfile=server.cert, keyfile=server.key)
except: # Not sure which exception it throws if the key/cert isn't found
logger.warning("SSL not available (are the paths {} and {} correct for the key and cert?)".format(server.key, server.cert))
StreamRequestHandler.__init__(self, socket, addr, server)
def setup(self):
StreamRequestHandler.setup(self)
self.keep_alive = True
self.handshake_done = False
self.valid_client = False
def handle(self):
while self.keep_alive:
if not self.handshake_done:
self.handshake()
elif self.valid_client:
self.read_next_message()
def read_bytes(self, num):
return self.rfile.read(num)
def read_next_message(self):
try:
b1, b2 = self.read_bytes(2)
except SocketError as e: # to be replaced with ConnectionResetError for py3
if e.errno == errno.ECONNRESET:
logger.info("Client closed connection.")
self.keep_alive = 0
return
b1, b2 = 0, 0
except ValueError as e:
b1, b2 = 0, 0
fin = b1 & FIN
opcode = b1 & OPCODE
masked = b2 & MASKED
payload_length = b2 & PAYLOAD_LEN
if opcode == OPCODE_CLOSE_CONN:
logger.info("Client asked to close connection.")
self.keep_alive = 0
return
if not masked:
logger.warning("Client must always be masked.")
self.keep_alive = 0
return
if opcode == OPCODE_CONTINUATION:
logger.warning("Continuation frames are not supported.")
return
elif opcode == OPCODE_BINARY:
logger.warning("Binary frames are not supported.")
return
elif opcode == OPCODE_TEXT:
opcode_handler = self.server._message_received_
elif opcode == OPCODE_PING:
opcode_handler = self.server._ping_received_
elif opcode == OPCODE_PONG:
opcode_handler = self.server._pong_received_
else:
logger.warning("Unknown opcode %#x." % opcode)
self.keep_alive = 0
return
if payload_length == 126:
payload_length = struct.unpack(">H", self.rfile.read(2))[0]
elif payload_length == 127:
payload_length = struct.unpack(">Q", self.rfile.read(8))[0]
masks = self.read_bytes(4)
message_bytes = bytearray()
for message_byte in self.read_bytes(payload_length):
message_byte ^= masks[len(message_bytes) % 4]
message_bytes.append(message_byte)
opcode_handler(self, message_bytes.decode('utf8'))
def send_message(self, message):
self.send_text(message)
def send_pong(self, message):
self.send_text(message, OPCODE_PONG)
def send_close(self, status=CLOSE_STATUS_NORMAL, reason=DEFAULT_CLOSE_REASON):
"""
Send CLOSE to client
Args:
status: Status as defined in https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1
reason: Text with reason of closing the connection
"""
if status < CLOSE_STATUS_NORMAL or status > 1015:
raise Exception(f"CLOSE status must be between 1000 and 1015, got {status}")
header = bytearray()
payload = struct.pack('!H', status) + reason
payload_length = len(payload)
assert payload_length <= 125, "We only support short closing reasons at the moment"
# Send CLOSE with status & reason
header.append(FIN | OPCODE_CLOSE_CONN)
header.append(payload_length)
with self._send_lock:
self.request.send(header + payload)
def send_text(self, message, opcode=OPCODE_TEXT):
"""
Important: Fragmented(=continuation) messages are not supported since
their usage cases are limited - when we don't know the payload length.
"""
# Validate message
if isinstance(message, bytes):
message = try_decode_UTF8(message) # this is slower but ensures we have UTF-8
if not message:
logger.warning("Can\'t send message, message is not valid UTF-8")
return False
elif not isinstance(message, str):
logger.warning('Can\'t send message, message has to be a string or bytes. Got %s' % type(message))
return False
header = bytearray()
payload = encode_to_UTF8(message)
payload_length = len(payload)
# Normal payload
if payload_length <= 125:
header.append(FIN | opcode)
header.append(payload_length)
# Extended payload
elif payload_length >= 126 and payload_length <= 65535:
header.append(FIN | opcode)
header.append(PAYLOAD_LEN_EXT16)
header.extend(struct.pack(">H", payload_length))
# Huge extended payload
elif payload_length < 18446744073709551616:
header.append(FIN | opcode)
header.append(PAYLOAD_LEN_EXT64)
header.extend(struct.pack(">Q", payload_length))
else:
raise Exception("Message is too big. Consider breaking it into chunks.")
return
with self._send_lock:
self.request.send(header + payload)
@staticmethod
def parse_query(http_get):
"""
Parses the query parameters from the first line.
Example: "GET /?q=hello HTTP/1.1" will be parsed to {'q': ['hello']}
"""
query = http_get.split(" ")[1] # example: http_get = "GET /?q=hello HTTP/1.1"
parsed_url = urlparse(query)
return parse_qs(parsed_url.query)
def read_http_headers(self):
headers = {}
# first line should be HTTP GET
http_get = self.rfile.readline().decode().strip()
assert http_get.upper().startswith('GET')
# remaining should be headers
while True:
header = self.rfile.readline().decode().strip()
if not header:
break
head, value = header.split(':', 1)
headers[head.lower().strip()] = value.strip()
self.headers = headers
self.query_params = WebSocketHandler.parse_query(http_get)
return headers
def handshake(self):
headers = self.read_http_headers()
try:
assert headers['upgrade'].lower() == 'websocket'
except AssertionError:
self.keep_alive = False
return
try:
key = headers['sec-websocket-key']
except KeyError:
logger.warning("Client tried to connect but was missing a key")
self.keep_alive = False
return
response = self.make_handshake_response(key)
with self._send_lock:
self.handshake_done = self.request.send(response.encode())
self.valid_client = True
self.server._new_client_(self)
@classmethod
def make_handshake_response(cls, key):
return \
'HTTP/1.1 101 Switching Protocols\r\n'\
'Upgrade: websocket\r\n' \
'Connection: Upgrade\r\n' \
'Sec-WebSocket-Accept: %s\r\n' \
'\r\n' % cls.calculate_response_key(key)
@classmethod
def calculate_response_key(cls, key):
GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
hash = sha1(key.encode() + GUID.encode())
response_key = b64encode(hash.digest()).strip()
return response_key.decode('ASCII')
def finish(self):
self.server._client_left_(self)
def encode_to_UTF8(data):
try:
return data.encode('UTF-8')
except UnicodeEncodeError as e:
logger.error("Could not encode data to UTF-8 -- %s" % e)
return False
except Exception as e:
raise(e)
return False
def try_decode_UTF8(data):
try:
return data.decode('utf-8')
except UnicodeDecodeError:
return False
except Exception as e:
raise(e)