Skip to content

Commit 7e72635

Browse files
evanevanevanevannnnblinkov
authored andcommitted
Solomon client library (#14911)
1 parent 5fa1021 commit 7e72635

File tree

8 files changed

+660
-1
lines changed

8 files changed

+660
-1
lines changed

ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
namespace NYql::NDq {
1717

1818
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolomonReadActor(
19-
NYql::NSo::NProto::TDqSolomonShard&& settings,
19+
NYql::NSo::NProto::TDqSolomonSource&& settings,
2020
ui64 inputIndex,
2121
TCollectStatsLevel statsLevel,
2222
const TTxId& txId,
Lines changed: 359 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,359 @@
1+
#include "solomon_accessor_client.h"
2+
3+
#include <library/cpp/protobuf/interop/cast.h>
4+
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
5+
#include <ydb/public/sdk/cpp/src/library/grpc/client/grpc_client_low.h>
6+
#include <yql/essentials/utils/url_builder.h>
7+
#include <yql/essentials/utils/yql_panic.h>
8+
9+
#include <ydb/library/yql/providers/solomon/solomon_accessor/grpc/solomon_accessor_pb.pb.h>
10+
#include <ydb/library/yql/providers/solomon/solomon_accessor/grpc/solomon_accessor_pb.grpc.pb.h>
11+
12+
namespace NYql::NSo {
13+
14+
using namespace yandex::monitoring::api::v3;
15+
16+
namespace {
17+
18+
Downsampling::GapFilling ParseGapFilling(const TString& fill)
19+
{
20+
if (fill == "NULL"sv) {
21+
return Downsampling::GAP_FILLING_NULL;
22+
}
23+
if (fill == "NONE"sv) {
24+
return Downsampling::GAP_FILLING_NONE;
25+
}
26+
if (fill == "PREVIOUS"sv) {
27+
return Downsampling::GAP_FILLING_PREVIOUS;
28+
}
29+
return Downsampling::GAP_FILLING_UNSPECIFIED;
30+
}
31+
32+
Downsampling::GridAggregation ParseGridAggregation(const TString& aggregation)
33+
{
34+
if (aggregation == "MAX"sv) {
35+
return Downsampling::GRID_AGGREGATION_MAX;
36+
}
37+
if (aggregation == "MIN"sv) {
38+
return Downsampling::GRID_AGGREGATION_MIN;
39+
}
40+
if (aggregation == "SUM"sv) {
41+
return Downsampling::GRID_AGGREGATION_SUM;
42+
}
43+
if (aggregation == "AVG"sv) {
44+
return Downsampling::GRID_AGGREGATION_AVG;
45+
}
46+
if (aggregation == "LAST"sv) {
47+
return Downsampling::GRID_AGGREGATION_LAST;
48+
}
49+
if (aggregation == "COUNT"sv) {
50+
return Downsampling::GRID_AGGREGATION_COUNT;
51+
}
52+
return Downsampling::GRID_AGGREGATION_UNSPECIFIED;
53+
}
54+
55+
MetricType ParseMetricType(const TString& type)
56+
{
57+
if (type == "DGAUGE"sv) {
58+
return MetricType::DGAUGE;
59+
}
60+
if (type == "IGAUGE"sv) {
61+
return MetricType::IGAUGE;
62+
}
63+
if (type == "COUNTER"sv) {
64+
return MetricType::COUNTER;
65+
}
66+
if (type == "RATE"sv) {
67+
return MetricType::RATE;
68+
}
69+
return MetricType::METRIC_TYPE_UNSPECIFIED;
70+
}
71+
72+
class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable_shared_from_this<TSolomonAccessorClient>
73+
{
74+
public:
75+
TSolomonAccessorClient(
76+
NYql::NSo::NProto::TDqSolomonSource&& settings,
77+
std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider
78+
)
79+
: DefaultReplica("sas")
80+
, Settings(std::move(settings))
81+
, CredentialsProvider(credentialsProvider)
82+
, GrpcClient(std::make_shared<NYdbGrpc::TGRpcClientLow>())
83+
, HttpGateway(IHTTPGateway::Make())
84+
{}
85+
86+
public:
87+
NThreading::TFuture<TListMetricsResult> ListMetrics(const TString& selectors, int pageSize, int page) override final
88+
{
89+
const auto request = BuildListMetricsRequest(selectors, pageSize, page);
90+
91+
IHTTPGateway::THeaders headers;
92+
headers.Fields.emplace_back(TStringBuilder{} << "Authorization: " << GetAuthInfo());
93+
94+
auto resultPromise = NThreading::NewPromise<TListMetricsResult>();
95+
96+
std::weak_ptr<const TSolomonAccessorClient> weakSelf = shared_from_this();
97+
// hold context until reply
98+
auto cb = [weakSelf, resultPromise](NYql::IHTTPGateway::TResult&& result) mutable
99+
{
100+
if (auto self = weakSelf.lock()) {
101+
resultPromise.SetValue(self->ProcessHttpResponse(std::move(result)));
102+
} else {
103+
resultPromise.SetValue(TListMetricsResult("Client has been shut down"));
104+
}
105+
};
106+
107+
HttpGateway->Download(
108+
request,
109+
headers,
110+
0,
111+
ListSizeLimit,
112+
std::move(cb)
113+
);
114+
115+
return resultPromise.GetFuture();
116+
}
117+
118+
NThreading::TFuture<TGetDataResult> GetData(const std::vector<TString>& selectors) override final
119+
{
120+
const auto request = BuildGetDataRequest(selectors);
121+
122+
NYdbGrpc::TCallMeta callMeta;
123+
callMeta.Aux.emplace_back("authorization", GetAuthInfo());
124+
125+
auto resultPromise = NThreading::NewPromise<TGetDataResult>();
126+
127+
NYdbGrpc::TGRpcClientConfig grpcConf;
128+
grpcConf.Locator = GetGrpcSolomonEndpoint();
129+
grpcConf.EnableSsl = Settings.GetUseSsl();
130+
const auto connection = GrpcClient->CreateGRpcServiceConnection<DataService>(grpcConf);
131+
132+
auto context = GrpcClient->CreateContext();
133+
if (!context) {
134+
throw yexception() << "Client is being shutted down";
135+
}
136+
std::weak_ptr<const TSolomonAccessorClient> weakSelf = shared_from_this();
137+
// hold context until reply
138+
auto cb = [weakSelf, resultPromise, context](
139+
NYdbGrpc::TGrpcStatus&& status,
140+
ReadResponse&& result) mutable
141+
{
142+
if (auto self = weakSelf.lock()) {
143+
resultPromise.SetValue(self->ProcessGrpcResponse(std::move(status), std::move(result)));
144+
} else {
145+
resultPromise.SetValue(TGetDataResult("Client has been shut down"));
146+
}
147+
};
148+
149+
connection->DoRequest<ReadRequest, ReadResponse>(
150+
std::move(request),
151+
std::move(cb),
152+
&DataService::Stub::AsyncRead,
153+
callMeta,
154+
context.get()
155+
);
156+
157+
return resultPromise.GetFuture();
158+
}
159+
160+
private:
161+
TString GetAuthInfo() const
162+
{
163+
const TString authToken = CredentialsProvider->GetAuthInfo();
164+
165+
switch (Settings.GetClusterType()) {
166+
case NSo::NProto::ESolomonClusterType::CT_SOLOMON:
167+
return "OAuth " + authToken;
168+
case NSo::NProto::ESolomonClusterType::CT_MONITORING:
169+
return "Bearer " + authToken;
170+
default:
171+
Y_ENSURE(false, "Invalid cluster type " << ToString<ui32>(Settings.GetClusterType()));
172+
}
173+
}
174+
175+
TString GetHttpSolomonEndpoint() const
176+
{
177+
return (Settings.GetUseSsl() ? "https://" : "http://") + Settings.GetEndpoint();
178+
}
179+
180+
TString GetGrpcSolomonEndpoint() const
181+
{
182+
return Settings.GetEndpoint() + ":443";
183+
}
184+
185+
TString BuildListMetricsRequest(const TString& selectors, int pageSize, int page) const
186+
{
187+
TUrlBuilder builder(GetHttpSolomonEndpoint());
188+
189+
builder.AddPathComponent("api");
190+
builder.AddPathComponent("v2");
191+
builder.AddPathComponent("projects");
192+
builder.AddPathComponent(Settings.GetProject());
193+
builder.AddPathComponent("sensors");
194+
195+
builder.AddUrlParam("selectors", selectors);
196+
builder.AddUrlParam("forceCluster", DefaultReplica);
197+
builder.AddUrlParam("pageSize", std::to_string(pageSize));
198+
builder.AddUrlParam("page", std::to_string(page));
199+
200+
return builder.Build();
201+
}
202+
203+
ReadRequest BuildGetDataRequest(const std::vector<TString>& selectors) const
204+
{
205+
ReadRequest request;
206+
207+
request.mutable_container()->set_project_id(Settings.GetProject());
208+
*request.mutable_from_time() = NProtoInterop::CastToProto(TInstant::FromValue(Settings.GetFrom()));
209+
*request.mutable_to_time() = NProtoInterop::CastToProto(TInstant::FromValue(Settings.GetTo()));
210+
*request.mutable_force_replica() = DefaultReplica;
211+
212+
if (Settings.GetDownsampling().GetDisabled()) {
213+
request.mutable_downsampling()->set_disabled(true);
214+
} else {
215+
const auto downsampling = Settings.GetDownsampling();
216+
request.mutable_downsampling()->set_grid_interval(downsampling.GetGridMs());
217+
request.mutable_downsampling()->set_grid_aggregation(ParseGridAggregation(downsampling.GetAggregation()));
218+
request.mutable_downsampling()->set_gap_filling(ParseGapFilling(downsampling.GetFill()));
219+
}
220+
221+
for (const auto& metric : selectors) {
222+
auto query = request.mutable_queries()->Add();
223+
*query->mutable_value() = TStringBuilder{} << "{" << metric << "}";
224+
query->set_hidden(false);
225+
}
226+
227+
return request;
228+
}
229+
230+
TListMetricsResult ProcessHttpResponse(NYql::IHTTPGateway::TResult&& response) const
231+
{
232+
std::vector<TMetric> result;
233+
234+
if (response.Content.HttpResponseCode < 200 || response.Content.HttpResponseCode >= 300) {
235+
return TListMetricsResult(TStringBuilder{} << "Error while sending request to monitoring api: " << response.Content.data());
236+
}
237+
238+
NJson::TJsonValue json;
239+
try {
240+
NJson::ReadJsonTree(response.Content.data(), &json, /*throwOnError*/ true);
241+
} catch (const std::exception& e) {
242+
return TStringBuilder{} << "Failed to parse response from monitoring api: " << e.what();
243+
}
244+
245+
if (!json.IsMap() || !json.Has("result")) {
246+
return TListMetricsResult{"Invalid result from monitoring api"};
247+
}
248+
249+
for (const auto& metricObj : json["result"].GetArray()) {
250+
try {
251+
result.emplace_back(metricObj);
252+
} catch (const std::exception& e) {
253+
return TStringBuilder{} << "Failed to parse result response from monitoring: " << e.what();
254+
}
255+
}
256+
257+
return std::move(result);
258+
}
259+
260+
TGetDataResult ProcessGrpcResponse(NYdbGrpc::TGrpcStatus&& status, ReadResponse&& response) const
261+
{
262+
std::vector<TTimeseries> result;
263+
264+
if (!status.Ok()) {
265+
return TStringBuilder{} << "Error while sending request to monitoring api: " << status.Msg;
266+
}
267+
268+
for (const auto& responseValue : response.response_per_query()) {
269+
YQL_ENSURE(responseValue.has_timeseries_vector());
270+
YQL_ENSURE(responseValue.timeseries_vector().values_size() == 1); // one response per one set of selectors
271+
272+
const auto& queryResponse = responseValue.timeseries_vector().values()[0];
273+
274+
std::vector<int64_t> timestamps;
275+
std::vector<double> values;
276+
277+
timestamps.reserve(queryResponse.timestamp_values().values_size());
278+
values.reserve(queryResponse.double_values().values_size());
279+
280+
for (int64_t value : queryResponse.timestamp_values().values()) {
281+
timestamps.push_back(value);
282+
}
283+
for (double value : queryResponse.double_values().values()) {
284+
values.push_back(value);
285+
}
286+
287+
result.emplace_back(queryResponse.name(), queryResponse.type(), std::move(timestamps), std::move(values));
288+
}
289+
290+
return std::move(result);
291+
}
292+
293+
private:
294+
const TString DefaultReplica;
295+
const size_t ListSizeLimit = 1ull << 30;
296+
297+
const NYql::NSo::NProto::TDqSolomonSource Settings;
298+
const std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider;
299+
300+
const std::shared_ptr<NYdbGrpc::TGRpcClientLow> GrpcClient;
301+
const std::shared_ptr<IHTTPGateway> HttpGateway;
302+
};
303+
304+
} // namespace
305+
306+
TMetric::TMetric(const NJson::TJsonValue& value)
307+
{
308+
YQL_ENSURE(value.IsMap());
309+
310+
if (value.Has("labels")) {
311+
auto labels = value["labels"];
312+
YQL_ENSURE(labels.IsMap());
313+
314+
for (const auto& [key, value] : labels.GetMapSafe()) {
315+
YQL_ENSURE(value.IsString());
316+
Labels[key] = value.GetString();
317+
}
318+
}
319+
320+
if (value.Has("type")) {
321+
YQL_ENSURE(value["type"].IsString());
322+
Type = ParseMetricType(value["type"].GetString());
323+
}
324+
325+
if (value.Has("createdAt")) {
326+
YQL_ENSURE(value["createdAt"].IsString());
327+
CreatedAt = value["createdAt"].GetString();
328+
}
329+
}
330+
331+
ISolomonAccessorClient::TListMetricsResult::TListMetricsResult(const TString& error)
332+
: Success(false)
333+
, ErrorMsg(error)
334+
{}
335+
336+
ISolomonAccessorClient::TListMetricsResult::TListMetricsResult(std::vector<TMetric>&& result)
337+
: Success(true)
338+
, Result(std::move(result))
339+
{}
340+
341+
ISolomonAccessorClient::TGetDataResult::TGetDataResult(const TString& error)
342+
: Success(false)
343+
, ErrorMsg(error)
344+
{}
345+
346+
ISolomonAccessorClient::TGetDataResult::TGetDataResult(std::vector<TTimeseries>&& result)
347+
: Success(true)
348+
, Result(std::move(result))
349+
{}
350+
351+
ISolomonAccessorClient::TPtr
352+
ISolomonAccessorClient::Make(
353+
NYql::NSo::NProto::TDqSolomonSource&& settings,
354+
std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider)
355+
{
356+
return std::make_shared<TSolomonAccessorClient>(std::move(settings), credentialsProvider);
357+
}
358+
359+
} // namespace NYql::NSo

0 commit comments

Comments
 (0)