Skip to content

Commit 8a928ea

Browse files
Merge 74d7882 into 7954aba
2 parents 7954aba + 74d7882 commit 8a928ea

File tree

8 files changed

+658
-1
lines changed

8 files changed

+658
-1
lines changed
Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
#include "solomon_accessor_client.h"
2+
3+
#include <library/cpp/protobuf/interop/cast.h>
4+
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
5+
#include <ydb/public/sdk/cpp/src/library/grpc/client/grpc_client_low.h>
6+
#include <yql/essentials/utils/url_builder.h>
7+
#include <yql/essentials/utils/yql_panic.h>
8+
9+
#include <ydb/library/yql/providers/common/solomon_accessor/grpc/solomon_accessor_pb.pb.h>
10+
#include <ydb/library/yql/providers/common/solomon_accessor/grpc/solomon_accessor_pb.grpc.pb.h>
11+
12+
namespace NYql::NSo {
13+
14+
using namespace yandex::monitoring::api::v3;
15+
16+
namespace {
17+
18+
Downsampling::GapFilling ParseGapFilling(const TString &fill)
19+
{
20+
if (fill.equal("NULL")) {
21+
return Downsampling::GAP_FILLING_NULL;
22+
}
23+
if (fill.equal("NONE")) {
24+
return Downsampling::GAP_FILLING_NONE;
25+
}
26+
if (fill.equal("PREVIOUS")) {
27+
return Downsampling::GAP_FILLING_PREVIOUS;
28+
}
29+
return Downsampling::GAP_FILLING_UNSPECIFIED;
30+
}
31+
32+
Downsampling::GridAggregation ParseGridAggregation(const TString &aggregation)
33+
{
34+
if (aggregation.equal("MAX")) {
35+
return Downsampling::GRID_AGGREGATION_MAX;
36+
}
37+
if (aggregation.equal("MIN")) {
38+
return Downsampling::GRID_AGGREGATION_MIN;
39+
}
40+
if (aggregation.equal("SUM")) {
41+
return Downsampling::GRID_AGGREGATION_SUM;
42+
}
43+
if (aggregation.equal("AVG")) {
44+
return Downsampling::GRID_AGGREGATION_AVG;
45+
}
46+
if (aggregation.equal("LAST")) {
47+
return Downsampling::GRID_AGGREGATION_LAST;
48+
}
49+
if (aggregation.equal("COUNT")) {
50+
return Downsampling::GRID_AGGREGATION_COUNT;
51+
}
52+
return Downsampling::GRID_AGGREGATION_UNSPECIFIED;
53+
}
54+
55+
MetricType ParseMetricType(const TString &type)
56+
{
57+
if (type.equal("DGAUGE")) {
58+
return MetricType::DGAUGE;
59+
}
60+
if (type.equal("IGAUGE")) {
61+
return MetricType::IGAUGE;
62+
}
63+
if (type.equal("COUNTER")) {
64+
return MetricType::COUNTER;
65+
}
66+
if (type.equal("RATE")) {
67+
return MetricType::RATE;
68+
}
69+
return MetricType::METRIC_TYPE_UNSPECIFIED;
70+
}
71+
72+
class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable_shared_from_this<TSolomonAccessorClient>
73+
{
74+
public:
75+
TSolomonAccessorClient(
76+
NYql::NSo::NProto::TDqSolomonSource&& settings,
77+
std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider
78+
)
79+
: DefaultReplica("sas")
80+
, DefaultPort(443)
81+
, Settings(std::move(settings))
82+
, CredentialsProvider(credentialsProvider)
83+
{}
84+
85+
public:
86+
NThreading::TFuture<TListMetricsResult> ListMetrics(const TString &selectors, int pageSize, int page) override final
87+
{
88+
const auto request = BuildListMetricsRequest(selectors, pageSize, page);
89+
90+
IHTTPGateway::THeaders headers;
91+
headers.Fields.emplace_back(TStringBuilder{} << "Authorization: " << GetAuthInfo());
92+
93+
auto resultPromise = NThreading::NewPromise<TListMetricsResult>();
94+
95+
const auto httpGateway = IHTTPGateway::Make();
96+
97+
std::weak_ptr<const TSolomonAccessorClient> weakSelf = shared_from_this();
98+
// hold context until reply
99+
auto cb = [weakSelf, resultPromise](NYql::IHTTPGateway::TResult&& result) mutable
100+
{
101+
if (auto self = weakSelf.lock()) {
102+
resultPromise.SetValue(self->ProcessHttpResponse(std::move(result)));
103+
}
104+
resultPromise.SetValue(TListMetricsResult("Client is being shutted down"));
105+
};
106+
107+
httpGateway->Download(
108+
request,
109+
headers,
110+
0,
111+
1ull << 30,
112+
std::move(cb)
113+
);
114+
115+
return resultPromise.GetFuture();
116+
}
117+
118+
NThreading::TFuture<TGetDataResult> GetData(const std::vector<TString> &selectors) override final
119+
{
120+
const auto request = BuildGetDataRequest(selectors);
121+
122+
NYdbGrpc::TCallMeta callMeta;
123+
callMeta.Aux.emplace_back("authorization", GetAuthInfo());
124+
125+
auto resultPromise = NThreading::NewPromise<TGetDataResult>();
126+
127+
NYdbGrpc::TGRpcClientConfig grpcConf;
128+
grpcConf.Locator = GetGrpcSolomonEndpoint();
129+
grpcConf.EnableSsl = Settings.GetUseSsl();
130+
const auto grpcClient = std::make_unique<NYdbGrpc::TGRpcClientLow>();
131+
const auto connection = grpcClient->CreateGRpcServiceConnection<DataService>(grpcConf);
132+
133+
auto context = grpcClient->CreateContext();
134+
if (!context) {
135+
throw yexception() << "Client is being shutted down";
136+
}
137+
std::weak_ptr<const TSolomonAccessorClient> weakSelf = shared_from_this();
138+
// hold context until reply
139+
auto cb = [weakSelf, resultPromise, context](
140+
NYdbGrpc::TGrpcStatus&& status,
141+
ReadResponse&& result) mutable
142+
{
143+
if (auto self = weakSelf.lock()) {
144+
resultPromise.SetValue(self->ProcessGrpcResponse(std::move(status), std::move(result)));
145+
}
146+
resultPromise.SetValue(TGetDataResult("Client is being shutted down"));
147+
};
148+
149+
connection->DoRequest<ReadRequest, ReadResponse>(
150+
std::move(request),
151+
std::move(cb),
152+
&DataService::Stub::AsyncRead,
153+
callMeta,
154+
context.get()
155+
);
156+
157+
return resultPromise.GetFuture();
158+
}
159+
160+
private:
161+
TString GetAuthInfo() const
162+
{
163+
const TString authToken = CredentialsProvider->GetAuthInfo();
164+
165+
switch (Settings.GetClusterType()) {
166+
case NSo::NProto::ESolomonClusterType::CT_SOLOMON:
167+
return "OAuth " + authToken;
168+
case NSo::NProto::ESolomonClusterType::CT_MONITORING:
169+
return "Bearer " + authToken;
170+
default:
171+
Y_ENSURE(false, "Invalid cluster type " << ToString<ui32>(Settings.GetClusterType()));
172+
}
173+
}
174+
175+
TString GetHttpSolomonEndpoint() const
176+
{
177+
return (Settings.GetUseSsl() ? "https://" : "http://") + Settings.GetEndpoint();
178+
}
179+
180+
TString GetGrpcSolomonEndpoint() const
181+
{
182+
return Settings.GetEndpoint() + std::to_string(DefaultPort);
183+
}
184+
185+
TString BuildListMetricsRequest(
186+
const TString &selectors,
187+
int pageSize,
188+
int page) const
189+
{
190+
TUrlBuilder builder(GetHttpSolomonEndpoint());
191+
192+
builder.AddPathComponent("api");
193+
builder.AddPathComponent("v2");
194+
builder.AddPathComponent("projects");
195+
builder.AddPathComponent(Settings.GetProject());
196+
builder.AddPathComponent("sensors");
197+
198+
builder.AddUrlParam("selectors", selectors);
199+
builder.AddUrlParam("forceCluster", DefaultReplica);
200+
builder.AddUrlParam("pageSize", std::to_string(pageSize));
201+
builder.AddUrlParam("page", std::to_string(page));
202+
203+
return builder.Build();
204+
}
205+
206+
ReadRequest BuildGetDataRequest(const std::vector<TString> &selectors) const
207+
{
208+
ReadRequest request;
209+
210+
request.mutable_container()->set_project_id(Settings.GetProject());
211+
*request.mutable_from_time() = NProtoInterop::CastToProto(TInstant::FromValue(Settings.GetFrom()));
212+
*request.mutable_to_time() = NProtoInterop::CastToProto(TInstant::FromValue(Settings.GetTo()));
213+
*request.mutable_force_replica() = DefaultReplica;
214+
215+
if (Settings.GetDownsampling().GetDisabled()) {
216+
request.mutable_downsampling()->set_disabled(true);
217+
}
218+
else {
219+
const auto downsampling = Settings.GetDownsampling();
220+
request.mutable_downsampling()->set_grid_interval(downsampling.GetGridMs());
221+
request.mutable_downsampling()->set_grid_aggregation(ParseGridAggregation(downsampling.GetAggregation()));
222+
request.mutable_downsampling()->set_gap_filling(ParseGapFilling(downsampling.GetFill()));
223+
}
224+
225+
for (const auto &metric : selectors) {
226+
auto query = request.mutable_queries()->Add();
227+
*query->mutable_value() = TStringBuilder{} << "{" << metric << "}";
228+
query->set_hidden(false);
229+
}
230+
231+
return request;
232+
}
233+
234+
TListMetricsResult ProcessHttpResponse(NYql::IHTTPGateway::TResult&& response) const
235+
{
236+
std::vector<Metric> result;
237+
238+
if (response.Content.HttpResponseCode < 200 || response.Content.HttpResponseCode >= 300) {
239+
return TListMetricsResult(TStringBuilder{} << "Error while sending request to monitoring api: " << response.Content.data());
240+
}
241+
242+
NJson::TJsonValue json;
243+
try {
244+
NJson::ReadJsonTree(response.Content.data(), &json, /*throwOnError*/ true);
245+
} catch (const std::exception& e) {
246+
return TStringBuilder{} << "Failed to parse response from monitoring api: " << e.what();
247+
}
248+
249+
if (!json.IsMap() || !json.Has("result")) {
250+
return TListMetricsResult{"Invalid result from monitoring api"};
251+
}
252+
253+
for (const auto &metricObj : json["result"].GetArray()) {
254+
try {
255+
result.emplace_back(metricObj);
256+
} catch (const std::exception& e) {
257+
return TStringBuilder{} << "Failed to parse response from monitoring: " << e.what();
258+
}
259+
}
260+
261+
return std::move(result);
262+
}
263+
264+
TGetDataResult ProcessGrpcResponse(NYdbGrpc::TGrpcStatus&& status, ReadResponse &&response) const
265+
{
266+
std::vector<Timeseries> result;
267+
268+
if (!status.Ok()) {
269+
return TStringBuilder{} << "Error while sending request to monitoring api: " << status.Msg;
270+
}
271+
272+
for (const auto &responseValue : response.response_per_query()) {
273+
YQL_ENSURE(responseValue.has_timeseries_vector());
274+
YQL_ENSURE(responseValue.timeseries_vector().values_size() == 1); // one response per one set of selectors
275+
276+
const auto &queryResponse = responseValue.timeseries_vector().values()[0];
277+
278+
std::vector<int64_t> timestampValues;
279+
std::vector<double> timeseriesValues;
280+
281+
for (int64_t value : queryResponse.timestamp_values().values()) {
282+
timestampValues.push_back(value);
283+
}
284+
for (double value : queryResponse.double_values().values()) {
285+
timeseriesValues.push_back(value);
286+
}
287+
288+
result.emplace_back(queryResponse.name(), queryResponse.type(), std::move(timestampValues), std::move(timeseriesValues));
289+
}
290+
291+
return std::move(result);
292+
}
293+
294+
private:
295+
const TString DefaultReplica;
296+
const int DefaultPort;
297+
298+
NYql::NSo::NProto::TDqSolomonSource Settings;
299+
std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider;
300+
};
301+
302+
} // namespace
303+
304+
Metric::Metric(const NJson::TJsonValue &value)
305+
{
306+
YQL_ENSURE(value.IsMap());
307+
308+
if (value.Has("labels")) {
309+
auto labels = value["labels"];
310+
YQL_ENSURE(labels.IsMap());
311+
312+
for (const auto &[key, value] : labels.GetMapSafe()) {
313+
YQL_ENSURE(value.IsString());
314+
Labels[key] = value.GetString();
315+
}
316+
}
317+
318+
if (value.Has("type")) {
319+
YQL_ENSURE(value["type"].IsString());
320+
Type = ParseMetricType(value["type"].GetString());
321+
}
322+
323+
if (value.Has("createdAt")) {
324+
YQL_ENSURE(value["createdAt"].IsString());
325+
CreatedAt = value["createdAt"].GetString();
326+
}
327+
}
328+
329+
ISolomonAccessorClient::TListMetricsResult::TListMetricsResult(const TString &error)
330+
: Success(false)
331+
, ErrorMsg(error)
332+
{}
333+
334+
ISolomonAccessorClient::TListMetricsResult::TListMetricsResult(std::vector<Metric> &&result)
335+
: Success(true)
336+
, Result(std::move(result))
337+
{}
338+
339+
ISolomonAccessorClient::TGetDataResult::TGetDataResult(const TString &error)
340+
: Success(false)
341+
, ErrorMsg(error)
342+
{}
343+
344+
ISolomonAccessorClient::TGetDataResult::TGetDataResult(std::vector<Timeseries> &&result)
345+
: Success(true)
346+
, Result(std::move(result))
347+
{}
348+
349+
ISolomonAccessorClient::TPtr
350+
ISolomonAccessorClient::Make(
351+
NYql::NSo::NProto::TDqSolomonSource&& settings,
352+
std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider)
353+
{
354+
return std::make_shared<TSolomonAccessorClient>(std::move(settings), credentialsProvider);
355+
}
356+
357+
} // namespace NYql::NSo

0 commit comments

Comments
 (0)