Skip to content

Commit de6c67b

Browse files
authored
Merge branch 'develop' into reader-writer-case
2 parents cfe752a + ac6e3ef commit de6c67b

11 files changed

+328
-100
lines changed

can/bus.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import logging
1212
import threading
1313
from time import time
14-
from aenum import Enum, auto
14+
from enum import Enum, auto
1515

1616
from can.broadcastmanager import ThreadBasedCyclicSendTask
1717
from can.message import Message
@@ -199,7 +199,7 @@ def send_periodic(
199199
Disable to instead manage tasks manually.
200200
:return:
201201
A started task instance. Note the task can be stopped (and depending on
202-
the backend modified) by calling the :meth:`stop` method.
202+
the backend modified) by calling the task's :meth:`stop` method.
203203
204204
.. note::
205205
@@ -430,3 +430,6 @@ def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]:
430430
for usage in the interface's bus constructor.
431431
"""
432432
raise NotImplementedError()
433+
434+
def fileno(self) -> int:
435+
raise NotImplementedError("fileno is not implemented using current CAN bus")

can/interfaces/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
"canalystii": ("can.interfaces.canalystii", "CANalystIIBus"),
2525
"systec": ("can.interfaces.systec", "UcanBus"),
2626
"seeedstudio": ("can.interfaces.seeedstudio", "SeeedBus"),
27+
"cantact": ("can.interfaces.cantact", "CantactBus"),
2728
}
2829

2930
BACKENDS.update(

can/interfaces/cantact.py

+152
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
"""
2+
Interface for CANtact devices from Linklayer Labs
3+
"""
4+
5+
import time
6+
import logging
7+
from unittest.mock import Mock
8+
9+
from can import BusABC, Message
10+
11+
logger = logging.getLogger(__name__)
12+
13+
try:
14+
import cantact
15+
except ImportError:
16+
logger.warning(
17+
"The CANtact module is not installed. Install it using `python3 -m pip install cantact`"
18+
)
19+
20+
21+
class CantactBus(BusABC):
22+
"""CANtact interface"""
23+
24+
@staticmethod
25+
def _detect_available_configs():
26+
try:
27+
interface = cantact.Interface()
28+
except NameError:
29+
# couldn't import cantact, so no configurations are available
30+
return []
31+
32+
channels = []
33+
for i in range(0, interface.channel_count()):
34+
channels.append({"interface": "cantact", "channel": "ch:%d" % i})
35+
return channels
36+
37+
def __init__(
38+
self,
39+
channel,
40+
bitrate=500000,
41+
poll_interval=0.01,
42+
monitor=False,
43+
bit_timing=None,
44+
_testing=False,
45+
**kwargs
46+
):
47+
"""
48+
:param int channel:
49+
Channel number (zero indexed, labeled on multi-channel devices)
50+
:param int bitrate:
51+
Bitrate in bits/s
52+
:param bool monitor:
53+
If true, operate in listen-only monitoring mode
54+
:param BitTiming bit_timing
55+
Optional BitTiming to use for custom bit timing setting. Overrides bitrate if not None.
56+
"""
57+
58+
if _testing:
59+
self.interface = MockInterface()
60+
else:
61+
self.interface = cantact.Interface()
62+
63+
self.channel = int(channel)
64+
self.channel_info = "CANtact: ch:%s" % channel
65+
66+
# configure the interface
67+
if bit_timing is None:
68+
# use bitrate
69+
self.interface.set_bitrate(int(channel), int(bitrate))
70+
else:
71+
# use custom bit timing
72+
self.interface.set_bit_timing(
73+
int(channel),
74+
int(bit_timing.brp),
75+
int(bit_timing.tseg1),
76+
int(bit_timing.tseg2),
77+
int(bit_timing.sjw),
78+
)
79+
self.interface.set_enabled(int(channel), True)
80+
self.interface.set_monitor(int(channel), monitor)
81+
self.interface.start()
82+
83+
super().__init__(
84+
channel=channel, bitrate=bitrate, poll_interval=poll_interval, **kwargs
85+
)
86+
87+
def _recv_internal(self, timeout):
88+
frame = self.interface.recv(int(timeout * 1000))
89+
if frame is None:
90+
# timeout occured
91+
return None, False
92+
93+
msg = Message(
94+
arbitration_id=frame["id"],
95+
is_extended_id=frame["extended"],
96+
timestamp=frame["timestamp"],
97+
is_remote_frame=frame["rtr"],
98+
dlc=frame["dlc"],
99+
data=frame["data"][: frame["dlc"]],
100+
channel=frame["channel"],
101+
is_rx=(not frame["loopback"]), # received if not loopback frame
102+
)
103+
return msg, False
104+
105+
def send(self, msg, timeout=None):
106+
self.interface.send(
107+
self.channel,
108+
msg.arbitration_id,
109+
bool(msg.is_extended_id),
110+
bool(msg.is_remote_frame),
111+
msg.dlc,
112+
msg.data,
113+
)
114+
115+
def shutdown(self):
116+
self.interface.stop()
117+
118+
119+
def mock_recv(timeout):
120+
if timeout > 0:
121+
frame = {}
122+
frame["id"] = 0x123
123+
frame["extended"] = False
124+
frame["timestamp"] = time.time()
125+
frame["loopback"] = False
126+
frame["rtr"] = False
127+
frame["dlc"] = 8
128+
frame["data"] = [1, 2, 3, 4, 5, 6, 7, 8]
129+
frame["channel"] = 0
130+
return frame
131+
else:
132+
# simulate timeout when timeout = 0
133+
return None
134+
135+
136+
class MockInterface:
137+
"""
138+
Mock interface to replace real interface when testing.
139+
This allows for tests to run without actual hardware.
140+
"""
141+
142+
start = Mock()
143+
set_bitrate = Mock()
144+
set_bit_timing = Mock()
145+
set_enabled = Mock()
146+
set_monitor = Mock()
147+
start = Mock()
148+
stop = Mock()
149+
send = Mock()
150+
channel_count = Mock(return_value=1)
151+
152+
recv = Mock(side_effect=mock_recv)

0 commit comments

Comments
 (0)