Skip to content

Remove using msgbus register node call #6679

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 16 additions & 123 deletions ydb/core/config/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,50 +181,18 @@ class TDefaultNodeBrokerClient
}
}

static void ProcessRegistrationDynamicNodeResult(
const THolder<NClient::TRegistrationResult>& result,
NKikimrConfig::TAppConfig& appConfig,
ui32& nodeId,
TKikimrScopeId& outScopeId)
{
nodeId = result->GetNodeId();
outScopeId = TKikimrScopeId(result->GetScopeId());

auto &nsConfig = *appConfig.MutableNameserviceConfig();
nsConfig.ClearNode();

auto &dnConfig = *appConfig.MutableDynamicNodeConfig();
for (auto &node : result->Record().GetNodes()) {
if (node.GetNodeId() == result->GetNodeId()) {
dnConfig.MutableNodeInfo()->CopyFrom(node);
} else {
auto &info = *nsConfig.AddNode();
info.SetNodeId(node.GetNodeId());
info.SetAddress(node.GetAddress());
info.SetPort(node.GetPort());
info.SetHost(node.GetHost());
info.SetInterconnectHost(node.GetResolveHost());
info.MutableLocation()->CopyFrom(node.GetLocation());
}
}
}

std::variant<
NYdb::NDiscovery::TNodeRegistrationResult,
THolder<NClient::TRegistrationResult>> Result;
NYdb::NDiscovery::TNodeRegistrationResult Result;
public:
TResult(std::variant<
NYdb::NDiscovery::TNodeRegistrationResult,
THolder<NClient::TRegistrationResult>> result)
TResult(NYdb::NDiscovery::TNodeRegistrationResult result)
: Result(std::move(result))
{}

void Apply(NKikimrConfig::TAppConfig& appConfig, ui32& nodeId, TKikimrScopeId& scopeId) const override {
std::visit([&appConfig, &nodeId, &scopeId](const auto& res) mutable { ProcessRegistrationDynamicNodeResult(res, appConfig, nodeId, scopeId); }, Result);
ProcessRegistrationDynamicNodeResult(Result, appConfig, nodeId, scopeId);
}
};

static NYdb::NDiscovery::TNodeRegistrationResult TryToRegisterDynamicNodeViaDiscoveryService(
static NYdb::NDiscovery::TNodeRegistrationResult TryToRegisterDynamicNode(
const TGrpcSslSettings& grpcSettings,
const TString addr,
const NYdb::NDiscovery::TNodeRegistrationSettings& settings,
Expand Down Expand Up @@ -252,83 +220,23 @@ class TDefaultNodeBrokerClient
return result;
}

static THolder<NClient::TRegistrationResult> TryToRegisterDynamicNodeViaLegacyService(
const TGrpcSslSettings& grpcSettings,
const TString& addr,
const TNodeRegistrationSettings& settings,
const IEnv& env)
{
NClient::TKikimr kikimr(GetKikimr(grpcSettings, addr, env));
auto registrant = kikimr.GetNodeRegistrant();

return MakeHolder<NClient::TRegistrationResult>(
registrant.SyncRegisterNode(
ToString(settings.DomainName),
settings.NodeHost,
settings.InterconnectPort,
settings.NodeAddress,
settings.NodeResolveHost,
settings.Location,
settings.FixedNodeID,
settings.Path));
}

static THolder<NClient::TRegistrationResult> RegisterDynamicNodeViaLegacyService(
const TGrpcSslSettings& grpcSettings,
const TVector<TString>& addrs,
const TNodeRegistrationSettings& settings,
const IEnv& env,
IInitLogger& logger)
{
THolder<NClient::TRegistrationResult> result;
while (!result || !result->IsSuccess()) {
for (const auto& addr : addrs) {
result = TryToRegisterDynamicNodeViaLegacyService(
grpcSettings,
addr,
settings,
env);
if (result->IsSuccess()) {
logger.Out() << "Success. Registered via legacy service as " << result->GetNodeId() << Endl;
break;
}
logger.Err() << "Registration error: " << result->GetErrorMessage() << Endl;
}
if (!result || !result->IsSuccess()) {
env.Sleep(TDuration::Seconds(1));
}
}
if (!result) {
ythrow yexception() << "Invalid result";
}

if (!result->IsSuccess()) {
ythrow yexception() << "Cannot register dynamic node: " << result->GetErrorMessage();
}

return result;
}

static NYdb::NDiscovery::TNodeRegistrationResult RegisterDynamicNodeViaDiscoveryService(
static NYdb::NDiscovery::TNodeRegistrationResult RegisterDynamicNodeImpl(
const TGrpcSslSettings& grpcSettings,
const TVector<TString>& addrs,
const NYdb::NDiscovery::TNodeRegistrationSettings& settings,
const IEnv& env,
IInitLogger& logger)
{
NYdb::NDiscovery::TNodeRegistrationResult result;
const size_t maxNumberReceivedCallUnimplemented = 5;
size_t currentNumberReceivedCallUnimplemented = 0;
while (!result.IsSuccess() && currentNumberReceivedCallUnimplemented < maxNumberReceivedCallUnimplemented) {
while (!result.IsSuccess()) {
for (const auto& addr : addrs) {
logger.Out() << "Trying to register dynamic node to " << addr << Endl;
result = TryToRegisterDynamicNodeViaDiscoveryService(
grpcSettings,
addr,
settings,
env);
result = TryToRegisterDynamicNode(grpcSettings,
addr,
settings,
env);
if (result.IsSuccess()) {
logger.Out() << "Success. Registered via discovery service as " << result.GetNodeId() << Endl;
logger.Out() << "Success. Registered as " << result.GetNodeId() << Endl;
logger.Out() << "Node name: ";
if (result.HasNodeName()) {
logger.Out() << result.GetNodeName();
Expand All @@ -340,9 +248,6 @@ class TDefaultNodeBrokerClient
}
if (!result.IsSuccess()) {
env.Sleep(TDuration::Seconds(1));
if (result.GetStatus() == NYdb::EStatus::CLIENT_CALL_UNIMPLEMENTED) {
currentNumberReceivedCallUnimplemented++;
}
}
}
return result;
Expand Down Expand Up @@ -381,23 +286,11 @@ class TDefaultNodeBrokerClient
{
auto newRegSettings = GetNodeRegistrationSettings(regSettings);

std::variant<
NYdb::NDiscovery::TNodeRegistrationResult,
THolder<NClient::TRegistrationResult>> result = RegisterDynamicNodeViaDiscoveryService(
grpcSettings,
addrs,
newRegSettings,
env,
logger);

if (!std::get<NYdb::NDiscovery::TNodeRegistrationResult>(result).IsSuccess()) {
result = RegisterDynamicNodeViaLegacyService(
grpcSettings,
addrs,
regSettings,
env,
logger);
}
NYdb::NDiscovery::TNodeRegistrationResult result = RegisterDynamicNodeImpl(grpcSettings,
addrs,
newRegSettings,
env,
logger);

return std::make_shared<TResult>(std::move(result));
}
Expand Down
Loading