Skip to content

Commit 234a510

Browse files
committed
Neousys: Add minimal unittest case(s) for creating, receiving, sending and shutdown
1 parent 86d13ed commit 234a510

File tree

2 files changed

+141
-21
lines changed

2 files changed

+141
-21
lines changed

can/interfaces/neousys/neousys.py

+24-21
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,18 @@ class NeousysCanBitClk(Structure):
120120
0x00000007 # this is the mask for the can last error code (lec).
121121
)
122122

123+
NEOUSYS_CANLIB = None
124+
125+
try:
126+
if platform.system() == "Windows":
127+
NEOUSYS_CANLIB = WinDLL("./WDT_DIO.dll")
128+
else:
129+
NEOUSYS_CANLIB = CDLL("libwdt_dio.so")
130+
logger.info("Loaded Neousys WDT_DIO Can driver")
131+
except OSError as error:
132+
logger.info("Cannot Neousys CAN bus dll or share object: %d", format(error))
133+
# NEOUSYS_CANLIB = None
134+
123135

124136
class NeousysBus(BusABC):
125137
""" Neousys CAN bus Class"""
@@ -132,16 +144,7 @@ def __init__(self, channel, device=0, bitrate=500000, **kwargs):
132144
"""
133145
super(NeousysBus, self).__init__(channel, **kwargs)
134146

135-
self.canlib = None
136-
137-
try:
138-
if platform.system() == "Windows":
139-
self.canlib = WinDLL("./WDT_DIO.dll")
140-
else:
141-
self.canlib = CDLL("libwdt_dio.so")
142-
143-
logger.info("Loaded Neousys WDT_DIO Can driver")
144-
147+
if NEOUSYS_CANLIB is not None:
145148
self.channel = channel
146149

147150
self.device = device
@@ -162,41 +165,41 @@ def __init__(self, channel, device=0, bitrate=500000, **kwargs):
162165
self._neousys_status_cb
163166
)
164167

165-
if self.canlib.CAN_RegisterReceived(0, self._neousys_recv_cb) == 0:
168+
if NEOUSYS_CANLIB.CAN_RegisterReceived(0, self._neousys_recv_cb) == 0:
166169
logger.error("Neousys CAN bus Setup receive callback")
167170

168-
if self.canlib.CAN_RegisterStatus(0, self._neousys_status_cb) == 0:
171+
if NEOUSYS_CANLIB.CAN_RegisterStatus(0, self._neousys_status_cb) == 0:
169172
logger.error("Neousys CAN bus Setup status callback")
170173

171174
if (
172-
self.canlib.CAN_Setup(
175+
NEOUSYS_CANLIB.CAN_Setup(
173176
channel, byref(self.init_config), sizeof(self.init_config)
174177
)
175178
== 0
176179
):
177180
logger.error("Neousys CAN bus Setup Error")
178181

179-
if self.canlib.CAN_Start(channel) == 0:
182+
if NEOUSYS_CANLIB.CAN_Start(channel) == 0:
180183
logger.error("Neousys CAN bus Start Error")
181184

182-
except OSError as error:
183-
logger.info("Cannot Neousys CAN bus dll or share object: %d", format(error))
184-
185185
def send(self, msg, timeout=None):
186186
"""
187187
:param msg: message to send
188188
:param timeout: timeout is not used here
189189
:return:
190190
"""
191191

192-
if self.canlib is None:
192+
if NEOUSYS_CANLIB is None:
193193
logger.error("Can't send msg as Neousys DLL/SO is not loaded")
194194
else:
195195
tx_msg = NeousysCanMsg(
196196
msg.arbitration_id, 0, 0, msg.dlc, (c_ubyte * 8)(*msg.data)
197197
)
198198

199-
if self.canlib.CAN_Send(self.channel, byref(tx_msg), sizeof(tx_msg)) == 0:
199+
if (
200+
NEOUSYS_CANLIB.CAN_Send(self.channel, byref(tx_msg), sizeof(tx_msg))
201+
== 0
202+
):
200203
logger.error("Neousys Can can't send message")
201204

202205
def _recv_internal(self, timeout):
@@ -253,8 +256,8 @@ def _neousys_status_cb(self, status):
253256
logger.info("%s _neousys_status_cb: %d", self.init_config, status)
254257

255258
def shutdown(self):
256-
if self.canlib is not None:
257-
self.canlib.CAN_Stop(self.channel)
259+
if NEOUSYS_CANLIB is not None:
260+
NEOUSYS_CANLIB.CAN_Stop(self.channel)
258261

259262
def fileno(self):
260263
# Return an invalid file descriptor as not used

test/test_neousys.py

+117
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#!/usr/bin/env python
2+
# coding: utf-8
3+
4+
"""
5+
Test for Neousys Interface
6+
"""
7+
8+
import ctypes
9+
import os
10+
import pickle
11+
import unittest
12+
from unittest.mock import Mock
13+
14+
from ctypes import (
15+
byref,
16+
cast,
17+
POINTER,
18+
sizeof,
19+
c_ubyte,
20+
)
21+
22+
import pytest
23+
24+
import can
25+
from can.interfaces.neousys import neousys
26+
27+
28+
class TestNeousysBus(unittest.TestCase):
29+
def setUp(self) -> None:
30+
can.interfaces.neousys.neousys.NEOUSYS_CANLIB = Mock()
31+
can.interfaces.neousys.neousys.NEOUSYS_CANLIB.CAN_RegisterReceived = Mock(
32+
return_value=1
33+
)
34+
can.interfaces.neousys.neousys.NEOUSYS_CANLIB.CAN_RegisterStatus = Mock(
35+
return_value=1
36+
)
37+
can.interfaces.neousys.neousys.NEOUSYS_CANLIB.CAN_Setup = Mock(return_value=1)
38+
can.interfaces.neousys.neousys.NEOUSYS_CANLIB.CAN_Start = Mock(return_value=1)
39+
can.interfaces.neousys.neousys.NEOUSYS_CANLIB.CAN_Send = Mock(return_value=1)
40+
can.interfaces.neousys.neousys.NEOUSYS_CANLIB.CAN_Stop = Mock(return_value=1)
41+
self.bus = can.Bus(channel=0, bustype="neousys", _testing=True)
42+
43+
def tearDown(self) -> None:
44+
if self.bus:
45+
self.bus.shutdown()
46+
self.bus = None
47+
48+
def test_bus_creation(self) -> None:
49+
self.assertIsInstance(self.bus, neousys.NeousysBus)
50+
self.assertTrue(neousys.NEOUSYS_CANLIB.CAN_Setup.called)
51+
self.assertTrue(neousys.NEOUSYS_CANLIB.CAN_Start.called)
52+
self.assertTrue(neousys.NEOUSYS_CANLIB.CAN_RegisterReceived.called)
53+
self.assertTrue(neousys.NEOUSYS_CANLIB.CAN_RegisterStatus.called)
54+
self.assertTrue(neousys.NEOUSYS_CANLIB.CAN_Send.not_called)
55+
self.assertTrue(neousys.NEOUSYS_CANLIB.CAN_Stop.not_called)
56+
57+
CAN_Start_args = (
58+
can.interfaces.neousys.neousys.NEOUSYS_CANLIB.CAN_Setup.call_args[0]
59+
)
60+
61+
# sizeof struct should be 16
62+
self.assertEqual(CAN_Start_args[0], 0)
63+
self.assertEqual(CAN_Start_args[2], 16)
64+
NeousysCanSetup_struct = cast(
65+
CAN_Start_args[1], POINTER(neousys.NeousysCanSetup)
66+
)
67+
self.assertEqual(NeousysCanSetup_struct.contents.bitRate, 500000)
68+
self.assertEqual(
69+
NeousysCanSetup_struct.contents.recvConfig,
70+
neousys.NEOUSYS_CAN_MSG_USE_ID_FILTER,
71+
)
72+
73+
def test_bus_creation_bitrate(self) -> None:
74+
self.bus = can.Bus(channel=0, bustype="neousys", bitrate=200000, _testing=True)
75+
self.assertIsInstance(self.bus, neousys.NeousysBus)
76+
CAN_Start_args = (
77+
can.interfaces.neousys.neousys.NEOUSYS_CANLIB.CAN_Setup.call_args[0]
78+
)
79+
80+
# sizeof struct should be 16
81+
self.assertEqual(CAN_Start_args[0], 0)
82+
self.assertEqual(CAN_Start_args[2], 16)
83+
NeousysCanSetup_struct = cast(
84+
CAN_Start_args[1], POINTER(neousys.NeousysCanSetup)
85+
)
86+
self.assertEqual(NeousysCanSetup_struct.contents.bitRate, 200000)
87+
self.assertEqual(
88+
NeousysCanSetup_struct.contents.recvConfig,
89+
neousys.NEOUSYS_CAN_MSG_USE_ID_FILTER,
90+
)
91+
92+
def test_receive(self) -> None:
93+
recv_msg = self.bus.recv(timeout=0.05)
94+
self.assertEqual(recv_msg, None)
95+
msg_data = [0x01, 0x02, 0x03, 0x04, 0x05]
96+
NeousysCanMsg_msg = neousys.NeousysCanMsg(
97+
0x01, 0x00, 0x00, 0x05, (c_ubyte * 8)(*msg_data)
98+
)
99+
self.bus._neousys_recv_cb(byref(NeousysCanMsg_msg), sizeof(NeousysCanMsg_msg))
100+
recv_msg = self.bus.recv(timeout=0.05)
101+
self.assertEqual(recv_msg.dlc, 5)
102+
self.assertSequenceEqual(recv_msg.data, msg_data)
103+
104+
def test_send(self) -> None:
105+
msg = can.Message(
106+
arbitration_id=0x01, data=[1, 2, 3, 4, 5, 6, 7, 8], is_extended_id=False
107+
)
108+
self.bus.send(msg)
109+
self.assertTrue(neousys.NEOUSYS_CANLIB.CAN_Send.called)
110+
111+
def test_shutdown(self) -> None:
112+
self.bus.shutdown()
113+
self.assertTrue(neousys.NEOUSYS_CANLIB.CAN_Stop.called)
114+
115+
116+
if __name__ == "__main__":
117+
unittest.main()

0 commit comments

Comments
 (0)