-
-
Notifications
You must be signed in to change notification settings - Fork 33.4k
/
Copy pathreproduce_state.py
135 lines (118 loc) · 3.94 KB
/
reproduce_state.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""Reproduce an Cover state."""
from __future__ import annotations
import asyncio
from collections.abc import Iterable
import logging
from typing import Any
from homeassistant.const import (
ATTR_ENTITY_ID,
SERVICE_CLOSE_COVER,
SERVICE_CLOSE_COVER_TILT,
SERVICE_OPEN_COVER,
SERVICE_OPEN_COVER_TILT,
SERVICE_SET_COVER_POSITION,
SERVICE_SET_COVER_TILT_POSITION,
)
from homeassistant.core import Context, HomeAssistant, State
from . import (
ATTR_CURRENT_POSITION,
ATTR_CURRENT_TILT_POSITION,
ATTR_POSITION,
ATTR_TILT_POSITION,
DOMAIN,
CoverState,
)
_LOGGER = logging.getLogger(__name__)
VALID_STATES = {
CoverState.CLOSED,
CoverState.CLOSING,
CoverState.OPEN,
CoverState.OPENING,
}
async def _async_reproduce_state(
hass: HomeAssistant,
state: State,
*,
context: Context | None = None,
reproduce_options: dict[str, Any] | None = None,
) -> None:
"""Reproduce a single state."""
if (cur_state := hass.states.get(state.entity_id)) is None:
_LOGGER.warning("Unable to find entity %s", state.entity_id)
return
if state.state not in VALID_STATES:
_LOGGER.warning(
"Invalid state specified for %s: %s", state.entity_id, state.state
)
return
# Return if we are already at the right state.
if (
cur_state.state == state.state
and cur_state.attributes.get(ATTR_CURRENT_POSITION)
== state.attributes.get(ATTR_CURRENT_POSITION)
and cur_state.attributes.get(ATTR_CURRENT_TILT_POSITION)
== state.attributes.get(ATTR_CURRENT_TILT_POSITION)
):
return
service_data = {ATTR_ENTITY_ID: state.entity_id}
service_data_tilting = {ATTR_ENTITY_ID: state.entity_id}
if not (
cur_state.state == state.state
and cur_state.attributes.get(ATTR_CURRENT_POSITION)
== state.attributes.get(ATTR_CURRENT_POSITION)
):
# Open/Close
if state.state in [CoverState.CLOSED, CoverState.CLOSING]:
service = SERVICE_CLOSE_COVER
elif state.state in [CoverState.OPEN, CoverState.OPENING]:
if (
ATTR_CURRENT_POSITION in cur_state.attributes
and ATTR_CURRENT_POSITION in state.attributes
):
service = SERVICE_SET_COVER_POSITION
service_data[ATTR_POSITION] = state.attributes[ATTR_CURRENT_POSITION]
else:
service = SERVICE_OPEN_COVER
await hass.services.async_call(
DOMAIN, service, service_data, context=context, blocking=True
)
if (
ATTR_CURRENT_TILT_POSITION in state.attributes
and ATTR_CURRENT_TILT_POSITION in cur_state.attributes
and cur_state.attributes.get(ATTR_CURRENT_TILT_POSITION)
!= state.attributes.get(ATTR_CURRENT_TILT_POSITION)
):
# Tilt position
if state.attributes.get(ATTR_CURRENT_TILT_POSITION) == 100:
service_tilting = SERVICE_OPEN_COVER_TILT
elif state.attributes.get(ATTR_CURRENT_TILT_POSITION) == 0:
service_tilting = SERVICE_CLOSE_COVER_TILT
else:
service_tilting = SERVICE_SET_COVER_TILT_POSITION
service_data_tilting[ATTR_TILT_POSITION] = state.attributes[
ATTR_CURRENT_TILT_POSITION
]
await hass.services.async_call(
DOMAIN,
service_tilting,
service_data_tilting,
context=context,
blocking=True,
)
async def async_reproduce_states(
hass: HomeAssistant,
states: Iterable[State],
*,
context: Context | None = None,
reproduce_options: dict[str, Any] | None = None,
) -> None:
"""Reproduce Cover states."""
# Reproduce states in parallel.
await asyncio.gather(
*(
_async_reproduce_state(
hass, state, context=context, reproduce_options=reproduce_options
)
for state in states
)
)