Skip to content

Commit f4a9ab2

Browse files
committed
Fix internal PQRB state (ydb-platform#7792)
1 parent 01cca87 commit f4a9ab2

File tree

2 files changed

+25
-14
lines changed

2 files changed

+25
-14
lines changed

ydb/core/persqueue/read_balancer__balancing.cpp

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ void TPartitionFamily::AttachePartitions(const std::vector<ui32>& partitions, co
341341
}
342342

343343
auto [activePartitionCount, inactivePartitionCount] = ClassifyPartitions(newPartitions);
344-
ChangePartitionCounters(activePartitionCount, activePartitionCount);
344+
ChangePartitionCounters(activePartitionCount, inactivePartitionCount);
345345

346346
if (IsActive()) {
347347
if (!Session->AllPartitionsReadable(newPartitions)) {
@@ -391,7 +391,7 @@ void TPartitionFamily::InactivatePartition(ui32 partitionId) {
391391
ActivePartitionCount += active;
392392
InactivePartitionCount += inactive;
393393

394-
if (IsActive()) {
394+
if (IsActive() && Session) {
395395
Session->ActivePartitionCount += active;
396396
Session->InactivePartitionCount += inactive;
397397
}
@@ -709,30 +709,39 @@ bool TConsumer::BreakUpFamily(TPartitionFamily* family, ui32 partitionId, bool d
709709
}
710710

711711
std::vector<ui32> members;
712-
713712
GetPartitionGraph().Travers(id, [&](auto childId) {
714713
if (partitions.contains(childId)) {
715-
members.push_back(childId);
716714
auto [_, i] = processedPartitions.insert(childId);
717715
if (!i) {
718716
familiesIntersect = true;
717+
} else {
718+
members.push_back(childId);
719719
}
720720

721721
return true;
722722
}
723723
return false;
724724
});
725725

726-
auto* f = CreateFamily({id}, family->Status, ctx);
727-
f->Partitions.insert(f->Partitions.end(), members.begin(), members.end());
726+
bool locked = family->Session && (family->LockedPartitions.contains(id) ||
727+
std::any_of(members.begin(), members.end(), [family](auto id) { return family->LockedPartitions.contains(id); }));
728+
auto* f = CreateFamily({id}, locked ? family->Status : TPartitionFamily::EStatus::Free, ctx);
728729
f->TargetStatus = family->TargetStatus;
729-
f->Session = family->Session;
730-
f->LockedPartitions = Intercept(family->LockedPartitions, f->Partitions);
730+
f->Partitions.insert(f->Partitions.end(), members.begin(), members.end());
731731
f->LastPipe = family->LastPipe;
732-
if (f->Session) {
732+
f->UpdatePartitionMapping(f->Partitions);
733+
f->ClassifyPartitions();
734+
if (locked) {
735+
f->LockedPartitions = Intercept(family->LockedPartitions, f->Partitions);
736+
737+
f->Session = family->Session;
733738
f->Session->Families.try_emplace(f->Id, f);
739+
f->Session->ActivePartitionCount += f->ActivePartitionCount;
740+
f->Session->InactivePartitionCount += f->InactivePartitionCount;
734741
if (f->IsActive()) {
735742
++f->Session->ActiveFamilyCount;
743+
} else if (f->IsRelesing()) {
744+
++f->Session->ReleasingFamilyCount;
736745
}
737746
}
738747

@@ -1299,7 +1308,7 @@ void TConsumer::Balance(const TActorContext& ctx) {
12991308
}
13001309
}
13011310

1302-
if (session->ActiveFamilyCount > desiredFamilyCount) {
1311+
if (allowPlusOne && session->ActiveFamilyCount > desiredFamilyCount) {
13031312
--allowPlusOne;
13041313
}
13051314
}

ydb/core/persqueue/read_balancer__balancing_app.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,8 @@ void TBalancer::RenderApp(TStringStream& str) const {
171171
TABLEH() { }
172172
TABLEH() { str << "Id"; }
173173
TABLEH() { str << "Partitions"; }
174-
TABLEH() { str << "<span alt=\"All / Active / Releasing\">Families</span>"; }
175-
TABLEH() { str << "<span alt=\"Active / Inactive / Releasing\">Statistics</span>"; };
174+
TABLEH() { str << "<span title=\"All families / Active / Releasing\">Families</span>"; }
175+
TABLEH() { str << "<span title=\"All partitions / Active / Inactive / Releasing\">Statistics</span>"; };
176176
TABLEH() { str << "Client node"; }
177177
TABLEH() { str << "Proxy node"; }
178178
}
@@ -203,7 +203,8 @@ void TBalancer::RenderApp(TStringStream& str) const {
203203
TABLED() { str << session->SessionName; }
204204
TABLED() { str << (session->Partitions.empty() ? "" : JoinRange(", ", session->Partitions.begin(), session->Partitions.end())); }
205205
TABLED() { str << session->Families.size() << " / " << session->ActiveFamilyCount << " / " << session->ReleasingFamilyCount; }
206-
TABLED() { str << session->ActivePartitionCount << " / " << session->InactivePartitionCount << " / " << session->ReleasingPartitionCount; }
206+
TABLED() { str << (session->ActivePartitionCount + session->InactivePartitionCount + session->ReleasingPartitionCount)
207+
<< " / " << session->ActivePartitionCount << " / " << session->InactivePartitionCount << " / " << session->ReleasingPartitionCount; }
207208
TABLED() { str << session->ClientNode; }
208209
TABLED() { str << session->ProxyNodeId; }
209210
}
@@ -213,7 +214,8 @@ void TBalancer::RenderApp(TStringStream& str) const {
213214
TABLED() { str << "<strong>Total:</strong>"; }
214215
TABLED() { }
215216
TABLED() { str << familyAllCount << " / " << activeFamilyCount << " / " << releasingFamilyCount; }
216-
TABLED() { str << activePartitionCount << " / " << inactivePartitionCount << " / " << releasingPartitionCount; }
217+
TABLED() { str << (activePartitionCount + inactivePartitionCount + releasingPartitionCount) << " / " << activePartitionCount << " / "
218+
<< inactivePartitionCount << " / " << releasingPartitionCount; }
217219
TABLED() { }
218220
TABLED() { }
219221
}

0 commit comments

Comments
 (0)