@@ -15,7 +15,7 @@ using namespace yandex::monitoring::api::v3;
15
15
16
16
namespace {
17
17
18
- Downsampling::GapFilling ParseGapFilling (const TString & fill)
18
+ Downsampling::GapFilling ParseGapFilling (const TString& fill)
19
19
{
20
20
if (fill.equal (" NULL" )) {
21
21
return Downsampling::GAP_FILLING_NULL;
@@ -29,7 +29,7 @@ Downsampling::GapFilling ParseGapFilling(const TString &fill)
29
29
return Downsampling::GAP_FILLING_UNSPECIFIED;
30
30
}
31
31
32
- Downsampling::GridAggregation ParseGridAggregation (const TString & aggregation)
32
+ Downsampling::GridAggregation ParseGridAggregation (const TString& aggregation)
33
33
{
34
34
if (aggregation.equal (" MAX" )) {
35
35
return Downsampling::GRID_AGGREGATION_MAX;
@@ -52,7 +52,7 @@ Downsampling::GridAggregation ParseGridAggregation(const TString &aggregation)
52
52
return Downsampling::GRID_AGGREGATION_UNSPECIFIED;
53
53
}
54
54
55
- MetricType ParseMetricType (const TString & type)
55
+ MetricType ParseMetricType (const TString& type)
56
56
{
57
57
if (type.equal (" DGAUGE" )) {
58
58
return MetricType::DGAUGE;
@@ -77,13 +77,14 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable
77
77
std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider
78
78
)
79
79
: DefaultReplica(" sas" )
80
- , DefaultPort(443 )
81
80
, Settings(std::move(settings))
82
81
, CredentialsProvider(credentialsProvider)
82
+ , GrpcClient(std::make_shared<NYdbGrpc::TGRpcClientLow>())
83
+ , HttpGateway(IHTTPGateway::Make())
83
84
{}
84
85
85
86
public:
86
- NThreading::TFuture<TListMetricsResult> ListMetrics (const TString & selectors, int pageSize, int page) override final
87
+ NThreading::TFuture<TListMetricsResult> ListMetrics (const TString& selectors, int pageSize, int page) override final
87
88
{
88
89
const auto request = BuildListMetricsRequest (selectors, pageSize, page);
89
90
@@ -92,30 +93,29 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable
92
93
93
94
auto resultPromise = NThreading::NewPromise<TListMetricsResult>();
94
95
95
- const auto httpGateway = IHTTPGateway::Make ();
96
-
97
96
std::weak_ptr<const TSolomonAccessorClient> weakSelf = shared_from_this ();
98
97
// hold context until reply
99
98
auto cb = [weakSelf, resultPromise](NYql::IHTTPGateway::TResult&& result) mutable
100
99
{
101
100
if (auto self = weakSelf.lock ()) {
102
101
resultPromise.SetValue (self->ProcessHttpResponse (std::move (result)));
102
+ } else {
103
+ resultPromise.SetValue (TListMetricsResult (" Client is being shutted down" ));
103
104
}
104
- resultPromise.SetValue (TListMetricsResult (" Client is being shutted down" ));
105
105
};
106
106
107
- httpGateway ->Download (
107
+ HttpGateway ->Download (
108
108
request,
109
109
headers,
110
110
0 ,
111
- 1ull << 30 ,
111
+ ListSizeLimit ,
112
112
std::move (cb)
113
113
);
114
114
115
115
return resultPromise.GetFuture ();
116
116
}
117
117
118
- NThreading::TFuture<TGetDataResult> GetData (const std::vector<TString> & selectors) override final
118
+ NThreading::TFuture<TGetDataResult> GetData (const std::vector<TString>& selectors) override final
119
119
{
120
120
const auto request = BuildGetDataRequest (selectors);
121
121
@@ -127,10 +127,9 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable
127
127
NYdbGrpc::TGRpcClientConfig grpcConf;
128
128
grpcConf.Locator = GetGrpcSolomonEndpoint ();
129
129
grpcConf.EnableSsl = Settings.GetUseSsl ();
130
- const auto grpcClient = std::make_unique<NYdbGrpc::TGRpcClientLow>();
131
- const auto connection = grpcClient->CreateGRpcServiceConnection <DataService>(grpcConf);
130
+ const auto connection = GrpcClient->CreateGRpcServiceConnection <DataService>(grpcConf);
132
131
133
- auto context = grpcClient ->CreateContext ();
132
+ auto context = GrpcClient ->CreateContext ();
134
133
if (!context) {
135
134
throw yexception () << " Client is being shutted down" ;
136
135
}
@@ -142,8 +141,9 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable
142
141
{
143
142
if (auto self = weakSelf.lock ()) {
144
143
resultPromise.SetValue (self->ProcessGrpcResponse (std::move (status), std::move (result)));
144
+ } else {
145
+ resultPromise.SetValue (TGetDataResult (" Client is being shutted down" ));
145
146
}
146
- resultPromise.SetValue (TGetDataResult (" Client is being shutted down" ));
147
147
};
148
148
149
149
connection->DoRequest <ReadRequest, ReadResponse>(
@@ -179,13 +179,10 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable
179
179
180
180
TString GetGrpcSolomonEndpoint () const
181
181
{
182
- return Settings.GetEndpoint () + std::to_string (DefaultPort) ;
182
+ return Settings.GetEndpoint () + " 443 " ;
183
183
}
184
184
185
- TString BuildListMetricsRequest (
186
- const TString &selectors,
187
- int pageSize,
188
- int page) const
185
+ TString BuildListMetricsRequest (const TString& selectors, int pageSize, int page) const
189
186
{
190
187
TUrlBuilder builder (GetHttpSolomonEndpoint ());
191
188
@@ -203,7 +200,7 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable
203
200
return builder.Build ();
204
201
}
205
202
206
- ReadRequest BuildGetDataRequest (const std::vector<TString> & selectors) const
203
+ ReadRequest BuildGetDataRequest (const std::vector<TString>& selectors) const
207
204
{
208
205
ReadRequest request;
209
206
@@ -214,15 +211,14 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable
214
211
215
212
if (Settings.GetDownsampling ().GetDisabled ()) {
216
213
request.mutable_downsampling ()->set_disabled (true );
217
- }
218
- else {
214
+ } else {
219
215
const auto downsampling = Settings.GetDownsampling ();
220
216
request.mutable_downsampling ()->set_grid_interval (downsampling.GetGridMs ());
221
217
request.mutable_downsampling ()->set_grid_aggregation (ParseGridAggregation (downsampling.GetAggregation ()));
222
218
request.mutable_downsampling ()->set_gap_filling (ParseGapFilling (downsampling.GetFill ()));
223
219
}
224
220
225
- for (const auto & metric : selectors) {
221
+ for (const auto & metric : selectors) {
226
222
auto query = request.mutable_queries ()->Add ();
227
223
*query->mutable_value () = TStringBuilder{} << " {" << metric << " }" ;
228
224
query->set_hidden (false );
@@ -233,7 +229,7 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable
233
229
234
230
TListMetricsResult ProcessHttpResponse (NYql::IHTTPGateway::TResult&& response) const
235
231
{
236
- std::vector<Metric > result;
232
+ std::vector<TMetric > result;
237
233
238
234
if (response.Content .HttpResponseCode < 200 || response.Content .HttpResponseCode >= 300 ) {
239
235
return TListMetricsResult (TStringBuilder{} << " Error while sending request to monitoring api: " << response.Content .data ());
@@ -250,11 +246,11 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable
250
246
return TListMetricsResult{" Invalid result from monitoring api" };
251
247
}
252
248
253
- for (const auto & metricObj : json[" result" ].GetArray ()) {
249
+ for (const auto & metricObj : json[" result" ].GetArray ()) {
254
250
try {
255
251
result.emplace_back (metricObj);
256
252
} catch (const std::exception & e) {
257
- return TStringBuilder{} << " Failed to parse response from monitoring: " << e.what ();
253
+ return TStringBuilder{} << " Failed to parse result response from monitoring: " << e.what ();
258
254
}
259
255
}
260
256
@@ -263,53 +259,59 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable
263
259
264
260
TGetDataResult ProcessGrpcResponse (NYdbGrpc::TGrpcStatus&& status, ReadResponse &&response) const
265
261
{
266
- std::vector<Timeseries > result;
262
+ std::vector<TTimeseries > result;
267
263
268
264
if (!status.Ok ()) {
269
265
return TStringBuilder{} << " Error while sending request to monitoring api: " << status.Msg ;
270
266
}
271
267
272
- for (const auto & responseValue : response.response_per_query ()) {
268
+ for (const auto & responseValue : response.response_per_query ()) {
273
269
YQL_ENSURE (responseValue.has_timeseries_vector ());
274
270
YQL_ENSURE (responseValue.timeseries_vector ().values_size () == 1 ); // one response per one set of selectors
275
271
276
- const auto & queryResponse = responseValue.timeseries_vector ().values ()[0 ];
272
+ const auto & queryResponse = responseValue.timeseries_vector ().values ()[0 ];
277
273
278
- std::vector<int64_t > timestampValues;
279
- std::vector<double > timeseriesValues;
274
+ std::vector<int64_t > timestamps;
275
+ std::vector<double > values;
276
+
277
+ timestamps.reserve (queryResponse.timestamp_values ().values_size ());
278
+ values.reserve (queryResponse.double_values ().values_size ());
280
279
281
280
for (int64_t value : queryResponse.timestamp_values ().values ()) {
282
- timestampValues .push_back (value);
281
+ timestamps .push_back (value);
283
282
}
284
283
for (double value : queryResponse.double_values ().values ()) {
285
- timeseriesValues .push_back (value);
284
+ values .push_back (value);
286
285
}
287
286
288
- result.emplace_back (queryResponse.name (), queryResponse.type (), std::move (timestampValues ), std::move (timeseriesValues ));
287
+ result.emplace_back (queryResponse.name (), queryResponse.type (), std::move (timestamps ), std::move (values ));
289
288
}
290
289
291
290
return std::move (result);
292
291
}
293
292
294
293
private:
295
294
const TString DefaultReplica;
296
- const int DefaultPort;
295
+ const size_t ListSizeLimit = 1ull << 30 ;
296
+
297
+ const NYql::NSo::NProto::TDqSolomonSource Settings;
298
+ const std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider;
297
299
298
- NYql::NSo::NProto::TDqSolomonSource Settings ;
299
- std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider ;
300
+ const std::shared_ptr<NYdbGrpc::TGRpcClientLow> GrpcClient ;
301
+ const std::shared_ptr<IHTTPGateway> HttpGateway ;
300
302
};
301
303
302
304
} // namespace
303
305
304
- Metric::Metric (const NJson::TJsonValue & value)
306
+ TMetric::TMetric (const NJson::TJsonValue& value)
305
307
{
306
308
YQL_ENSURE (value.IsMap ());
307
309
308
310
if (value.Has (" labels" )) {
309
311
auto labels = value[" labels" ];
310
312
YQL_ENSURE (labels.IsMap ());
311
313
312
- for (const auto & [key, value] : labels.GetMapSafe ()) {
314
+ for (const auto & [key, value] : labels.GetMapSafe ()) {
313
315
YQL_ENSURE (value.IsString ());
314
316
Labels[key] = value.GetString ();
315
317
}
@@ -326,22 +328,22 @@ Metric::Metric(const NJson::TJsonValue &value)
326
328
}
327
329
}
328
330
329
- ISolomonAccessorClient::TListMetricsResult::TListMetricsResult (const TString & error)
331
+ ISolomonAccessorClient::TListMetricsResult::TListMetricsResult (const TString& error)
330
332
: Success(false )
331
333
, ErrorMsg(error)
332
334
{}
333
335
334
- ISolomonAccessorClient::TListMetricsResult::TListMetricsResult (std::vector<Metric> && result)
336
+ ISolomonAccessorClient::TListMetricsResult::TListMetricsResult (std::vector<TMetric>&& result)
335
337
: Success(true )
336
338
, Result(std::move(result))
337
339
{}
338
340
339
- ISolomonAccessorClient::TGetDataResult::TGetDataResult (const TString & error)
341
+ ISolomonAccessorClient::TGetDataResult::TGetDataResult (const TString& error)
340
342
: Success(false )
341
343
, ErrorMsg(error)
342
344
{}
343
345
344
- ISolomonAccessorClient::TGetDataResult::TGetDataResult (std::vector<Timeseries> && result)
346
+ ISolomonAccessorClient::TGetDataResult::TGetDataResult (std::vector<TTimeseries>&& result)
345
347
: Success(true )
346
348
, Result(std::move(result))
347
349
{}
0 commit comments