Skip to content

Commit e3f4b87

Browse files
authored
add traces and database redirects to local rpc calls (#9745)
1 parent 7d337c0 commit e3f4b87

11 files changed

+130
-281
lines changed

ydb/core/viewer/json_local_rpc.h

Lines changed: 59 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
#pragma once
22
#include "json_pipe_req.h"
3-
#include "viewer.h"
4-
#include <library/cpp/json/json_writer.h>
53
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
64

7-
namespace NKikimr {
8-
namespace NViewer {
5+
namespace NKikimr::NViewer {
96

107
struct TEvLocalRpcPrivate {
118
enum EEv {
@@ -26,41 +23,27 @@ struct TEvLocalRpcPrivate {
2623
};
2724
};
2825

29-
using namespace NActors;
30-
using NSchemeShard::TEvSchemeShard;
31-
3226
template <class TProtoRequest, class TProtoResponse, class TProtoResult, class TProtoService, class TRpcEv>
33-
class TJsonLocalRpc : public TActorBootstrapped<TJsonLocalRpc<TProtoRequest, TProtoResponse, TProtoResult, TProtoService, TRpcEv>> {
27+
class TJsonLocalRpc : public TViewerPipeClient {
3428
using TThis = TJsonLocalRpc<TProtoRequest, TProtoResponse, TProtoResult, TProtoService, TRpcEv>;
35-
using TBase = TActorBootstrapped<TThis>;
36-
37-
using TBase::Send;
38-
using TBase::PassAway;
39-
using TBase::Become;
29+
using TBase = TViewerPipeClient;
4030

4131
protected:
42-
IViewer* Viewer;
43-
NMon::TEvHttpInfo::TPtr Event;
44-
TProtoRequest Request;
32+
using TBase::ReplyAndPassAway;
33+
std::vector<HTTP_METHOD> AllowedMethods = {};
4534
TAutoPtr<TEvLocalRpcPrivate::TEvGrpcRequestResult<TProtoResult>> Result;
46-
47-
TJsonSettings JsonSettings;
48-
ui32 Timeout = 0;
49-
TString Database;
5035
NThreading::TFuture<TProtoResponse> RpcFuture;
5136

5237
public:
5338
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
5439
return NKikimrServices::TActivity::VIEWER_HANDLER;
5540
}
5641

57-
TJsonLocalRpc(IViewer* viewer, NMon::TEvHttpInfo::TPtr &ev)
58-
: Viewer(viewer)
59-
, Event(ev)
42+
TJsonLocalRpc(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev)
43+
: TBase(viewer, ev, TProtoRequest::descriptor()->name())
6044
{}
6145

62-
TProtoRequest Params2Proto(const TCgiParameters& params) {
63-
TProtoRequest request;
46+
void Params2Proto(const TCgiParameters& params, TProtoRequest& request) {
6447
using google::protobuf::Descriptor;
6548
using google::protobuf::Reflection;
6649
using google::protobuf::FieldDescriptor;
@@ -110,44 +93,52 @@ class TJsonLocalRpc : public TActorBootstrapped<TJsonLocalRpc<TProtoRequest, TPr
11093
}
11194
}
11295
}
113-
return request;
11496
}
11597

116-
TProtoRequest Params2Proto() {
117-
TProtoRequest request;
118-
NProtobufJson::TJson2ProtoConfig json2ProtoConfig;
119-
auto postData = Event->Get()->Request.GetPostContent();
120-
if (!postData.empty()) {
121-
try {
122-
NProtobufJson::Json2Proto(postData, request, json2ProtoConfig);
123-
}
124-
catch (const yexception& e) {
125-
ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", e.what()));
98+
bool ValidateProto(TProtoRequest& request) {
99+
using google::protobuf::Descriptor;
100+
using google::protobuf::Reflection;
101+
using google::protobuf::FieldDescriptor;
102+
const Descriptor& descriptor = *TProtoRequest::GetDescriptor();
103+
const Reflection& reflection = *TProtoRequest::GetReflection();
104+
for (int idx = 0; idx < descriptor.field_count(); ++idx) {
105+
const FieldDescriptor* field = descriptor.field(idx);
106+
const auto& options(field->options());
107+
if (options.HasExtension(Ydb::required)) {
108+
if (options.GetExtension(Ydb::required)) {
109+
if (!reflection.HasField(request, field)) {
110+
ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", TStringBuilder() << "field '" << field->name() << "' is required"));
111+
return false;
112+
}
113+
}
126114
}
127-
} else {
128-
const auto& params(Event->Get()->Request.GetParams());
129-
return Params2Proto(params);
130115
}
131-
return request;
116+
return true;
132117
}
133118

134-
bool PostToRequest() {
119+
bool Params2Proto(TProtoRequest& request) {
135120
auto postData = Event->Get()->Request.GetPostContent();
136121
if (!postData.empty()) {
137122
try {
138-
NProtobufJson::Json2Proto(postData, Request, {});
139-
return true;
123+
NProtobufJson::Json2Proto(postData, request);
140124
}
141125
catch (const yexception& e) {
142-
ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", e.what()));
126+
ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", e.what()));
143127
return false;
144128
}
129+
} else {
130+
const auto& params(Event->Get()->Request.GetParams());
131+
Params2Proto(params, request);
132+
}
133+
if (!ValidateProto(request)) {
134+
return false;
145135
}
146136
return true;
147137
}
148138

149-
void SendGrpcRequest() {
150-
RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(Request), Database, Event->Get()->UserToken, TlsActivationContext->ActorSystem());
139+
void SendGrpcRequest(TProtoRequest&& request) {
140+
// TODO(xenoxeno): pass trace id
141+
RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(request), Database, Event->Get()->UserToken, TlsActivationContext->ActorSystem());
151142
RpcFuture.Subscribe([actorId = TBase::SelfId(), actorSystem = TlsActivationContext->ActorSystem()]
152143
(const NThreading::TFuture<TProtoResponse>& future) {
153144
auto& response = future.GetValueSync();
@@ -173,14 +164,21 @@ class TJsonLocalRpc : public TActorBootstrapped<TJsonLocalRpc<TProtoRequest, TPr
173164
}
174165

175166
virtual void Bootstrap() {
176-
const auto& params(Event->Get()->Request.GetParams());
177-
JsonSettings.EnumAsNumbers = !FromStringWithDefault<bool>(params.Get("enums"), true);
178-
JsonSettings.UI64AsString = !FromStringWithDefault<bool>(params.Get("ui64"), true);
179-
Timeout = FromStringWithDefault<ui32>(params.Get("timeout"), 10000);
180-
181-
SendGrpcRequest();
182-
183-
Become(&TThis::StateRequested, TDuration::MilliSeconds(Timeout), new TEvents::TEvWakeup());
167+
if (!AllowedMethods.empty() && std::find(AllowedMethods.begin(), AllowedMethods.end(), Event->Get()->Request.GetMethod()) == AllowedMethods.end()) {
168+
return ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Method is not allowed"));
169+
}
170+
if (Database.empty()) {
171+
return ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "field 'database' is required"));
172+
}
173+
if (TBase::NeedToRedirect()) {
174+
return;
175+
}
176+
TProtoRequest request;
177+
if (!Params2Proto(request)) {
178+
return;
179+
}
180+
SendGrpcRequest(std::move(request));
181+
Become(&TThis::StateRequested, Timeout, new TEvents::TEvWakeup());
184182
}
185183

186184
void Handle(typename TEvLocalRpcPrivate::TEvGrpcRequestResult<TProtoResult>::TPtr& ev) {
@@ -197,38 +195,24 @@ class TJsonLocalRpc : public TActorBootstrapped<TJsonLocalRpc<TProtoRequest, TPr
197195

198196
void ReplyAndPassAway() {
199197
if (Result && Result->Status) {
200-
if (!Result->Status->IsSuccess()) {
198+
if (Result->Status->IsSuccess()) {
199+
return ReplyAndPassAway(GetHTTPOKJSON(Result->Message));
200+
} else {
201201
NJson::TJsonValue json;
202202
TString message;
203203
MakeJsonErrorReply(json, message, Result->Status.value());
204204
TStringStream stream;
205205
NJson::WriteJson(&stream, &json);
206206
if (Result->Status->GetStatus() == NYdb::EStatus::UNAUTHORIZED) {
207-
return ReplyAndPassAway(Viewer->GetHTTPFORBIDDEN(Event->Get(), "application/json", stream.Str()));
207+
return ReplyAndPassAway(GetHTTPFORBIDDEN("application/json", stream.Str()), message);
208208
} else {
209-
return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "application/json", stream.Str()));
209+
return ReplyAndPassAway(GetHTTPBADREQUEST("application/json", stream.Str()), message);
210210
}
211-
} else {
212-
TStringStream json;
213-
TProtoToJson::ProtoToJson(json, Result->Message, JsonSettings);
214-
return ReplyAndPassAway(Viewer->GetHTTPOKJSON(Event->Get(), json.Str()));
215211
}
216212
} else {
217-
return ReplyAndPassAway(Viewer->GetHTTPINTERNALERROR(Event->Get()));
213+
return ReplyAndPassAway(GetHTTPINTERNALERROR("text/plain", "no Result or Status"), "internal error");
218214
}
219215
}
220-
221-
222-
void HandleTimeout() {
223-
ReplyAndPassAway(Viewer->GetHTTPGATEWAYTIMEOUT(Event->Get()));
224-
}
225-
226-
void ReplyAndPassAway(TString data) {
227-
Send(Event->Sender, new NMon::TEvHttpInfoRes(data, 0, NMon::IEvHttpInfoRes::EContentType::Custom));
228-
PassAway();
229-
}
230216
};
231217

232-
233-
}
234-
}
218+
} // namespace NKikimr::NViewer

ydb/core/viewer/json_pipe_req.cpp

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,9 @@ void TViewerPipeClient::InitConfig(const TCgiParameters& params) {
599599
Database = params.Get("tenant");
600600
}
601601
Direct = FromStringWithDefault<bool>(params.Get("direct"), Direct);
602+
JsonSettings.EnumAsNumbers = !FromStringWithDefault<bool>(params.Get("enums"), true);
603+
JsonSettings.UI64AsString = !FromStringWithDefault<bool>(params.Get("ui64"), true);
604+
Timeout = TDuration::MilliSeconds(FromStringWithDefault<ui32>(params.Get("timeout"), Timeout.MilliSeconds()));
602605
}
603606

604607
void TViewerPipeClient::InitConfig(const TRequestSettings& settings) {
@@ -655,6 +658,12 @@ TString TViewerPipeClient::GetHTTPOKJSON(const NJson::TJsonValue& response, TIns
655658
return GetHTTPOKJSON(NJson::WriteJson(response, false), lastModified);
656659
}
657660

661+
TString TViewerPipeClient::GetHTTPOKJSON(const google::protobuf::Message& response, TInstant lastModified) {
662+
TStringStream json;
663+
TProtoToJson::ProtoToJson(json, response, JsonSettings);
664+
return GetHTTPOKJSON(json.Str(), lastModified);
665+
}
666+
658667
TString TViewerPipeClient::GetHTTPGATEWAYTIMEOUT(TString contentType, TString response) {
659668
return Viewer->GetHTTPGATEWAYTIMEOUT(GetRequest(), std::move(contentType), std::move(response));
660669
}
@@ -696,41 +705,47 @@ void TViewerPipeClient::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
696705
}
697706

698707
void TViewerPipeClient::HandleResolveResource(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
699-
ResourceNavigateResponse->Set(std::move(ev));
700-
if (ResourceNavigateResponse->IsOk()) {
701-
TSchemeCacheNavigate::TEntry& entry(ResourceNavigateResponse->Get()->Request->ResultSet.front());
702-
SharedDatabase = CanonizePath(entry.Path);
703-
if (SharedDatabase == AppData()->TenantName) {
704-
Direct = true;
705-
return Bootstrap(); // retry bootstrap without redirect this time
708+
if (ResourceNavigateResponse) {
709+
ResourceNavigateResponse->Set(std::move(ev));
710+
if (ResourceNavigateResponse->IsOk()) {
711+
TSchemeCacheNavigate::TEntry& entry(ResourceNavigateResponse->Get()->Request->ResultSet.front());
712+
SharedDatabase = CanonizePath(entry.Path);
713+
if (SharedDatabase == AppData()->TenantName) {
714+
Direct = true;
715+
return Bootstrap(); // retry bootstrap without redirect this time
716+
}
717+
DatabaseBoardInfoResponse = MakeRequestStateStorageEndpointsLookup(SharedDatabase);
718+
} else {
719+
ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Failed to resolve database - shared database not found"));
706720
}
707-
DatabaseBoardInfoResponse = MakeRequestStateStorageEndpointsLookup(SharedDatabase);
708-
} else {
709-
ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Failed to resolve database - shared database not found"));
710721
}
711722
}
712723

713724
void TViewerPipeClient::HandleResolveDatabase(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
714-
DatabaseNavigateResponse->Set(std::move(ev));
715-
if (DatabaseNavigateResponse->IsOk()) {
716-
TSchemeCacheNavigate::TEntry& entry(DatabaseNavigateResponse->Get()->Request->ResultSet.front());
717-
if (entry.DomainInfo && entry.DomainInfo->ResourcesDomainKey && entry.DomainInfo->DomainKey != entry.DomainInfo->ResourcesDomainKey) {
718-
ResourceNavigateResponse = MakeRequestSchemeCacheNavigate(TPathId(entry.DomainInfo->ResourcesDomainKey));
719-
Become(&TViewerPipeClient::StateResolveResource);
720-
return;
725+
if (DatabaseNavigateResponse) {
726+
DatabaseNavigateResponse->Set(std::move(ev));
727+
if (DatabaseNavigateResponse->IsOk()) {
728+
TSchemeCacheNavigate::TEntry& entry(DatabaseNavigateResponse->Get()->Request->ResultSet.front());
729+
if (entry.DomainInfo && entry.DomainInfo->ResourcesDomainKey && entry.DomainInfo->DomainKey != entry.DomainInfo->ResourcesDomainKey) {
730+
ResourceNavigateResponse = MakeRequestSchemeCacheNavigate(TPathId(entry.DomainInfo->ResourcesDomainKey));
731+
Become(&TViewerPipeClient::StateResolveResource);
732+
return;
733+
}
734+
DatabaseBoardInfoResponse = MakeRequestStateStorageEndpointsLookup(CanonizePath(entry.Path));
735+
} else {
736+
ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Failed to resolve database - not found"));
721737
}
722-
DatabaseBoardInfoResponse = MakeRequestStateStorageEndpointsLookup(CanonizePath(entry.Path));
723-
} else {
724-
ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Failed to resolve database - not found"));
725738
}
726739
}
727740

728741
void TViewerPipeClient::HandleResolve(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
729-
DatabaseBoardInfoResponse->Set(std::move(ev));
730-
if (DatabaseBoardInfoResponse->IsOk()) {
731-
ReplyAndPassAway(MakeForward(GetNodesFromBoardReply(DatabaseBoardInfoResponse->GetRef())));
732-
} else {
733-
ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Failed to resolve database - no nodes found"));
742+
if (DatabaseBoardInfoResponse) {
743+
DatabaseBoardInfoResponse->Set(std::move(ev));
744+
if (DatabaseBoardInfoResponse->IsOk()) {
745+
ReplyAndPassAway(MakeForward(GetNodesFromBoardReply(DatabaseBoardInfoResponse->GetRef())));
746+
} else {
747+
ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Failed to resolve database - no nodes found"));
748+
}
734749
}
735750
}
736751

ydb/core/viewer/json_pipe_req.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
4545
NWilson::TSpan Span;
4646
IViewer* Viewer = nullptr;
4747
NMon::TEvHttpInfo::TPtr Event;
48+
TJsonSettings JsonSettings;
49+
TDuration Timeout = TDuration::Seconds(10);
4850

4951
struct TPipeInfo {
5052
TActorId PipeClient;
@@ -289,6 +291,7 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
289291
TString GetHTTPOK(TString contentType = {}, TString response = {}, TInstant lastModified = {});
290292
TString GetHTTPOKJSON(TString response = {}, TInstant lastModified = {});
291293
TString GetHTTPOKJSON(const NJson::TJsonValue& response, TInstant lastModified = {});
294+
TString GetHTTPOKJSON(const google::protobuf::Message& response, TInstant lastModified = {});
292295
TString GetHTTPGATEWAYTIMEOUT(TString contentType = {}, TString response = {});
293296
TString GetHTTPBADREQUEST(TString contentType = {}, TString response = {});
294297
TString GetHTTPINTERNALERROR(TString contentType = {}, TString response = {});

ydb/core/viewer/operation_cancel.h

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,35 +18,8 @@ class TOperationCancel : public TOperationCancelRpc {
1818

1919
TOperationCancel(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev)
2020
: TBase(viewer, ev)
21-
{}
22-
23-
void Bootstrap() override {
24-
if (Event->Get()->Request.GetMethod() != HTTP_METHOD_POST) {
25-
return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "Only POST method is allowed"));
26-
}
27-
28-
if (!PostToRequest()) {
29-
return;
30-
}
31-
32-
const auto& params(Event->Get()->Request.GetParams());
33-
if (params.Has("database")) {
34-
Database = params.Get("database");
35-
}
36-
37-
if (Database.empty()) {
38-
return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "field 'database' is required"));
39-
}
40-
41-
if (params.Has("id")) {
42-
Request.set_id(params.Get("id"));
43-
}
44-
45-
if (Request.id().empty()) {
46-
return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "field 'id' is required"));
47-
}
48-
49-
TBase::Bootstrap();
21+
{
22+
AllowedMethods = {HTTP_METHOD_POST};
5023
}
5124

5225
static YAML::Node GetSwagger() {

0 commit comments

Comments
 (0)