Skip to content

Fix deprecation of asyncio.get_event_loop() #1235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jan 27, 2022
Merged
2 changes: 1 addition & 1 deletion can/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def __new__( # type: ignore # pylint: disable=keyword-arg-before-vararg
# resolve the bus class to use for that interface
cls = _get_class_for_interface(kwargs["interface"])

# remove the 'interface' key so it doesn't get passed to the backend
# remove the "interface" key, so it doesn't get passed to the backend
del kwargs["interface"]

# make sure the bus can handle this config format
Expand Down
38 changes: 22 additions & 16 deletions can/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,30 @@
This module contains the implementation of :class:`~can.Notifier`.
"""

from typing import Any, cast, Iterable, List, Optional, Union, Awaitable
import asyncio
import logging
import threading
import time
from typing import Any, Callable, cast, Iterable, List, Optional, Union, Awaitable

from can.bus import BusABC
from can.listener import Listener
from can.message import Message

import threading
import logging
import time
import asyncio

logger = logging.getLogger("can.Notifier")

Listenable = Union[Listener, Callable[[Message], None]]


class Notifier:
def __init__(
self,
bus: Union[BusABC, List[BusABC]],
listeners: Iterable[Listener],
listeners: Iterable[Listenable],
timeout: float = 1.0,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> None:
"""Manages the distribution of :class:`can.Message` instances to listeners.
"""Manages the distribution of :class:`~can.Message` instances to listeners.

Supports multiple buses and listeners.

Expand All @@ -35,11 +36,13 @@ def __init__(


:param bus: A :ref:`bus` or a list of buses to listen to.
:param listeners: An iterable of :class:`~can.Listener`
:param timeout: An optional maximum number of seconds to wait for any message.
:param loop: An :mod:`asyncio` event loop to schedule listeners in.
:param listeners:
An iterable of :class:`~can.Listener` or callables that receive a :class:`~can.Message`
and return nothing.
:param timeout: An optional maximum number of seconds to wait for any :class:`~can.Message`.
:param loop: An :mod:`asyncio` event loop to schedule the ``listeners`` in.
"""
self.listeners: List[Listener] = list(listeners)
self.listeners: List[Listenable] = list(listeners)
self.bus = bus
self.timeout = timeout
self._loop = loop
Expand Down Expand Up @@ -101,8 +104,8 @@ def stop(self, timeout: float = 5) -> None:
# reader is a file descriptor
self._loop.remove_reader(reader)
for listener in self.listeners:
if hasattr(listener, "stop"):
listener.stop()
# Mypy prefers this over a hasattr(...) check
getattr(listener, "stop", lambda: None)()

def _rx_thread(self, bus: BusABC) -> None:
msg = None
Expand Down Expand Up @@ -150,9 +153,12 @@ def _on_error(self, exc: Exception) -> bool:
was_handled = False

for listener in self.listeners:
if hasattr(listener, "on_error"):
on_error = getattr(
listener, "on_error", None
) # Mypy prefers this over hasattr(...)
if on_error is not None:
try:
listener.on_error(exc)
on_error(exc)
except NotImplementedError:
pass
else:
Expand Down
80 changes: 38 additions & 42 deletions examples/asyncio_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,53 @@
"""

import asyncio
from typing import List

import can
from can.notifier import Listenable


def print_message(msg):
def print_message(msg: can.Message) -> None:
"""Regular callback function. Can also be a coroutine."""
print(msg)


async def main():
async def main() -> None:
"""The main function that runs in the loop."""

bus = can.Bus("vcan0", bustype="virtual", receive_own_messages=True)
reader = can.AsyncBufferedReader()
logger = can.Logger("logfile.asc")

listeners = [
print_message, # Callback function
reader, # AsyncBufferedReader() listener
logger, # Regular Listener object
]
# Create Notifier with an explicit loop to use for scheduling of callbacks
loop = asyncio.get_event_loop()
notifier = can.Notifier(bus, listeners, loop=loop)
# Start sending first message
bus.send(can.Message(arbitration_id=0))

print("Bouncing 10 messages...")
for _ in range(10):
# Wait for next message from AsyncBufferedReader
msg = await reader.get_message()
# Delay response
await asyncio.sleep(0.5)
msg.arbitration_id += 1
bus.send(msg)
# Wait for last message to arrive
await reader.get_message()
print("Done!")

# Clean-up
notifier.stop()
bus.shutdown()
with can.Bus( # type: ignore
interface="virtual", channel="my_channel_0", receive_own_messages=True
) as bus:
reader = can.AsyncBufferedReader()
logger = can.Logger("logfile.asc")

listeners: List[Listenable] = [
print_message, # Callback function
reader, # AsyncBufferedReader() listener
logger, # Regular Listener object
]
# Create Notifier with an explicit loop to use for scheduling of callbacks
loop = asyncio.get_running_loop()
notifier = can.Notifier(bus, listeners, loop=loop)
# Start sending first message
bus.send(can.Message(arbitration_id=0))

print("Bouncing 10 messages...")
for _ in range(10):
# Wait for next message from AsyncBufferedReader
msg = await reader.get_message()
# Delay response
await asyncio.sleep(0.5)
msg.arbitration_id += 1
bus.send(msg)

# Wait for last message to arrive
await reader.get_message()
print("Done!")

# Clean-up
notifier.stop()


if __name__ == "__main__":
try:
# Get the default event loop
LOOP = asyncio.get_event_loop()
# Run until main coroutine finishes
LOOP.run_until_complete(main())
finally:
LOOP.close()

# or on Python 3.7+ simply
# asyncio.run(main())
asyncio.run(main())
6 changes: 3 additions & 3 deletions examples/serial_com.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ def receive(bus, stop_event):


def main():
"""Controles the sender and receiver."""
with can.interface.Bus(bustype="serial", channel="/dev/ttyS10") as server:
with can.interface.Bus(bustype="serial", channel="/dev/ttyS11") as client:
"""Controls the sender and receiver."""
with can.interface.Bus(interface="serial", channel="/dev/ttyS10") as server:
with can.interface.Bus(interface="serial", channel="/dev/ttyS11") as client:

tx_msg = can.Message(
arbitration_id=0x01,
Expand Down
70 changes: 34 additions & 36 deletions test/notifier_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,48 +9,46 @@

class NotifierTest(unittest.TestCase):
def test_single_bus(self):
bus = can.Bus("test", bustype="virtual", receive_own_messages=True)
reader = can.BufferedReader()
notifier = can.Notifier(bus, [reader], 0.1)
msg = can.Message()
bus.send(msg)
self.assertIsNotNone(reader.get_message(1))
notifier.stop()
bus.shutdown()
with can.Bus("test", interface="virtual", receive_own_messages=True) as bus:
reader = can.BufferedReader()
notifier = can.Notifier(bus, [reader], 0.1)
msg = can.Message()
bus.send(msg)
self.assertIsNotNone(reader.get_message(1))
notifier.stop()

def test_multiple_bus(self):
bus1 = can.Bus(0, bustype="virtual", receive_own_messages=True)
bus2 = can.Bus(1, bustype="virtual", receive_own_messages=True)
reader = can.BufferedReader()
notifier = can.Notifier([bus1, bus2], [reader], 0.1)
msg = can.Message()
bus1.send(msg)
time.sleep(0.1)
bus2.send(msg)
recv_msg = reader.get_message(1)
self.assertIsNotNone(recv_msg)
self.assertEqual(recv_msg.channel, 0)
recv_msg = reader.get_message(1)
self.assertIsNotNone(recv_msg)
self.assertEqual(recv_msg.channel, 1)
notifier.stop()
bus1.shutdown()
bus2.shutdown()
with can.Bus(0, interface="virtual", receive_own_messages=True) as bus1:
with can.Bus(1, interface="virtual", receive_own_messages=True) as bus2:
reader = can.BufferedReader()
notifier = can.Notifier([bus1, bus2], [reader], 0.1)
msg = can.Message()
bus1.send(msg)
time.sleep(0.1)
bus2.send(msg)
recv_msg = reader.get_message(1)
self.assertIsNotNone(recv_msg)
self.assertEqual(recv_msg.channel, 0)
recv_msg = reader.get_message(1)
self.assertIsNotNone(recv_msg)
self.assertEqual(recv_msg.channel, 1)
notifier.stop()


class AsyncNotifierTest(unittest.TestCase):
def test_asyncio_notifier(self):
loop = asyncio.get_event_loop()
bus = can.Bus("test", bustype="virtual", receive_own_messages=True)
reader = can.AsyncBufferedReader()
notifier = can.Notifier(bus, [reader], 0.1, loop=loop)
msg = can.Message()
bus.send(msg)
future = asyncio.wait_for(reader.get_message(), 1.0)
recv_msg = loop.run_until_complete(future)
self.assertIsNotNone(recv_msg)
notifier.stop()
bus.shutdown()
async def run_it():
with can.Bus("test", interface="virtual", receive_own_messages=True) as bus:
reader = can.AsyncBufferedReader()
notifier = can.Notifier(
bus, [reader], 0.1, loop=asyncio.get_running_loop()
)
bus.send(can.Message())
recv_msg = await asyncio.wait_for(reader.get_message(), 0.5)
self.assertIsNotNone(recv_msg)
notifier.stop()

asyncio.run(run_it())


if __name__ == "__main__":
Expand Down