Skip to content

Commit 37b6f88

Browse files
authored
Merge 1990fba into 4c79b47
2 parents 4c79b47 + 1990fba commit 37b6f88

File tree

2 files changed

+25
-14
lines changed

2 files changed

+25
-14
lines changed

ydb/core/persqueue/read_balancer__balancing.cpp

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ void TPartitionFamily::AttachePartitions(const std::vector<ui32>& partitions, co
341341
}
342342

343343
auto [activePartitionCount, inactivePartitionCount] = ClassifyPartitions(newPartitions);
344-
ChangePartitionCounters(activePartitionCount, activePartitionCount);
344+
ChangePartitionCounters(activePartitionCount, inactivePartitionCount);
345345

346346
if (IsActive()) {
347347
if (!Session->AllPartitionsReadable(newPartitions)) {
@@ -391,7 +391,7 @@ void TPartitionFamily::InactivatePartition(ui32 partitionId) {
391391
ActivePartitionCount += active;
392392
InactivePartitionCount += inactive;
393393

394-
if (IsActive()) {
394+
if (IsActive() && Session) {
395395
Session->ActivePartitionCount += active;
396396
Session->InactivePartitionCount += inactive;
397397
}
@@ -710,30 +710,39 @@ bool TConsumer::BreakUpFamily(TPartitionFamily* family, ui32 partitionId, bool d
710710
}
711711

712712
std::vector<ui32> members;
713-
714713
GetPartitionGraph().Travers(id, [&](auto childId) {
715714
if (partitions.contains(childId)) {
716-
members.push_back(childId);
717715
auto [_, i] = processedPartitions.insert(childId);
718716
if (!i) {
719717
familiesIntersect = true;
718+
} else {
719+
members.push_back(childId);
720720
}
721721

722722
return true;
723723
}
724724
return false;
725725
});
726726

727-
auto* f = CreateFamily({id}, family->Status, ctx);
728-
f->Partitions.insert(f->Partitions.end(), members.begin(), members.end());
727+
bool locked = family->Session && (family->LockedPartitions.contains(id) ||
728+
std::any_of(members.begin(), members.end(), [family](auto id) { return family->LockedPartitions.contains(id); }));
729+
auto* f = CreateFamily({id}, locked ? family->Status : TPartitionFamily::EStatus::Free, ctx);
729730
f->TargetStatus = family->TargetStatus;
730-
f->Session = family->Session;
731-
f->LockedPartitions = Intercept(family->LockedPartitions, f->Partitions);
731+
f->Partitions.insert(f->Partitions.end(), members.begin(), members.end());
732732
f->LastPipe = family->LastPipe;
733-
if (f->Session) {
733+
f->UpdatePartitionMapping(f->Partitions);
734+
f->ClassifyPartitions();
735+
if (locked) {
736+
f->LockedPartitions = Intercept(family->LockedPartitions, f->Partitions);
737+
738+
f->Session = family->Session;
734739
f->Session->Families.try_emplace(f->Id, f);
740+
f->Session->ActivePartitionCount += f->ActivePartitionCount;
741+
f->Session->InactivePartitionCount += f->InactivePartitionCount;
735742
if (f->IsActive()) {
736743
++f->Session->ActiveFamilyCount;
744+
} else if (f->IsRelesing()) {
745+
++f->Session->ReleasingFamilyCount;
737746
}
738747
}
739748

@@ -1300,7 +1309,7 @@ void TConsumer::Balance(const TActorContext& ctx) {
13001309
}
13011310
}
13021311

1303-
if (session->ActiveFamilyCount > desiredFamilyCount) {
1312+
if (allowPlusOne && session->ActiveFamilyCount > desiredFamilyCount) {
13041313
--allowPlusOne;
13051314
}
13061315
}

ydb/core/persqueue/read_balancer__balancing_app.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,8 @@ void TBalancer::RenderApp(TStringStream& str) const {
171171
TABLEH() { }
172172
TABLEH() { str << "Id"; }
173173
TABLEH() { str << "Partitions"; }
174-
TABLEH() { str << "<span alt=\"All / Active / Releasing\">Families</span>"; }
175-
TABLEH() { str << "<span alt=\"Active / Inactive / Releasing\">Statistics</span>"; };
174+
TABLEH() { str << "<span title=\"All families / Active / Releasing\">Families</span>"; }
175+
TABLEH() { str << "<span title=\"All partitions / Active / Inactive / Releasing\">Statistics</span>"; };
176176
TABLEH() { str << "Client node"; }
177177
TABLEH() { str << "Proxy node"; }
178178
}
@@ -203,7 +203,8 @@ void TBalancer::RenderApp(TStringStream& str) const {
203203
TABLED() { str << session->SessionName; }
204204
TABLED() { str << (session->Partitions.empty() ? "" : JoinRange(", ", session->Partitions.begin(), session->Partitions.end())); }
205205
TABLED() { str << session->Families.size() << " / " << session->ActiveFamilyCount << " / " << session->ReleasingFamilyCount; }
206-
TABLED() { str << session->ActivePartitionCount << " / " << session->InactivePartitionCount << " / " << session->ReleasingPartitionCount; }
206+
TABLED() { str << (session->ActivePartitionCount + session->InactivePartitionCount) << " / " << session->ActivePartitionCount
207+
<< " / " << session->InactivePartitionCount << " / " << session->ReleasingPartitionCount; }
207208
TABLED() { str << session->ClientNode; }
208209
TABLED() { str << session->ProxyNodeId; }
209210
}
@@ -213,7 +214,8 @@ void TBalancer::RenderApp(TStringStream& str) const {
213214
TABLED() { str << "<strong>Total:</strong>"; }
214215
TABLED() { }
215216
TABLED() { str << familyAllCount << " / " << activeFamilyCount << " / " << releasingFamilyCount; }
216-
TABLED() { str << activePartitionCount << " / " << inactivePartitionCount << " / " << releasingPartitionCount; }
217+
TABLED() { str << (activePartitionCount + inactivePartitionCount) << " / " << activePartitionCount << " / " << inactivePartitionCount
218+
<< " / " << releasingPartitionCount; }
217219
TABLED() { }
218220
TABLED() { }
219221
}

0 commit comments

Comments
 (0)