Skip to content

Improve IO typing #1238

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions can/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@
CanTimeoutError,
)

from .io import Logger, SizedRotatingLogger, Printer, LogReader, MessageSync
from .io import ASCWriter, ASCReader
from .io import BLFReader, BLFWriter
from .io import CanutilsLogReader, CanutilsLogWriter
from .io import CSVWriter, CSVReader
from .io import SqliteWriter, SqliteReader

from .util import set_logging_level

from .message import Message
Expand All @@ -42,6 +35,13 @@
from .interface import Bus, detect_available_configs
from .bit_timing import BitTiming

from .io import Logger, SizedRotatingLogger, Printer, LogReader, MessageSync
from .io import ASCWriter, ASCReader
from .io import BLFReader, BLFWriter
from .io import CanutilsLogReader, CanutilsLogWriter
from .io import CSVWriter, CSVReader
from .io import SqliteWriter, SqliteReader

from .broadcastmanager import (
CyclicSendTaskABC,
LimitedDurationCyclicSendTaskABC,
Expand Down
14 changes: 6 additions & 8 deletions can/io/asc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
import time
import logging

from .. import typechecking
from ..message import Message
from ..listener import Listener
from ..util import channel2int
from .generic import BaseIOHandler, FileIOMessageWriter
from ..typechecking import AcceptedIOType
from .generic import FileIOMessageWriter, MessageReader
from ..typechecking import StringPathLike


CAN_MSG_EXT = 0x80000000
Expand All @@ -28,7 +26,7 @@
logger = logging.getLogger("can.io.asc")


class ASCReader(BaseIOHandler):
class ASCReader(MessageReader):
"""
Iterator of CAN messages from a ASC logging file. Meta data (comments,
bus statistics, J1939 Transport Protocol messages) is ignored.
Expand All @@ -40,7 +38,7 @@ class ASCReader(BaseIOHandler):

def __init__(
self,
file: AcceptedIOType,
file: Union[StringPathLike, TextIO],
base: str = "hex",
relative_timestamp: bool = True,
) -> None:
Expand Down Expand Up @@ -248,7 +246,7 @@ def __iter__(self) -> Generator[Message, None, None]:
self.stop()


class ASCWriter(FileIOMessageWriter, Listener):
class ASCWriter(FileIOMessageWriter):
"""Logs CAN data to an ASCII log file (.asc).

The measurement starts with the timestamp of the first registered message.
Expand Down Expand Up @@ -287,7 +285,7 @@ class ASCWriter(FileIOMessageWriter, Listener):

def __init__(
self,
file: AcceptedIOType,
file: Union[StringPathLike, TextIO],
channel: int = 1,
) -> None:
"""
Expand Down
17 changes: 8 additions & 9 deletions can/io/blf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
import datetime
import time
import logging
from typing import List, BinaryIO
from typing import List, BinaryIO, Generator, Union

from ..message import Message
from ..listener import Listener
from ..util import len2dlc, dlc2len, channel2int
from ..typechecking import AcceptedIOType
from .generic import BaseIOHandler, FileIOMessageWriter
from ..typechecking import StringPathLike
from .generic import FileIOMessageWriter, MessageReader


class BLFParseError(Exception):
Expand Down Expand Up @@ -131,7 +130,7 @@ def systemtime_to_timestamp(systemtime):
return 0


class BLFReader(BaseIOHandler):
class BLFReader(MessageReader):
"""
Iterator of CAN messages from a Binary Logging File.

Expand All @@ -141,7 +140,7 @@ class BLFReader(BaseIOHandler):

file: BinaryIO

def __init__(self, file: AcceptedIOType) -> None:
def __init__(self, file: Union[StringPathLike, BinaryIO]) -> None:
"""
:param file: a path-like object or as file-like object to read from
If this is a file-like object, is has to opened in binary
Expand All @@ -162,7 +161,7 @@ def __init__(self, file: AcceptedIOType) -> None:
self._tail = b""
self._pos = 0

def __iter__(self):
def __iter__(self) -> Generator[Message, None, None]:
while True:
data = self.file.read(OBJ_HEADER_BASE_STRUCT.size)
if not data:
Expand Down Expand Up @@ -349,7 +348,7 @@ def _parse_data(self, data):
pos = next_pos


class BLFWriter(FileIOMessageWriter, Listener):
class BLFWriter(FileIOMessageWriter):
"""
Logs CAN data to a Binary Logging File compatible with Vector's tools.
"""
Expand All @@ -364,7 +363,7 @@ class BLFWriter(FileIOMessageWriter, Listener):

def __init__(
self,
file: AcceptedIOType,
file: Union[StringPathLike, BinaryIO],
append: bool = False,
channel: int = 1,
compression_level: int = -1,
Expand Down
60 changes: 35 additions & 25 deletions can/io/canutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
"""

import logging
from typing import Generator, TextIO, Union

from can.message import Message
from can.listener import Listener
from .generic import BaseIOHandler, FileIOMessageWriter
from ..typechecking import AcceptedIOType
from .generic import FileIOMessageWriter, MessageReader
from ..typechecking import AcceptedIOType, StringPathLike

log = logging.getLogger("can.io.canutils")

Expand All @@ -22,7 +22,7 @@
CANFD_ESI = 0x02


class CanutilsLogReader(BaseIOHandler):
class CanutilsLogReader(MessageReader):
"""
Iterator over CAN messages from a .log Logging File (candump -L).

Expand All @@ -32,30 +32,37 @@ class CanutilsLogReader(BaseIOHandler):
``(0.0) vcan0 001#8d00100100820100``
"""

def __init__(self, file: AcceptedIOType) -> None:
file: TextIO

def __init__(self, file: Union[StringPathLike, TextIO]) -> None:
"""
:param file: a path-like object or as file-like object to read from
If this is a file-like object, is has to opened in text
read mode, not binary read mode.
"""
super().__init__(file, mode="r")

def __iter__(self):
def __iter__(self) -> Generator[Message, None, None]:
for line in self.file:

# skip empty lines
temp = line.strip()
if not temp:
continue

timestamp, channel, frame = temp.split()
timestamp = float(timestamp[1:-1])
canId, data = frame.split("#", maxsplit=1)
if channel.isdigit():
channel = int(channel)
channel_string: str
timestamp_string, channel_string, frame = temp.split()
timestamp = float(timestamp_string[1:-1])
can_id_string, data = frame.split("#", maxsplit=1)

channel: Union[int, str]
if channel_string.isdigit():
channel = int(channel_string)
else:
channel = channel_string

isExtended = len(canId) > 3
canId = int(canId, 16)
is_extended = len(can_id_string) > 3
can_id = int(can_id_string, 16)

is_fd = False
brs = False
Expand All @@ -69,43 +76,43 @@ def __iter__(self):
data = data[2:]

if data and data[0].lower() == "r":
isRemoteFrame = True
is_remote_frame = True

if len(data) > 1:
dlc = int(data[1:])
else:
dlc = 0

dataBin = None
data_bin = None
else:
isRemoteFrame = False
is_remote_frame = False

dlc = len(data) // 2
dataBin = bytearray()
data_bin = bytearray()
for i in range(0, len(data), 2):
dataBin.append(int(data[i : (i + 2)], 16))
data_bin.append(int(data[i : (i + 2)], 16))

if canId & CAN_ERR_FLAG and canId & CAN_ERR_BUSERROR:
if can_id & CAN_ERR_FLAG and can_id & CAN_ERR_BUSERROR:
msg = Message(timestamp=timestamp, is_error_frame=True)
else:
msg = Message(
timestamp=timestamp,
arbitration_id=canId & 0x1FFFFFFF,
is_extended_id=isExtended,
is_remote_frame=isRemoteFrame,
arbitration_id=can_id & 0x1FFFFFFF,
is_extended_id=is_extended,
is_remote_frame=is_remote_frame,
is_fd=is_fd,
bitrate_switch=brs,
error_state_indicator=esi,
dlc=dlc,
data=dataBin,
data=data_bin,
channel=channel,
)
yield msg

self.stop()


class CanutilsLogWriter(FileIOMessageWriter, Listener):
class CanutilsLogWriter(FileIOMessageWriter):
"""Logs CAN data to an ASCII log file (.log).
This class is is compatible with "candump -L".

Expand All @@ -115,7 +122,10 @@ class CanutilsLogWriter(FileIOMessageWriter, Listener):
"""

def __init__(
self, file: AcceptedIOType, channel: str = "vcan0", append: bool = False
self,
file: Union[StringPathLike, TextIO],
channel: str = "vcan0",
append: bool = False,
):
"""
:param file: a path-like object or as file-like object to write to
Expand Down
21 changes: 12 additions & 9 deletions can/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@
"""

from base64 import b64encode, b64decode
from typing import TextIO
from typing import TextIO, Generator, Union

from can.message import Message
from can.listener import Listener
from .generic import BaseIOHandler, FileIOMessageWriter
from ..typechecking import AcceptedIOType
from .generic import FileIOMessageWriter, MessageReader
from ..typechecking import StringPathLike


class CSVReader(BaseIOHandler):
class CSVReader(MessageReader):
"""Iterator over CAN messages from a .csv file that was
generated by :class:`~can.CSVWriter` or that uses the same
format as described there. Assumes that there is a header
Expand All @@ -27,15 +26,17 @@ class CSVReader(BaseIOHandler):
Any line separator is accepted.
"""

def __init__(self, file: AcceptedIOType) -> None:
file: TextIO

def __init__(self, file: Union[StringPathLike, TextIO]) -> None:
"""
:param file: a path-like object or as file-like object to read from
If this is a file-like object, is has to opened in text
read mode, not binary read mode.
"""
super().__init__(file, mode="r")

def __iter__(self):
def __iter__(self) -> Generator[Message, None, None]:
# skip the header line
try:
next(self.file)
Expand All @@ -62,7 +63,7 @@ def __iter__(self):
self.stop()


class CSVWriter(FileIOMessageWriter, Listener):
class CSVWriter(FileIOMessageWriter):
"""Writes a comma separated text file with a line for
each message. Includes a header line.

Expand All @@ -85,7 +86,9 @@ class CSVWriter(FileIOMessageWriter, Listener):

file: TextIO

def __init__(self, file: AcceptedIOType, append: bool = False) -> None:
def __init__(
self, file: Union[StringPathLike, TextIO], append: bool = False
) -> None:
"""
:param file: a path-like object or a file-like object to write to.
If this is a file-like object, is has to open in text
Expand Down
11 changes: 5 additions & 6 deletions can/io/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
Optional,
cast,
Iterable,
Union,
TextIO,
BinaryIO,
Type,
ContextManager,
)
Expand Down Expand Up @@ -76,21 +73,23 @@ def stop(self) -> None:
class MessageWriter(BaseIOHandler, can.Listener, metaclass=ABCMeta):
"""The base class for all writers."""

file: Optional[can.typechecking.FileLike]


# pylint: disable=abstract-method,too-few-public-methods
class FileIOMessageWriter(MessageWriter, metaclass=ABCMeta):
"""A specialized base class for all writers with file descriptors."""

file: Union[TextIO, BinaryIO]
file: can.typechecking.FileLike

def __init__(self, file: can.typechecking.AcceptedIOType, mode: str = "rt") -> None:
# Not possible with the type signature, but be verbose for user friendliness
# Not possible with the type signature, but be verbose for user-friendliness
if file is None:
raise ValueError("The given file cannot be None")

super().__init__(file, mode)


# pylint: disable=too-few-public-methods
class MessageReader(BaseIOHandler, Iterable, metaclass=ABCMeta):
class MessageReader(BaseIOHandler, Iterable[can.Message], metaclass=ABCMeta):
"""The base class for all readers."""
Loading