Skip to content

Enable mypy for vector interface #1229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 75 additions & 55 deletions can/interfaces/vector/canlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,29 @@
import logging
import time
import os
from typing import List, NamedTuple, Optional, Tuple, Sequence, Union
from types import ModuleType
from typing import (
List,
NamedTuple,
Optional,
Tuple,
Sequence,
Union,
Any,
Dict,
Callable,
)

WaitForSingleObject: Optional[Callable[[int, int], int]]
INFINITE: Optional[int]
try:
# Try builtin Python 3 Windows API
from _winapi import WaitForSingleObject, INFINITE

HAS_EVENTS = True
except ImportError:
try:
# Try pywin32 package
from win32event import WaitForSingleObject, INFINITE

HAS_EVENTS = True
except ImportError:
# Use polling instead
HAS_EVENTS = False
WaitForSingleObject, INFINITE = None, None
HAS_EVENTS = False

# Import Modules
# ==============
Expand All @@ -36,7 +43,7 @@
deprecated_args_alias,
time_perfcounter_correlation,
)
from can.typechecking import AutoDetectedConfig, CanFilters
from can.typechecking import AutoDetectedConfig, CanFilters, Channel

# Define Module Logger
# ====================
Expand All @@ -48,7 +55,7 @@
from . import xldefine, xlclass

# Import safely Vector API module for Travis tests
xldriver = None
xldriver: Optional[ModuleType] = None
try:
from . import xldriver
except Exception as exc:
Expand All @@ -74,19 +81,19 @@ def __init__(
can_filters: Optional[CanFilters] = None,
poll_interval: float = 0.01,
receive_own_messages: bool = False,
bitrate: int = None,
bitrate: Optional[int] = None,
rx_queue_size: int = 2 ** 14,
app_name: str = "CANalyzer",
serial: int = None,
app_name: Optional[str] = "CANalyzer",
serial: Optional[int] = None,
fd: bool = False,
data_bitrate: int = None,
data_bitrate: Optional[int] = None,
sjw_abr: int = 2,
tseg1_abr: int = 6,
tseg2_abr: int = 3,
sjw_dbr: int = 2,
tseg1_dbr: int = 6,
tseg2_dbr: int = 3,
**kwargs,
**kwargs: Any,
) -> None:
"""
:param channel:
Expand Down Expand Up @@ -144,16 +151,18 @@ def __init__(

if xldriver is None:
raise CanInterfaceNotImplementedError("The Vector API has not been loaded")
self.xldriver = xldriver # keep reference so mypy knows it is not None

self.poll_interval = poll_interval

if isinstance(channel, str): # must be checked before generic Sequence
self.channels: Sequence[int]
if isinstance(channel, int):
self.channels = [channel]
elif isinstance(channel, str): # must be checked before generic Sequence
# Assume comma separated string of channels
self.channels = [int(ch.strip()) for ch in channel.split(",")]
elif isinstance(channel, int):
self.channels = [channel]
elif isinstance(channel, Sequence):
self.channels = channel
self.channels = [int(ch) for ch in channel]
else:
raise TypeError(
f"Invalid type for channels parameter: {type(channel).__name__}"
Expand Down Expand Up @@ -185,12 +194,12 @@ def __init__(
"None of the configured channels could be found on the specified hardware."
)

xldriver.xlOpenDriver()
self.xldriver.xlOpenDriver()
self.port_handle = xlclass.XLportHandle(xldefine.XL_INVALID_PORTHANDLE)
self.mask = 0
self.fd = fd
# Get channels masks
self.channel_masks = {}
self.channel_masks: Dict[Optional[Channel], int] = {}
self.index_to_channel = {}

for channel in self.channels:
Expand All @@ -200,7 +209,7 @@ def __init__(
app_name, channel
)
LOG.debug("Channel index %d found", channel)
idx = xldriver.xlGetChannelIndex(hw_type, hw_index, hw_channel)
idx = self.xldriver.xlGetChannelIndex(hw_type, hw_index, hw_channel)
if idx < 0:
# Undocumented behavior! See issue #353.
# If hardware is unavailable, this function returns -1.
Expand All @@ -224,7 +233,7 @@ def __init__(
if bitrate or fd:
permission_mask.value = self.mask
if fd:
xldriver.xlOpenPort(
self.xldriver.xlOpenPort(
self.port_handle,
self._app_name,
self.mask,
Expand All @@ -234,7 +243,7 @@ def __init__(
xldefine.XL_BusTypes.XL_BUS_TYPE_CAN,
)
else:
xldriver.xlOpenPort(
self.xldriver.xlOpenPort(
self.port_handle,
self._app_name,
self.mask,
Expand Down Expand Up @@ -267,7 +276,7 @@ def __init__(
self.canFdConf.tseg1Dbr = int(tseg1_dbr)
self.canFdConf.tseg2Dbr = int(tseg2_dbr)

xldriver.xlCanFdSetConfiguration(
self.xldriver.xlCanFdSetConfiguration(
self.port_handle, self.mask, self.canFdConf
)
LOG.info(
Expand All @@ -289,7 +298,7 @@ def __init__(
)
else:
if bitrate:
xldriver.xlCanSetChannelBitrate(
self.xldriver.xlCanSetChannelBitrate(
self.port_handle, permission_mask, bitrate
)
LOG.info("SetChannelBitrate: baudr.=%u", bitrate)
Expand All @@ -298,16 +307,16 @@ def __init__(

# Enable/disable TX receipts
tx_receipts = 1 if receive_own_messages else 0
xldriver.xlCanSetChannelMode(self.port_handle, self.mask, tx_receipts, 0)
self.xldriver.xlCanSetChannelMode(self.port_handle, self.mask, tx_receipts, 0)

if HAS_EVENTS:
self.event_handle = xlclass.XLhandle()
xldriver.xlSetNotification(self.port_handle, self.event_handle, 1)
self.xldriver.xlSetNotification(self.port_handle, self.event_handle, 1)
else:
LOG.info("Install pywin32 to avoid polling")

try:
xldriver.xlActivateChannel(
self.xldriver.xlActivateChannel(
self.port_handle, self.mask, xldefine.XL_BusTypes.XL_BUS_TYPE_CAN, 0
)
except VectorOperationError as error:
Expand All @@ -320,17 +329,17 @@ def __init__(
if time.get_clock_info("time").resolution > 1e-5:
ts, perfcounter = time_perfcounter_correlation()
try:
xldriver.xlGetSyncTime(self.port_handle, offset)
self.xldriver.xlGetSyncTime(self.port_handle, offset)
except VectorInitializationError:
xldriver.xlGetChannelTime(self.port_handle, self.mask, offset)
self.xldriver.xlGetChannelTime(self.port_handle, self.mask, offset)
current_perfcounter = time.perf_counter()
now = ts + (current_perfcounter - perfcounter)
self._time_offset = now - offset.value * 1e-9
else:
try:
xldriver.xlGetSyncTime(self.port_handle, offset)
self.xldriver.xlGetSyncTime(self.port_handle, offset)
except VectorInitializationError:
xldriver.xlGetChannelTime(self.port_handle, self.mask, offset)
self.xldriver.xlGetChannelTime(self.port_handle, self.mask, offset)
self._time_offset = time.time() - offset.value * 1e-9

except VectorInitializationError:
Expand All @@ -339,7 +348,7 @@ def __init__(
self._is_filtered = False
super().__init__(channel=channel, can_filters=can_filters, **kwargs)

def _apply_filters(self, filters) -> None:
def _apply_filters(self, filters: Optional[CanFilters]) -> None:
if filters:
# Only up to one filter per ID type allowed
if len(filters) == 1 or (
Expand All @@ -348,7 +357,7 @@ def _apply_filters(self, filters) -> None:
):
try:
for can_filter in filters:
xldriver.xlCanSetChannelAcceptance(
self.xldriver.xlCanSetChannelAcceptance(
self.port_handle,
self.mask,
can_filter["can_id"],
Expand All @@ -370,14 +379,14 @@ def _apply_filters(self, filters) -> None:
# fallback: reset filters
self._is_filtered = False
try:
xldriver.xlCanSetChannelAcceptance(
self.xldriver.xlCanSetChannelAcceptance(
self.port_handle,
self.mask,
0x0,
0x0,
xldefine.XL_AcceptanceFilter.XL_CAN_EXT,
)
xldriver.xlCanSetChannelAcceptance(
self.xldriver.xlCanSetChannelAcceptance(
self.port_handle,
self.mask,
0x0,
Expand Down Expand Up @@ -417,14 +426,14 @@ def _recv_internal(
else:
time_left = end_time - time.time()
time_left_ms = max(0, int(time_left * 1000))
WaitForSingleObject(self.event_handle.value, time_left_ms)
WaitForSingleObject(self.event_handle.value, time_left_ms) # type: ignore
else:
# Wait a short time until we try again
time.sleep(self.poll_interval)

def _recv_canfd(self) -> Optional[Message]:
xl_can_rx_event = xlclass.XLcanRxEvent()
xldriver.xlCanReceive(self.port_handle, xl_can_rx_event)
self.xldriver.xlCanReceive(self.port_handle, xl_can_rx_event)

if xl_can_rx_event.tag == xldefine.XL_CANFD_RX_EventTags.XL_CAN_EV_TAG_RX_OK:
is_rx = True
Expand Down Expand Up @@ -470,7 +479,7 @@ def _recv_canfd(self) -> Optional[Message]:
def _recv_can(self) -> Optional[Message]:
xl_event = xlclass.XLevent()
event_count = ctypes.c_uint(1)
xldriver.xlReceive(self.port_handle, event_count, xl_event)
self.xldriver.xlReceive(self.port_handle, event_count, xl_event)

if xl_event.tag != xldefine.XL_EventTags.XL_RECEIVE_MSG:
self.handle_can_event(xl_event)
Expand Down Expand Up @@ -523,7 +532,7 @@ def handle_canfd_event(self, event: xlclass.XLcanRxEvent) -> None:
`XL_CAN_EV_TAG_TX_ERROR`, `XL_TIMER` or `XL_CAN_EV_TAG_CHIP_STATE` tag.
"""

def send(self, msg: Message, timeout: Optional[float] = None):
def send(self, msg: Message, timeout: Optional[float] = None) -> None:
self._send_sequence([msg])

def _send_sequence(self, msgs: Sequence[Message]) -> int:
Expand All @@ -548,7 +557,9 @@ def _send_can_msg_sequence(self, msgs: Sequence[Message]) -> int:
*map(self._build_xl_event, msgs)
)

xldriver.xlCanTransmit(self.port_handle, mask, message_count, xl_event_array)
self.xldriver.xlCanTransmit(
self.port_handle, mask, message_count, xl_event_array
)
return message_count.value

@staticmethod
Expand Down Expand Up @@ -580,7 +591,7 @@ def _send_can_fd_msg_sequence(self, msgs: Sequence[Message]) -> int:
)

msg_count_sent = ctypes.c_uint(0)
xldriver.xlCanTransmitEx(
self.xldriver.xlCanTransmitEx(
self.port_handle, mask, message_count, msg_count_sent, xl_can_tx_event_array
)
return msg_count_sent.value
Expand Down Expand Up @@ -611,17 +622,17 @@ def _build_xl_can_tx_event(msg: Message) -> xlclass.XLcanTxEvent:
return xl_can_tx_event

def flush_tx_buffer(self) -> None:
xldriver.xlCanFlushTransmitQueue(self.port_handle, self.mask)
self.xldriver.xlCanFlushTransmitQueue(self.port_handle, self.mask)

def shutdown(self) -> None:
super().shutdown()
xldriver.xlDeactivateChannel(self.port_handle, self.mask)
xldriver.xlClosePort(self.port_handle)
xldriver.xlCloseDriver()
self.xldriver.xlDeactivateChannel(self.port_handle, self.mask)
self.xldriver.xlClosePort(self.port_handle)
self.xldriver.xlCloseDriver()

def reset(self) -> None:
xldriver.xlDeactivateChannel(self.port_handle, self.mask)
xldriver.xlActivateChannel(
self.xldriver.xlDeactivateChannel(self.port_handle, self.mask)
self.xldriver.xlActivateChannel(
self.port_handle, self.mask, xldefine.XL_BusTypes.XL_BUS_TYPE_CAN, 0
)

Expand Down Expand Up @@ -657,7 +668,7 @@ def _detect_available_configs() -> List[AutoDetectedConfig]:
"vector_channel_config": channel_config,
}
)
return configs
return configs # type: ignore

@staticmethod
def popup_vector_hw_configuration(wait_for_finish: int = 0) -> None:
Expand All @@ -666,6 +677,9 @@ def popup_vector_hw_configuration(wait_for_finish: int = 0) -> None:
:param wait_for_finish:
Time to wait for user input in milliseconds.
"""
if xldriver is None:
raise CanInterfaceNotImplementedError("The Vector API has not been loaded")

xldriver.xlPopupHwConfig(ctypes.c_char_p(), ctypes.c_uint(wait_for_finish))

@staticmethod
Expand All @@ -685,14 +699,17 @@ def get_application_config(
:raises can.interfaces.vector.VectorInitializationError:
If the application name does not exist in the Vector hardware configuration.
"""
if xldriver is None:
raise CanInterfaceNotImplementedError("The Vector API has not been loaded")

hw_type = ctypes.c_uint()
hw_index = ctypes.c_uint()
hw_channel = ctypes.c_uint()
app_channel = ctypes.c_uint(app_channel)
_app_channel = ctypes.c_uint(app_channel)

xldriver.xlGetApplConfig(
app_name.encode(),
app_channel,
_app_channel,
hw_type,
hw_index,
hw_channel,
Expand All @@ -707,7 +724,7 @@ def set_application_config(
hw_type: xldefine.XL_HardwareType,
hw_index: int,
hw_channel: int,
**kwargs,
**kwargs: Any,
) -> None:
"""Modify the application settings in Vector Hardware Configuration.

Expand Down Expand Up @@ -737,6 +754,9 @@ def set_application_config(
:raises can.interfaces.vector.VectorInitializationError:
If the application name does not exist in the Vector hardware configuration.
"""
if xldriver is None:
raise CanInterfaceNotImplementedError("The Vector API has not been loaded")

xldriver.xlSetApplConfig(
app_name.encode(),
app_channel,
Expand All @@ -758,7 +778,7 @@ def set_timer_rate(self, timer_rate_ms: int) -> None:
the timer events.
"""
timer_rate_10us = timer_rate_ms * 100
xldriver.xlSetTimerRate(self.port_handle, timer_rate_10us)
self.xldriver.xlSetTimerRate(self.port_handle, timer_rate_10us)


class VectorChannelConfig(NamedTuple):
Expand Down
1 change: 1 addition & 0 deletions can/interfaces/vector/xldriver.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# type: ignore
"""
Ctypes wrapper module for Vector CAN Interface on win32/win64 systems.

Expand Down
Loading