Skip to content

Commit 9341525

Browse files
Implement a pipelining API
This is so that we can send multiple commands to a board while keeping to only one request-response cycle. Co-Authored-By: Jake Howard <[email protected]>
1 parent 062f944 commit 9341525

File tree

1 file changed

+115
-42
lines changed

1 file changed

+115
-42
lines changed

sbot/serial_wrapper.py

Lines changed: 115 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"""
77
from __future__ import annotations
88

9+
import itertools
910
import logging
1011
import sys
1112
import threading
@@ -16,7 +17,6 @@
1617
import serial
1718

1819
from .exceptions import BoardDisconnectionError
19-
from .logging import TRACE
2020
from .utils import IN_SIMULATOR, BoardIdentity
2121

2222
logger = logging.getLogger(__name__)
@@ -122,51 +122,88 @@ def stop(self) -> None:
122122
"""
123123
self._disconnect()
124124

125+
def _connect_if_needed(self) -> None:
126+
if not self.serial.is_open:
127+
if not self._connect():
128+
# If the serial port cannot be opened raise an error,
129+
# this will be caught by the retry decorator
130+
raise BoardDisconnectionError((
131+
f'Connection to board {self.identity.board_type}:'
132+
f'{self.identity.asset_tag} could not be established',
133+
))
134+
125135
@retry(times=3, exceptions=(BoardDisconnectionError, UnicodeDecodeError))
126-
def query(self, data: str) -> str:
136+
def query_multi(self, commands: list[str]) -> list[str]:
127137
"""
128138
Send a command to the board and return the response.
129139
130-
This method will automatically reconnect to the board and retry the command
140+
This method will automatically reconnect to the board and retry the commands
131141
up to 3 times on serial errors.
132142
133-
:param data: The data to write to the board.
143+
:param commands: The commands to write to the board.
134144
:raises BoardDisconnectionError: If the serial connection fails during the transaction,
135145
including failing to respond to the command.
136-
:return: The response from the board with the trailing newline removed.
146+
:return: The responses from the board with the trailing newlines removed.
137147
"""
148+
# Verify no command has a newline in it, and build a command `bytes` from the
149+
# list of commands
150+
encoded_commands: list[bytes] = []
151+
invalid_commands: list[tuple[str, str]] = []
152+
153+
for command in commands:
154+
if '\n' in command:
155+
invalid_commands.append(("contains newline", command))
156+
else:
157+
try:
158+
byte_form = command.encode(encoding='utf-8')
159+
except UnicodeEncodeError as e:
160+
invalid_commands.append((str(e), command))
161+
else:
162+
encoded_commands.append(byte_form)
163+
encoded_commands.append(b'\n')
164+
165+
if invalid_commands:
166+
invalid_commands.sort()
167+
168+
invalid_command_groups = dict(itertools.groupby(
169+
invalid_commands,
170+
key=lambda x: x[0],
171+
))
172+
173+
error_message = "\n".join(
174+
["Invalid commands:"] +
175+
[
176+
f" {reason}: " + ", ".join(
177+
repr(command)
178+
for _, command in grouped_commands
179+
)
180+
for reason, grouped_commands in invalid_command_groups.items()
181+
],
182+
)
183+
raise ValueError(error_message)
184+
185+
full_commands = b''.join(encoded_commands)
186+
138187
with self._lock:
139-
if not self.serial.is_open:
140-
if not self._connect():
141-
# If the serial port cannot be opened raise an error,
142-
# this will be caught by the retry decorator
143-
raise BoardDisconnectionError((
144-
f'Connection to board {self.identity.board_type}:'
145-
f'{self.identity.asset_tag} could not be established',
146-
))
188+
# If the serial port is not open, try to connect
189+
self._connect_if_needed() # TODO: Write me
147190

191+
# Contain all the serial IO in a try-catch; on error, disconnect and raise an error
148192
try:
149-
logger.log(TRACE, f'Serial write - {data!r}')
150-
cmd = data + '\n'
151-
self.serial.write(cmd.encode())
152-
153-
response = self.serial.readline()
154-
try:
155-
response_str = response.decode().rstrip('\n')
156-
except UnicodeDecodeError as e:
157-
logger.warning(
158-
f"Board {self.identity.board_type}:{self.identity.asset_tag} "
159-
f"returned invalid characters: {response!r}")
160-
raise e
161-
logger.log(
162-
TRACE, f'Serial read - {response_str!r}')
163-
164-
if b'\n' not in response:
165-
# If readline times out no error is raised, it returns an incomplete string
166-
logger.warning((
167-
f'Connection to board {self.identity.board_type}:'
168-
f'{self.identity.asset_tag} timed out waiting for response'
169-
))
193+
# Send the commands to the board
194+
self.serial.write(full_commands)
195+
196+
# Read as many lines as there are commands
197+
responses_binary = [
198+
self.serial.readline()
199+
for _ in range(len(commands))
200+
]
201+
202+
# Check all responses have a trailing newline (an incomplete
203+
# response will not).
204+
# This is within the lock and try-catch to ensure the serial port
205+
# is closed on error.
206+
if not all(response.endswith(b'\n') for response in responses_binary):
170207
raise serial.SerialException('Timeout on readline')
171208
except serial.SerialException:
172209
# Serial connection failed, close the port and raise an error
@@ -176,15 +213,51 @@ def query(self, data: str) -> str:
176213
'disconnected during transaction'
177214
))
178215

179-
if response_str.startswith('NACK'):
180-
_, error_msg = response_str.split(':', maxsplit=1)
181-
logger.error((
182-
f'Board {self.identity.board_type}:{self.identity.asset_tag} '
183-
f'returned NACK on write command: {error_msg}'
184-
))
185-
raise RuntimeError(error_msg)
216+
# Decode all the responses as UTF-8
217+
try:
218+
responses_decoded = [
219+
response.decode("utf-8").rstrip('\n')
220+
for response in responses_binary
221+
]
222+
except UnicodeDecodeError as e:
223+
logger.warning(
224+
f"Board {self.identity.board_type}:{self.identity.asset_tag} "
225+
f"returned invalid characters: {responses_binary!r}")
226+
raise e
227+
228+
# Collect any NACK responses; if any, raise an error
229+
nack_prefix = 'NACK:'
230+
nack_responses = [
231+
response
232+
for response in responses_decoded
233+
if response.startswith(nack_prefix)
234+
]
235+
236+
if nack_responses:
237+
errors = [response[len(nack_prefix):] for response in nack_responses]
238+
# We can't use exception groups due to needing to support Python 3.8
239+
raise (
240+
RuntimeError(errors[0])
241+
if len(errors) == 1
242+
else RuntimeError("Multiple errors: " + ", ".join(errors))
243+
)
244+
245+
# Return the list of responses
246+
return responses_decoded
247+
248+
def query(self, data: str) -> str:
249+
"""
250+
Send a command to the board and return the response.
251+
252+
This method will automatically reconnect to the board and retry the command
253+
up to 3 times on serial errors.
186254
187-
return response_str
255+
:param data: The data to write to the board.
256+
:raises BoardDisconnectionError: If the serial connection fails during the transaction,
257+
including failing to respond to the command.
258+
:return: The response from the board with the trailing newline removed.
259+
"""
260+
return self.query_multi([data])[0]
188261

189262
def write(self, data: str) -> None:
190263
"""

0 commit comments

Comments
 (0)