Skip to content

Commit f828cc9

Browse files
authored
Fix duplicated responses in collect/propose query (#8249)
1 parent 4042963 commit f828cc9

File tree

2 files changed

+91
-49
lines changed

2 files changed

+91
-49
lines changed

ydb/core/blobstorage/nodewarden/distconf.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,8 @@ namespace NKikimr::NStorage {
323323
// Root node operation
324324

325325
void CheckRootNodeStatus();
326+
void BecomeRoot();
327+
void UnbecomeRoot();
326328
void HandleErrorTimeout();
327329
void ProcessGather(TEvGather *res);
328330
bool HasQuorum() const;
@@ -549,7 +551,11 @@ namespace NKikimr::NStorage {
549551
}
550552

551553
// process responses
554+
std::set<TNodeIdentifier> seen;
552555
generateSuccessful([&](const TNodeIdentifier& node) {
556+
const auto& [_, inserted] = seen.insert(node);
557+
Y_ABORT_UNLESS(inserted);
558+
553559
const auto it = nodeMap.find(node.NodeId());
554560
if (it == nodeMap.end() || TNodeIdentifier(*it->second) != node) { // unexpected node answers
555561
return;

ydb/core/blobstorage/nodewarden/distconf_fsm.cpp

Lines changed: 85 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,39 @@ namespace NKikimr::NStorage {
1717
if (RootState == ERootState::INITIAL && hasQuorum) { // becoming root node
1818
Y_ABORT_UNLESS(!Scepter);
1919
Scepter = std::make_shared<TScepter>();
20-
21-
auto makeAllBoundNodes = [&] {
22-
TStringStream s;
23-
const char *sep = "{";
24-
for (const auto& [nodeId, _] : AllBoundNodes) {
25-
s << std::exchange(sep, " ") << nodeId;
26-
}
27-
s << '}';
28-
return s.Str();
29-
};
30-
STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Starting config collection", (Scepter, Scepter->Id),
31-
(AllBoundNodes, makeAllBoundNodes()));
32-
RootState = ERootState::IN_PROGRESS;
33-
TEvScatter task;
34-
task.SetTaskId(RandomNumber<ui64>());
35-
task.MutableCollectConfigs();
36-
IssueScatterTask(TActorId(), std::move(task));
20+
BecomeRoot();
3721
} else if (Scepter && !hasQuorum) { // unbecoming root node -- lost quorum
3822
SwitchToError("quorum lost");
3923
}
4024
}
4125

26+
void TDistributedConfigKeeper::BecomeRoot() {
27+
auto makeAllBoundNodes = [&] {
28+
TStringStream s;
29+
const char *sep = "{";
30+
for (const auto& [nodeId, _] : AllBoundNodes) {
31+
s << std::exchange(sep, " ") << nodeId;
32+
}
33+
s << '}';
34+
return s.Str();
35+
};
36+
STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Starting config collection", (Scepter, Scepter->Id),
37+
(AllBoundNodes, makeAllBoundNodes()));
38+
RootState = ERootState::IN_PROGRESS;
39+
TEvScatter task;
40+
task.SetTaskId(RandomNumber<ui64>());
41+
task.MutableCollectConfigs();
42+
IssueScatterTask(TActorId(), std::move(task));
43+
}
44+
45+
void TDistributedConfigKeeper::UnbecomeRoot() {
46+
}
47+
4248
void TDistributedConfigKeeper::SwitchToError(const TString& reason) {
4349
STLOG(PRI_ERROR, BS_NODE, NWDC38, "SwitchToError", (RootState, RootState), (Reason, reason));
50+
if (Scepter) {
51+
UnbecomeRoot();
52+
}
4453
Scepter.reset();
4554
RootState = ERootState::ERROR_TIMEOUT;
4655
ErrorReason = reason;
@@ -498,66 +507,93 @@ namespace NKikimr::NStorage {
498507
void TDistributedConfigKeeper::Perform(TEvGather::TCollectConfigs *response,
499508
const TEvScatter::TCollectConfigs& /*request*/, TScatterTask& task) {
500509
THashMap<TStorageConfigMeta, TEvGather::TCollectConfigs::TNode*> baseConfigs;
510+
THashSet<TNodeIdentifier> nodesAlreadyReplied{SelfNode};
501511

502-
auto addBaseConfig = [&](const TEvGather::TCollectConfigs::TNode& item) {
503-
const auto& config = item.GetBaseConfig();
504-
auto& ptr = baseConfigs[config];
505-
if (!ptr) {
506-
ptr = response->AddNodes();
507-
ptr->MutableBaseConfig()->CopyFrom(config);
508-
}
509-
ptr->MutableNodeIds()->MergeFrom(item.GetNodeIds());
510-
};
511-
512-
auto addPerDiskConfig = [&](const TEvGather::TCollectConfigs::TPersistentConfig& item, auto addFunc, auto& set) {
513-
const auto& config = item.GetConfig();
514-
auto& ptr = set[config];
515-
if (!ptr) {
516-
ptr = (response->*addFunc)();
517-
ptr->MutableConfig()->CopyFrom(config);
518-
}
519-
ptr->MutableDisks()->MergeFrom(item.GetDisks());
520-
};
521-
522-
TEvGather::TCollectConfigs::TNode s;
523-
SelfNode.Serialize(s.AddNodeIds());
524-
auto *cfg = s.MutableBaseConfig();
525-
cfg->CopyFrom(BaseConfig);
526-
addBaseConfig(s);
512+
auto *ptr = response->AddNodes();
513+
ptr->MutableBaseConfig()->CopyFrom(BaseConfig);
514+
SelfNode.Serialize(ptr->AddNodeIds());
515+
baseConfigs.emplace(BaseConfig, ptr);
527516

528517
THashMap<TStorageConfigMeta, TEvGather::TCollectConfigs::TPersistentConfig*> committedConfigs;
529518
THashMap<TStorageConfigMeta, TEvGather::TCollectConfigs::TPersistentConfig*> proposedConfigs;
530-
531519
for (auto& item : *response->MutableCommittedConfigs()) {
532520
committedConfigs[item.GetConfig()] = &item;
533521
}
534522
for (auto& item : *response->MutableProposedConfigs()) {
535523
proposedConfigs[item.GetConfig()] = &item;
536524
}
537525

526+
auto addBaseConfig = [&](const TEvGather::TCollectConfigs::TNode& item, auto *nodesToIgnore) {
527+
const auto& config = item.GetBaseConfig();
528+
auto& ptr = baseConfigs[config];
529+
for (const auto& nodeId : item.GetNodeIds()) {
530+
TNodeIdentifier n(nodeId);
531+
if (const auto [_, inserted] = nodesAlreadyReplied.emplace(n); inserted) {
532+
if (!ptr) {
533+
ptr = response->AddNodes();
534+
ptr->MutableBaseConfig()->CopyFrom(config);
535+
}
536+
ptr->AddNodeIds()->CopyFrom(nodeId);
537+
} else {
538+
nodesToIgnore->insert(std::move(n));
539+
}
540+
}
541+
};
542+
543+
auto addPerDiskConfig = [&](const TEvGather::TCollectConfigs::TPersistentConfig& item, auto addFunc, auto& set,
544+
const auto& nodesToIgnore) {
545+
const auto& config = item.GetConfig();
546+
auto& ptr = set[config];
547+
for (const auto& disk : item.GetDisks()) {
548+
if (!nodesToIgnore.contains(TNodeIdentifier(disk.GetNodeId()))) {
549+
if (!ptr) {
550+
ptr = (response->*addFunc)();
551+
ptr->MutableConfig()->CopyFrom(config);
552+
}
553+
ptr->AddDisks()->CopyFrom(disk);
554+
}
555+
}
556+
};
557+
538558
for (const auto& reply : task.CollectedResponses) {
539559
if (reply.HasCollectConfigs()) {
540560
const auto& cc = reply.GetCollectConfigs();
561+
562+
THashSet<TNodeIdentifier> nodesToIgnore;
541563
for (const auto& item : cc.GetNodes()) {
542-
addBaseConfig(item);
564+
addBaseConfig(item, &nodesToIgnore);
543565
}
544566
for (const auto& item : cc.GetCommittedConfigs()) {
545-
addPerDiskConfig(item, &TEvGather::TCollectConfigs::AddCommittedConfigs, committedConfigs);
567+
addPerDiskConfig(item, &TEvGather::TCollectConfigs::AddCommittedConfigs, committedConfigs, nodesToIgnore);
546568
}
547569
for (const auto& item : cc.GetProposedConfigs()) {
548-
addPerDiskConfig(item, &TEvGather::TCollectConfigs::AddProposedConfigs, proposedConfigs);
570+
addPerDiskConfig(item, &TEvGather::TCollectConfigs::AddProposedConfigs, proposedConfigs, nodesToIgnore);
571+
}
572+
for (const auto& item : cc.GetNoMetadata()) {
573+
if (!nodesToIgnore.contains(TNodeIdentifier(item.GetNodeId()))) {
574+
response->AddNoMetadata()->CopyFrom(item);
575+
}
576+
}
577+
for (const auto& item : cc.GetErrors()) {
578+
if (!nodesToIgnore.contains(TNodeIdentifier(item.GetNodeId()))) {
579+
response->AddErrors()->CopyFrom(item);
580+
}
549581
}
550-
response->MutableNoMetadata()->MergeFrom(cc.GetNoMetadata());
551-
response->MutableErrors()->MergeFrom(cc.GetErrors());
552582
}
553583
}
554584
}
555585

556586
void TDistributedConfigKeeper::Perform(TEvGather::TProposeStorageConfig *response,
557587
const TEvScatter::TProposeStorageConfig& /*request*/, TScatterTask& task) {
588+
THashSet<TNodeIdentifier> nodesAlreadyReplied;
558589
for (const auto& reply : task.CollectedResponses) {
559590
if (reply.HasProposeStorageConfig()) {
560-
response->MutableStatus()->MergeFrom(reply.GetProposeStorageConfig().GetStatus());
591+
const auto& config = reply.GetProposeStorageConfig();
592+
for (const auto& status : config.GetStatus()) {
593+
if (const auto [_, inserted] = nodesAlreadyReplied.insert(status.GetNodeId()); inserted) {
594+
response->AddStatus()->CopyFrom(status);
595+
}
596+
}
561597
}
562598
}
563599
}

0 commit comments

Comments
 (0)