9
9
import logging
10
10
11
11
from can import BusABC, Message
12
- from ..exceptions import CanInterfaceNotImplementedError, CanOperationError
12
+ from ..exceptions import (
13
+ CanInterfaceNotImplementedError,
14
+ CanInitializationError,
15
+ CanOperationError,
16
+ error_check,
17
+ )
13
18
from can import typechecking
14
19
15
20
@@ -62,8 +67,6 @@ def __init__(
62
67
**kwargs: Any,
63
68
) -> None:
64
69
"""
65
- :raise ValueError: if both *bitrate* and *btr* are set
66
-
67
70
:param str channel:
68
71
port of underlying serial or usb device (e.g. ``/dev/ttyUSB0``, ``COM8``, ...)
69
72
Must not be empty. Can also end with ``@115200`` (or similarly) to specify the baudrate.
@@ -79,48 +82,57 @@ def __init__(
79
82
Time to wait in seconds after opening serial connection
80
83
:param rtscts:
81
84
turn hardware handshake (RTS/CTS) on and off
85
+
86
+ :raise ValueError: if both ``bitrate`` and ``btr`` are set or the channel is invalid
87
+ :raise CanInterfaceNotImplementedError: if the serial module is missing
88
+ :raise CanInitializationError: if the underlying serial connection could not be established
82
89
"""
83
90
if serial is None:
84
91
raise CanInterfaceNotImplementedError("The serial module is not installed")
85
92
86
93
if not channel: # if None or empty
87
- raise TypeError ("Must specify a serial port.")
94
+ raise ValueError ("Must specify a serial port.")
88
95
if "@" in channel:
89
96
(channel, baudrate) = channel.split("@")
90
97
ttyBaudrate = int(baudrate)
91
- self.serialPortOrig = serial.serial_for_url(
92
- channel, baudrate=ttyBaudrate, rtscts=rtscts
93
- )
98
+
99
+ with error_check(exception_type=CanInitializationError):
100
+ self.serialPortOrig = serial.serial_for_url(
101
+ channel, baudrate=ttyBaudrate, rtscts=rtscts
102
+ )
94
103
95
104
self._buffer = bytearray()
96
105
97
106
time.sleep(sleep_after_open)
98
107
99
- if bitrate is not None and btr is not None:
100
- raise ValueError("Bitrate and btr mutually exclusive.")
101
- if bitrate is not None:
102
- self.set_bitrate(bitrate)
103
- if btr is not None:
104
- self.set_bitrate_reg(btr)
105
- self.open()
108
+ with error_check(exception_type=CanInitializationError):
109
+ if bitrate is not None and btr is not None:
110
+ raise ValueError("Bitrate and btr mutually exclusive.")
111
+ if bitrate is not None:
112
+ self.set_bitrate(bitrate)
113
+ if btr is not None:
114
+ self.set_bitrate_reg(btr)
115
+ self.open()
106
116
107
117
super().__init__(
108
118
channel, ttyBaudrate=115200, bitrate=None, rtscts=False, **kwargs
109
119
)
110
120
111
121
def set_bitrate(self, bitrate: int) -> None:
112
122
"""
113
- :raise ValueError: if both *bitrate* is not among the possible values
114
-
115
123
:param bitrate:
116
124
Bitrate in bit/s
125
+
126
+ :raise ValueError: if ``bitrate`` is not among the possible values
117
127
"""
118
- self.close()
119
128
if bitrate in self._BITRATES:
120
- self._write(self. _BITRATES[bitrate])
129
+ bitrate_code = self._BITRATES[bitrate]
121
130
else:
122
131
bitrates = ", ".join(str(k) for k in self._BITRATES.keys())
123
132
raise ValueError(f"Invalid bitrate, choose one of {bitrates}.")
133
+
134
+ self.close()
135
+ self._write(bitrate_code)
124
136
self.open()
125
137
126
138
def set_bitrate_reg(self, btr: str) -> None:
@@ -133,33 +145,38 @@ def set_bitrate_reg(self, btr: str) -> None:
133
145
self.open()
134
146
135
147
def _write(self, string: str) -> None:
136
- self.serialPortOrig.write(string.encode() + self.LINE_TERMINATOR)
137
- self.serialPortOrig.flush()
148
+ with error_check("Could not write to serial device"):
149
+ self.serialPortOrig.write(string.encode() + self.LINE_TERMINATOR)
150
+ self.serialPortOrig.flush()
138
151
139
152
def _read(self, timeout: Optional[float]) -> Optional[str]:
140
153
141
- # first read what is already in receive buffer
142
- while self.serialPortOrig.in_waiting:
143
- self._buffer += self.serialPortOrig.read()
144
- # if we still don't have a complete message, do a blocking read
145
- start = time.time()
146
- time_left = timeout
147
- while not (ord(self._OK) in self._buffer or ord(self._ERROR) in self._buffer):
148
- self.serialPortOrig.timeout = time_left
149
- byte = self.serialPortOrig.read()
150
- if byte:
151
- self._buffer += byte
152
- # if timeout is None, try indefinitely
153
- if timeout is None:
154
- continue
155
- # try next one only if there still is time, and with
156
- # reduced timeout
157
- else:
158
- time_left = timeout - (time.time() - start)
159
- if time_left > 0:
154
+ with error_check("Could not read from serial device"):
155
+ # first read what is already in receive buffer
156
+ while self.serialPortOrig.in_waiting:
157
+ self._buffer += self.serialPortOrig.read()
158
+ # if we still don't have a complete message, do a blocking read
159
+ start = time.time()
160
+ time_left = timeout
161
+ while not (
162
+ ord(self._OK) in self._buffer or ord(self._ERROR) in self._buffer
163
+ ):
164
+ self.serialPortOrig.timeout = time_left
165
+ byte = self.serialPortOrig.read()
166
+ if byte:
167
+ self._buffer += byte
168
+ # if timeout is None, try indefinitely
169
+ if timeout is None:
160
170
continue
171
+ # try next one only if there still is time, and with
172
+ # reduced timeout
161
173
else:
162
- return None
174
+ time_left = timeout - (time.time() - start)
175
+ if time_left > 0:
176
+ continue
177
+ else:
178
+ return None
179
+
163
180
# return first message
164
181
for i in range(len(self._buffer)):
165
182
if self._buffer[i] == ord(self._OK) or self._buffer[i] == ord(self._ERROR):
@@ -170,8 +187,9 @@ def _read(self, timeout: Optional[float]) -> Optional[str]:
170
187
171
188
def flush(self) -> None:
172
189
del self._buffer[:]
173
- while self.serialPortOrig.in_waiting:
174
- self.serialPortOrig.read()
190
+ with error_check("Could not flush"):
191
+ while self.serialPortOrig.in_waiting:
192
+ self.serialPortOrig.read()
175
193
176
194
def open(self) -> None:
177
195
self._write("O")
@@ -247,7 +265,8 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None:
247
265
248
266
def shutdown(self) -> None:
249
267
self.close()
250
- self.serialPortOrig.close()
268
+ with error_check("Could not close serial socket"):
269
+ self.serialPortOrig.close()
251
270
252
271
def fileno(self) -> int:
253
272
try:
0 commit comments