-
-
Notifications
You must be signed in to change notification settings - Fork 33.4k
/
Copy pathactive_update_processor.py
166 lines (135 loc) · 5.85 KB
/
active_update_processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""A Bluetooth passive processor coordinator.
Collects data from advertisements but can also poll.
"""
from __future__ import annotations
from collections.abc import Callable, Coroutine
import logging
from typing import Any
from bleak import BleakError
from bluetooth_data_tools import monotonic_time_coarse
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.debounce import Debouncer
from . import BluetoothChange, BluetoothScanningMode, BluetoothServiceInfoBleak
from .passive_update_processor import PassiveBluetoothProcessorCoordinator
POLL_DEFAULT_COOLDOWN = 10
POLL_DEFAULT_IMMEDIATE = True
class ActiveBluetoothProcessorCoordinator[_DataT](
PassiveBluetoothProcessorCoordinator[_DataT]
):
"""A processor coordinator that parses passive data.
Parses passive data from advertisements but can also poll.
Every time an advertisement is received, needs_poll_method is called to work
out if a poll is needed. This should return True if it is and False if it is
not needed.
def needs_poll_method(
svc_info: BluetoothServiceInfoBleak,
last_poll: float | None
) -> bool:
return True
If there has been no poll since HA started, `last_poll` will be None.
Otherwise it is the number of seconds since one was last attempted.
If a poll is needed, the coordinator will call poll_method. This is a coroutine.
It should return the same type of data as your update_method. The expectation is
that data from advertisements and from polling are being parsed and fed into a
shared object that represents the current state of the device.
async def poll_method(svc_info: BluetoothServiceInfoBleak) -> YourDataType:
return YourDataType(....)
BluetoothServiceInfoBleak.device contains a BLEDevice. You should use this in
your poll function, as it is the most efficient way to get a BleakClient.
"""
def __init__(
self,
hass: HomeAssistant,
logger: logging.Logger,
*,
address: str,
mode: BluetoothScanningMode,
update_method: Callable[[BluetoothServiceInfoBleak], _DataT],
needs_poll_method: Callable[[BluetoothServiceInfoBleak, float | None], bool],
poll_method: Callable[
[BluetoothServiceInfoBleak],
Coroutine[Any, Any, _DataT],
]
| None = None,
poll_debouncer: Debouncer[Coroutine[Any, Any, None]] | None = None,
connectable: bool = True,
) -> None:
"""Initialize the processor."""
super().__init__(hass, logger, address, mode, update_method, connectable)
self._needs_poll_method = needs_poll_method
self._poll_method = poll_method
self._last_poll: float | None = None
self.last_poll_successful = True
# We keep the last service info in case the poller needs to refer to
# e.g. its BLEDevice
self._last_service_info: BluetoothServiceInfoBleak | None = None
if poll_debouncer is None:
poll_debouncer = Debouncer(
hass,
logger,
cooldown=POLL_DEFAULT_COOLDOWN,
immediate=POLL_DEFAULT_IMMEDIATE,
function=self._async_poll,
background=True,
)
else:
poll_debouncer.function = self._async_poll
self._debounced_poll = poll_debouncer
def needs_poll(self, service_info: BluetoothServiceInfoBleak) -> bool:
"""Return true if time to try and poll."""
if self.hass.is_stopping:
return False
poll_age: float | None = None
if self._last_poll:
poll_age = service_info.time - self._last_poll
return self._needs_poll_method(service_info, poll_age)
async def _async_poll_data(
self, last_service_info: BluetoothServiceInfoBleak
) -> _DataT:
"""Fetch the latest data from the source."""
if self._poll_method is None:
raise NotImplementedError("Poll method not implemented")
return await self._poll_method(last_service_info)
async def _async_poll(self) -> None:
"""Poll the device to retrieve any extra data."""
assert self._last_service_info
try:
update = await self._async_poll_data(self._last_service_info)
except BleakError as exc:
if self.last_poll_successful:
self.logger.error(
"%s: Bluetooth error whilst polling: %s", self.address, str(exc)
)
self.last_poll_successful = False
return
except Exception: # noqa: BLE001
if self.last_poll_successful:
self.logger.exception("%s: Failure while polling", self.address)
self.last_poll_successful = False
return
finally:
self._last_poll = monotonic_time_coarse()
if not self.last_poll_successful:
self.logger.debug("%s: Polling recovered", self.address)
self.last_poll_successful = True
for processor in self._processors:
processor.async_handle_update(update)
@callback
def _async_handle_bluetooth_event(
self,
service_info: BluetoothServiceInfoBleak,
change: BluetoothChange,
) -> None:
"""Handle a Bluetooth event."""
super()._async_handle_bluetooth_event(service_info, change)
self._last_service_info = service_info
# See if its time to poll
# We use bluetooth events to trigger the poll so that we scan as soon as
# possible after a device comes online or back in range, if a poll is due
if self.needs_poll(service_info):
self._debounced_poll.async_schedule_call()
@callback
def _async_stop(self) -> None:
"""Cancel debouncer and stop the callbacks."""
self._debounced_poll.async_cancel()
super()._async_stop()