4
4
#include < ydb/core/graph/api/service.h>
5
5
#include < ydb/core/graph/api/events.h>
6
6
#include < library/cpp/json/json_writer.h>
7
+ #include " json_pipe_req.h"
8
+ #include " viewer_request.h"
7
9
#include " viewer.h"
8
10
#include " log.h"
9
11
10
12
namespace NKikimr {
11
13
namespace NViewer {
12
14
13
15
using namespace NActors ;
16
+ using namespace NMonitoring ;
14
17
15
- class TJsonRender : public TActorBootstrapped <TJsonRender> {
18
+ class TJsonRender : public TViewerPipeClient <TJsonRender> {
19
+ using TThis = TJsonRender;
20
+ using TBase = TViewerPipeClient<TJsonRender>;
16
21
IViewer* Viewer;
17
22
NMon::TEvHttpInfo::TPtr Event;
23
+ TEvViewer::TEvViewerRequest::TPtr ViewerRequest;
24
+ ui32 Timeout = 0 ;
18
25
std::vector<TString> Metrics;
26
+ TString Database;
27
+ TCgiParameters Params;
19
28
29
+ std::optional<TNodeId> SubscribedNodeId;
30
+ std::vector<TNodeId> TenantDynamicNodes;
31
+ bool Direct = false ;
32
+ bool MadeProxyRequest = false ;
20
33
public:
21
34
static constexpr NKikimrServices::TActivity::EType ActorActivityType () {
22
35
return NKikimrServices::TActivity::VIEWER_HANDLER;
@@ -25,116 +38,242 @@ class TJsonRender : public TActorBootstrapped<TJsonRender> {
25
38
TJsonRender (IViewer* viewer, NMon::TEvHttpInfo::TPtr &ev)
26
39
: Viewer(viewer)
27
40
, Event(ev)
28
- {}
41
+ {
42
+ const auto & params (Event->Get ()->Request .GetParams ());
43
+
44
+ InitConfig (params);
45
+ Database = params.Get (" database" );
46
+ Direct = FromStringWithDefault<bool >(params.Get (" direct" ), Direct);
47
+ Timeout = FromStringWithDefault<ui32>(params.Get (" timeout" ), 30000 );
48
+ }
49
+
50
+ TJsonRender (TEvViewer::TEvViewerRequest::TPtr& ev)
51
+ : ViewerRequest(ev)
52
+ {
53
+ auto & request = ViewerRequest->Get ()->Record .GetRenderRequest ();
54
+
55
+ TCgiParameters params (request.GetUri ());
56
+ InitConfig (params);
57
+ Direct = true ;
58
+ Timeout = ViewerRequest->Get ()->Record .GetTimeout ();
59
+ }
29
60
30
61
void Bootstrap () {
31
- auto postData = Event->Get ()->Request .GetPostContent ();
62
+ auto postData = Event
63
+ ? Event->Get ()->Request .GetPostContent ()
64
+ : ViewerRequest->Get ()->Record .GetRenderRequest ().GetContent ();
32
65
BLOG_D (" PostData=" << postData);
33
66
NKikimrGraph::TEvGetMetrics getRequest;
34
67
if (postData) {
35
- TCgiParameters params (postData);
36
- if (params .Has (" target" )) {
68
+ Params = TCgiParameters (postData);
69
+ if (Params .Has (" target" )) {
37
70
TString metric;
38
71
size_t num = 0 ;
39
72
for (;;) {
40
- metric = params .Get (" target" , num);
73
+ metric = Params .Get (" target" , num);
41
74
if (metric.empty ()) {
42
75
break ;
43
76
}
44
77
Metrics.push_back (metric);
45
78
++num;
46
79
}
47
- // StringSplitter(params.Get("target")).Split(',').SkipEmpty().Collect(&Metrics);
48
- for (const auto & metric : Metrics) {
49
- getRequest.AddMetrics (metric);
50
- }
51
- } else {
52
- static const TString png1x1 = " \x89\x50\x4e\x47\x0d\x0a\x1a\x0a\x00\x00\x00\x0d\x49\x48\x44\x52\x00\x00\x00\x01\x00\x00\x00\x01\x01 "
53
- " \x03\x00\x00\x00\x25\xdb\x56\xca\x00\x00\x00\x03\x50\x4c\x54\x45\x00\x00\x00\xa7\x7a\x3d\xda\x00\x00 "
54
- " \x00\x01\x74\x52\x4e\x53\x00\x40\xe6\xd8\x66\x00\x00\x00\x0a\x49\x44\x41\x54\x08\xd7\x63\x60\x00\x00 "
55
- " \x00\x02\x00\x01\xe2\x21\xbc\x33\x00\x00\x00\x00\x49\x45\x4e\x44\xae\x42\x60\x82 " ;
56
- Send (Event->Sender , new NMon::TEvHttpInfoRes (Viewer->GetHTTPOK (Event->Get (), " image/png" , png1x1), 0 , NMon::IEvHttpInfoRes::EContentType::Custom));
57
- return PassAway ();
58
80
}
59
- if (params.Has (" from" )) {
60
- getRequest.SetTimeFrom (FromStringWithDefault<ui32>(params.Get (" from" )));
61
- }
62
- if (params.Has (" until" )) {
63
- getRequest.SetTimeTo (FromStringWithDefault<ui32>(params.Get (" until" )));
81
+ // StringSplitter(Params.Get("target")).Split(',').SkipEmpty().Collect(&Metrics);
82
+
83
+ if (Database && !Direct) {
84
+ RequestStateStorageEndpointsLookup (Database); // to find some dynamic node and redirect there
64
85
}
65
- if (params. Has ( " maxDataPoints " ) ) {
66
- getRequest. SetMaxPoints (FromStringWithDefault<ui32>(params. Get ( " maxDataPoints " ), 1000 ) );
86
+ if (Requests == 0 ) {
87
+ SendGraphRequest ( );
67
88
}
68
89
} else {
69
- Send (Event-> Sender , new NMon::TEvHttpInfoRes ( Viewer->GetHTTPBADREQUEST (Event->Get (), {}, " Bad Request" ), 0 , NMon::IEvHttpInfoRes::EContentType::Custom ));
70
- return PassAway () ;
90
+ ReplyAndPassAway ( Viewer->GetHTTPBADREQUEST (Event->Get (), {}, " Bad Request" ));
91
+ return ;
71
92
}
72
- Send (NGraph::MakeGraphServiceId (), new NGraph::TEvGraph::TEvGetMetrics (std::move (getRequest)));
73
- Schedule (TDuration::Seconds (30 ), new TEvents::TEvWakeup ());
74
- Become (&TThis::StateWork);
93
+
94
+ Become (&TThis::StateWork, TDuration::MilliSeconds (Timeout), new TEvents::TEvWakeup ());
95
+ }
96
+
97
+ void PassAway () override {
98
+ if (SubscribedNodeId.has_value ()) {
99
+ Send (TActivationContext::InterconnectProxy (SubscribedNodeId.value ()), new TEvents::TEvUnsubscribe ());
100
+ }
101
+ TBase::PassAway ();
102
+ BLOG_TRACE (" PassAway()" );
75
103
}
76
104
77
105
STATEFN (StateWork) {
78
106
switch (ev->GetTypeRewrite ()) {
107
+ hFunc (TEvStateStorage::TEvBoardInfo, Handle );
108
+ hFunc (TEvents::TEvUndelivered, Undelivered);
109
+ hFunc (TEvInterconnect::TEvNodeConnected, Connected);
110
+ hFunc (TEvInterconnect::TEvNodeDisconnected, Disconnected);
111
+ hFunc (TEvViewer::TEvViewerResponse, Handle );
79
112
hFunc (NGraph::TEvGraph::TEvMetricsResult, Handle );
80
- cFunc (TEvents::TSystem::Wakeup, Timeout);
113
+
114
+ cFunc (TEvents::TSystem::Wakeup, HandleTimeout);
81
115
}
82
116
}
83
117
84
- void Handle (NGraph::TEvGraph::TEvMetricsResult::TPtr& ev) {
85
- const auto & response (ev->Get ()->Record );
86
- NJson::TJsonValue json;
118
+ void Connected (TEvInterconnect::TEvNodeConnected::TPtr &) {}
87
119
88
- if (response.GetError ()) {
89
- json[" status" ] = " error" ;
90
- json[" error" ] = response.GetError ();
91
- Send (Event->Sender , new NMon::TEvHttpInfoRes (Viewer->GetHTTPOKJSON (Event->Get ()) + NJson::WriteJson (json, false ), 0 , NMon::IEvHttpInfoRes::EContentType::Custom));
92
- return PassAway ();
120
+ void Undelivered (TEvents::TEvUndelivered::TPtr &ev) {
121
+ if (ev->Get ()->SourceType == NViewer::TEvViewer::EvViewerRequest) {
122
+ SendGraphRequest ();
123
+ }
124
+ }
125
+
126
+ void Disconnected (TEvInterconnect::TEvNodeDisconnected::TPtr &) {
127
+ SendGraphRequest ();
128
+ }
129
+
130
+ void SendDynamicNodeRenderRequest () {
131
+ ui64 hash = std::hash<TString>()(Event->Get ()->Request .GetRemoteAddr ());
132
+
133
+ auto itPos = std::next (TenantDynamicNodes.begin (), hash % TenantDynamicNodes.size ());
134
+ std::nth_element (TenantDynamicNodes.begin (), itPos, TenantDynamicNodes.end ());
135
+
136
+ TNodeId nodeId = *itPos;
137
+ SubscribedNodeId = nodeId;
138
+ TActorId viewerServiceId = MakeViewerID (nodeId);
139
+
140
+ THolder<TEvViewer::TEvViewerRequest> request = MakeHolder<TEvViewer::TEvViewerRequest>();
141
+ request->Record .SetTimeout (Timeout);
142
+ auto renderRequest = request->Record .MutableRenderRequest ();
143
+ renderRequest->SetUri (TString (Event->Get ()->Request .GetUri ()));
144
+
145
+ TStringBuf content = Event->Get ()->Request .GetPostContent ();
146
+ renderRequest->SetContent (TString (content));
147
+
148
+ ViewerWhiteboardCookie cookie (NKikimrViewer::TEvViewerRequest::kRenderRequest , nodeId);
149
+ SendRequest (viewerServiceId, request.Release (), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, cookie.ToUi64 ());
150
+ }
151
+
152
+ void Handle (TEvStateStorage::TEvBoardInfo::TPtr& ev) {
153
+ BLOG_TRACE (" Received TEvBoardInfo" );
154
+ if (ev->Get ()->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
155
+ for (const auto & [actorId, infoEntry] : ev->Get ()->InfoEntries ) {
156
+ TenantDynamicNodes.emplace_back (actorId.NodeId ());
157
+ }
158
+ }
159
+ if (TenantDynamicNodes.empty ()) {
160
+ SendGraphRequest ();
161
+ } else {
162
+ SendDynamicNodeRenderRequest ();
163
+ }
164
+ }
165
+
166
+ void SendGraphRequest () {
167
+ if (MadeProxyRequest) {
168
+ return ;
93
169
}
94
- if (response.DataSize () != Metrics.size ()) {
95
- json[" status" ] = " error" ;
96
- json[" error" ] = " Invalid data size received" ;
97
- Send (Event->Sender , new NMon::TEvHttpInfoRes (Viewer->GetHTTPOKJSON (Event->Get ()) + NJson::WriteJson (json, false ), 0 , NMon::IEvHttpInfoRes::EContentType::Custom));
170
+ MadeProxyRequest = true ;
171
+ NKikimrGraph::TEvGetMetrics getRequest;
172
+ if (Metrics.size () > 0 ) {
173
+ for (const auto & metric : Metrics) {
174
+ getRequest.AddMetrics (metric);
175
+ }
176
+ } else {
177
+ static const TString png1x1 = " \x89\x50\x4e\x47\x0d\x0a\x1a\x0a\x00\x00\x00\x0d\x49\x48\x44\x52\x00\x00\x00\x01\x00\x00\x00\x01\x01 "
178
+ " \x03\x00\x00\x00\x25\xdb\x56\xca\x00\x00\x00\x03\x50\x4c\x54\x45\x00\x00\x00\xa7\x7a\x3d\xda\x00\x00 "
179
+ " \x00\x01\x74\x52\x4e\x53\x00\x40\xe6\xd8\x66\x00\x00\x00\x0a\x49\x44\x41\x54\x08\xd7\x63\x60\x00\x00 "
180
+ " \x00\x02\x00\x01\xe2\x21\xbc\x33\x00\x00\x00\x00\x49\x45\x4e\x44\xae\x42\x60\x82 " ;
181
+ Send (Event->Sender , new NMon::TEvHttpInfoRes (Viewer->GetHTTPOK (Event->Get (), " image/png" , png1x1), 0 , NMon::IEvHttpInfoRes::EContentType::Custom));
98
182
return PassAway ();
99
183
}
100
- for (size_t nMetric = 0 ; nMetric < response.DataSize (); ++nMetric) {
101
- const auto & protoMetric (response.GetData (nMetric));
102
- if (response.TimeSize () != protoMetric.ValuesSize ()) {
184
+ if (Params.Has (" from" )) {
185
+ getRequest.SetTimeFrom (FromStringWithDefault<ui32>(Params.Get (" from" )));
186
+ }
187
+ if (Params.Has (" until" )) {
188
+ getRequest.SetTimeTo (FromStringWithDefault<ui32>(Params.Get (" until" )));
189
+ }
190
+ if (Params.Has (" maxDataPoints" )) {
191
+ getRequest.SetMaxPoints (FromStringWithDefault<ui32>(Params.Get (" maxDataPoints" ), 1000 ));
192
+ }
193
+ Send (NGraph::MakeGraphServiceId (), new NGraph::TEvGraph::TEvGetMetrics (std::move (getRequest)));
194
+ }
195
+
196
+ void HandleRenderResponse (NKikimrGraph::TEvMetricsResult& response) {
197
+ if (Event) {
198
+ NJson::TJsonValue json;
199
+
200
+ if (response.GetError ()) {
103
201
json[" status" ] = " error" ;
104
- json[" error" ] = " Invalid value size received" ;
105
- Send (Event->Sender , new NMon::TEvHttpInfoRes (Viewer->GetHTTPOKJSON (Event->Get ()) + NJson::WriteJson (json, false ), 0 , NMon::IEvHttpInfoRes::EContentType::Custom));
106
- return PassAway ();
202
+ json[" error" ] = response.GetError ();
203
+ ReplyAndPassAway (Viewer->GetHTTPOKJSON (Event->Get ()) + NJson::WriteJson (json, false ));
204
+ return ;
205
+ }
206
+ if (response.DataSize () != Metrics.size ()) {
207
+ json[" status" ] = " error" ;
208
+ json[" error" ] = " Invalid data size received" ;
209
+ ReplyAndPassAway (Viewer->GetHTTPOKJSON (Event->Get ()) + NJson::WriteJson (json, false ));
210
+ return ;
107
211
}
108
- }
109
- { // graphite
110
- json.SetType (NJson::JSON_ARRAY);
111
212
for (size_t nMetric = 0 ; nMetric < response.DataSize (); ++nMetric) {
112
213
const auto & protoMetric (response.GetData (nMetric));
113
- NJson::TJsonValue& jsonMetric (json.AppendValue ({}));
114
- jsonMetric[" target" ] = Metrics[nMetric];
115
- jsonMetric[" title" ] = Metrics[nMetric];
116
- jsonMetric[" tags" ][" name" ] = Metrics[nMetric];
117
- NJson::TJsonValue& jsonDataPoints (jsonMetric[" datapoints" ]);
118
- jsonDataPoints.SetType (NJson::JSON_ARRAY);
119
- for (size_t nTime = 0 ; nTime < response.TimeSize (); ++nTime) {
120
- NJson::TJsonValue& jsonDataPoint (jsonDataPoints.AppendValue ({}));
121
- double value = protoMetric.GetValues (nTime);
122
- if (isnan (value)) {
123
- jsonDataPoint.AppendValue (NJson::TJsonValue (NJson::JSON_NULL));
124
- } else {
125
- jsonDataPoint.AppendValue (value);
214
+ if (response.TimeSize () != protoMetric.ValuesSize ()) {
215
+ json[" status" ] = " error" ;
216
+ json[" error" ] = " Invalid value size received" ;
217
+ ReplyAndPassAway (Viewer->GetHTTPOKJSON (Event->Get ()) + NJson::WriteJson (json, false ));
218
+ return ;
219
+ }
220
+ }
221
+ { // graphite
222
+ json.SetType (NJson::JSON_ARRAY);
223
+ for (size_t nMetric = 0 ; nMetric < response.DataSize (); ++nMetric) {
224
+ const auto & protoMetric (response.GetData (nMetric));
225
+ NJson::TJsonValue& jsonMetric (json.AppendValue ({}));
226
+ jsonMetric[" target" ] = Metrics[nMetric];
227
+ jsonMetric[" title" ] = Metrics[nMetric];
228
+ jsonMetric[" tags" ][" name" ] = Metrics[nMetric];
229
+ NJson::TJsonValue& jsonDataPoints (jsonMetric[" datapoints" ]);
230
+ jsonDataPoints.SetType (NJson::JSON_ARRAY);
231
+ for (size_t nTime = 0 ; nTime < response.TimeSize (); ++nTime) {
232
+ NJson::TJsonValue& jsonDataPoint (jsonDataPoints.AppendValue ({}));
233
+ double value = protoMetric.GetValues (nTime);
234
+ if (isnan (value)) {
235
+ jsonDataPoint.AppendValue (NJson::TJsonValue (NJson::JSON_NULL));
236
+ } else {
237
+ jsonDataPoint.AppendValue (value);
238
+ }
239
+ jsonDataPoint.AppendValue (response.GetTime (nTime));
126
240
}
127
- jsonDataPoint.AppendValue (response.GetTime (nTime));
128
241
}
129
242
}
243
+
244
+ ReplyAndPassAway (Viewer->GetHTTPOKJSON (Event->Get ()) + NJson::WriteJson (json, false ));
245
+ } else {
246
+ TEvViewer::TEvViewerResponse* viewerResponse = new TEvViewer::TEvViewerResponse ();
247
+ viewerResponse->Record .MutableRenderResponse ()->CopyFrom (response);
248
+ ReplyAndPassAway (viewerResponse);
130
249
}
250
+ }
251
+
252
+ void Handle (NGraph::TEvGraph::TEvMetricsResult::TPtr& ev) {
253
+ HandleRenderResponse (ev->Get ()->Record );
254
+ }
255
+
256
+ void Handle (TEvViewer::TEvViewerResponse::TPtr& ev) {
257
+ HandleRenderResponse (*(ev.Get ()->Get ()->Record .MutableRenderResponse ()));
258
+ }
259
+
260
+ void HandleTimeout () {
261
+ if (Event) {
262
+ ReplyAndPassAway (Viewer->GetHTTPGATEWAYTIMEOUT (Event->Get ()));
263
+ } else {
264
+ auto * response = new TEvViewer::TEvViewerResponse ();
265
+ response->Record .MutableRenderResponse ()->SetError (" Request timed out" );
266
+ ReplyAndPassAway (response);
267
+ }
268
+ }
131
269
132
- Send (Event->Sender , new NMon::TEvHttpInfoRes (Viewer->GetHTTPOKJSON (Event->Get ()) + NJson::WriteJson (json, false ), 0 , NMon::IEvHttpInfoRes::EContentType::Custom));
270
+ void ReplyAndPassAway (TEvViewer::TEvViewerResponse* response) {
271
+ Send (ViewerRequest->Sender , response);
133
272
PassAway ();
134
273
}
135
274
136
- void Timeout ( ) {
137
- Send (Event->Sender , new NMon::TEvHttpInfoRes (Viewer-> GetHTTPGATEWAYTIMEOUT (Event-> Get () ), 0 , NMon::IEvHttpInfoRes::EContentType::Custom));
275
+ void ReplyAndPassAway (TString data ) {
276
+ Send (Event->Sender , new NMon::TEvHttpInfoRes (std::move (data ), 0 , NMon::IEvHttpInfoRes::EContentType::Custom));
138
277
PassAway ();
139
278
}
140
279
};
@@ -144,6 +283,8 @@ struct TJsonRequestParameters<TJsonRender> {
144
283
static TString GetParameters () {
145
284
return R"___( [{"name":"target","in":"query","description":"metrics comma delimited","required":true,"type":"string"},
146
285
{"name":"from","in":"query","description":"time in seconds","required":false,"type":"integer"},
286
+ {"name":"database","in":"query","description":"database name","required":false,"type":"string"},
287
+ {"name":"direct","in":"query","description":"force processing query on current node","required":false,"type":"boolean"},
147
288
{"name":"until","in":"query","description":"time in seconds","required":false,"type":"integer"},
148
289
{"name":"maxDataPoints","in":"query","description":"maximum number of data points","required":false,"type":"integer"},
149
290
{"name":"format","in":"query","description":"response format","required":false,"type":"string"}])___" ;
0 commit comments