Skip to content

Commit 3591182

Browse files
authored
YDB-6485 Wilson HTML Page + custom headers (#6571)
Closes #6485
1 parent 0fb1588 commit 3591182

File tree

4 files changed

+124
-2
lines changed

4 files changed

+124
-2
lines changed

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

+15
Original file line numberDiff line numberDiff line change
@@ -874,10 +874,18 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
874874
break;
875875
}
876876

877+
const auto& headersProto = opentelemetry.GetHeaders();
878+
TMap<TString, TString> headers;
879+
880+
for (const auto& header : headersProto) {
881+
headers.insert({header.first, header.second});
882+
}
883+
877884
NWilson::TWilsonUploaderParams uploaderParams {
878885
.CollectorUrl = opentelemetry.GetCollectorUrl(),
879886
.ServiceName = opentelemetry.GetServiceName(),
880887
.GrpcSigner = std::move(grpcSigner),
888+
.Headers = headers,
881889
};
882890

883891
if (tracingConfig.HasUploader()) {
@@ -901,6 +909,13 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
901909
#undef GET_FIELD_FROM_CONFIG
902910
}
903911

912+
if (const auto& mon = appData->Mon) {
913+
uploaderParams.RegisterMonPage = [mon](TActorSystem *actorSystem, const TActorId& actorId) {
914+
NMonitoring::TIndexMonPage *actorsMonPage = mon->RegisterIndexPage("actors", "Actors");
915+
mon->RegisterActorPage(actorsMonPage, "wilson_uploader", "Wilson Trace Uploader", false, actorSystem, actorId);
916+
};
917+
}
918+
904919
wilsonUploader.reset(std::move(uploaderParams).CreateUploader());
905920
break;
906921
}

ydb/core/protos/config.proto

+1
Original file line numberDiff line numberDiff line change
@@ -1670,6 +1670,7 @@ message TTracingConfig {
16701670
message TOpentelemetryBackend {
16711671
optional string CollectorUrl = 1;
16721672
optional string ServiceName = 2;
1673+
map<string, string> Headers = 3;
16731674
}
16741675

16751676

ydb/library/actors/wilson/wilson_uploader.cpp

+103-2
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ namespace NWilson {
108108

109109
TString CollectorUrl;
110110
TString ServiceName;
111+
TMap<TString, TString> Headers;
112+
113+
TRegisterMonPageCallback RegisterMonPage;
111114

112115
std::shared_ptr<grpc::Channel> Channel;
113116
std::unique_ptr<NServiceProto::TraceService::Stub> Stub;
@@ -126,6 +129,9 @@ namespace NWilson {
126129
TIntrusiveListWithAutoDelete<TExportRequestData, TDelete> ExportRequests;
127130
size_t ExportRequestsCount = 0;
128131

132+
TString ErrStr;
133+
TString LastCommitTraceErrStr;
134+
129135
public:
130136
TWilsonUploader(TWilsonUploaderParams params)
131137
: MaxSpansPerSecond(params.MaxExportedSpansPerSecond)
@@ -136,6 +142,8 @@ namespace NWilson {
136142
, MaxExportInflight(params.MaxExportRequestsInflight)
137143
, CollectorUrl(std::move(params.CollectorUrl))
138144
, ServiceName(std::move(params.ServiceName))
145+
, Headers(params.Headers)
146+
, RegisterMonPage(params.RegisterMonPage)
139147
, GrpcSigner(std::move(params.GrpcSigner))
140148
, CurrentBatch(MaxSpansInBatch, MaxBytesInBatch, ServiceName)
141149
{}
@@ -166,11 +174,15 @@ namespace NWilson {
166174
TStringBuf host;
167175
ui16 port;
168176
if (!TryGetSchemeHostAndPort(CollectorUrl, scheme, host, port)) {
169-
ALOG_ERROR(WILSON_SERVICE_ID, "Failed to parse collector url (" << CollectorUrl << " was provided). Wilson wouldn't work");
177+
ErrStr = "Failed to parse collector url (" + CollectorUrl + " was provided). Wilson wouldn't work";
178+
ALOG_ERROR(WILSON_SERVICE_ID, ErrStr);
170179
Become(&TThis::StateBroken);
171180
return;
172181
} else if (scheme != "grpc://" && scheme != "grpcs://") {
173-
ALOG_ERROR(WILSON_SERVICE_ID, "Wrong scheme provided: " << scheme << " (only grpc:// and grpcs:// are supported). Wilson wouldn't work");
182+
TStringStream ss;
183+
ss << "Wrong scheme provided: " << scheme << " (only grpc:// and grpcs:// are supported). Wilson wouldn't work";
184+
ErrStr = ss.Str();
185+
ALOG_ERROR(WILSON_SERVICE_ID, ErrStr);
174186
Become(&TThis::StateBroken);
175187
return;
176188
}
@@ -181,6 +193,14 @@ namespace NWilson {
181193
ALOG_INFO(WILSON_SERVICE_ID, "TWilsonUploader::Bootstrap");
182194
}
183195

196+
void Registered(TActorSystem* sys, const TActorId& owner) override {
197+
TActorBootstrapped<TWilsonUploader>::Registered(sys, owner);
198+
199+
if (const auto& mon = RegisterMonPage) {
200+
mon(sys, SelfId());
201+
}
202+
}
203+
184204
void Handle(TEvWilson::TPtr ev) {
185205
if (SpansSizeBytes >= MaxPendingSpanBytes) {
186206
ALOG_ERROR(WILSON_SERVICE_ID, "dropped span due to overflow");
@@ -284,6 +304,9 @@ namespace NWilson {
284304
if (GrpcSigner) {
285305
GrpcSigner->SignClientContext(*context);
286306
}
307+
for (const auto& [key, value] : Headers) {
308+
context->AddMetadata(key, value);
309+
}
287310
auto reader = Stub->AsyncExport(context.get(), std::move(batch.Request), &CQ);
288311
auto uploadData = std::unique_ptr<TExportRequestData>(new TExportRequestData {
289312
.Context = std::move(context),
@@ -305,6 +328,8 @@ namespace NWilson {
305328
auto node = std::unique_ptr<TExportRequestData>(static_cast<TExportRequestData*>(tag));
306329
ALOG_TRACE(WILSON_SERVICE_ID, "finished export request " << (void*)node.get());
307330
if (!node->Status.ok()) {
331+
LastCommitTraceErrStr = node->Status.error_message();
332+
308333
ALOG_ERROR(WILSON_SERVICE_ID,
309334
"failed to commit traces: " << node->Status.error_message());
310335
}
@@ -351,13 +376,89 @@ namespace NWilson {
351376
TryToSend();
352377
}
353378

379+
void HandleHttp(NMon::TEvHttpInfo::TPtr &ev) {
380+
TStringStream str;
381+
str.Reserve(64 << 10);
382+
383+
bool isBroken = CurrentStateFunc() == &TThis::StateBroken;
384+
385+
HTML(str) {
386+
TAG(TH4) {str << "Current state";}
387+
PARA() {
388+
str << (isBroken ? "Broken" : "Works");
389+
}
390+
if (ErrStr) {
391+
PARA() {
392+
str << "Error: " << ErrStr;
393+
}
394+
}
395+
if (LastCommitTraceErrStr) {
396+
PARA() {
397+
str << "Last commit traces error: " << LastCommitTraceErrStr;
398+
}
399+
}
400+
PARA() {
401+
str << "Current batch size: " << CurrentBatch.SizeSpans();
402+
}
403+
PARA() {
404+
str << "Current batch queue size: " << BatchQueue.size();
405+
}
406+
PARA() {
407+
std::string state;
408+
switch (Channel->GetState(false)) {
409+
case GRPC_CHANNEL_IDLE:
410+
state = "GRPC_CHANNEL_IDLE";
411+
break;
412+
case GRPC_CHANNEL_CONNECTING:
413+
state = "GRPC_CHANNEL_CONNECTING";
414+
break;
415+
case GRPC_CHANNEL_READY:
416+
state = "GRPC_CHANNEL_READY";
417+
break;
418+
case GRPC_CHANNEL_TRANSIENT_FAILURE:
419+
state = "GRPC_CHANNEL_TRANSIENT_FAILURE";
420+
break;
421+
case GRPC_CHANNEL_SHUTDOWN:
422+
state = "GRPC_CHANNEL_SHUTDOWN";
423+
break;
424+
default:
425+
state = "UNKNOWN_STATE";
426+
break;
427+
}
428+
str << "Channel state# " << state;
429+
}
430+
TAG(TH4) {str << "Config";}
431+
PRE() {
432+
str << "MaxPendingSpanBytes# " << MaxPendingSpanBytes << '\n';
433+
str << "MaxSpansPerSecond# " << MaxSpansPerSecond << '\n';
434+
str << "MaxSpansInBatch# " << MaxSpansInBatch << '\n';
435+
str << "MaxBytesInBatch# " << MaxBytesInBatch << '\n';
436+
str << "MaxBatchAccumulation# " << MaxBatchAccumulation << '\n';
437+
str << "MaxSpanTimeInQueue# " << MaxSpanTimeInQueue << '\n';
438+
str << "MaxExportInflight# " << MaxExportInflight << '\n';
439+
str << "CollectorUrl# " << CollectorUrl << '\n';
440+
str << "ServiceName# " << ServiceName << '\n';
441+
str << "Headers# " << '\n';
442+
for (const auto& [key, value] : Headers) {
443+
str << '\t' << key << ": " << value << '\n';
444+
}
445+
}
446+
}
447+
448+
auto* result = new NMon::TEvHttpInfoRes(str.Str(), 0, NMon::IEvHttpInfoRes::EContentType::Html);
449+
450+
Send(ev->Sender, result);
451+
}
452+
354453
STRICT_STFUNC(StateWork,
355454
hFunc(TEvWilson, Handle);
356455
hFunc(TEvents::TEvWakeup, HandleWakeup);
456+
hFunc(NMon::TEvHttpInfo, HandleHttp);
357457
);
358458

359459
STRICT_STFUNC(StateBroken,
360460
IgnoreFunc(TEvWilson);
461+
hFunc(NMon::TEvHttpInfo, HandleHttp);
361462
);
362463
};
363464

ydb/library/actors/wilson/wilson_uploader.h

+5
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,13 @@ namespace NWilson {
2525
return NActors::TActorId(0, TStringBuf("WilsonUpload", 12));
2626
}
2727

28+
using TRegisterMonPageCallback = std::function<void(NActors::TActorSystem* actorSystem, const NActors::TActorId& actorId)>;
29+
2830
struct TWilsonUploaderParams {
2931
TString CollectorUrl;
3032
TString ServiceName;
3133
std::unique_ptr<IGrpcSigner> GrpcSigner;
34+
TMap<TString, TString> Headers;
3235

3336
ui64 MaxExportedSpansPerSecond = Max<ui64>();
3437
ui64 MaxSpansInBatch = 150;
@@ -37,6 +40,8 @@ namespace NWilson {
3740
ui32 SpanExportTimeoutSeconds = 60 * 60 * 24 * 365;
3841
ui64 MaxExportRequestsInflight = 1;
3942

43+
TRegisterMonPageCallback RegisterMonPage;
44+
4045
NActors::IActor* CreateUploader() &&;
4146
};
4247

0 commit comments

Comments
 (0)