diff --git a/can/interfaces/vector/canlib.py b/can/interfaces/vector/canlib.py index 1313b00b7..4a90ea1bc 100644 --- a/can/interfaces/vector/canlib.py +++ b/can/interfaces/vector/canlib.py @@ -10,22 +10,29 @@ import logging import time import os -from typing import List, NamedTuple, Optional, Tuple, Sequence, Union +from types import ModuleType +from typing import ( + List, + NamedTuple, + Optional, + Tuple, + Sequence, + Union, + Any, + Dict, + Callable, +) +WaitForSingleObject: Optional[Callable[[int, int], int]] +INFINITE: Optional[int] try: # Try builtin Python 3 Windows API from _winapi import WaitForSingleObject, INFINITE HAS_EVENTS = True except ImportError: - try: - # Try pywin32 package - from win32event import WaitForSingleObject, INFINITE - - HAS_EVENTS = True - except ImportError: - # Use polling instead - HAS_EVENTS = False + WaitForSingleObject, INFINITE = None, None + HAS_EVENTS = False # Import Modules # ============== @@ -36,7 +43,7 @@ deprecated_args_alias, time_perfcounter_correlation, ) -from can.typechecking import AutoDetectedConfig, CanFilters +from can.typechecking import AutoDetectedConfig, CanFilters, Channel # Define Module Logger # ==================== @@ -48,7 +55,7 @@ from . import xldefine, xlclass # Import safely Vector API module for Travis tests -xldriver = None +xldriver: Optional[ModuleType] = None try: from . import xldriver except Exception as exc: @@ -74,19 +81,19 @@ def __init__( can_filters: Optional[CanFilters] = None, poll_interval: float = 0.01, receive_own_messages: bool = False, - bitrate: int = None, + bitrate: Optional[int] = None, rx_queue_size: int = 2 ** 14, - app_name: str = "CANalyzer", - serial: int = None, + app_name: Optional[str] = "CANalyzer", + serial: Optional[int] = None, fd: bool = False, - data_bitrate: int = None, + data_bitrate: Optional[int] = None, sjw_abr: int = 2, tseg1_abr: int = 6, tseg2_abr: int = 3, sjw_dbr: int = 2, tseg1_dbr: int = 6, tseg2_dbr: int = 3, - **kwargs, + **kwargs: Any, ) -> None: """ :param channel: @@ -144,16 +151,18 @@ def __init__( if xldriver is None: raise CanInterfaceNotImplementedError("The Vector API has not been loaded") + self.xldriver = xldriver # keep reference so mypy knows it is not None self.poll_interval = poll_interval - if isinstance(channel, str): # must be checked before generic Sequence + self.channels: Sequence[int] + if isinstance(channel, int): + self.channels = [channel] + elif isinstance(channel, str): # must be checked before generic Sequence # Assume comma separated string of channels self.channels = [int(ch.strip()) for ch in channel.split(",")] - elif isinstance(channel, int): - self.channels = [channel] elif isinstance(channel, Sequence): - self.channels = channel + self.channels = [int(ch) for ch in channel] else: raise TypeError( f"Invalid type for channels parameter: {type(channel).__name__}" @@ -185,12 +194,12 @@ def __init__( "None of the configured channels could be found on the specified hardware." ) - xldriver.xlOpenDriver() + self.xldriver.xlOpenDriver() self.port_handle = xlclass.XLportHandle(xldefine.XL_INVALID_PORTHANDLE) self.mask = 0 self.fd = fd # Get channels masks - self.channel_masks = {} + self.channel_masks: Dict[Optional[Channel], int] = {} self.index_to_channel = {} for channel in self.channels: @@ -200,7 +209,7 @@ def __init__( app_name, channel ) LOG.debug("Channel index %d found", channel) - idx = xldriver.xlGetChannelIndex(hw_type, hw_index, hw_channel) + idx = self.xldriver.xlGetChannelIndex(hw_type, hw_index, hw_channel) if idx < 0: # Undocumented behavior! See issue #353. # If hardware is unavailable, this function returns -1. @@ -224,7 +233,7 @@ def __init__( if bitrate or fd: permission_mask.value = self.mask if fd: - xldriver.xlOpenPort( + self.xldriver.xlOpenPort( self.port_handle, self._app_name, self.mask, @@ -234,7 +243,7 @@ def __init__( xldefine.XL_BusTypes.XL_BUS_TYPE_CAN, ) else: - xldriver.xlOpenPort( + self.xldriver.xlOpenPort( self.port_handle, self._app_name, self.mask, @@ -267,7 +276,7 @@ def __init__( self.canFdConf.tseg1Dbr = int(tseg1_dbr) self.canFdConf.tseg2Dbr = int(tseg2_dbr) - xldriver.xlCanFdSetConfiguration( + self.xldriver.xlCanFdSetConfiguration( self.port_handle, self.mask, self.canFdConf ) LOG.info( @@ -289,7 +298,7 @@ def __init__( ) else: if bitrate: - xldriver.xlCanSetChannelBitrate( + self.xldriver.xlCanSetChannelBitrate( self.port_handle, permission_mask, bitrate ) LOG.info("SetChannelBitrate: baudr.=%u", bitrate) @@ -298,16 +307,16 @@ def __init__( # Enable/disable TX receipts tx_receipts = 1 if receive_own_messages else 0 - xldriver.xlCanSetChannelMode(self.port_handle, self.mask, tx_receipts, 0) + self.xldriver.xlCanSetChannelMode(self.port_handle, self.mask, tx_receipts, 0) if HAS_EVENTS: self.event_handle = xlclass.XLhandle() - xldriver.xlSetNotification(self.port_handle, self.event_handle, 1) + self.xldriver.xlSetNotification(self.port_handle, self.event_handle, 1) else: LOG.info("Install pywin32 to avoid polling") try: - xldriver.xlActivateChannel( + self.xldriver.xlActivateChannel( self.port_handle, self.mask, xldefine.XL_BusTypes.XL_BUS_TYPE_CAN, 0 ) except VectorOperationError as error: @@ -320,17 +329,17 @@ def __init__( if time.get_clock_info("time").resolution > 1e-5: ts, perfcounter = time_perfcounter_correlation() try: - xldriver.xlGetSyncTime(self.port_handle, offset) + self.xldriver.xlGetSyncTime(self.port_handle, offset) except VectorInitializationError: - xldriver.xlGetChannelTime(self.port_handle, self.mask, offset) + self.xldriver.xlGetChannelTime(self.port_handle, self.mask, offset) current_perfcounter = time.perf_counter() now = ts + (current_perfcounter - perfcounter) self._time_offset = now - offset.value * 1e-9 else: try: - xldriver.xlGetSyncTime(self.port_handle, offset) + self.xldriver.xlGetSyncTime(self.port_handle, offset) except VectorInitializationError: - xldriver.xlGetChannelTime(self.port_handle, self.mask, offset) + self.xldriver.xlGetChannelTime(self.port_handle, self.mask, offset) self._time_offset = time.time() - offset.value * 1e-9 except VectorInitializationError: @@ -339,7 +348,7 @@ def __init__( self._is_filtered = False super().__init__(channel=channel, can_filters=can_filters, **kwargs) - def _apply_filters(self, filters) -> None: + def _apply_filters(self, filters: Optional[CanFilters]) -> None: if filters: # Only up to one filter per ID type allowed if len(filters) == 1 or ( @@ -348,7 +357,7 @@ def _apply_filters(self, filters) -> None: ): try: for can_filter in filters: - xldriver.xlCanSetChannelAcceptance( + self.xldriver.xlCanSetChannelAcceptance( self.port_handle, self.mask, can_filter["can_id"], @@ -370,14 +379,14 @@ def _apply_filters(self, filters) -> None: # fallback: reset filters self._is_filtered = False try: - xldriver.xlCanSetChannelAcceptance( + self.xldriver.xlCanSetChannelAcceptance( self.port_handle, self.mask, 0x0, 0x0, xldefine.XL_AcceptanceFilter.XL_CAN_EXT, ) - xldriver.xlCanSetChannelAcceptance( + self.xldriver.xlCanSetChannelAcceptance( self.port_handle, self.mask, 0x0, @@ -417,14 +426,14 @@ def _recv_internal( else: time_left = end_time - time.time() time_left_ms = max(0, int(time_left * 1000)) - WaitForSingleObject(self.event_handle.value, time_left_ms) + WaitForSingleObject(self.event_handle.value, time_left_ms) # type: ignore else: # Wait a short time until we try again time.sleep(self.poll_interval) def _recv_canfd(self) -> Optional[Message]: xl_can_rx_event = xlclass.XLcanRxEvent() - xldriver.xlCanReceive(self.port_handle, xl_can_rx_event) + self.xldriver.xlCanReceive(self.port_handle, xl_can_rx_event) if xl_can_rx_event.tag == xldefine.XL_CANFD_RX_EventTags.XL_CAN_EV_TAG_RX_OK: is_rx = True @@ -470,7 +479,7 @@ def _recv_canfd(self) -> Optional[Message]: def _recv_can(self) -> Optional[Message]: xl_event = xlclass.XLevent() event_count = ctypes.c_uint(1) - xldriver.xlReceive(self.port_handle, event_count, xl_event) + self.xldriver.xlReceive(self.port_handle, event_count, xl_event) if xl_event.tag != xldefine.XL_EventTags.XL_RECEIVE_MSG: self.handle_can_event(xl_event) @@ -523,7 +532,7 @@ def handle_canfd_event(self, event: xlclass.XLcanRxEvent) -> None: `XL_CAN_EV_TAG_TX_ERROR`, `XL_TIMER` or `XL_CAN_EV_TAG_CHIP_STATE` tag. """ - def send(self, msg: Message, timeout: Optional[float] = None): + def send(self, msg: Message, timeout: Optional[float] = None) -> None: self._send_sequence([msg]) def _send_sequence(self, msgs: Sequence[Message]) -> int: @@ -548,7 +557,9 @@ def _send_can_msg_sequence(self, msgs: Sequence[Message]) -> int: *map(self._build_xl_event, msgs) ) - xldriver.xlCanTransmit(self.port_handle, mask, message_count, xl_event_array) + self.xldriver.xlCanTransmit( + self.port_handle, mask, message_count, xl_event_array + ) return message_count.value @staticmethod @@ -580,7 +591,7 @@ def _send_can_fd_msg_sequence(self, msgs: Sequence[Message]) -> int: ) msg_count_sent = ctypes.c_uint(0) - xldriver.xlCanTransmitEx( + self.xldriver.xlCanTransmitEx( self.port_handle, mask, message_count, msg_count_sent, xl_can_tx_event_array ) return msg_count_sent.value @@ -611,17 +622,17 @@ def _build_xl_can_tx_event(msg: Message) -> xlclass.XLcanTxEvent: return xl_can_tx_event def flush_tx_buffer(self) -> None: - xldriver.xlCanFlushTransmitQueue(self.port_handle, self.mask) + self.xldriver.xlCanFlushTransmitQueue(self.port_handle, self.mask) def shutdown(self) -> None: super().shutdown() - xldriver.xlDeactivateChannel(self.port_handle, self.mask) - xldriver.xlClosePort(self.port_handle) - xldriver.xlCloseDriver() + self.xldriver.xlDeactivateChannel(self.port_handle, self.mask) + self.xldriver.xlClosePort(self.port_handle) + self.xldriver.xlCloseDriver() def reset(self) -> None: - xldriver.xlDeactivateChannel(self.port_handle, self.mask) - xldriver.xlActivateChannel( + self.xldriver.xlDeactivateChannel(self.port_handle, self.mask) + self.xldriver.xlActivateChannel( self.port_handle, self.mask, xldefine.XL_BusTypes.XL_BUS_TYPE_CAN, 0 ) @@ -657,7 +668,7 @@ def _detect_available_configs() -> List[AutoDetectedConfig]: "vector_channel_config": channel_config, } ) - return configs + return configs # type: ignore @staticmethod def popup_vector_hw_configuration(wait_for_finish: int = 0) -> None: @@ -666,6 +677,9 @@ def popup_vector_hw_configuration(wait_for_finish: int = 0) -> None: :param wait_for_finish: Time to wait for user input in milliseconds. """ + if xldriver is None: + raise CanInterfaceNotImplementedError("The Vector API has not been loaded") + xldriver.xlPopupHwConfig(ctypes.c_char_p(), ctypes.c_uint(wait_for_finish)) @staticmethod @@ -685,14 +699,17 @@ def get_application_config( :raises can.interfaces.vector.VectorInitializationError: If the application name does not exist in the Vector hardware configuration. """ + if xldriver is None: + raise CanInterfaceNotImplementedError("The Vector API has not been loaded") + hw_type = ctypes.c_uint() hw_index = ctypes.c_uint() hw_channel = ctypes.c_uint() - app_channel = ctypes.c_uint(app_channel) + _app_channel = ctypes.c_uint(app_channel) xldriver.xlGetApplConfig( app_name.encode(), - app_channel, + _app_channel, hw_type, hw_index, hw_channel, @@ -707,7 +724,7 @@ def set_application_config( hw_type: xldefine.XL_HardwareType, hw_index: int, hw_channel: int, - **kwargs, + **kwargs: Any, ) -> None: """Modify the application settings in Vector Hardware Configuration. @@ -737,6 +754,9 @@ def set_application_config( :raises can.interfaces.vector.VectorInitializationError: If the application name does not exist in the Vector hardware configuration. """ + if xldriver is None: + raise CanInterfaceNotImplementedError("The Vector API has not been loaded") + xldriver.xlSetApplConfig( app_name.encode(), app_channel, @@ -758,7 +778,7 @@ def set_timer_rate(self, timer_rate_ms: int) -> None: the timer events. """ timer_rate_10us = timer_rate_ms * 100 - xldriver.xlSetTimerRate(self.port_handle, timer_rate_10us) + self.xldriver.xlSetTimerRate(self.port_handle, timer_rate_10us) class VectorChannelConfig(NamedTuple): diff --git a/can/interfaces/vector/xldriver.py b/can/interfaces/vector/xldriver.py index db57d5911..3243fa4a0 100644 --- a/can/interfaces/vector/xldriver.py +++ b/can/interfaces/vector/xldriver.py @@ -1,3 +1,4 @@ +# type: ignore """ Ctypes wrapper module for Vector CAN Interface on win32/win64 systems. diff --git a/setup.cfg b/setup.cfg index 068badd4c..b402ee645 100644 --- a/setup.cfg +++ b/setup.cfg @@ -9,4 +9,26 @@ no_implicit_optional = True disallow_incomplete_defs = True warn_redundant_casts = True warn_unused_ignores = True -exclude = (^venv|^test|^can/interfaces|^setup.py$) +exclude = + (?x)( + venv + |^test + |^setup.py$ + |^can/interfaces/__init__.py + |^can/interfaces/etas + |^can/interfaces/gs_usb + |^can/interfaces/ics_neovi + |^can/interfaces/iscan + |^can/interfaces/ixxat + |^can/interfaces/kvaser + |^can/interfaces/nican + |^can/interfaces/neousys + |^can/interfaces/pcan + |^can/interfaces/serial + |^can/interfaces/slcan + |^can/interfaces/socketcan + |^can/interfaces/systec + |^can/interfaces/udp_multicast + |^can/interfaces/usb2can + |^can/interfaces/virtual + ) diff --git a/test/test_vector.py b/test/test_vector.py index b1626b18c..338783136 100644 --- a/test/test_vector.py +++ b/test/test_vector.py @@ -22,6 +22,7 @@ VectorOperationError, VectorChannelConfig, ) +from test.config import IS_WINDOWS class TestVectorBus(unittest.TestCase): @@ -309,7 +310,7 @@ def test_vector_error_pickle(self) -> None: with pytest.raises(error_type): raise exc_unpickled - def test_vector_subteype_error_from_generic(self) -> None: + def test_vector_subtype_error_from_generic(self) -> None: for error_type in [VectorInitializationError, VectorOperationError]: with self.subTest(f"error_type = {error_type.__name__}"): @@ -320,13 +321,18 @@ def test_vector_subteype_error_from_generic(self) -> None: generic = VectorError(error_code, error_string, function) # pickle and unpickle - specififc: VectorError = error_type.from_generic(generic) + specific: VectorError = error_type.from_generic(generic) - self.assertEqual(str(generic), str(specififc)) - self.assertEqual(error_code, specififc.error_code) + self.assertEqual(str(generic), str(specific)) + self.assertEqual(error_code, specific.error_code) with pytest.raises(error_type): - raise specififc + raise specific + + @unittest.skipUnless(IS_WINDOWS, "Windows specific test") + def test_winapi_availability(self) -> None: + self.assertIsNotNone(canlib.WaitForSingleObject) + self.assertIsNotNone(canlib.INFINITE) class TestVectorChannelConfig: