-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclient.py
122 lines (93 loc) · 3.46 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import json
import logging
import socket
import struct
import time
from typing import Callable
import cv2
import numpy as np
from cogstream.engine import EngineResult
from cogstream.engine.channel import JpegSendChannel, ClientChannel, JsonResultReceiveChannel
from cogstream.engine.engine import Frame
from cogstream.engine.io import SocketFrameWriter, SocketResultScanner
from cogstream.mediator.client import StreamSpec
from cogstream.typing import deep_to_dict
logger = logging.getLogger(__name__)
def _send_stream_spec(sock, data: bytes):
"""
Sends the given data over the socket and prefixes the packet with an appropriate packet header.
:param sock: the socket to send the data over
:param data: the data to send
:return:
"""
# Prefix each message with a 4-byte length (little endian)
arr = struct.pack('<i', len(data)) + data
sock.sendall(arr)
class EngineClient:
channel: ClientChannel
def __init__(self, stream_spec: StreamSpec) -> None:
super().__init__()
self.stream_spec = stream_spec
self.address = stream_spec.get_socket_address()
self.sock = None
self.channel = None
self.acknowledged = False
self.frame_cnt = 0
def open(self):
if self.sock is not None:
raise ValueError('already connected')
address = self.address
logger.info('connecting to engine at %s', address)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(address)
doc = json.dumps(deep_to_dict(self.stream_spec))
logger.info("initializing stream with stream spec: %s", doc)
_send_stream_spec(sock, doc.encode('UTF-8'))
self.sock = sock
in_channel = JsonResultReceiveChannel(0, SocketResultScanner(sock))
out_channel = JpegSendChannel(0, SocketFrameWriter(sock))
self.channel = ClientChannel(in_channel, out_channel)
self.acknowledged = True
def request(self, frame: np.ndarray) -> EngineResult:
self.channel.send(Frame(frame, frame_id=self.frame_cnt))
# TODO: determine whether to read result synchronously from stream spec
self.frame_cnt += 1
return self.channel.recv_result()
def request_async(self, frame: np.ndarray):
self.channel.send(Frame(frame, frame_id=self.frame_cnt))
self.frame_cnt += 1
def close(self):
if self.sock is None:
return
logger.info('closing socket')
self.sock.close()
EngineResultCallback = Callable[[np.ndarray, EngineResult], None]
def stream_camera(cam, client, show=True, max_fps=0, on_result: EngineResultCallback = None):
# target frame inter-arrival time
if max_fps:
ia = 1 / max_fps
else:
ia = 0
while True:
start = time.time()
check, frame = cam.read()
if not check:
logger.info('no more frames to read')
break
if show:
cv2.imshow("capture", frame)
if on_result is None:
client.request_async(frame)
else:
result = client.request(frame)
logger.debug('received result %s', result)
on_result(frame, result)
if ia > 0:
delay = ia - (time.time() - start)
if delay >= 0:
time.sleep(delay)
if show:
key = cv2.waitKey(1)
if key == ord('q'):
break
logger.info('fps: %.2f' % (1 / (time.time() - start)))