Skip to content

Commit d683209

Browse files
Merge 6251288 into d2ecf39
2 parents d2ecf39 + 6251288 commit d683209

File tree

8 files changed

+620
-1
lines changed

8 files changed

+620
-1
lines changed
Lines changed: 333 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
#include "solomon_accessor_client.h"
2+
3+
#include <library/cpp/http/simple/http_client.h>
4+
#include <library/cpp/protobuf/interop/cast.h>
5+
#include <library/cpp/threading/future/core/future.h>
6+
#include <ydb/public/sdk/cpp/src/library/grpc/client/grpc_client_low.h>
7+
#include <yql/essentials/utils/url_builder.h>
8+
#include <yql/essentials/utils/yql_panic.h>
9+
10+
#include <ydb/library/yql/providers/common/solomon_accessor/grpc/solomon_accessor_pb.pb.h>
11+
#include <ydb/library/yql/providers/common/solomon_accessor/grpc/solomon_accessor_pb.grpc.pb.h>
12+
13+
namespace NYql::NSo {
14+
15+
using namespace yandex::monitoring::api::v3;
16+
17+
namespace {
18+
19+
Downsampling::GapFilling ParseGapFilling(const TString &fill)
20+
{
21+
if (fill.equal("NULL")) {
22+
return Downsampling::GAP_FILLING_NULL;
23+
}
24+
if (fill.equal("NONE")) {
25+
return Downsampling::GAP_FILLING_NONE;
26+
}
27+
if (fill.equal("PREVIOUS")) {
28+
return Downsampling::GAP_FILLING_PREVIOUS;
29+
}
30+
return Downsampling::GAP_FILLING_UNSPECIFIED;
31+
}
32+
33+
Downsampling::GridAggregation ParseGridAggregation(const TString &aggregation)
34+
{
35+
if (aggregation.equal("MAX")) {
36+
return Downsampling::GRID_AGGREGATION_MAX;
37+
}
38+
if (aggregation.equal("MIN")) {
39+
return Downsampling::GRID_AGGREGATION_MIN;
40+
}
41+
if (aggregation.equal("SUM")) {
42+
return Downsampling::GRID_AGGREGATION_SUM;
43+
}
44+
if (aggregation.equal("AVG")) {
45+
return Downsampling::GRID_AGGREGATION_AVG;
46+
}
47+
if (aggregation.equal("LAST")) {
48+
return Downsampling::GRID_AGGREGATION_LAST;
49+
}
50+
if (aggregation.equal("COUNT")) {
51+
return Downsampling::GRID_AGGREGATION_COUNT;
52+
}
53+
return Downsampling::GRID_AGGREGATION_UNSPECIFIED;
54+
}
55+
56+
MetricType ParseMetricType(const TString &type)
57+
{
58+
if (type.equal("DGAUGE")) {
59+
return MetricType::DGAUGE;
60+
}
61+
if (type.equal("IGAUGE")) {
62+
return MetricType::IGAUGE;
63+
}
64+
if (type.equal("COUNTER")) {
65+
return MetricType::COUNTER;
66+
}
67+
if (type.equal("RATE")) {
68+
return MetricType::RATE;
69+
}
70+
return MetricType::METRIC_TYPE_UNSPECIFIED;
71+
}
72+
73+
class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable_shared_from_this<TSolomonAccessorClient>
74+
{
75+
public:
76+
using THeaders = TKeepAliveHttpClient::THeaders;
77+
using THttpCode = TKeepAliveHttpClient::THttpCode;
78+
79+
TSolomonAccessorClient(
80+
NYql::NSo::NProto::TDqSolomonSource&& settings,
81+
std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider
82+
)
83+
: DefaultReplica("sas")
84+
, DefaultPort(443)
85+
, Settings(std::move(settings))
86+
, CredentialsProvider(credentialsProvider)
87+
{}
88+
89+
public:
90+
TMaybe<TString> ListMetrics(
91+
const TString &selectors,
92+
int pageSize,
93+
int page,
94+
TVector<Metric> &result) override final
95+
{
96+
const auto request = BuildListMetricsRequest(selectors, pageSize, page);
97+
98+
THeaders headers;
99+
headers["Authorization"] = GetAuthInfo();
100+
101+
TStringStream str;
102+
const auto httpClient = std::make_unique<TKeepAliveHttpClient>(GetHttpSolomonEndpoint(), DefaultPort);
103+
const auto retCode = httpClient->DoGet(request, &str, headers);
104+
105+
return ProcessHttpResponse(retCode, str.Str(), result);
106+
}
107+
108+
TMaybe<TString> GetData(
109+
const std::vector<TString> &selectors,
110+
std::vector<Timeseries> &results) override final
111+
{
112+
const auto request = BuildGetDataRequest(selectors);
113+
114+
NYdbGrpc::TCallMeta callMeta;
115+
callMeta.Aux.emplace_back("authorization", GetAuthInfo());
116+
117+
auto resultPromise = NThreading::NewPromise<TMaybe<TString>>();
118+
119+
NYdbGrpc::TGRpcClientConfig grpcConf;
120+
grpcConf.Locator = GetGrpcSolomonEndpoint();
121+
grpcConf.EnableSsl = Settings.GetUseSsl();
122+
const auto grpcClient = std::make_unique<NYdbGrpc::TGRpcClientLow>();
123+
const auto connection = grpcClient->CreateGRpcServiceConnection<DataService>(grpcConf);
124+
125+
auto context = grpcClient->CreateContext();
126+
if (!context) {
127+
throw yexception() << "Client is being shutted down";
128+
}
129+
std::weak_ptr<const TSolomonAccessorClient> weakSelf = shared_from_this();
130+
// hold context until reply
131+
auto cb = [weakSelf, resultPromise, context, &results](
132+
NYdbGrpc::TGrpcStatus&& status,
133+
ReadResponse&& result) mutable
134+
{
135+
if (auto self = weakSelf.lock()) {
136+
resultPromise.SetValue(self->ProcessGrpcResponse(std::move(status), std::move(result), results));
137+
}
138+
resultPromise.SetValue({});
139+
};
140+
141+
connection->DoRequest<ReadRequest, ReadResponse>(
142+
std::move(request),
143+
std::move(cb),
144+
&DataService::Stub::AsyncRead,
145+
callMeta,
146+
context.get()
147+
);
148+
149+
resultPromise.GetFuture().Wait(TDuration::Seconds(10));
150+
return resultPromise.GetValue();
151+
}
152+
153+
private:
154+
TString GetAuthInfo() const
155+
{
156+
const TString authToken = CredentialsProvider->GetAuthInfo();
157+
158+
switch (Settings.GetClusterType()) {
159+
case NSo::NProto::ESolomonClusterType::CT_SOLOMON:
160+
return "OAuth " + authToken;
161+
case NSo::NProto::ESolomonClusterType::CT_MONITORING:
162+
return "Bearer " + authToken;
163+
default:
164+
Y_ENSURE(false, "Invalid cluster type " << ToString<ui32>(Settings.GetClusterType()));
165+
}
166+
}
167+
168+
TString GetHttpSolomonEndpoint() const
169+
{
170+
return (Settings.GetUseSsl() ? "https://" : "http://") + Settings.GetEndpoint();
171+
}
172+
173+
TString GetGrpcSolomonEndpoint() const
174+
{
175+
return Settings.GetEndpoint() + std::to_string(DefaultPort);
176+
}
177+
178+
TString BuildListMetricsRequest(
179+
const TString &selectors,
180+
int pageSize,
181+
int page) const
182+
{
183+
TUrlBuilder builder("");
184+
185+
builder.AddPathComponent("api");
186+
builder.AddPathComponent("v2");
187+
builder.AddPathComponent("projects");
188+
builder.AddPathComponent(Settings.GetProject());
189+
builder.AddPathComponent("sensors");
190+
191+
builder.AddUrlParam("selectors", selectors);
192+
builder.AddUrlParam("forceCluster", DefaultReplica);
193+
builder.AddUrlParam("pageSize", std::to_string(pageSize));
194+
builder.AddUrlParam("page", std::to_string(page));
195+
196+
return builder.Build();
197+
}
198+
199+
ReadRequest BuildGetDataRequest(
200+
const std::vector<TString> &selectors) const
201+
{
202+
ReadRequest request;
203+
204+
request.mutable_container()->set_project_id(Settings.GetProject());
205+
*request.mutable_from_time() = NProtoInterop::CastToProto(TInstant::FromValue(Settings.GetFrom()));
206+
*request.mutable_to_time() = NProtoInterop::CastToProto(TInstant::FromValue(Settings.GetTo()));
207+
*request.mutable_force_replica() = DefaultReplica;
208+
209+
if (Settings.GetDownsampling().GetDisabled()) {
210+
request.mutable_downsampling()->set_disabled(true);
211+
}
212+
else {
213+
const auto downsampling = Settings.GetDownsampling();
214+
request.mutable_downsampling()->set_grid_interval(downsampling.GetGridMs());
215+
request.mutable_downsampling()->set_grid_aggregation(ParseGridAggregation(downsampling.GetAggregation()));
216+
request.mutable_downsampling()->set_gap_filling(ParseGapFilling(downsampling.GetFill()));
217+
}
218+
219+
for (const auto &metric : selectors) {
220+
auto query = request.mutable_queries()->Add();
221+
*query->mutable_value() = TStringBuilder{} << "{" << metric << "}";
222+
query->set_hidden(false);
223+
}
224+
225+
return request;
226+
}
227+
228+
TMaybe<TString> ProcessHttpResponse(
229+
THttpCode retCode,
230+
TString response,
231+
TVector<Metric> &result) const
232+
{
233+
if (retCode < 200 || retCode >= 300) {
234+
return TStringBuilder{} << "Error while sending request to monitoring api: " << response;
235+
}
236+
237+
NJson::TJsonValue json;
238+
try {
239+
NJson::ReadJsonTree(response, &json, /*throwOnError*/ true);
240+
} catch (const std::exception& e) {
241+
return TStringBuilder{} << "Failed to parse response from monitoring api: " << e.what();
242+
}
243+
244+
if (!json.IsMap() || !json.Has("result")) {
245+
return "Invalid result from monitoring api";
246+
}
247+
248+
for (const auto &metricObj : json["result"].GetArray()) {
249+
try {
250+
result.emplace_back(metricObj);
251+
} catch (const std::exception& e) {
252+
return TStringBuilder{} << "Failed to parse response from monitoring: " << e.what();
253+
}
254+
}
255+
256+
return {};
257+
}
258+
259+
TMaybe<TString> ProcessGrpcResponse(
260+
NYdbGrpc::TGrpcStatus&& status,
261+
ReadResponse &&response,
262+
std::vector<Timeseries> &result) const
263+
{
264+
if (!status.Ok()) {
265+
return TStringBuilder{} << "Error while sending request to monitoring api: " << status.Msg;
266+
}
267+
268+
for (const auto &responseValue : response.response_per_query()) {
269+
YQL_ENSURE(responseValue.has_timeseries_vector());
270+
YQL_ENSURE(responseValue.timeseries_vector().values_size() == 1); // one response per one set of selectors
271+
272+
const auto &queryResponse = responseValue.timeseries_vector().values()[0];
273+
274+
std::vector<int64_t> timestampValues;
275+
std::vector<double> timeseriesValues;
276+
277+
for (int64_t value : queryResponse.timestamp_values().values()) {
278+
timestampValues.push_back(value);
279+
}
280+
for (double value : queryResponse.double_values().values()) {
281+
timeseriesValues.push_back(value);
282+
}
283+
284+
result.emplace_back(queryResponse.name(), queryResponse.type(), std::move(timestampValues), std::move(timeseriesValues));
285+
}
286+
287+
return {};
288+
}
289+
290+
private:
291+
const TString DefaultReplica;
292+
const int DefaultPort;
293+
294+
NYql::NSo::NProto::TDqSolomonSource Settings;
295+
std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider;
296+
};
297+
298+
} // namespace
299+
300+
Metric::Metric(const NJson::TJsonValue &value)
301+
{
302+
YQL_ENSURE(value.IsMap());
303+
304+
if (value.Has("labels")) {
305+
auto labels = value["labels"];
306+
YQL_ENSURE(labels.IsMap());
307+
308+
for (const auto &[key, value] : labels.GetMapSafe()) {
309+
YQL_ENSURE(value.IsString());
310+
Labels[key] = value.GetString();
311+
}
312+
}
313+
314+
if (value.Has("type")) {
315+
YQL_ENSURE(value["type"].IsString());
316+
Type = ParseMetricType(value["type"].GetString());
317+
}
318+
319+
if (value.Has("createdAt")) {
320+
YQL_ENSURE(value["createdAt"].IsString());
321+
CreatedAt = value["createdAt"].GetString();
322+
}
323+
}
324+
325+
ISolomonAccessorClient::TPtr
326+
ISolomonAccessorClient::Make(
327+
NYql::NSo::NProto::TDqSolomonSource&& settings,
328+
std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider)
329+
{
330+
return std::make_shared<TSolomonAccessorClient>(std::move(settings), credentialsProvider);
331+
}
332+
333+
} // namespace NYql::NSo
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#pragma once
2+
3+
#include <library/cpp/json/json_reader.h>
4+
#include <ydb-cpp-sdk/client/types/credentials/credentials.h>
5+
#include <ydb/library/yql/providers/common/solomon_accessor/grpc/solomon_accessor_pb.pb.h>
6+
#include <ydb/library/yql/providers/solomon/proto/dq_solomon_shard.pb.h>
7+
8+
namespace NYql::NSo {
9+
10+
class Metric {
11+
public:
12+
Metric(const NJson::TJsonValue &value);
13+
14+
public:
15+
std::map<TString, TString> Labels;
16+
yandex::monitoring::api::v3::MetricType Type;
17+
TString CreatedAt;
18+
};
19+
20+
class Timeseries {
21+
public:
22+
TString Name;
23+
yandex::monitoring::api::v3::MetricType Type;
24+
std::vector<int64_t> TimestampValues;
25+
std::vector<double> TimeseriesValues;
26+
};
27+
28+
class ISolomonAccessorClient {
29+
public:
30+
using TPtr = std::shared_ptr<ISolomonAccessorClient>;
31+
using TWeakPtr = std::weak_ptr<ISolomonAccessorClient>;
32+
33+
virtual ~ISolomonAccessorClient() = default;
34+
35+
static TPtr Make(
36+
NYql::NSo::NProto::TDqSolomonSource&& settings,
37+
std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider);
38+
39+
public:
40+
virtual TMaybe<TString> ListMetrics(
41+
const TString &selectors,
42+
int pageSize,
43+
int page,
44+
TVector<Metric> &result) = 0;
45+
46+
virtual TMaybe<TString> GetData(
47+
const std::vector<TString> &selectors,
48+
std::vector<Timeseries> &results) = 0;
49+
};
50+
51+
} // namespace NYql

0 commit comments

Comments
 (0)