-
-
Notifications
You must be signed in to change notification settings - Fork 33.4k
/
Copy pathsensor.py
246 lines (209 loc) · 8.37 KB
/
sensor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""Allows to configure custom shell commands to turn a value for a sensor."""
from __future__ import annotations
import asyncio
from collections.abc import Mapping
from datetime import datetime, timedelta
import json
from typing import Any, cast
from jsonpath import jsonpath
from homeassistant.components.sensor import SensorDeviceClass
from homeassistant.components.sensor.helpers import async_parse_date_datetime
from homeassistant.const import (
CONF_COMMAND,
CONF_NAME,
CONF_SCAN_INTERVAL,
CONF_VALUE_TEMPLATE,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import TemplateError
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.template import Template
from homeassistant.helpers.trigger_template_entity import ManualTriggerSensorEntity
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util import dt as dt_util
from .const import (
CONF_COMMAND_TIMEOUT,
CONF_JSON_ATTRIBUTES,
CONF_JSON_ATTRIBUTES_PATH,
LOGGER,
TRIGGER_ENTITY_OPTIONS,
)
from .utils import async_check_output_or_log
DEFAULT_NAME = "Command Sensor"
SCAN_INTERVAL = timedelta(seconds=60)
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the Command Sensor."""
if not discovery_info:
return
discovery_info = cast(DiscoveryInfoType, discovery_info)
sensor_config = discovery_info
command: str = sensor_config[CONF_COMMAND]
command_timeout: int = sensor_config[CONF_COMMAND_TIMEOUT]
json_attributes: list[str] | None = sensor_config.get(CONF_JSON_ATTRIBUTES)
json_attributes_path: str | None = sensor_config.get(CONF_JSON_ATTRIBUTES_PATH)
scan_interval: timedelta = sensor_config.get(CONF_SCAN_INTERVAL, SCAN_INTERVAL)
value_template: Template | None = sensor_config.get(CONF_VALUE_TEMPLATE)
data = CommandSensorData(hass, command, command_timeout)
trigger_entity_config = {
CONF_NAME: Template(sensor_config[CONF_NAME], hass),
**{k: v for k, v in sensor_config.items() if k in TRIGGER_ENTITY_OPTIONS},
}
async_add_entities(
[
CommandSensor(
data,
trigger_entity_config,
value_template,
json_attributes,
json_attributes_path,
scan_interval,
)
]
)
class CommandSensor(ManualTriggerSensorEntity):
"""Representation of a sensor that is using shell commands."""
_attr_should_poll = False
def __init__(
self,
data: CommandSensorData,
config: ConfigType,
value_template: Template | None,
json_attributes: list[str] | None,
json_attributes_path: str | None,
scan_interval: timedelta,
) -> None:
"""Initialize the sensor."""
super().__init__(self.hass, config)
self.data = data
self._attr_extra_state_attributes: dict[str, Any] = {}
self._json_attributes = json_attributes
self._json_attributes_path = json_attributes_path
self._attr_native_value = None
self._value_template = value_template
self._scan_interval = scan_interval
self._process_updates: asyncio.Lock | None = None
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return extra state attributes."""
return self._attr_extra_state_attributes
async def async_added_to_hass(self) -> None:
"""Call when entity about to be added to hass."""
await super().async_added_to_hass()
await self._update_entity_state()
self.async_on_remove(
async_track_time_interval(
self.hass,
self._update_entity_state,
self._scan_interval,
name=f"Command Line Sensor - {self.name}",
cancel_on_shutdown=True,
),
)
async def _update_entity_state(self, now: datetime | None = None) -> None:
"""Update the state of the entity."""
if self._process_updates is None:
self._process_updates = asyncio.Lock()
if self._process_updates.locked():
LOGGER.warning(
"Updating Command Line Sensor %s took longer than the scheduled update interval %s",
self.name,
self._scan_interval,
)
return
async with self._process_updates:
await self._async_update()
async def _async_update(self) -> None:
"""Get the latest data and updates the state."""
await self.data.async_update()
value = self.data.value
if self._json_attributes:
self._attr_extra_state_attributes = {}
if value:
try:
json_dict = json.loads(value)
if self._json_attributes_path is not None:
json_dict = jsonpath(json_dict, self._json_attributes_path)
# jsonpath will always store the result in json_dict[0]
# so the next line happens to work exactly as needed to
# find the result
if isinstance(json_dict, list):
json_dict = json_dict[0]
if isinstance(json_dict, Mapping):
self._attr_extra_state_attributes = {
k: json_dict[k]
for k in self._json_attributes
if k in json_dict
}
else:
LOGGER.warning("JSON result was not a dictionary")
except ValueError:
LOGGER.warning("Unable to parse output as JSON: %s", value)
else:
LOGGER.warning("Empty reply found when expecting JSON data")
if self._value_template is None:
self._attr_native_value = None
self._process_manual_data(value)
return
self._attr_native_value = None
if self._value_template is not None and value is not None:
value = self._value_template.async_render_with_possible_json_value(
value,
None,
)
if self.device_class not in {
SensorDeviceClass.DATE,
SensorDeviceClass.TIMESTAMP,
}:
self._attr_native_value = value
elif value is not None:
self._attr_native_value = async_parse_date_datetime(
value, self.entity_id, self.device_class
)
self._process_manual_data(value)
self.async_write_ha_state()
async def async_update(self) -> None:
"""Update the entity.
Only used by the generic entity update service.
"""
await self._update_entity_state(dt_util.now())
class CommandSensorData:
"""The class for handling the data retrieval."""
def __init__(self, hass: HomeAssistant, command: str, command_timeout: int) -> None:
"""Initialize the data object."""
self.value: str | None = None
self.hass = hass
self.command = command
self.timeout = command_timeout
async def async_update(self) -> None:
"""Get the latest data with a shell command."""
command = self.command
if " " not in command:
prog = command
args = None
args_compiled = None
else:
prog, args = command.split(" ", 1)
args_compiled = Template(args, self.hass)
if args_compiled:
try:
args_to_render = {"arguments": args}
rendered_args = args_compiled.async_render(args_to_render)
except TemplateError as ex:
LOGGER.exception("Error rendering command template: %s", ex)
return
else:
rendered_args = None
if rendered_args == args:
# No template used. default behavior
pass
else:
# Template used. Construct the string used in the shell
command = f"{prog} {rendered_args}"
LOGGER.debug("Running command: %s", command)
self.value = await async_check_output_or_log(command, self.timeout)