Skip to content

Commit 4a18abf

Browse files
authored
KIKIMR-20597 Implemented tvm authentication for wilson uploader (#768)
* KIKIMR-20597 Implemented tvm authentication for wilson uploader * Enchancements * Renaming * Config issues fixed * Comments resolved
1 parent cfe1520 commit 4a18abf

File tree

9 files changed

+112
-22
lines changed

9 files changed

+112
-22
lines changed

ydb/core/driver_lib/run/factories.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <ydb/library/yql/providers/pq/cm_client/client.h>
1919

2020
#include <ydb/library/actors/core/actorsystem.h>
21+
#include <ydb/library/actors/wilson/wilson_uploader.h>
2122

2223
#include <functional>
2324
#include <unordered_map>
@@ -55,6 +56,8 @@ struct TModuleFactories {
5556
std::shared_ptr<NHttpProxy::IAuthFactory> DataStreamsAuthFactory;
5657
std::vector<NKikimr::NMiniKQL::TComputationNodeFactory> AdditionalComputationNodeFactories;
5758

59+
std::unique_ptr<NWilson::IGrpcSigner>(*WilsonGrpcSignerFactory)(const NKikimrConfig::TTracingConfig::TAuthConfig&);
60+
5861
~TModuleFactories();
5962
};
6063

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -370,8 +370,9 @@ static bool IsServiceInitialized(NActors::TActorSystemSetup* setup, TActorId ser
370370
return false;
371371
}
372372

373-
TBasicServicesInitializer::TBasicServicesInitializer(const TKikimrRunConfig& runConfig)
373+
TBasicServicesInitializer::TBasicServicesInitializer(const TKikimrRunConfig& runConfig, std::shared_ptr<TModuleFactories> factories)
374374
: IKikimrServicesInitializer(runConfig)
375+
, Factories(std::move(factories))
375376
{
376377
}
377378

@@ -827,10 +828,20 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
827828

828829
if (Config.HasTracingConfig()) {
829830
const auto& tracing = Config.GetTracingConfig();
831+
std::unique_ptr<NWilson::IGrpcSigner> grpcSigner;
832+
if (tracing.HasAuthConfig() && Factories && Factories->WilsonGrpcSignerFactory) {
833+
grpcSigner = Factories->WilsonGrpcSignerFactory(tracing.GetAuthConfig());
834+
}
835+
auto wilsonUploader = NWilson::WilsonUploaderParams {
836+
.Host = tracing.GetHost(),
837+
.Port = static_cast<ui16>(tracing.GetPort()),
838+
.RootCA = tracing.GetRootCA(),
839+
.ServiceName = tracing.GetServiceName(),
840+
.GrpcSigner = std::move(grpcSigner),
841+
}.CreateUploader();
830842
setup->LocalServices.emplace_back(
831843
NWilson::MakeWilsonUploaderId(),
832-
TActorSetupCmd(NWilson::CreateWilsonUploader(tracing.GetHost(), tracing.GetPort(), tracing.GetRootCA(), tracing.GetServiceName()),
833-
TMailboxType::ReadAsFilled, appData->BatchPoolId));
844+
TActorSetupCmd(wilsonUploader, TMailboxType::ReadAsFilled, appData->BatchPoolId));
834845
}
835846
}
836847

ydb/core/driver_lib/run/kikimr_services_initializers.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@ class TBasicServicesInitializer : public IKikimrServicesInitializer {
4949

5050
static ISchedulerThread* CreateScheduler(const NKikimrConfig::TActorSystemConfig::TScheduler &config);
5151

52+
std::shared_ptr<TModuleFactories> Factories;
53+
5254
public:
53-
TBasicServicesInitializer(const TKikimrRunConfig& runConfig);
55+
TBasicServicesInitializer(const TKikimrRunConfig& runConfig, std::shared_ptr<TModuleFactories> factories);
5456

5557
void InitializeServices(NActors::TActorSystemSetup *setup, const NKikimr::TAppData *appData) override;
5658
};

ydb/core/driver_lib/run/run.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1375,7 +1375,7 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
13751375
}
13761376

13771377
if (serviceMask.EnableBasicServices) {
1378-
sil->AddServiceInitializer(new TBasicServicesInitializer(runConfig));
1378+
sil->AddServiceInitializer(new TBasicServicesInitializer(runConfig, ModuleFactories));
13791379
}
13801380
if (serviceMask.EnableIcbService) {
13811381
sil->AddServiceInitializer(new TImmediateControlBoardInitializer(runConfig));

ydb/core/protos/config.proto

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1517,10 +1517,33 @@ message TCompactionConfig {
15171517
}
15181518

15191519
message TTracingConfig {
1520+
message TAuthConfig {
1521+
message TTvm {
1522+
optional string Host = 1;
1523+
optional uint32 Port = 2;
1524+
1525+
required uint32 SelfTvmId = 3;
1526+
required uint32 TracingTvmId = 4;
1527+
1528+
optional string DiskCacheDir = 5;
1529+
1530+
oneof Secret {
1531+
string PlainTextSecret = 6;
1532+
string SecretFile = 7;
1533+
string SecretEnvironmentVariable = 8;
1534+
}
1535+
}
1536+
1537+
oneof Method {
1538+
TTvm Tvm = 1;
1539+
}
1540+
}
1541+
15201542
optional string Host = 1;
15211543
optional uint32 Port = 2;
15221544
optional string RootCA = 3;
15231545
optional string ServiceName = 4;
1546+
optional TAuthConfig AuthConfig = 5;
15241547
}
15251548

15261549
message TFailureInjectionConfig {

ydb/library/actors/wilson/wilson_uploader.cpp

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
#include <opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.h>
77
#include <util/stream/file.h>
88
#include <util/string/hex.h>
9-
#include <grpc++/grpc++.h>
109
#include <chrono>
1110

1211
namespace NWilson {
@@ -32,6 +31,7 @@ namespace NWilson {
3231
std::unique_ptr<NServiceProto::TraceService::Stub> Stub;
3332
grpc::CompletionQueue CQ;
3433

34+
std::unique_ptr<IGrpcSigner> GrpcSigner;
3535
std::unique_ptr<grpc::ClientContext> Context;
3636
std::unique_ptr<grpc::ClientAsyncResponseReader<NServiceProto::ExportTraceServiceResponse>> Reader;
3737
NServiceProto::ExportTraceServiceResponse Response;
@@ -53,11 +53,12 @@ namespace NWilson {
5353
bool WakeupScheduled = false;
5454

5555
public:
56-
TWilsonUploader(TString host, ui16 port, TString rootCA, TString serviceName)
57-
: Host(std::move(host))
58-
, Port(std::move(port))
59-
, RootCA(std::move(rootCA))
60-
, ServiceName(std::move(serviceName))
56+
TWilsonUploader(WilsonUploaderParams params)
57+
: Host(std::move(params.Host))
58+
, Port(std::move(params.Port))
59+
, RootCA(std::move(params.RootCA))
60+
, ServiceName(std::move(params.ServiceName))
61+
, GrpcSigner(std::move(params.GrpcSigner))
6162
{}
6263

6364
~TWilsonUploader() {
@@ -142,6 +143,9 @@ namespace NWilson {
142143

143144
ScheduleWakeup(NextSendTimestamp);
144145
Context = std::make_unique<grpc::ClientContext>();
146+
if (GrpcSigner) {
147+
GrpcSigner->SignClientContext(*Context);
148+
}
145149
Reader = Stub->AsyncExport(Context.get(), std::move(request), &CQ);
146150
Reader->Finish(&Response, &Status, nullptr);
147151
}
@@ -192,8 +196,12 @@ namespace NWilson {
192196

193197
} // anonymous
194198

195-
IActor *CreateWilsonUploader(TString host, ui16 port, TString rootCA, TString serviceName) {
196-
return new TWilsonUploader(std::move(host), port, std::move(rootCA), std::move(serviceName));
199+
IActor* CreateWilsonUploader(WilsonUploaderParams params) {
200+
return new TWilsonUploader(std::move(params));
201+
}
202+
203+
IActor* WilsonUploaderParams::CreateUploader() && {
204+
return CreateWilsonUploader(std::move(*this));
197205
}
198206

199207
} // NWilson

ydb/library/actors/wilson/wilson_uploader.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,14 @@
44
#include <ydb/library/actors/core/event_local.h>
55
#include <ydb/library/actors/core/events.h>
66
#include <opentelemetry/proto/trace/v1/trace.pb.h>
7+
#include <grpc++/grpc++.h>
78

89
namespace NWilson {
10+
struct IGrpcSigner {
11+
virtual void SignClientContext(grpc::ClientContext& context) = 0;
12+
13+
virtual ~IGrpcSigner() = default;
14+
};
915

1016
struct TEvWilson : NActors::TEventLocal<TEvWilson, NActors::TEvents::TSystem::Wilson> {
1117
opentelemetry::proto::trace::v1::Span Span;
@@ -19,6 +25,16 @@ namespace NWilson {
1925
return NActors::TActorId(0, TStringBuf("WilsonUpload", 12));
2026
}
2127

22-
NActors::IActor *CreateWilsonUploader(TString host, ui16 port, TString rootCA, TString serviceName);
28+
struct WilsonUploaderParams {
29+
TString Host;
30+
ui16 Port;
31+
TString RootCA;
32+
TString ServiceName;
33+
std::unique_ptr<IGrpcSigner> GrpcSigner;
34+
35+
NActors::IActor* CreateUploader() &&;
36+
};
37+
38+
NActors::IActor* CreateWilsonUploader(WilsonUploaderParams params);
2339

2440
} // NWilson

ydb/tools/cfg/static.py

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,13 @@ def __init__(
114114
)
115115
self._enable_cms_config_cache = template.get("enable_cms_config_cache", enable_cms_config_cache)
116116
if "tracing" in template:
117+
tracing = template["tracing"]
117118
self.__tracing = (
118-
template["tracing"]["host"],
119-
template["tracing"]["port"],
120-
template["tracing"]["root_ca"],
121-
template["tracing"]["service_name"],
119+
tracing["host"],
120+
tracing["port"],
121+
tracing["root_ca"],
122+
tracing["service_name"],
123+
tracing.get("auth_config")
122124
)
123125
else:
124126
self.__tracing = None
@@ -1121,12 +1123,36 @@ def __generate_sys_txt(self):
11211123
def __generate_tracing_txt(self):
11221124
pb = config_pb2.TAppConfig()
11231125
if self.__tracing:
1126+
tracing_pb = pb.TracingConfig
11241127
(
1125-
pb.TracingConfig.Host,
1126-
pb.TracingConfig.Port,
1127-
pb.TracingConfig.RootCA,
1128-
pb.TracingConfig.ServiceName,
1128+
tracing_pb.Host,
1129+
tracing_pb.Port,
1130+
tracing_pb.RootCA,
1131+
tracing_pb.ServiceName,
1132+
auth_config
11291133
) = self.__tracing
1134+
1135+
if auth_config:
1136+
auth_pb = tracing_pb.AuthConfig
1137+
if "tvm" in auth_config:
1138+
tvm = auth_config.get("tvm")
1139+
tvm_pb = auth_pb.Tvm
1140+
1141+
if "host" in tvm:
1142+
tvm_pb.Host = tvm["host"]
1143+
if "port" in tvm:
1144+
tvm_pb.Port = tvm["port"]
1145+
tvm_pb.SelfTvmId = tvm["self_tvm_id"]
1146+
tvm_pb.TracingTvmId = tvm["tracing_tvm_id"]
1147+
tvm_pb.DiskCacheDir = tvm["disk_cache_dir"]
1148+
1149+
if "plain_text_secret" in tvm:
1150+
tvm_pb.PlainTextSecret = tvm["plain_text_secret"]
1151+
elif "secret_file" in tvm:
1152+
tvm_pb.SecretFile = tvm["secret_file"]
1153+
elif "secret_environment_variable" in tvm:
1154+
tvm_pb.SecretEnvironmentVariable = tvm["secret_environment_variable"]
1155+
11301156
self.__proto_configs["tracing.txt"] = pb
11311157

11321158
def __generate_sys_txt_advanced(self):

ydb/tools/cfg/validation.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@
133133
port=dict(type="integer"),
134134
root_ca=dict(type="string"),
135135
service_name=dict(type="string"),
136+
auth_config=dict(type="object"),
136137
),
137138
required=[
138139
"host",

0 commit comments

Comments
 (0)