5
5
6
6
import asyncio
7
7
import logging
8
- from collections .abc import Callable , Iterable , Set
9
- from typing import Any , TypeVar
8
+ from collections .abc import AsyncIterator , Awaitable , Callable , Iterable , Set
9
+ from typing import Any , TypeVar , cast
10
+
11
+ import grpc .aio
12
+
13
+ # pylint: disable=no-name-in-module
14
+ from frequenz .api .common .components_pb2 import ComponentCategory as PbComponentCategory
15
+ from frequenz .api .common .metrics_pb2 import Bounds as PbBounds
16
+ from frequenz .api .microgrid .microgrid_pb2 import ComponentData as PbComponentData
17
+ from frequenz .api .microgrid .microgrid_pb2 import ComponentFilter as PbComponentFilter
18
+ from frequenz .api .microgrid .microgrid_pb2 import ComponentIdParam as PbComponentIdParam
19
+ from frequenz .api .microgrid .microgrid_pb2 import ComponentList as PbComponentList
20
+ from frequenz .api .microgrid .microgrid_pb2 import ConnectionFilter as PbConnectionFilter
21
+ from frequenz .api .microgrid .microgrid_pb2 import ConnectionList as PbConnectionList
22
+ from frequenz .api .microgrid .microgrid_pb2 import (
23
+ MicrogridMetadata as PbMicrogridMetadata ,
24
+ )
25
+ from frequenz .api .microgrid .microgrid_pb2 import SetBoundsParam as PbSetBoundsParam
26
+ from frequenz .api .microgrid .microgrid_pb2 import (
27
+ SetPowerActiveParam as PbSetPowerActiveParam ,
28
+ )
29
+ from frequenz .api .microgrid .microgrid_pb2_grpc import MicrogridStub
10
30
11
- import grpclib
12
- import grpclib .client
13
- from betterproto .lib .google import protobuf as pb_google
31
+ # pylint: enable=no-name-in-module
14
32
from frequenz .channels import Receiver
15
33
from frequenz .client .base import channel , retry , streaming
16
- from frequenz .microgrid .betterproto .frequenz .api import microgrid as pb_microgrid
17
- from frequenz .microgrid .betterproto .frequenz .api .common import (
18
- components as pb_components ,
19
- )
20
- from frequenz .microgrid .betterproto .frequenz .api .common import metrics as pb_metrics
34
+ from google .protobuf .empty_pb2 import Empty # pylint: disable=no-name-in-module
35
+ from google .protobuf .timestamp_pb2 import Timestamp # pylint: disable=no-name-in-module
21
36
22
37
from ._component import (
23
38
Component ,
35
50
)
36
51
from ._connection import Connection
37
52
from ._constants import RECEIVER_MAX_SIZE
38
- from ._exception import ClientError
53
+ from ._exception import ApiClientError
39
54
from ._metadata import Location , Metadata
40
55
41
56
DEFAULT_GRPC_CALL_TIMEOUT = 60.0
@@ -72,7 +87,7 @@ def __init__(
72
87
self ._server_url = server_url
73
88
"""The location of the microgrid API server as a URL."""
74
89
75
- self .api = pb_microgrid . MicrogridStub (channel .parse_grpc_uri (server_url ))
90
+ self .api = MicrogridStub (channel .parse_grpc_uri (server_url ))
76
91
"""The gRPC stub for the microgrid API."""
77
92
78
93
self ._broadcasters : dict [int , streaming .GrpcStreamBroadcaster [Any , Any ]] = {}
@@ -90,25 +105,29 @@ async def components(self) -> Iterable[Component]:
90
105
Iterator whose elements are all the components in the microgrid.
91
106
92
107
Raises:
93
- ClientError : If the are any errors communicating with the Microgrid API,
108
+ ApiClientError : If the are any errors communicating with the Microgrid API,
94
109
most likely a subclass of
95
110
[GrpcError][frequenz.client.microgrid.GrpcError].
96
111
"""
97
112
try :
98
- component_list = await self .api .list_components (
99
- pb_microgrid .ComponentFilter (),
100
- timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
113
+ # grpc.aio is missing types and mypy thinks this is not awaitable,
114
+ # but it is
115
+ component_list = await cast (
116
+ Awaitable [PbComponentList ],
117
+ self .api .ListComponents (
118
+ PbComponentFilter (),
119
+ timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
120
+ ),
101
121
)
102
- except grpclib . GRPCError as grpc_error :
103
- raise ClientError .from_grpc_error (
122
+ except grpc . aio . AioRpcError as grpc_error :
123
+ raise ApiClientError .from_grpc_error (
104
124
server_url = self ._server_url ,
105
- operation = "list_components " ,
125
+ operation = "ListComponents " ,
106
126
grpc_error = grpc_error ,
107
127
) from grpc_error
108
128
109
129
components_only = filter (
110
- lambda c : c .category
111
- is not pb_components .ComponentCategory .COMPONENT_CATEGORY_SENSOR ,
130
+ lambda c : c .category is not PbComponentCategory .COMPONENT_CATEGORY_SENSOR ,
112
131
component_list .components ,
113
132
)
114
133
result : Iterable [Component ] = map (
@@ -132,13 +151,16 @@ async def metadata(self) -> Metadata:
132
151
Returns:
133
152
the microgrid metadata.
134
153
"""
135
- microgrid_metadata : pb_microgrid . MicrogridMetadata | None = None
154
+ microgrid_metadata : PbMicrogridMetadata | None = None
136
155
try :
137
- microgrid_metadata = await self .api .get_microgrid_metadata (
138
- pb_google .Empty (),
139
- timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
156
+ microgrid_metadata = await cast (
157
+ Awaitable [PbMicrogridMetadata ],
158
+ self .api .GetMicrogridMetadata (
159
+ Empty (),
160
+ timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
161
+ ),
140
162
)
141
- except grpclib . GRPCError :
163
+ except grpc . aio . AioRpcError :
142
164
_logger .exception ("The microgrid metadata is not available." )
143
165
144
166
if not microgrid_metadata :
@@ -170,25 +192,28 @@ async def connections(
170
192
Microgrid connections matching the provided start and end filters.
171
193
172
194
Raises:
173
- ClientError : If the are any errors communicating with the Microgrid API,
195
+ ApiClientError : If the are any errors communicating with the Microgrid API,
174
196
most likely a subclass of
175
197
[GrpcError][frequenz.client.microgrid.GrpcError].
176
198
"""
177
- connection_filter = pb_microgrid .ConnectionFilter (
178
- starts = list (starts ), ends = list (ends )
179
- )
199
+ connection_filter = PbConnectionFilter (starts = starts , ends = ends )
180
200
try :
181
201
valid_components , all_connections = await asyncio .gather (
182
202
self .components (),
183
- self .api .list_connections (
184
- connection_filter ,
185
- timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
203
+ # grpc.aio is missing types and mypy thinks this is not
204
+ # awaitable, but it is
205
+ cast (
206
+ Awaitable [PbConnectionList ],
207
+ self .api .ListConnections (
208
+ connection_filter ,
209
+ timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
210
+ ),
186
211
),
187
212
)
188
- except grpclib . GRPCError as grpc_error :
189
- raise ClientError .from_grpc_error (
213
+ except grpc . aio . AioRpcError as grpc_error :
214
+ raise ApiClientError .from_grpc_error (
190
215
server_url = self ._server_url ,
191
- operation = "list_connections " ,
216
+ operation = "ListConnections " ,
192
217
grpc_error = grpc_error ,
193
218
) from grpc_error
194
219
# Filter out the components filtered in `components` method.
@@ -212,7 +237,7 @@ async def _new_component_data_receiver(
212
237
* ,
213
238
component_id : int ,
214
239
expected_category : ComponentCategory ,
215
- transform : Callable [[pb_microgrid . ComponentData ], _ComponentDataT ],
240
+ transform : Callable [[PbComponentData ], _ComponentDataT ],
216
241
maxsize : int ,
217
242
) -> Receiver [_ComponentDataT ]:
218
243
"""Return a new broadcaster receiver for a given `component_id`.
@@ -239,8 +264,13 @@ async def _new_component_data_receiver(
239
264
if broadcaster is None :
240
265
broadcaster = streaming .GrpcStreamBroadcaster (
241
266
f"raw-component-data-{ component_id } " ,
242
- lambda : self .api .stream_component_data (
243
- pb_microgrid .ComponentIdParam (id = component_id )
267
+ # We need to cast here because grpc says StreamComponentData is
268
+ # a grpc.CallIterator[PbComponentData] which is not an AsyncIterator,
269
+ # but it is a grpc.aio.UnaryStreamCall[..., PbComponentData], which it
270
+ # is.
271
+ lambda : cast (
272
+ AsyncIterator [PbComponentData ],
273
+ self .api .StreamComponentData (PbComponentIdParam (id = component_id )),
244
274
),
245
275
transform ,
246
276
retry_strategy = self ._retry_strategy ,
@@ -389,21 +419,22 @@ async def set_power(self, component_id: int, power_w: float) -> None:
389
419
power_w: power to set for the component.
390
420
391
421
Raises:
392
- ClientError : If the are any errors communicating with the Microgrid API,
422
+ ApiClientError : If the are any errors communicating with the Microgrid API,
393
423
most likely a subclass of
394
424
[GrpcError][frequenz.client.microgrid.GrpcError].
395
425
"""
396
426
try :
397
- await self .api .set_power_active (
398
- pb_microgrid .SetPowerActiveParam (
399
- component_id = component_id , power = power_w
427
+ await cast (
428
+ Awaitable [Empty ],
429
+ self .api .SetPowerActive (
430
+ PbSetPowerActiveParam (component_id = component_id , power = power_w ),
431
+ timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
400
432
),
401
- timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
402
433
)
403
- except grpclib . GRPCError as grpc_error :
404
- raise ClientError .from_grpc_error (
434
+ except grpc . aio . AioRpcError as grpc_error :
435
+ raise ApiClientError .from_grpc_error (
405
436
server_url = self ._server_url ,
406
- operation = "set_power_active " ,
437
+ operation = "SetPowerActive " ,
407
438
grpc_error = grpc_error ,
408
439
) from grpc_error
409
440
@@ -413,7 +444,7 @@ async def set_bounds(
413
444
lower : float ,
414
445
upper : float ,
415
446
) -> None :
416
- """Send `SetBoundsParam `s received from a channel to the Microgrid service.
447
+ """Send `PbSetBoundsParam `s received from a channel to the Microgrid service.
417
448
418
449
Args:
419
450
component_id: ID of the component to set bounds for.
@@ -423,7 +454,7 @@ async def set_bounds(
423
454
Raises:
424
455
ValueError: when upper bound is less than 0, or when lower bound is
425
456
greater than 0.
426
- ClientError : If the are any errors communicating with the Microgrid API,
457
+ ApiClientError : If the are any errors communicating with the Microgrid API,
427
458
most likely a subclass of
428
459
[GrpcError][frequenz.client.microgrid.GrpcError].
429
460
"""
@@ -432,21 +463,22 @@ async def set_bounds(
432
463
if lower > 0 :
433
464
raise ValueError (f"Lower bound { lower } must be less than or equal to 0." )
434
465
435
- target_metric = (
436
- pb_microgrid .SetBoundsParamTargetMetric .TARGET_METRIC_POWER_ACTIVE
437
- )
466
+ target_metric = PbSetBoundsParam .TargetMetric .TARGET_METRIC_POWER_ACTIVE
438
467
try :
439
- await self .api .add_inclusion_bounds (
440
- pb_microgrid .SetBoundsParam (
441
- component_id = component_id ,
442
- target_metric = target_metric ,
443
- bounds = pb_metrics .Bounds (lower = lower , upper = upper ),
468
+ await cast (
469
+ Awaitable [Timestamp ],
470
+ self .api .AddInclusionBounds (
471
+ PbSetBoundsParam (
472
+ component_id = component_id ,
473
+ target_metric = target_metric ,
474
+ bounds = PbBounds (lower = lower , upper = upper ),
475
+ ),
476
+ timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
444
477
),
445
- timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
446
478
)
447
- except grpclib . GRPCError as grpc_error :
448
- raise ClientError .from_grpc_error (
479
+ except grpc . aio . AioRpcError as grpc_error :
480
+ raise ApiClientError .from_grpc_error (
449
481
server_url = self ._server_url ,
450
- operation = "add_inclusion_bounds " ,
482
+ operation = "AddInclusionBounds " ,
451
483
grpc_error = grpc_error ,
452
484
) from grpc_error
0 commit comments