diff --git a/can/interfaces/ixxat/__init__.py b/can/interfaces/ixxat/__init__.py index 39e67a0b8..91cc1f4be 100644 --- a/can/interfaces/ixxat/__init__.py +++ b/can/interfaces/ixxat/__init__.py @@ -14,7 +14,7 @@ "structures", ] -from can.interfaces.ixxat.canlib import ( # noqa: F401 +from can.interfaces.ixxat.canlib import ( CyclicSendTask, IXXATBus, get_ixxat_hwids, diff --git a/can/interfaces/ixxat/canlib.py b/can/interfaces/ixxat/canlib.py index bb7344b7a..e9a7f1eb6 100644 --- a/can/interfaces/ixxat/canlib.py +++ b/can/interfaces/ixxat/canlib.py @@ -8,7 +8,7 @@ See also the NICAN interface. """ - +import contextlib import ctypes import functools import logging @@ -28,20 +28,22 @@ Message, RestartableCyclicTaskABC, ) -from can.ctypesutil import HANDLE, PHANDLE, CLibrary -from can.ctypesutil import HRESULT as ctypes_HRESULT +from can.ctypesutil import HANDLE, HRESULT, PHANDLE, CLibrary from can.exceptions import CanInitializationError, CanInterfaceNotImplementedError from can.interfaces.ixxat import constants, structures +from can.interfaces.ixxat.constants import CanFeature from can.interfaces.ixxat.exceptions import * from can.typechecking import AutoDetectedConfig, CanFilters from can.util import deprecated_args_alias, dlc2len, len2dlc __all__ = [ + "CyclicSendTask", + "get_ixxat_hwids", + "IXXATBus", "VCITimeout", "VCIError", "VCIBusOffError", "VCIDeviceNotFoundError", - "IXXATBus", "vciFormatError", ] @@ -126,7 +128,9 @@ def __check_status(result: int, function: Callable, args: Tuple): elif result == constants.VCI_E_NO_MORE_ITEMS: raise StopIteration() elif result == constants.VCI_E_ACCESSDENIED: - log.warning(f"VCI_E_ACCESSDENIED error raised when calling VCI Function {function._name}") + log.warning( + f"VCI_E_ACCESSDENIED error raised when calling VCI Function {function._name}" + ) # not a real error, might happen if another program has initialized the bus elif result != constants.VCI_OK: raise VCIError(vciFormatError(function, result)) @@ -143,11 +147,11 @@ def __check_status(result: int, function: Callable, args: Tuple): # void VCIAPI vciFormatError (HRESULT hrError, PCHAR pszText, UINT32 dwsize); try: _canlib.map_symbol( - "vciFormatError", None, (ctypes_HRESULT, ctypes.c_char_p, ctypes.c_uint32) + "vciFormatError", None, (HRESULT, ctypes.c_char_p, ctypes.c_uint32) ) except ImportError: _canlib.map_symbol( - "vciFormatErrorA", None, (ctypes_HRESULT, ctypes.c_char_p, ctypes.c_uint32) + "vciFormatErrorA", None, (HRESULT, ctypes.c_char_p, ctypes.c_uint32) ) _canlib.vciFormatError = _canlib.vciFormatErrorA # Hack to have vciFormatError as a free function @@ -394,9 +398,12 @@ class IXXATBus(BusABC): """ @deprecated_args_alias( - deprecation_start="4.2.2", + deprecation_start="4.3.0", deprecation_end="5.0.0", - sjw_abr=None, # Use BitTiming class instead + deprecation_info="Use the `can.BitTimingFd` class and the `timing` parameter instead.", + fd=None, + data_bitrate=None, + sjw_abr=None, tseg1_abr=None, tseg2_abr=None, sjw_dbr=None, @@ -415,17 +422,15 @@ def __init__( self, channel: int, can_filters: Optional[CanFilters] = None, - receive_own_messages: Optional[int] = False, - unique_hardware_id: Optional[int] = None, - extended: Optional[bool] = True, - fd: Optional[bool] = False, + receive_own_messages: bool = False, + unique_hardware_id: Optional[str] = None, + extended: bool = True, rx_fifo_size: Optional[int] = None, tx_fifo_size: Optional[int] = None, - bitrate: Optional[int] = 500_000, - data_bitrate: Optional[int] = 2_000_000, + bitrate: int = 500_000, timing: Optional[Union[BitTiming, BitTimingFd]] = None, **kwargs, - ): + ) -> None: """ :param channel: The Channel id to create this bus with. @@ -442,10 +447,6 @@ def __init__( :param extended: Default True, enables the capability to use extended IDs. - :param fd: - Default False, enables CAN-FD usage (alternatively a :class:`~can.BitTimingFd` - instance may be passed to the `timing` parameter). - :param rx_fifo_size: Receive fifo size (default 16). If initialised as an FD bus, this value is automatically increased to 1024 unless a value is specified by the user. @@ -459,11 +460,6 @@ def __init__( :class:`~can.BitTiming` or :class:`~can.BitTimingFd` instance is provided in the `timing` parameter. - :param data_bitrate: - Channel bitrate in bit/s (only in CAN-Fd if baudrate switch enabled). Note that - this value will be overriden if a :class:`~can.BitTimingFd` instance is provided - in the `timing` parameter. - :param timing: Optional :class:`~can.BitTiming` or :class:`~can.BitTimingFd` instance to use for custom bit timing setting. The `f_clock` value of the timing @@ -478,32 +474,26 @@ def __init__( ) log.info("CAN Filters: %s", can_filters) + # fetch deprecated timing arguments (if provided) + fd: bool = kwargs.get("fd", False) + data_bitrate: int = kwargs.get("data_bitrate", 2_000_000) + tseg1_abr: Optional[int] = kwargs.get("tseg1_abr") + tseg2_abr: Optional[int] = kwargs.get("tseg2_abr") + sjw_abr: Optional[int] = kwargs.get("sjw_abr") + tseg1_dbr: Optional[int] = kwargs.get("tseg1_dbr") + tseg2_dbr: Optional[int] = kwargs.get("tseg2_dbr") + sjw_dbr: Optional[int] = kwargs.get("sjw_dbr") + ssp_dbr: Optional[int] = kwargs.get("ssp_dbr") + # Configuration options - if isinstance(timing, BitTiming): - fd = False # if a BitTiming instance has been passed, force the bus to initialise as a standard bus. - elif isinstance(timing, BitTiming): - fd = True # if a BitTimingFd instance has been passed, force the bus to initialise as FD capble + is_fd = isinstance(timing, BitTimingFd) if timing else fd + self._can_protocol = CanProtocol.CAN_FD if is_fd else CanProtocol.CAN_20 + channel = int(channel) # Usually comes as a string from the config file if channel < 0: raise ValueError("channel number must be >= 0") - bitrate = int(bitrate) - data_bitrate = int(data_bitrate) - if (bitrate < 0) or (data_bitrate < 0): - raise ValueError("bitrate and data_bitrate must be >= 0") - if (bitrate > 1_000_000) or (data_bitrate > 10_000_000): - raise ValueError( - "bitrate must be <= 1_000_000 data_bitrate must be <= 10_000_000" - ) - self.receive_own_messages = receive_own_messages - # fetch deprecated timing arguments (if provided) - tseg1_abr = kwargs.get("tseg1_abr") - tseg2_abr = kwargs.get("tseg2_abr") - sjw_abr = kwargs.get("sjw_abr") - tseg1_dbr = kwargs.get("tseg1_dbr") - tseg2_dbr = kwargs.get("tseg2_dbr") - sjw_dbr = kwargs.get("sjw_dbr") - ssp_dbr = kwargs.get("ssp_dbr") + self.receive_own_messages = receive_own_messages # setup buffer sizes if rx_fifo_size is not None: # if the user provided an rx fifo size @@ -511,7 +501,7 @@ def __init__( raise ValueError("rx_fifo_size must be > 0") else: # otherwise use the default size (depending upon if FD or not) rx_fifo_size = 16 - if fd: + if is_fd: rx_fifo_size = 1024 if tx_fifo_size is not None: # if the user provided a tx fifo size @@ -519,52 +509,21 @@ def __init__( raise ValueError("tx_fifo_size must be > 0") else: # otherwise use the default size (depending upon if FD or not) tx_fifo_size = 16 - if fd: + if is_fd: tx_fifo_size = 128 self._device_handle = HANDLE() - self._device_info = structures.VCIDEVICEINFO() self._control_handle = HANDLE() self._channel_handle = HANDLE() self._channel_capabilities = structures.CANCAPABILITIES2() self._message = structures.CANMSG2() - if fd: + if is_fd: self._payload = (ctypes.c_byte * 64)() else: self._payload = (ctypes.c_byte * 8)() # Search for supplied device - if unique_hardware_id is None: - log.info("Searching for first available device") - else: - log.info("Searching for unique HW ID %s", unique_hardware_id) - _canlib.vciEnumDeviceOpen(ctypes.byref(self._device_handle)) - while True: - try: - _canlib.vciEnumDeviceNext( - self._device_handle, ctypes.byref(self._device_info) - ) - except StopIteration as exc: - if unique_hardware_id is None: - raise VCIDeviceNotFoundError( - "No IXXAT device(s) connected or device(s) in use by other process(es)." - ) from exc - else: - raise VCIDeviceNotFoundError( - f"Unique HW ID {unique_hardware_id} not connected or not available." - ) from exc - else: - if (unique_hardware_id is None) or ( - self._device_info.UniqueHardwareId.AsChar - == bytes(unique_hardware_id, "ascii") - ): - break - else: - log.debug( - "Ignoring IXXAT with hardware id '%s'.", - self._device_info.UniqueHardwareId.AsChar.decode("ascii"), - ) - _canlib.vciEnumDeviceClose(self._device_handle) + self._device_info = self._find_device(self._device_handle, unique_hardware_id) try: _canlib.vciDeviceOpen( @@ -615,21 +574,19 @@ def __init__( _canlib.canControlGetCaps( self._control_handle, ctypes.byref(self._channel_capabilities) ) + _features = CanFeature(self._channel_capabilities.dwFeatures) # check capabilities bOpMode = constants.CAN_OPMODE_UNDEFINED - if ( - self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_STDANDEXT - ) != 0: + if CanFeature.CAN_FEATURE_STDANDEXT in _features: # controller supports CAN_OPMODE_STANDARD and CAN_OPMODE_EXTENDED at the same time bOpMode |= constants.CAN_OPMODE_STANDARD # enable both 11 bits reception if extended: # parameter from configuration bOpMode |= constants.CAN_OPMODE_EXTENDED # enable 29 bits reception - elif ( - self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_STDANDEXT - ) != 0: + elif CanFeature.CAN_FEATURE_STDOREXT in _features: log.warning( - "Channel %d capabilities allow either basic or extended IDs, but not both. using %s according to parameter [extended=%s]", + "Channel %d capabilities allow either basic or extended IDs, " + "but not both. using %s according to parameter [extended=%s]", channel, "extended" if extended else "basic", "True" if extended else "False", @@ -641,70 +598,53 @@ def __init__( else constants.CAN_OPMODE_STANDARD ) - if ( # controller supports receiving error frames: - self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_ERRFRAME - ) != 0: + if CanFeature.CAN_FEATURE_ERRFRAME in _features: + # controller supports receiving error frames bOpMode |= constants.CAN_OPMODE_ERRFRAME - if ( # controller supports receiving error frames: - self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_BUSLOAD - ) != 0: - self._bus_load_calculation = True - else: - self._bus_load_calculation = False + # controller supports receiving error frames + self._bus_load_calculation = CanFeature.CAN_FEATURE_BUSLOAD in _features - if ( # controller supports hardware scheduling of cyclic messages - self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_SCHEDULER - ) != 0: - self._interface_scheduler_capable = True - else: - self._interface_scheduler_capable = False + # controller supports hardware scheduling of cyclic messages + self._interface_scheduler_capable = ( + CanFeature.CAN_FEATURE_SCHEDULER in _features + ) bExMode = constants.CAN_EXMODE_DISABLED - self._can_protocol = CanProtocol.CAN_20 # default to standard CAN protocol - if fd: + if is_fd: if ( - self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_EXTDATA - ) != 0: - bExMode |= constants.CAN_EXMODE_EXTDATALEN - else: - raise CanInitializationError( - "The interface %s does not support extended data frames (FD)" - % self._device_info.UniqueHardwareId.AsChar.decode("ascii"), - ) - if ( - self._channel_capabilities.dwFeatures & constants.CAN_FEATURE_FASTDATA - ) != 0: - bExMode |= constants.CAN_EXMODE_FASTDATA - else: + CanFeature.CAN_FEATURE_EXTDATA not in _features + or CanFeature.CAN_FEATURE_FASTDATA not in _features + ): raise CanInitializationError( - "The interface %s does not support fast data rates (FD)" + "The interface %s does not support CAN FD" % self._device_info.UniqueHardwareId.AsChar.decode("ascii"), ) - # set bus to CAN FD protocol once FD capability is verified - self._can_protocol = CanProtocol.CAN_FD - if timing and not isinstance(timing, (BitTiming, BitTimingFd)): - raise CanInitializationError( - "The timing parameter to the Ixxat Bus must be None, or an instance of can.BitTiming or can.BitTimingFd" - ) + bExMode |= constants.CAN_EXMODE_EXTDATALEN + bExMode |= constants.CAN_EXMODE_FASTDATA pBtpSDR, pBtpFDR = self._bit_timing_constructor( - timing, - bitrate, - tseg1_abr, # deprecated - tseg2_abr, # deprecated - sjw_abr, # deprecated - data_bitrate, - tseg1_dbr, # deprecated - tseg2_dbr, # deprecated - sjw_dbr, # deprecated - ssp_dbr, # deprecated + timing_obj=timing, + bitrate=bitrate, + data_bitrate=data_bitrate if fd else None, # deprecated + tseg1_abr=tseg1_abr, # deprecated + tseg2_abr=tseg2_abr, # deprecated + sjw_abr=sjw_abr, # deprecated + tseg1_dbr=tseg1_dbr, # deprecated + tseg2_dbr=tseg2_dbr, # deprecated + sjw_dbr=sjw_dbr, # deprecated + ssp_dbr=ssp_dbr, # deprecated ) log.info( - "Initialising Channel %d with the following parameters: \n%s\n%s", + "Initialising Channel %d with the following parameters:\n" + "bOpMode=%d, bExMode=%d\n" + "%s\n" + "%s", channel, + bOpMode, + bExMode, pBtpSDR, pBtpFDR, ) @@ -718,7 +658,7 @@ def __init__( 0, 0, ctypes.byref(pBtpSDR), - ctypes.byref(pBtpFDR), + ctypes.byref(pBtpFDR) if pBtpFDR else None, ) # With receive messages, this field contains the relative reception time of @@ -775,6 +715,43 @@ def __init__( super().__init__(channel=channel, can_filters=None, **kwargs) + def _find_device( + self, device_handle: HANDLE, unique_hardware_id: Optional[str] + ) -> structures.VCIDEVICEINFO: + if unique_hardware_id is None: + log.info("Searching for first available device") + else: + log.info("Searching for unique HW ID %s", unique_hardware_id) + + device_info = structures.VCIDEVICEINFO() + _canlib.vciEnumDeviceOpen(ctypes.byref(device_handle)) + while True: + try: + _canlib.vciEnumDeviceNext(device_handle, ctypes.byref(device_info)) + except StopIteration as exc: + if unique_hardware_id is None: + raise VCIDeviceNotFoundError( + "No IXXAT device(s) connected or device(s) in use by other process(es)." + ) from exc + else: + raise VCIDeviceNotFoundError( + f"Unique HW ID {unique_hardware_id} not connected or not available." + ) from exc + else: + if (unique_hardware_id is None) or ( + device_info.UniqueHardwareId.AsChar + == bytes(unique_hardware_id, "ascii") + ): + break + else: + log.debug( + "Ignoring IXXAT with hardware id '%s'.", + self._device_info.UniqueHardwareId.AsChar.decode("ascii"), + ) + _canlib.vciEnumDeviceClose(device_handle) + + return device_info + def _inWaiting(self): try: _canlib.canChannelWaitRxEvent(self._channel_handle, 0) @@ -990,13 +967,15 @@ def _send_periodic_internal( def shutdown(self): super().shutdown() - if self._scheduler is not None: - _canlib.canSchedulerClose(self._scheduler) - _canlib.canChannelClose(self._channel_handle) - _canlib.canControlStart(self._control_handle, constants.FALSE) - _canlib.canControlReset(self._control_handle) - _canlib.canControlClose(self._control_handle) - _canlib.vciDeviceClose(self._device_handle) + + with contextlib.suppress(VCIError): + if self._scheduler is not None: + _canlib.canSchedulerClose(self._scheduler) + _canlib.canChannelClose(self._channel_handle) + _canlib.canControlStart(self._control_handle, constants.FALSE) + _canlib.canControlReset(self._control_handle) + _canlib.canControlClose(self._control_handle) + _canlib.vciDeviceClose(self._device_handle) @property def clock_frequency(self) -> int: @@ -1019,6 +998,7 @@ def bus_load(self) -> int: warnings.warn("The current adapter does not support bus load measurement") return 0 + @property def _status(self) -> structures.CANLINESTATUS2: status = structures.CANLINESTATUS2() _canlib.canControlGetStatus(self._control_handle, ctypes.byref(status)) @@ -1049,62 +1029,100 @@ def state(self) -> BusState: def _bit_timing_constructor( self, timing_obj: Optional[Union[BitTiming, BitTimingFd]], - bitrate: Optional[int], + bitrate: int, + data_bitrate: Optional[int], tseg1_abr: Optional[int], tseg2_abr: Optional[int], sjw_abr: Optional[int], - data_bitrate: Optional[int], tseg1_dbr: Optional[int], tseg2_dbr: Optional[int], sjw_dbr: Optional[int], ssp_dbr: Optional[int], - ) -> Tuple: - """ - A helper function to convert a can.BitTiming or can.BitTimingFd object into the arguments for + ) -> Tuple[structures.CANBTP, Optional[structures.CANBTP]]: + """A helper function to convert a can.BitTiming or can.BitTimingFd object into the arguments for the VCI driver's CANBTP function. - :param timing_obj: A can.BitTiming or can.BitTimingFd instance. If this argument is specified, - all other arguments are ignored - :type timing_obj: Optional[Union[BitTiming, BitTimingFd]] - :param bitrate: The standard / arbitration bitrate in bit/s. - :type bitrate: Optional[int] - :param tseg1_abr: Time segment 1 for the standard / arbitration speed, that is, the number of quanta from - (but not including) the Sync Segment to the sampling point. - :type tseg1_abr: Optional[int] - :param tseg2_abr: Time segment 2 for the standard / arbitration speed, that is, the number of quanta from the - sampling point to the end of the bit. - :type tseg2_abr: Optional[int] - :param sjw_abr: The Synchronization Jump Width for the standard / arbitration speed. Decides the maximum - number of time quanta that the controller can resynchronize every bit. - :type sjw_abr: Optional[int] - :param data_bitrate: The CAN FD Data bitrate in bit/s. - :type data_bitrate: Optional[int] - :param tseg1_dbr: Time segment 1 for the CAN FD data speed, that is, the number of quanta from - (but not including) the Sync Segment to the sampling point. - :type tseg1_dbr: Optional[int] - :param tseg2_dbr: Time segment 2 for the CAN FD data speed, that is, the number of quanta from the - sampling point to the end of the bit. - :type tseg2_dbr: Optional[int] - :param sjw_dbr: The Synchronization Jump Width for the CAN FD data speed. Decides the maximum - number of time quanta that the controller can resynchronize every bit. - :type sjw_dbr: Optional[int] - :param ssp_dbr: Secondary Sample Point Offset for the CAN FD data speed, that is, the number of quanta from - (but not including) the Sync Segment to the secondary sampling point. If this value is not provided, it - defaults to the same value as `tseg1_dbr`. - :type ssp_dbr: Optional[int] - :param : DESCRIPTION - :type : TYPE - :return: A Tuple containing two CANBTP structures for the VCI driver. - The first is the standard CAN 2.0 CANBTP object (or the CAN FD Arbitration rate CANBTP object), - and the second is the CAN FD data rate CANBTP object (which is null if a standard bus is initialised). - :rtype: Tuple + :param timing_obj: + A can.BitTiming or can.BitTimingFd instance. If this argument is specified, + all other arguments are ignored + :param bitrate: + The standard / arbitration bitrate in bit/s. + :param data_bitrate: + The data bitrate in bit/s for CAN FD. + :param tseg1_abr: + Time segment 1 for the standard / arbitration speed, that is, the number of quanta from + (but not including) the Sync Segment to the sampling point. + :param tseg2_abr: + Time segment 2 for the standard / arbitration speed, that is, the number of quanta from the + sampling point to the end of the bit. + :param sjw_abr: + The Synchronization Jump Width for the standard / arbitration speed. Decides the maximum + number of time quanta that the controller can resynchronize every bit. + :param data_bitrate: + The CAN FD Data bitrate in bit/s. + :param tseg1_dbr: + Time segment 1 for the CAN FD data speed, that is, the number of quanta from + (but not including) the Sync Segment to the sampling point. + :param tseg2_dbr: + Time segment 2 for the CAN FD data speed, that is, the number of quanta from the + sampling point to the end of the bit. + :param sjw_dbr: + The Synchronization Jump Width for the CAN FD data speed. Decides the maximum + number of time quanta that the controller can resynchronize every bit. + :param ssp_dbr: + Secondary Sample Point Offset for the CAN FD data speed, that is, the number of quanta from + (but not including) the Sync Segment to the secondary sampling point. If this value is not provided, it + defaults to the same value as `tseg1_dbr`. + + :return: + A Tuple containing two CANBTP structures for the VCI driver. + The first is the standard CAN 2.0 CANBTP object (or the CAN FD Arbitration rate CANBTP object), + and the second is the CAN FD data rate CANBTP object (which is null if a standard bus is initialised). """ + deprecated_args_provided = any( + [ + data_bitrate, + tseg1_abr, + tseg2_abr, + sjw_abr, + tseg1_dbr, + tseg2_dbr, + sjw_dbr, + ssp_dbr, + ] + ) - # Create a null FD timing structure in case we are only using a standard bus - pBtpFDR = structures.CANBTP(dwMode=0, dwBPS=0, wTS1=0, wTS2=0, wSJW=0, wTDO=0) + if isinstance(timing_obj, BitTiming): + pBtpSDR = structures.CANBTP( + dwMode=0, + dwBPS=timing_obj.bitrate, + wTS1=timing_obj.tseg1, + wTS2=timing_obj.tseg2, + wSJW=timing_obj.sjw, + wTDO=0, + ) + return pBtpSDR, None - # if only a bitrate is supplied - if bitrate and not timing_obj and not (tseg1_abr and tseg2_abr and sjw_abr): - # unless timing segments are specified, try and use a predefined set of timings from constants.py + if isinstance(timing_obj, BitTimingFd): + pBtpSDR = structures.CANBTP( + dwMode=0, + dwBPS=timing_obj.nom_bitrate, + wTS1=timing_obj.nom_tseg1, + wTS2=timing_obj.nom_tseg2, + wSJW=timing_obj.nom_sjw, + wTDO=0, + ) + pBtpFDR = structures.CANBTP( + dwMode=0, + dwBPS=timing_obj.data_bitrate, + wTS1=timing_obj.data_tseg1, + wTS2=timing_obj.data_tseg2, + wSJW=timing_obj.data_sjw, + wTDO=0xFFF, + ) + return pBtpSDR, pBtpFDR + + if not deprecated_args_provided: + # try to use a predefined set of timings from constants.py pBtpSDR = constants.CAN_BITRATE_PRESETS.get(bitrate, None) if not pBtpSDR: # if we have failed to identify a suitable set of timings from the presets @@ -1113,8 +1131,20 @@ def _bit_timing_constructor( bitrate=bitrate, sample_point=80, ) + pBtpSDR = structures.CANBTP( + dwMode=0, + dwBPS=timing_obj.bitrate, + wTS1=timing_obj.tseg1, + wTS2=timing_obj.tseg2, + wSJW=timing_obj.sjw, + wTDO=0, + ) + return pBtpSDR, None + + # TODO: the code below is obsolete once the deprecated bit timing parameters are removed + # if a bitrate and timings are supplied - elif bitrate and not timing_obj and (tseg1_abr and tseg2_abr and sjw_abr): + if tseg1_abr and tseg2_abr and sjw_abr: pBtpSDR = structures.CANBTP( dwMode=0, dwBPS=bitrate, @@ -1123,13 +1153,26 @@ def _bit_timing_constructor( wSJW=sjw_abr, wTDO=0, ) + else: + pBtpSDR = constants.CAN_BITRATE_PRESETS.get(bitrate, None) + if not pBtpSDR: + # if we have failed to identify a suitable set of timings from the presets + timing_obj = BitTiming.from_sample_point( + f_clock=self._channel_capabilities.dwCanClkFreq, + bitrate=bitrate, + sample_point=80, + ) + pBtpSDR = structures.CANBTP( + dwMode=0, + dwBPS=timing_obj.bitrate, + wTS1=timing_obj.tseg1, + wTS2=timing_obj.tseg2, + wSJW=timing_obj.sjw, + wTDO=0, + ) # if a data_bitrate is supplied - if ( - data_bitrate - and not timing_obj - and not (tseg1_dbr and tseg2_dbr and sjw_dbr) - ): + if data_bitrate and not (tseg1_dbr and tseg2_dbr and sjw_dbr): # unless timing segments are specified, try and use a predefined set of timings from constants.py pBtpFDR = constants.CAN_DATABITRATE_PRESETS.get(data_bitrate, None) if not pBtpFDR: @@ -1141,10 +1184,19 @@ def _bit_timing_constructor( data_bitrate=data_bitrate, data_sample_point=80, # 80% ) - # if a data_bitrate and timings are supplied - elif data_bitrate and not timing_obj and (tseg1_dbr and tseg2_dbr and sjw_dbr): + pBtpFDR = structures.CANBTP( + dwMode=0, + dwBPS=timing_obj.data_bitrate, + wTS1=timing_obj.data_tseg1, + wTS2=timing_obj.data_tseg2, + wSJW=timing_obj.data_sjw, + wTDO=ssp_dbr or 0xFFF, + ) + + # if a data_bitrate and no timings are supplied + elif data_bitrate: if not ssp_dbr: - ssp_dbr = tseg2_dbr + ssp_dbr = 0xFFF pBtpFDR = structures.CANBTP( dwMode=0, dwBPS=data_bitrate, @@ -1153,40 +1205,14 @@ def _bit_timing_constructor( wSJW=sjw_dbr, wTDO=ssp_dbr, ) - - # if a timing object is provided - if isinstance(timing_obj, BitTiming): - pBtpSDR = structures.CANBTP( - dwMode=0, - dwBPS=timing_obj.bitrate, - wTS1=timing_obj.tseg1, - wTS2=timing_obj.tseg2, - wSJW=timing_obj.sjw, - wTDO=0, - ) - elif isinstance(timing_obj, BitTimingFd): - pBtpSDR = structures.CANBTP( - dwMode=0, - dwBPS=timing_obj.nom_bitrate, - wTS1=timing_obj.nom_tseg1, - wTS2=timing_obj.nom_tseg2, - wSJW=timing_obj.nom_sjw, - wTDO=0, - ) - pBtpFDR = structures.CANBTP( - dwMode=0, - dwBPS=timing_obj.data_bitrate, - wTS1=timing_obj.data_tseg1, - wTS2=timing_obj.data_tseg2, - wSJW=timing_obj.data_sjw, - wTDO=timing_obj.data_tseg2, - ) + else: + pBtpFDR = None return pBtpSDR, pBtpFDR @staticmethod def _detect_available_configs() -> List[AutoDetectedConfig]: - config_list = [] # list in wich to store the resulting bus kwargs + config_list = [] # used to detect HWID device_handle = HANDLE() @@ -1218,7 +1244,7 @@ def _detect_available_configs() -> List[AutoDetectedConfig]: ctypes.byref(channel_handle), ) except Exception: - # Array outside of bounds error == accessing a channel not in the hardware + # Array outside bounds error == accessing a channel not in the hardware break else: _canlib.canChannelClose(channel_handle) diff --git a/can/interfaces/ixxat/constants.py b/can/interfaces/ixxat/constants.py index 1249dfd53..d317bfb53 100644 --- a/can/interfaces/ixxat/constants.py +++ b/can/interfaces/ixxat/constants.py @@ -3,9 +3,34 @@ Copyright (C) 2016 Giuseppe Corbelli """ +from enum import IntFlag from . import structures + +class CanFeature(IntFlag): + CAN_FEATURE_STDOREXT = 0x00000001 # 11 OR 29 bit (exclusive) + CAN_FEATURE_STDANDEXT = 0x00000002 # 11 AND 29 bit (simultaneous) + CAN_FEATURE_RMTFRAME = 0x00000004 # reception of remote frames + CAN_FEATURE_ERRFRAME = 0x00000008 # reception of error frames + CAN_FEATURE_BUSLOAD = 0x00000010 # bus load measurement + CAN_FEATURE_IDFILTER = 0x00000020 # exact message filter + CAN_FEATURE_LISTONLY = 0x00000040 # listen only mode + CAN_FEATURE_SCHEDULER = 0x00000080 # cyclic message scheduler + CAN_FEATURE_GENERRFRM = 0x00000100 # error frame generation + CAN_FEATURE_DELAYEDTX = 0x00000200 # delayed message transmitter + CAN_FEATURE_SINGLESHOT = 0x00000400 # single shot mode + CAN_FEATURE_HIGHPRIOR = 0x00000800 # high priority message + CAN_FEATURE_AUTOBAUD = 0x00001000 # automatic bit rate detection + CAN_FEATURE_EXTDATA = 0x00002000 # extended data length (CANFD) + CAN_FEATURE_FASTDATA = 0x00004000 # fast data bit rate (CANFD) + CAN_FEATURE_ISOFRAME = 0x00008000 # ISO conform frame (CANFD) + CAN_FEATURE_NONISOFRM = ( + 0x00010000 # non ISO conform frame (CANFD) (different CRC computation) + ) + CAN_FEATURE_64BITTSC = 0x00020000 # 64-bit time stamp counter + + FALSE = 0 TRUE = 1 @@ -197,28 +222,6 @@ CAN_ACCEPT_PASSEXCL = 0x03 # message passes exclusion filter -CAN_FEATURE_STDOREXT = 0x00000001 # 11 OR 29 bit (exclusive) -CAN_FEATURE_STDANDEXT = 0x00000002 # 11 AND 29 bit (simultaneous) -CAN_FEATURE_RMTFRAME = 0x00000004 # reception of remote frames -CAN_FEATURE_ERRFRAME = 0x00000008 # reception of error frames -CAN_FEATURE_BUSLOAD = 0x00000010 # bus load measurement -CAN_FEATURE_IDFILTER = 0x00000020 # exact message filter -CAN_FEATURE_LISTONLY = 0x00000040 # listen only mode -CAN_FEATURE_SCHEDULER = 0x00000080 # cyclic message scheduler -CAN_FEATURE_GENERRFRM = 0x00000100 # error frame generation -CAN_FEATURE_DELAYEDTX = 0x00000200 # delayed message transmitter -CAN_FEATURE_SINGLESHOT = 0x00000400 # single shot mode -CAN_FEATURE_HIGHPRIOR = 0x00000800 # high priority message -CAN_FEATURE_AUTOBAUD = 0x00001000 # automatic bit rate detection -CAN_FEATURE_EXTDATA = 0x00002000 # extended data length (CANFD) -CAN_FEATURE_FASTDATA = 0x00004000 # fast data bit rate (CANFD) -CAN_FEATURE_ISOFRAME = 0x00008000 # ISO conform frame (CANFD) -CAN_FEATURE_NONISOFRM = ( - 0x00010000 # non ISO conform frame (CANFD) (different CRC computation) -) -CAN_FEATURE_64BITTSC = 0x00020000 # 64-bit time stamp counter - - CAN_BITRATE_PRESETS = { 10000: structures.CANBTP( dwMode=0, dwBPS=10_000, wTS1=6400, wTS2=1600, wSJW=1600, wTDO=0 diff --git a/can/util.py b/can/util.py index 59abdd579..ec9283197 100644 --- a/can/util.py +++ b/can/util.py @@ -334,6 +334,7 @@ def channel2int(channel: Optional[typechecking.Channel]) -> Optional[int]: def deprecated_args_alias( deprecation_start: str, deprecation_end: Optional[str] = None, + deprecation_info: Optional[str] = None, **aliases: Optional[str], ) -> Callable[[Callable[P1, T1]], Callable[P1, T1]]: """Allows to rename/deprecate a function kwarg(s) and optionally @@ -358,6 +359,8 @@ def library_function(new_arg): The *python-can* version, that introduced the :class:`DeprecationWarning`. :param deprecation_end: The *python-can* version, that marks the end of the deprecation period. + :param deprecation_info: + Additional information, which will be added to the deprecation warning.. :param aliases: keyword arguments, that map the deprecated argument names to the new argument names or ``None``. @@ -371,6 +374,7 @@ def wrapper(*args: P1.args, **kwargs: P1.kwargs) -> T1: func_name=f.__name__, start=deprecation_start, end=deprecation_end, + info=deprecation_info, kwargs=kwargs, aliases=aliases, ) @@ -385,6 +389,7 @@ def _rename_kwargs( func_name: str, start: str, end: Optional[str], + info: Optional[str], kwargs: P1.kwargs, aliases: Dict[str, Optional[str]], ) -> None: @@ -410,6 +415,9 @@ def _rename_kwargs( ) kwargs[new] = value + if info: + deprecation_notice += " " + info.strip() + warnings.warn(deprecation_notice, DeprecationWarning) diff --git a/test/test_interface_ixxat.py b/test/test_interface_ixxat.py index 34150f2a4..40b50cb05 100644 --- a/test/test_interface_ixxat.py +++ b/test/test_interface_ixxat.py @@ -176,7 +176,7 @@ def test_bus_creation_standard_bitrate(self): def test_bus_creation_arbitrary_bitrate(self): target_bitrate = 444_444 with can.Bus(interface="ixxat", channel=0, bitrate=target_bitrate) as bus: - timing = bus._status().sBtpSdr + timing = bus._status.sBtpSdr bus_timing = can.BitTiming( self.clock_frequency, timing.dwBPS, diff --git a/test/test_util.py b/test/test_util.py index a4aacdf86..3e93e5bf0 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -2,6 +2,7 @@ import unittest import warnings +from typing import Optional import pytest @@ -20,12 +21,14 @@ class RenameKwargsTest(unittest.TestCase): expected_kwargs = dict(a=1, b=2, c=3, d=4) - def _test(self, start: str, end: str, kwargs, aliases): + def _test( + self, start: str, end: Optional[str], note: Optional[str], kwargs, aliases + ): # Test that we do get the DeprecationWarning when called with deprecated kwargs with self.assertWarnsRegex( DeprecationWarning, "is deprecated.*?" + start + ".*?" + end ): - _rename_kwargs("unit_test", start, end, kwargs, aliases) + _rename_kwargs("unit_test", start, end, note, kwargs, aliases) # Test that the aliases contains the deprecated values and # the obsolete kwargs have been removed @@ -37,30 +40,30 @@ def _test(self, start: str, end: str, kwargs, aliases): # Cause all warnings to always be triggered. warnings.simplefilter("error", DeprecationWarning) try: - _rename_kwargs("unit_test", start, end, kwargs, aliases) + _rename_kwargs("unit_test", start, end, note, kwargs, aliases) finally: warnings.resetwarnings() def test_rename(self): kwargs = dict(old_a=1, old_b=2, c=3, d=4) aliases = {"old_a": "a", "old_b": "b"} - self._test("1.0", "2.0", kwargs, aliases) + self._test("1.0", "2.0", None, kwargs, aliases) def test_obsolete(self): kwargs = dict(a=1, b=2, c=3, d=4, z=10) aliases = {"z": None} - self._test("1.0", "2.0", kwargs, aliases) + self._test("1.0", "2.0", None, kwargs, aliases) def test_rename_and_obsolete(self): kwargs = dict(old_a=1, old_b=2, c=3, d=4, z=10) aliases = {"old_a": "a", "old_b": "b", "z": None} - self._test("1.0", "2.0", kwargs, aliases) + self._test("1.0", "2.0", None, kwargs, aliases) def test_with_new_and_alias_present(self): kwargs = dict(old_a=1, a=1, b=2, c=3, d=4, z=10) aliases = {"old_a": "a", "old_b": "b", "z": None} with self.assertRaises(TypeError): - self._test("1.0", "2.0", kwargs, aliases) + self._test("1.0", "2.0", "Random note.", kwargs, aliases) class DeprecatedArgsAliasTest(unittest.TestCase): @@ -130,6 +133,20 @@ def _test_func3(a): warnings.simplefilter("error") _test_func3(a=1) + @deprecated_args_alias( + "1.0.0", deprecation_info="Because why not?", old_a="a" + ) + def _test_func4(a): + pass + + with pytest.warns(DeprecationWarning) as record: + _test_func4(old_a=1) + assert len(record) == 1 + assert ( + record[0].message.args[0] + == "The 'old_a' argument is deprecated since python-can v1.0.0. Use 'a' instead. Because why not?" + ) + class TestBusConfig(unittest.TestCase): base_config = dict(interface="socketcan", bitrate=500_000)