10
10
import logging
11
11
import time
12
12
import os
13
- from typing import List , NamedTuple , Optional , Tuple , Sequence , Union
13
+ from types import ModuleType
14
+ from typing import (
15
+ List ,
16
+ NamedTuple ,
17
+ Optional ,
18
+ Tuple ,
19
+ Sequence ,
20
+ Union ,
21
+ Any ,
22
+ Dict ,
23
+ Callable ,
24
+ )
14
25
26
+ WaitForSingleObject : Optional [Callable [[int , int ], int ]]
27
+ INFINITE : Optional [int ]
15
28
try :
16
29
# Try builtin Python 3 Windows API
17
30
from _winapi import WaitForSingleObject , INFINITE
18
31
19
32
HAS_EVENTS = True
20
33
except ImportError :
21
- try :
22
- # Try pywin32 package
23
- from win32event import WaitForSingleObject , INFINITE
24
-
25
- HAS_EVENTS = True
26
- except ImportError :
27
- # Use polling instead
28
- HAS_EVENTS = False
34
+ WaitForSingleObject , INFINITE = None , None
35
+ HAS_EVENTS = False
29
36
30
37
# Import Modules
31
38
# ==============
36
43
deprecated_args_alias ,
37
44
time_perfcounter_correlation ,
38
45
)
39
- from can .typechecking import AutoDetectedConfig , CanFilters
46
+ from can .typechecking import AutoDetectedConfig , CanFilters , Channel
40
47
41
48
# Define Module Logger
42
49
# ====================
48
55
from . import xldefine , xlclass
49
56
50
57
# Import safely Vector API module for Travis tests
51
- xldriver = None
58
+ xldriver : Optional [ ModuleType ] = None
52
59
try :
53
60
from . import xldriver
54
61
except Exception as exc :
@@ -74,19 +81,19 @@ def __init__(
74
81
can_filters : Optional [CanFilters ] = None ,
75
82
poll_interval : float = 0.01 ,
76
83
receive_own_messages : bool = False ,
77
- bitrate : int = None ,
84
+ bitrate : Optional [ int ] = None ,
78
85
rx_queue_size : int = 2 ** 14 ,
79
- app_name : str = "CANalyzer" ,
80
- serial : int = None ,
86
+ app_name : Optional [ str ] = "CANalyzer" ,
87
+ serial : Optional [ int ] = None ,
81
88
fd : bool = False ,
82
- data_bitrate : int = None ,
89
+ data_bitrate : Optional [ int ] = None ,
83
90
sjw_abr : int = 2 ,
84
91
tseg1_abr : int = 6 ,
85
92
tseg2_abr : int = 3 ,
86
93
sjw_dbr : int = 2 ,
87
94
tseg1_dbr : int = 6 ,
88
95
tseg2_dbr : int = 3 ,
89
- ** kwargs ,
96
+ ** kwargs : Any ,
90
97
) -> None :
91
98
"""
92
99
:param channel:
@@ -144,16 +151,18 @@ def __init__(
144
151
145
152
if xldriver is None :
146
153
raise CanInterfaceNotImplementedError ("The Vector API has not been loaded" )
154
+ self .xldriver = xldriver # keep reference so mypy knows it is not None
147
155
148
156
self .poll_interval = poll_interval
149
157
150
- if isinstance (channel , str ): # must be checked before generic Sequence
158
+ self .channels : Sequence [int ]
159
+ if isinstance (channel , int ):
160
+ self .channels = [channel ]
161
+ elif isinstance (channel , str ): # must be checked before generic Sequence
151
162
# Assume comma separated string of channels
152
163
self .channels = [int (ch .strip ()) for ch in channel .split ("," )]
153
- elif isinstance (channel , int ):
154
- self .channels = [channel ]
155
164
elif isinstance (channel , Sequence ):
156
- self .channels = channel
165
+ self .channels = [ int ( ch ) for ch in channel ]
157
166
else :
158
167
raise TypeError (
159
168
f"Invalid type for channels parameter: { type (channel ).__name__ } "
@@ -185,12 +194,12 @@ def __init__(
185
194
"None of the configured channels could be found on the specified hardware."
186
195
)
187
196
188
- xldriver .xlOpenDriver ()
197
+ self . xldriver .xlOpenDriver ()
189
198
self .port_handle = xlclass .XLportHandle (xldefine .XL_INVALID_PORTHANDLE )
190
199
self .mask = 0
191
200
self .fd = fd
192
201
# Get channels masks
193
- self .channel_masks = {}
202
+ self .channel_masks : Dict [ Optional [ Channel ], int ] = {}
194
203
self .index_to_channel = {}
195
204
196
205
for channel in self .channels :
@@ -200,7 +209,7 @@ def __init__(
200
209
app_name , channel
201
210
)
202
211
LOG .debug ("Channel index %d found" , channel )
203
- idx = xldriver .xlGetChannelIndex (hw_type , hw_index , hw_channel )
212
+ idx = self . xldriver .xlGetChannelIndex (hw_type , hw_index , hw_channel )
204
213
if idx < 0 :
205
214
# Undocumented behavior! See issue #353.
206
215
# If hardware is unavailable, this function returns -1.
@@ -224,7 +233,7 @@ def __init__(
224
233
if bitrate or fd :
225
234
permission_mask .value = self .mask
226
235
if fd :
227
- xldriver .xlOpenPort (
236
+ self . xldriver .xlOpenPort (
228
237
self .port_handle ,
229
238
self ._app_name ,
230
239
self .mask ,
@@ -234,7 +243,7 @@ def __init__(
234
243
xldefine .XL_BusTypes .XL_BUS_TYPE_CAN ,
235
244
)
236
245
else :
237
- xldriver .xlOpenPort (
246
+ self . xldriver .xlOpenPort (
238
247
self .port_handle ,
239
248
self ._app_name ,
240
249
self .mask ,
@@ -267,7 +276,7 @@ def __init__(
267
276
self .canFdConf .tseg1Dbr = int (tseg1_dbr )
268
277
self .canFdConf .tseg2Dbr = int (tseg2_dbr )
269
278
270
- xldriver .xlCanFdSetConfiguration (
279
+ self . xldriver .xlCanFdSetConfiguration (
271
280
self .port_handle , self .mask , self .canFdConf
272
281
)
273
282
LOG .info (
@@ -289,7 +298,7 @@ def __init__(
289
298
)
290
299
else :
291
300
if bitrate :
292
- xldriver .xlCanSetChannelBitrate (
301
+ self . xldriver .xlCanSetChannelBitrate (
293
302
self .port_handle , permission_mask , bitrate
294
303
)
295
304
LOG .info ("SetChannelBitrate: baudr.=%u" , bitrate )
@@ -298,16 +307,16 @@ def __init__(
298
307
299
308
# Enable/disable TX receipts
300
309
tx_receipts = 1 if receive_own_messages else 0
301
- xldriver .xlCanSetChannelMode (self .port_handle , self .mask , tx_receipts , 0 )
310
+ self . xldriver .xlCanSetChannelMode (self .port_handle , self .mask , tx_receipts , 0 )
302
311
303
312
if HAS_EVENTS :
304
313
self .event_handle = xlclass .XLhandle ()
305
- xldriver .xlSetNotification (self .port_handle , self .event_handle , 1 )
314
+ self . xldriver .xlSetNotification (self .port_handle , self .event_handle , 1 )
306
315
else :
307
316
LOG .info ("Install pywin32 to avoid polling" )
308
317
309
318
try :
310
- xldriver .xlActivateChannel (
319
+ self . xldriver .xlActivateChannel (
311
320
self .port_handle , self .mask , xldefine .XL_BusTypes .XL_BUS_TYPE_CAN , 0
312
321
)
313
322
except VectorOperationError as error :
@@ -320,17 +329,17 @@ def __init__(
320
329
if time .get_clock_info ("time" ).resolution > 1e-5 :
321
330
ts , perfcounter = time_perfcounter_correlation ()
322
331
try :
323
- xldriver .xlGetSyncTime (self .port_handle , offset )
332
+ self . xldriver .xlGetSyncTime (self .port_handle , offset )
324
333
except VectorInitializationError :
325
- xldriver .xlGetChannelTime (self .port_handle , self .mask , offset )
334
+ self . xldriver .xlGetChannelTime (self .port_handle , self .mask , offset )
326
335
current_perfcounter = time .perf_counter ()
327
336
now = ts + (current_perfcounter - perfcounter )
328
337
self ._time_offset = now - offset .value * 1e-9
329
338
else :
330
339
try :
331
- xldriver .xlGetSyncTime (self .port_handle , offset )
340
+ self . xldriver .xlGetSyncTime (self .port_handle , offset )
332
341
except VectorInitializationError :
333
- xldriver .xlGetChannelTime (self .port_handle , self .mask , offset )
342
+ self . xldriver .xlGetChannelTime (self .port_handle , self .mask , offset )
334
343
self ._time_offset = time .time () - offset .value * 1e-9
335
344
336
345
except VectorInitializationError :
@@ -339,7 +348,7 @@ def __init__(
339
348
self ._is_filtered = False
340
349
super ().__init__ (channel = channel , can_filters = can_filters , ** kwargs )
341
350
342
- def _apply_filters (self , filters ) -> None :
351
+ def _apply_filters (self , filters : Optional [ CanFilters ] ) -> None :
343
352
if filters :
344
353
# Only up to one filter per ID type allowed
345
354
if len (filters ) == 1 or (
@@ -348,7 +357,7 @@ def _apply_filters(self, filters) -> None:
348
357
):
349
358
try :
350
359
for can_filter in filters :
351
- xldriver .xlCanSetChannelAcceptance (
360
+ self . xldriver .xlCanSetChannelAcceptance (
352
361
self .port_handle ,
353
362
self .mask ,
354
363
can_filter ["can_id" ],
@@ -370,14 +379,14 @@ def _apply_filters(self, filters) -> None:
370
379
# fallback: reset filters
371
380
self ._is_filtered = False
372
381
try :
373
- xldriver .xlCanSetChannelAcceptance (
382
+ self . xldriver .xlCanSetChannelAcceptance (
374
383
self .port_handle ,
375
384
self .mask ,
376
385
0x0 ,
377
386
0x0 ,
378
387
xldefine .XL_AcceptanceFilter .XL_CAN_EXT ,
379
388
)
380
- xldriver .xlCanSetChannelAcceptance (
389
+ self . xldriver .xlCanSetChannelAcceptance (
381
390
self .port_handle ,
382
391
self .mask ,
383
392
0x0 ,
@@ -417,14 +426,14 @@ def _recv_internal(
417
426
else :
418
427
time_left = end_time - time .time ()
419
428
time_left_ms = max (0 , int (time_left * 1000 ))
420
- WaitForSingleObject (self .event_handle .value , time_left_ms )
429
+ WaitForSingleObject (self .event_handle .value , time_left_ms ) # type: ignore
421
430
else :
422
431
# Wait a short time until we try again
423
432
time .sleep (self .poll_interval )
424
433
425
434
def _recv_canfd (self ) -> Optional [Message ]:
426
435
xl_can_rx_event = xlclass .XLcanRxEvent ()
427
- xldriver .xlCanReceive (self .port_handle , xl_can_rx_event )
436
+ self . xldriver .xlCanReceive (self .port_handle , xl_can_rx_event )
428
437
429
438
if xl_can_rx_event .tag == xldefine .XL_CANFD_RX_EventTags .XL_CAN_EV_TAG_RX_OK :
430
439
is_rx = True
@@ -470,7 +479,7 @@ def _recv_canfd(self) -> Optional[Message]:
470
479
def _recv_can (self ) -> Optional [Message ]:
471
480
xl_event = xlclass .XLevent ()
472
481
event_count = ctypes .c_uint (1 )
473
- xldriver .xlReceive (self .port_handle , event_count , xl_event )
482
+ self . xldriver .xlReceive (self .port_handle , event_count , xl_event )
474
483
475
484
if xl_event .tag != xldefine .XL_EventTags .XL_RECEIVE_MSG :
476
485
self .handle_can_event (xl_event )
@@ -523,7 +532,7 @@ def handle_canfd_event(self, event: xlclass.XLcanRxEvent) -> None:
523
532
`XL_CAN_EV_TAG_TX_ERROR`, `XL_TIMER` or `XL_CAN_EV_TAG_CHIP_STATE` tag.
524
533
"""
525
534
526
- def send (self , msg : Message , timeout : Optional [float ] = None ):
535
+ def send (self , msg : Message , timeout : Optional [float ] = None ) -> None :
527
536
self ._send_sequence ([msg ])
528
537
529
538
def _send_sequence (self , msgs : Sequence [Message ]) -> int :
@@ -548,7 +557,9 @@ def _send_can_msg_sequence(self, msgs: Sequence[Message]) -> int:
548
557
* map (self ._build_xl_event , msgs )
549
558
)
550
559
551
- xldriver .xlCanTransmit (self .port_handle , mask , message_count , xl_event_array )
560
+ self .xldriver .xlCanTransmit (
561
+ self .port_handle , mask , message_count , xl_event_array
562
+ )
552
563
return message_count .value
553
564
554
565
@staticmethod
@@ -580,7 +591,7 @@ def _send_can_fd_msg_sequence(self, msgs: Sequence[Message]) -> int:
580
591
)
581
592
582
593
msg_count_sent = ctypes .c_uint (0 )
583
- xldriver .xlCanTransmitEx (
594
+ self . xldriver .xlCanTransmitEx (
584
595
self .port_handle , mask , message_count , msg_count_sent , xl_can_tx_event_array
585
596
)
586
597
return msg_count_sent .value
@@ -611,17 +622,17 @@ def _build_xl_can_tx_event(msg: Message) -> xlclass.XLcanTxEvent:
611
622
return xl_can_tx_event
612
623
613
624
def flush_tx_buffer (self ) -> None :
614
- xldriver .xlCanFlushTransmitQueue (self .port_handle , self .mask )
625
+ self . xldriver .xlCanFlushTransmitQueue (self .port_handle , self .mask )
615
626
616
627
def shutdown (self ) -> None :
617
628
super ().shutdown ()
618
- xldriver .xlDeactivateChannel (self .port_handle , self .mask )
619
- xldriver .xlClosePort (self .port_handle )
620
- xldriver .xlCloseDriver ()
629
+ self . xldriver .xlDeactivateChannel (self .port_handle , self .mask )
630
+ self . xldriver .xlClosePort (self .port_handle )
631
+ self . xldriver .xlCloseDriver ()
621
632
622
633
def reset (self ) -> None :
623
- xldriver .xlDeactivateChannel (self .port_handle , self .mask )
624
- xldriver .xlActivateChannel (
634
+ self . xldriver .xlDeactivateChannel (self .port_handle , self .mask )
635
+ self . xldriver .xlActivateChannel (
625
636
self .port_handle , self .mask , xldefine .XL_BusTypes .XL_BUS_TYPE_CAN , 0
626
637
)
627
638
@@ -657,7 +668,7 @@ def _detect_available_configs() -> List[AutoDetectedConfig]:
657
668
"vector_channel_config" : channel_config ,
658
669
}
659
670
)
660
- return configs
671
+ return configs # type: ignore
661
672
662
673
@staticmethod
663
674
def popup_vector_hw_configuration (wait_for_finish : int = 0 ) -> None :
@@ -666,6 +677,9 @@ def popup_vector_hw_configuration(wait_for_finish: int = 0) -> None:
666
677
:param wait_for_finish:
667
678
Time to wait for user input in milliseconds.
668
679
"""
680
+ if xldriver is None :
681
+ raise CanInterfaceNotImplementedError ("The Vector API has not been loaded" )
682
+
669
683
xldriver .xlPopupHwConfig (ctypes .c_char_p (), ctypes .c_uint (wait_for_finish ))
670
684
671
685
@staticmethod
@@ -685,14 +699,17 @@ def get_application_config(
685
699
:raises can.interfaces.vector.VectorInitializationError:
686
700
If the application name does not exist in the Vector hardware configuration.
687
701
"""
702
+ if xldriver is None :
703
+ raise CanInterfaceNotImplementedError ("The Vector API has not been loaded" )
704
+
688
705
hw_type = ctypes .c_uint ()
689
706
hw_index = ctypes .c_uint ()
690
707
hw_channel = ctypes .c_uint ()
691
- app_channel = ctypes .c_uint (app_channel )
708
+ _app_channel = ctypes .c_uint (app_channel )
692
709
693
710
xldriver .xlGetApplConfig (
694
711
app_name .encode (),
695
- app_channel ,
712
+ _app_channel ,
696
713
hw_type ,
697
714
hw_index ,
698
715
hw_channel ,
@@ -707,7 +724,7 @@ def set_application_config(
707
724
hw_type : xldefine .XL_HardwareType ,
708
725
hw_index : int ,
709
726
hw_channel : int ,
710
- ** kwargs ,
727
+ ** kwargs : Any ,
711
728
) -> None :
712
729
"""Modify the application settings in Vector Hardware Configuration.
713
730
@@ -737,6 +754,9 @@ def set_application_config(
737
754
:raises can.interfaces.vector.VectorInitializationError:
738
755
If the application name does not exist in the Vector hardware configuration.
739
756
"""
757
+ if xldriver is None :
758
+ raise CanInterfaceNotImplementedError ("The Vector API has not been loaded" )
759
+
740
760
xldriver .xlSetApplConfig (
741
761
app_name .encode (),
742
762
app_channel ,
@@ -758,7 +778,7 @@ def set_timer_rate(self, timer_rate_ms: int) -> None:
758
778
the timer events.
759
779
"""
760
780
timer_rate_10us = timer_rate_ms * 100
761
- xldriver .xlSetTimerRate (self .port_handle , timer_rate_10us )
781
+ self . xldriver .xlSetTimerRate (self .port_handle , timer_rate_10us )
762
782
763
783
764
784
class VectorChannelConfig (NamedTuple ):
0 commit comments