Skip to content

Commit 1a6ecfc

Browse files
Adds CANtact interface (#853)
1 parent 587325c commit 1a6ecfc

File tree

6 files changed

+233
-0
lines changed

6 files changed

+233
-0
lines changed

can/interfaces/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
"canalystii": ("can.interfaces.canalystii", "CANalystIIBus"),
2525
"systec": ("can.interfaces.systec", "UcanBus"),
2626
"seeedstudio": ("can.interfaces.seeedstudio", "SeeedBus"),
27+
"cantact": ("can.interfaces.cantact", "CantactBus"),
2728
}
2829

2930
BACKENDS.update(

can/interfaces/cantact.py

+152
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
"""
2+
Interface for CANtact devices from Linklayer Labs
3+
"""
4+
5+
import time
6+
import logging
7+
from unittest.mock import Mock
8+
9+
from can import BusABC, Message
10+
11+
logger = logging.getLogger(__name__)
12+
13+
try:
14+
import cantact
15+
except ImportError:
16+
logger.warning(
17+
"The CANtact module is not installed. Install it using `python3 -m pip install cantact`"
18+
)
19+
20+
21+
class CantactBus(BusABC):
22+
"""CANtact interface"""
23+
24+
@staticmethod
25+
def _detect_available_configs():
26+
try:
27+
interface = cantact.Interface()
28+
except NameError:
29+
# couldn't import cantact, so no configurations are available
30+
return []
31+
32+
channels = []
33+
for i in range(0, interface.channel_count()):
34+
channels.append({"interface": "cantact", "channel": "ch:%d" % i})
35+
return channels
36+
37+
def __init__(
38+
self,
39+
channel,
40+
bitrate=500000,
41+
poll_interval=0.01,
42+
monitor=False,
43+
bit_timing=None,
44+
_testing=False,
45+
**kwargs
46+
):
47+
"""
48+
:param int channel:
49+
Channel number (zero indexed, labeled on multi-channel devices)
50+
:param int bitrate:
51+
Bitrate in bits/s
52+
:param bool monitor:
53+
If true, operate in listen-only monitoring mode
54+
:param BitTiming bit_timing
55+
Optional BitTiming to use for custom bit timing setting. Overrides bitrate if not None.
56+
"""
57+
58+
if _testing:
59+
self.interface = MockInterface()
60+
else:
61+
self.interface = cantact.Interface()
62+
63+
self.channel = int(channel)
64+
self.channel_info = "CANtact: ch:%s" % channel
65+
66+
# configure the interface
67+
if bit_timing is None:
68+
# use bitrate
69+
self.interface.set_bitrate(int(channel), int(bitrate))
70+
else:
71+
# use custom bit timing
72+
self.interface.set_bit_timing(
73+
int(channel),
74+
int(bit_timing.brp),
75+
int(bit_timing.tseg1),
76+
int(bit_timing.tseg2),
77+
int(bit_timing.sjw),
78+
)
79+
self.interface.set_enabled(int(channel), True)
80+
self.interface.set_monitor(int(channel), monitor)
81+
self.interface.start()
82+
83+
super().__init__(
84+
channel=channel, bitrate=bitrate, poll_interval=poll_interval, **kwargs
85+
)
86+
87+
def _recv_internal(self, timeout):
88+
frame = self.interface.recv(int(timeout * 1000))
89+
if frame is None:
90+
# timeout occured
91+
return None, False
92+
93+
msg = Message(
94+
arbitration_id=frame["id"],
95+
is_extended_id=frame["extended"],
96+
timestamp=frame["timestamp"],
97+
is_remote_frame=frame["rtr"],
98+
dlc=frame["dlc"],
99+
data=frame["data"][: frame["dlc"]],
100+
channel=frame["channel"],
101+
is_rx=(not frame["loopback"]), # received if not loopback frame
102+
)
103+
return msg, False
104+
105+
def send(self, msg, timeout=None):
106+
self.interface.send(
107+
self.channel,
108+
msg.arbitration_id,
109+
bool(msg.is_extended_id),
110+
bool(msg.is_remote_frame),
111+
msg.dlc,
112+
msg.data,
113+
)
114+
115+
def shutdown(self):
116+
self.interface.stop()
117+
118+
119+
def mock_recv(timeout):
120+
if timeout > 0:
121+
frame = {}
122+
frame["id"] = 0x123
123+
frame["extended"] = False
124+
frame["timestamp"] = time.time()
125+
frame["loopback"] = False
126+
frame["rtr"] = False
127+
frame["dlc"] = 8
128+
frame["data"] = [1, 2, 3, 4, 5, 6, 7, 8]
129+
frame["channel"] = 0
130+
return frame
131+
else:
132+
# simulate timeout when timeout = 0
133+
return None
134+
135+
136+
class MockInterface:
137+
"""
138+
Mock interface to replace real interface when testing.
139+
This allows for tests to run without actual hardware.
140+
"""
141+
142+
start = Mock()
143+
set_bitrate = Mock()
144+
set_bit_timing = Mock()
145+
set_enabled = Mock()
146+
set_monitor = Mock()
147+
start = Mock()
148+
stop = Mock()
149+
send = Mock()
150+
channel_count = Mock(return_value=1)
151+
152+
recv = Mock(side_effect=mock_recv)

doc/installation.rst

+14
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,20 @@ To install ``python-can`` using the XL Driver Library as the backend:
9090

9191
3. Use Vector Hardware Configuration to assign a channel to your application.
9292

93+
CANtact
94+
~~~~~~~
95+
96+
CANtact is supported on Linux, Windows, and macOS.
97+
To install ``python-can`` using the CANtact driver backend:
98+
99+
``python3 -m pip install "python-can[cantact]"``
100+
101+
If ``python-can`` is already installed, the CANtact backend can be installed seperately:
102+
103+
``python3 -m pip install cantact``
104+
105+
Additional CANtact documentation is available at https://cantact.io.
106+
93107
Installing python-can in development mode
94108
-----------------------------------------
95109

setup.py

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
"seeedstudio": ["pyserial>=3.0"],
3030
"serial": ["pyserial~=3.0"],
3131
"neovi": ["python-ics>=2.12"],
32+
"cantact": ["cantact>=0.0.7"],
3233
}
3334

3435
setup(

test/simplecyclic_test.py

+1
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ def test_stopping_perodic_tasks(self):
152152

153153
bus.shutdown()
154154

155+
@unittest.skipIf(IS_CI, "fails randomly when run on CI server")
155156
def test_thread_based_cyclic_send_task(self):
156157
bus = can.ThreadSafeBus(bustype="virtual")
157158
msg = can.Message(

test/test_cantact.py

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#!/usr/bin/env python
2+
# coding: utf-8
3+
4+
"""
5+
Tests for CANtact interfaces
6+
"""
7+
8+
import time
9+
import logging
10+
import unittest
11+
from unittest.mock import Mock, patch
12+
13+
import pytest
14+
15+
import can
16+
from can.interfaces import cantact
17+
18+
19+
class CantactTest(unittest.TestCase):
20+
def test_bus_creation(self):
21+
bus = can.Bus(channel=0, bustype="cantact", _testing=True)
22+
self.assertIsInstance(bus, cantact.CantactBus)
23+
cantact.MockInterface.set_bitrate.assert_called()
24+
cantact.MockInterface.set_bit_timing.assert_not_called()
25+
cantact.MockInterface.set_enabled.assert_called()
26+
cantact.MockInterface.set_monitor.assert_called()
27+
cantact.MockInterface.start.assert_called()
28+
29+
def test_bus_creation_bittiming(self):
30+
cantact.MockInterface.set_bitrate.reset_mock()
31+
32+
bt = can.BitTiming(tseg1=13, tseg2=2, brp=6, sjw=1)
33+
bus = can.Bus(channel=0, bustype="cantact", bit_timing=bt, _testing=True)
34+
self.assertIsInstance(bus, cantact.CantactBus)
35+
cantact.MockInterface.set_bitrate.assert_not_called()
36+
cantact.MockInterface.set_bit_timing.assert_called()
37+
cantact.MockInterface.set_enabled.assert_called()
38+
cantact.MockInterface.set_monitor.assert_called()
39+
cantact.MockInterface.start.assert_called()
40+
41+
def test_transmit(self):
42+
bus = can.Bus(channel=0, bustype="cantact", _testing=True)
43+
msg = can.Message(
44+
arbitration_id=0xC0FFEF, data=[1, 2, 3, 4, 5, 6, 7, 8], is_extended_id=True
45+
)
46+
bus.send(msg)
47+
cantact.MockInterface.send.assert_called()
48+
49+
def test_recv(self):
50+
bus = can.Bus(channel=0, bustype="cantact", _testing=True)
51+
frame = bus.recv(timeout=0.5)
52+
cantact.MockInterface.recv.assert_called()
53+
self.assertIsInstance(frame, can.Message)
54+
55+
def test_recv_timeout(self):
56+
bus = can.Bus(channel=0, bustype="cantact", _testing=True)
57+
frame = bus.recv(timeout=0.0)
58+
cantact.MockInterface.recv.assert_called()
59+
self.assertIsNone(frame)
60+
61+
def test_shutdown(self):
62+
bus = can.Bus(channel=0, bustype="cantact", _testing=True)
63+
bus.shutdown()
64+
cantact.MockInterface.stop.assert_called()

0 commit comments

Comments
 (0)