diff --git a/can/interface.py b/can/interface.py index e217f2fb6..f1a0087e3 100644 --- a/can/interface.py +++ b/can/interface.py @@ -103,7 +103,7 @@ def __new__( # type: ignore # pylint: disable=keyword-arg-before-vararg # resolve the bus class to use for that interface cls = _get_class_for_interface(kwargs["interface"]) - # remove the 'interface' key so it doesn't get passed to the backend + # remove the "interface" key, so it doesn't get passed to the backend del kwargs["interface"] # make sure the bus can handle this config format diff --git a/can/notifier.py b/can/notifier.py index 2554fb4fc..f7c004c4e 100644 --- a/can/notifier.py +++ b/can/notifier.py @@ -2,29 +2,30 @@ This module contains the implementation of :class:`~can.Notifier`. """ -from typing import Any, cast, Iterable, List, Optional, Union, Awaitable +import asyncio +import logging +import threading +import time +from typing import Any, Callable, cast, Iterable, List, Optional, Union, Awaitable from can.bus import BusABC from can.listener import Listener from can.message import Message -import threading -import logging -import time -import asyncio - logger = logging.getLogger("can.Notifier") +MessageRecipient = Union[Listener, Callable[[Message], None]] + class Notifier: def __init__( self, bus: Union[BusABC, List[BusABC]], - listeners: Iterable[Listener], + listeners: Iterable[MessageRecipient], timeout: float = 1.0, loop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: - """Manages the distribution of :class:`can.Message` instances to listeners. + """Manages the distribution of :class:`~can.Message` instances to listeners. Supports multiple buses and listeners. @@ -35,11 +36,13 @@ def __init__( :param bus: A :ref:`bus` or a list of buses to listen to. - :param listeners: An iterable of :class:`~can.Listener` - :param timeout: An optional maximum number of seconds to wait for any message. - :param loop: An :mod:`asyncio` event loop to schedule listeners in. + :param listeners: + An iterable of :class:`~can.Listener` or callables that receive a :class:`~can.Message` + and return nothing. + :param timeout: An optional maximum number of seconds to wait for any :class:`~can.Message`. + :param loop: An :mod:`asyncio` event loop to schedule the ``listeners`` in. """ - self.listeners: List[Listener] = list(listeners) + self.listeners: List[MessageRecipient] = list(listeners) self.bus = bus self.timeout = timeout self._loop = loop @@ -101,8 +104,8 @@ def stop(self, timeout: float = 5) -> None: # reader is a file descriptor self._loop.remove_reader(reader) for listener in self.listeners: - if hasattr(listener, "stop"): - listener.stop() + # Mypy prefers this over a hasattr(...) check + getattr(listener, "stop", lambda: None)() def _rx_thread(self, bus: BusABC) -> None: msg = None @@ -150,9 +153,12 @@ def _on_error(self, exc: Exception) -> bool: was_handled = False for listener in self.listeners: - if hasattr(listener, "on_error"): + on_error = getattr( + listener, "on_error", None + ) # Mypy prefers this over hasattr(...) + if on_error is not None: try: - listener.on_error(exc) + on_error(exc) except NotImplementedError: pass else: diff --git a/examples/asyncio_demo.py b/examples/asyncio_demo.py index d501d1aaf..0f37d6573 100755 --- a/examples/asyncio_demo.py +++ b/examples/asyncio_demo.py @@ -5,57 +5,53 @@ """ import asyncio +from typing import List + import can +from can.notifier import MessageRecipient -def print_message(msg): +def print_message(msg: can.Message) -> None: """Regular callback function. Can also be a coroutine.""" print(msg) -async def main(): +async def main() -> None: """The main function that runs in the loop.""" - bus = can.Bus("vcan0", bustype="virtual", receive_own_messages=True) - reader = can.AsyncBufferedReader() - logger = can.Logger("logfile.asc") - - listeners = [ - print_message, # Callback function - reader, # AsyncBufferedReader() listener - logger, # Regular Listener object - ] - # Create Notifier with an explicit loop to use for scheduling of callbacks - loop = asyncio.get_event_loop() - notifier = can.Notifier(bus, listeners, loop=loop) - # Start sending first message - bus.send(can.Message(arbitration_id=0)) - - print("Bouncing 10 messages...") - for _ in range(10): - # Wait for next message from AsyncBufferedReader - msg = await reader.get_message() - # Delay response - await asyncio.sleep(0.5) - msg.arbitration_id += 1 - bus.send(msg) - # Wait for last message to arrive - await reader.get_message() - print("Done!") - - # Clean-up - notifier.stop() - bus.shutdown() + with can.Bus( # type: ignore + interface="virtual", channel="my_channel_0", receive_own_messages=True + ) as bus: + reader = can.AsyncBufferedReader() + logger = can.Logger("logfile.asc") + + listeners: List[MessageRecipient] = [ + print_message, # Callback function + reader, # AsyncBufferedReader() listener + logger, # Regular Listener object + ] + # Create Notifier with an explicit loop to use for scheduling of callbacks + loop = asyncio.get_running_loop() + notifier = can.Notifier(bus, listeners, loop=loop) + # Start sending first message + bus.send(can.Message(arbitration_id=0)) + + print("Bouncing 10 messages...") + for _ in range(10): + # Wait for next message from AsyncBufferedReader + msg = await reader.get_message() + # Delay response + await asyncio.sleep(0.5) + msg.arbitration_id += 1 + bus.send(msg) + + # Wait for last message to arrive + await reader.get_message() + print("Done!") + + # Clean-up + notifier.stop() if __name__ == "__main__": - try: - # Get the default event loop - LOOP = asyncio.get_event_loop() - # Run until main coroutine finishes - LOOP.run_until_complete(main()) - finally: - LOOP.close() - - # or on Python 3.7+ simply - # asyncio.run(main()) + asyncio.run(main()) diff --git a/examples/serial_com.py b/examples/serial_com.py index 1fbc997b2..c57207a77 100755 --- a/examples/serial_com.py +++ b/examples/serial_com.py @@ -47,9 +47,9 @@ def receive(bus, stop_event): def main(): - """Controles the sender and receiver.""" - with can.interface.Bus(bustype="serial", channel="/dev/ttyS10") as server: - with can.interface.Bus(bustype="serial", channel="/dev/ttyS11") as client: + """Controls the sender and receiver.""" + with can.interface.Bus(interface="serial", channel="/dev/ttyS10") as server: + with can.interface.Bus(interface="serial", channel="/dev/ttyS11") as client: tx_msg = can.Message( arbitration_id=0x01, diff --git a/test/notifier_test.py b/test/notifier_test.py index c9d8f4a27..ca2093f55 100644 --- a/test/notifier_test.py +++ b/test/notifier_test.py @@ -9,48 +9,46 @@ class NotifierTest(unittest.TestCase): def test_single_bus(self): - bus = can.Bus("test", bustype="virtual", receive_own_messages=True) - reader = can.BufferedReader() - notifier = can.Notifier(bus, [reader], 0.1) - msg = can.Message() - bus.send(msg) - self.assertIsNotNone(reader.get_message(1)) - notifier.stop() - bus.shutdown() + with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: + reader = can.BufferedReader() + notifier = can.Notifier(bus, [reader], 0.1) + msg = can.Message() + bus.send(msg) + self.assertIsNotNone(reader.get_message(1)) + notifier.stop() def test_multiple_bus(self): - bus1 = can.Bus(0, bustype="virtual", receive_own_messages=True) - bus2 = can.Bus(1, bustype="virtual", receive_own_messages=True) - reader = can.BufferedReader() - notifier = can.Notifier([bus1, bus2], [reader], 0.1) - msg = can.Message() - bus1.send(msg) - time.sleep(0.1) - bus2.send(msg) - recv_msg = reader.get_message(1) - self.assertIsNotNone(recv_msg) - self.assertEqual(recv_msg.channel, 0) - recv_msg = reader.get_message(1) - self.assertIsNotNone(recv_msg) - self.assertEqual(recv_msg.channel, 1) - notifier.stop() - bus1.shutdown() - bus2.shutdown() + with can.Bus(0, interface="virtual", receive_own_messages=True) as bus1: + with can.Bus(1, interface="virtual", receive_own_messages=True) as bus2: + reader = can.BufferedReader() + notifier = can.Notifier([bus1, bus2], [reader], 0.1) + msg = can.Message() + bus1.send(msg) + time.sleep(0.1) + bus2.send(msg) + recv_msg = reader.get_message(1) + self.assertIsNotNone(recv_msg) + self.assertEqual(recv_msg.channel, 0) + recv_msg = reader.get_message(1) + self.assertIsNotNone(recv_msg) + self.assertEqual(recv_msg.channel, 1) + notifier.stop() class AsyncNotifierTest(unittest.TestCase): def test_asyncio_notifier(self): - loop = asyncio.get_event_loop() - bus = can.Bus("test", bustype="virtual", receive_own_messages=True) - reader = can.AsyncBufferedReader() - notifier = can.Notifier(bus, [reader], 0.1, loop=loop) - msg = can.Message() - bus.send(msg) - future = asyncio.wait_for(reader.get_message(), 1.0) - recv_msg = loop.run_until_complete(future) - self.assertIsNotNone(recv_msg) - notifier.stop() - bus.shutdown() + async def run_it(): + with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: + reader = can.AsyncBufferedReader() + notifier = can.Notifier( + bus, [reader], 0.1, loop=asyncio.get_running_loop() + ) + bus.send(can.Message()) + recv_msg = await asyncio.wait_for(reader.get_message(), 0.5) + self.assertIsNotNone(recv_msg) + notifier.stop() + + asyncio.run(run_it()) if __name__ == "__main__":