Skip to content

Commit e3ea003

Browse files
committed
Add client implementation
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 72bf6ff commit e3ea003

File tree

6 files changed

+1313
-1
lines changed

6 files changed

+1313
-1
lines changed

src/frequenz/client/dispatch/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,7 @@
22
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
33

44
"""Dispatch API client for Python."""
5+
6+
from ._client import Client
7+
8+
__all__ = ["Client"]
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Dispatch API client for Python."""
5+
from datetime import datetime, timedelta
6+
from typing import Any, AsyncIterator, Iterator
7+
8+
import grpc
9+
from frequenz.api.dispatch.v1 import dispatch_pb2_grpc
10+
11+
# pylint: disable=no-name-in-module
12+
from frequenz.api.dispatch.v1.dispatch_pb2 import (
13+
DispatchDeleteRequest,
14+
DispatchFilter,
15+
DispatchGetRequest,
16+
DispatchListRequest,
17+
DispatchUpdateRequest,
18+
)
19+
from frequenz.api.dispatch.v1.dispatch_pb2 import (
20+
TimeIntervalFilter as PBTimeIntervalFilter,
21+
)
22+
from google.protobuf.timestamp_pb2 import Timestamp
23+
24+
from ._internal_types import DispatchCreateRequest
25+
from .types import (
26+
ComponentSelector,
27+
Dispatch,
28+
RecurrenceRule,
29+
component_selector_to_protobuf,
30+
)
31+
32+
# pylint: enable=no-name-in-module
33+
34+
35+
class Client:
36+
"""Dispatch API client."""
37+
38+
def __init__(self, grpc_channel: grpc.aio.Channel, svc_addr: str) -> None:
39+
"""Initialize the client.
40+
41+
Args:
42+
grpc_channel: gRPC channel to use for communication with the API.
43+
svc_addr: Address of the service to connect to.
44+
"""
45+
self._svc_addr = svc_addr
46+
self._stub = dispatch_pb2_grpc.MicrogridDispatchServiceStub(grpc_channel)
47+
48+
# pylint: disable=too-many-arguments, too-many-locals
49+
async def list(
50+
self,
51+
microgrid_id: int,
52+
component_selectors: Iterator[ComponentSelector] = iter(()),
53+
start_from: datetime | None = None,
54+
start_to: datetime | None = None,
55+
end_from: datetime | None = None,
56+
end_to: datetime | None = None,
57+
active: bool | None = None,
58+
dry_run: bool | None = None,
59+
) -> AsyncIterator[Dispatch]:
60+
"""List dispatches.
61+
62+
Example usage:
63+
64+
```python
65+
grpc_channel = grpc.aio.insecure_channel("example")
66+
client = Client(grpc_channel, "localhost:50051")
67+
async for dispatch in client.list(microgrid_id=1):
68+
print(dispatch)
69+
```
70+
71+
Yields:
72+
Dispatch: The dispatches.
73+
74+
Args:
75+
microgrid_id: The microgrid_id to list dispatches for.
76+
component_selectors: optional, list of component ids or categories to filter by.
77+
start_from: optional, filter by start_time >= start_from.
78+
start_to: optional, filter by start_time < start_to.
79+
end_from: optional, filter by end_time >= end_from.
80+
end_to: optional, filter by end_time < end_to.
81+
active: optional, filter by active status.
82+
dry_run: optional, filter by dry_run status.
83+
84+
Returns:
85+
An async iterator of dispatches.
86+
"""
87+
time_interval = None
88+
89+
def to_timestamp(dt: datetime | None) -> Timestamp | None:
90+
if dt is None:
91+
return None
92+
93+
ts = Timestamp()
94+
ts.FromDatetime(dt)
95+
return ts
96+
97+
if start_from or start_to or end_from or end_to:
98+
time_interval = PBTimeIntervalFilter(
99+
start_from=to_timestamp(start_from),
100+
start_to=to_timestamp(start_to),
101+
end_from=to_timestamp(end_from),
102+
end_to=to_timestamp(end_to),
103+
)
104+
105+
selectors = []
106+
107+
for selector in component_selectors:
108+
selectors.append(component_selector_to_protobuf(selector))
109+
110+
filters = DispatchFilter(
111+
selectors=selectors,
112+
time_interval=time_interval,
113+
is_active=active,
114+
is_dry_run=dry_run,
115+
)
116+
request = DispatchListRequest(microgrid_id=microgrid_id, filter=filters)
117+
118+
response = await self._stub.ListMicrogridDispatches(request) # type: ignore
119+
for dispatch in response.dispatches:
120+
yield Dispatch.from_protobuf(dispatch)
121+
122+
async def create(
123+
self,
124+
microgrid_id: int,
125+
_type: str,
126+
start_time: datetime,
127+
duration: timedelta,
128+
selector: ComponentSelector,
129+
active: bool = True,
130+
dry_run: bool = False,
131+
payload: dict[str, Any] | None = None,
132+
recurrence: RecurrenceRule | None = None,
133+
) -> None:
134+
"""Create a dispatch.
135+
136+
Args:
137+
microgrid_id: The microgrid_id to create the dispatch for.
138+
_type: User defined string to identify the dispatch type.
139+
start_time: The start time of the dispatch.
140+
duration: The duration of the dispatch.
141+
selector: The component selector for the dispatch.
142+
active: The active status of the dispatch.
143+
dry_run: The dry_run status of the dispatch.
144+
payload: The payload of the dispatch.
145+
recurrence: The recurrence rule of the dispatch.
146+
147+
Raises:
148+
ValueError: If start_time is in the past.
149+
"""
150+
if start_time <= datetime.now().astimezone(start_time.tzinfo):
151+
raise ValueError("start_time must not be in the past")
152+
153+
request = DispatchCreateRequest(
154+
microgrid_id=microgrid_id,
155+
type=_type,
156+
start_time=start_time,
157+
duration=duration,
158+
selector=selector,
159+
is_active=active,
160+
is_dry_run=dry_run,
161+
payload=payload or {},
162+
recurrence=recurrence or RecurrenceRule(),
163+
).to_protobuf()
164+
165+
await self._stub.CreateMicrogridDispatch(request) # type: ignore
166+
167+
async def update(
168+
self,
169+
dispatch_id: int,
170+
new_fields: dict[str, Any],
171+
) -> None:
172+
"""Update a dispatch.
173+
174+
The `new_fields` argument is a dictionary of fields to update. The keys are
175+
the field names, and the values are the new values for the fields.
176+
177+
For recurrence fields, the keys are preceeded by "recurrence.".
178+
179+
Args:
180+
dispatch_id: The dispatch_id to update.
181+
new_fields: The fields to update.
182+
"""
183+
msg = DispatchUpdateRequest(id=dispatch_id)
184+
185+
for key, val in new_fields.items():
186+
path = key.split(".")
187+
188+
match path[0]:
189+
case "type":
190+
msg.update.type = val
191+
case "start_time":
192+
msg.update.start_time.FromDatetime(val)
193+
case "duration":
194+
msg.update.duration = int(val.total_seconds())
195+
case "selector":
196+
msg.update.selector.CopyFrom(component_selector_to_protobuf(val))
197+
case "is_active":
198+
msg.update.is_active = val
199+
case "active":
200+
msg.update.is_active = val
201+
key = "is_active"
202+
case "is_dry_run":
203+
msg.update.is_dry_run = val
204+
case "dry_run":
205+
msg.update.is_dry_run = val
206+
key = "is_dry_run"
207+
case "recurrence":
208+
match path[1]:
209+
case "freq":
210+
msg.update.recurrence.freq = val
211+
# Proto uses "freq" instead of "frequency"
212+
case "frequency":
213+
msg.update.recurrence.freq = val
214+
# Correct the key to "recurrence.freq"
215+
key = "recurrence.freq"
216+
case "interval":
217+
msg.update.recurrence.interval = val
218+
case "end_criteria":
219+
msg.update.recurrence.end_criteria.CopyFrom(
220+
val.to_protobuf()
221+
)
222+
case "byminutes":
223+
msg.update.recurrence.byminutes.extend(val)
224+
case "byhours":
225+
msg.update.recurrence.byhours.extend(val)
226+
case "byweekdays":
227+
msg.update.recurrence.byweekdays.extend(val)
228+
case "bymonthdays":
229+
msg.update.recurrence.bymonthdays.extend(val)
230+
case "bymonths":
231+
msg.update.recurrence.bymonths.extend(val)
232+
233+
msg.update_mask.paths.append(key)
234+
235+
await self._stub.UpdateMicrogridDispatch(msg) # type: ignore
236+
237+
async def get(self, dispatch_id: int) -> Dispatch:
238+
"""Get a dispatch.
239+
240+
Args:
241+
dispatch_id: The dispatch_id to get.
242+
243+
Returns:
244+
Dispatch: The dispatch.
245+
"""
246+
request = DispatchGetRequest(id=dispatch_id)
247+
response = await self._stub.GetMicrogridDispatch(request) # type: ignore
248+
return Dispatch.from_protobuf(response)
249+
250+
async def delete(self, dispatch_id: int) -> None:
251+
"""Delete a dispatch.
252+
253+
Args:
254+
dispatch_id: The dispatch_id to delete.
255+
"""
256+
request = DispatchDeleteRequest(id=dispatch_id)
257+
await self._stub.DeleteMicrogridDispatch(request) # type: ignore
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Type wrappers for the generated protobuf messages."""
5+
6+
7+
from dataclasses import dataclass
8+
from datetime import datetime, timedelta, timezone
9+
from typing import Any
10+
11+
# pylint: disable=no-name-in-module
12+
from frequenz.api.dispatch.v1.dispatch_pb2 import (
13+
DispatchCreateRequest as PBDispatchCreateRequest,
14+
)
15+
16+
# pylint: enable=no-name-in-module
17+
from google.protobuf.json_format import MessageToDict
18+
19+
from frequenz.client.dispatch.types import (
20+
ComponentSelector,
21+
RecurrenceRule,
22+
component_selector_from_protobuf,
23+
component_selector_to_protobuf,
24+
)
25+
26+
27+
# pylint: disable=too-many-instance-attributes
28+
@dataclass(kw_only=True)
29+
class DispatchCreateRequest:
30+
"""Request to create a new dispatch."""
31+
32+
microgrid_id: int
33+
"""The identifier of the microgrid to which this dispatch belongs."""
34+
35+
type: str
36+
"""User-defined information about the type of dispatch.
37+
38+
This is understood and processed by downstream applications."""
39+
40+
start_time: datetime
41+
"""The start time of the dispatch in UTC."""
42+
43+
duration: timedelta
44+
"""The duration of the dispatch, represented as a timedelta."""
45+
46+
selector: ComponentSelector
47+
"""The component selector specifying which components the dispatch targets."""
48+
49+
is_active: bool
50+
"""Indicates whether the dispatch is active and eligible for processing."""
51+
52+
is_dry_run: bool
53+
"""Indicates if the dispatch is a dry run.
54+
55+
Executed for logging and monitoring without affecting actual component states."""
56+
57+
payload: dict[str, Any]
58+
"""The dispatch payload containing arbitrary data.
59+
60+
It is structured as needed for the dispatch operation."""
61+
62+
recurrence: RecurrenceRule
63+
"""The recurrence rule for the dispatch.
64+
65+
Defining any repeating patterns or schedules."""
66+
67+
@classmethod
68+
def from_protobuf(
69+
cls, pb_object: PBDispatchCreateRequest
70+
) -> "DispatchCreateRequest":
71+
"""Convert a protobuf dispatch create request to a dispatch.
72+
73+
Args:
74+
pb_object: The protobuf dispatch create request to convert.
75+
76+
Returns:
77+
The converted dispatch.
78+
"""
79+
return DispatchCreateRequest(
80+
microgrid_id=pb_object.microgrid_id,
81+
type=pb_object.type,
82+
start_time=pb_object.start_time.ToDatetime().replace(tzinfo=timezone.utc),
83+
duration=timedelta(seconds=pb_object.duration),
84+
selector=component_selector_from_protobuf(pb_object.selector),
85+
is_active=pb_object.is_active,
86+
is_dry_run=pb_object.is_dry_run,
87+
payload=MessageToDict(pb_object.payload),
88+
recurrence=RecurrenceRule.from_protobuf(pb_object.recurrence),
89+
)
90+
91+
def to_protobuf(self) -> PBDispatchCreateRequest:
92+
"""Convert a dispatch to a protobuf dispatch create request.
93+
94+
Returns:
95+
The converted protobuf dispatch create request.
96+
"""
97+
pb_request = PBDispatchCreateRequest()
98+
99+
pb_request.microgrid_id = self.microgrid_id
100+
pb_request.type = self.type
101+
pb_request.start_time.FromDatetime(self.start_time)
102+
pb_request.duration = int(self.duration.total_seconds())
103+
pb_request.selector.CopyFrom(component_selector_to_protobuf(self.selector))
104+
pb_request.is_active = self.is_active
105+
pb_request.is_dry_run = self.is_dry_run
106+
pb_request.payload.update(self.payload)
107+
pb_request.recurrence.CopyFrom(self.recurrence.to_protobuf())
108+
109+
return pb_request

0 commit comments

Comments
 (0)