Skip to content

Commit 25c9852

Browse files
committed
Fix balancing of large topics (ydb-platform#7317)
1 parent 421c1c6 commit 25c9852

9 files changed

+179
-95
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ auto GetStepAndTxId(const E& event)
5555
return GetStepAndTxId(event.Step, event.TxId);
5656
}
5757

58-
bool TPartition::LastOffsetHasBeenCommited(const TUserInfo& userInfo) const {
59-
return !IsActive() && static_cast<ui64>(std::max<i64>(userInfo.Offset, 0)) == EndOffset;
58+
bool TPartition::LastOffsetHasBeenCommited(const TUserInfoBase& userInfo) const {
59+
return !IsActive() && (static_cast<ui64>(std::max<i64>(userInfo.Offset, 0)) == EndOffset || StartOffset == EndOffset);
6060
}
6161

6262
struct TMirrorerInfo {
@@ -2857,6 +2857,10 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
28572857

28582858
userInfo.Offset = offset;
28592859

2860+
if (LastOffsetHasBeenCommited(userInfo)) {
2861+
SendReadingFinished(user);
2862+
}
2863+
28602864
auto counter = setSession ? COUNTER_PQ_CREATE_SESSION_OK : (dropSession ? COUNTER_PQ_DELETE_SESSION_OK : COUNTER_PQ_SET_CLIENT_OFFSET_OK);
28612865
TabletCounters.Cumulative()[counter].Increment(1);
28622866
}

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
148148
bool CanWrite() const;
149149
bool CanEnqueue() const;
150150

151-
bool LastOffsetHasBeenCommited(const TUserInfo& userInfo) const;
151+
bool LastOffsetHasBeenCommited(const TUserInfoBase& userInfo) const;
152152

153153
void ReplyError(const TActorContext& ctx, const ui64 dst, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error);
154154
void ReplyPropose(const TActorContext& ctx, const NKikimrPQ::TEvProposeTransaction& event, NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode,

ydb/core/persqueue/partition_init.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,6 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
176176

177177
case NKikimrProto::NODATA:
178178
Partition()->Config = Partition()->TabletConfig;
179-
Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition.OriginalPartitionId);
180-
Partition()->PartitionGraph = MakePartitionGraph(Partition()->Config);
181179
break;
182180

183181
case NKikimrProto::ERROR:
@@ -191,6 +189,9 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
191189
Y_ABORT("bad status");
192190
};
193191

192+
Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition.OriginalPartitionId);
193+
Partition()->PartitionGraph = MakePartitionGraph(Partition()->Config);
194+
194195
Done(ctx);
195196
}
196197

ydb/core/persqueue/read_balancer.cpp

Lines changed: 71 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,6 @@ static constexpr TDuration ACL_SUCCESS_RETRY_TIMEOUT = TDuration::Seconds(30);
2121
static constexpr TDuration ACL_ERROR_RETRY_TIMEOUT = TDuration::Seconds(5);
2222
static constexpr TDuration ACL_EXPIRATION_TIMEOUT = TDuration::Minutes(5);
2323

24-
NKikimrPQ::EConsumerScalingSupport DefaultScalingSupport() {
25-
// TODO fix me after support of paremeter ConsumerScalingSupport
26-
return AppData()->FeatureFlags.GetEnableTopicSplitMerge() ? NKikimrPQ::EConsumerScalingSupport::FULL_SUPPORT
27-
: NKikimrPQ::EConsumerScalingSupport::NOT_SUPPORT;
28-
}
29-
3024
TString EncodeAnchor(const TString& v) {
3125
auto r = Base64Encode(v);
3226
while (r.EndsWith('=')) {
@@ -197,46 +191,95 @@ TString TPersQueueReadBalancer::GenerateStat() {
197191

198192
TStringStream str;
199193
HTML(str) {
200-
TAG(TH2) {str << "PersQueueReadBalancer Tablet";}
201-
TAG(TH3) {str << "Topic: " << Topic;}
202-
TAG(TH3) {str << "Generation: " << Generation;}
203-
TAG(TH3) {str << "Inited: " << Inited;}
204-
TAG(TH3) {str << "ActivePipes: " << balancerStatistcs.Sessions.size();}
205-
if (Inited) {
206-
TAG(TH3) {str << "Active partitions: " << NumActiveParts;}
207-
TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedSec: " << metrics.TotalAvgWriteSpeedPerSec << "/" << metrics.MaxAvgWriteSpeedPerSec << "/" << metrics.TotalAvgWriteSpeedPerSec / NumActiveParts;}
208-
TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedMin: " << metrics.TotalAvgWriteSpeedPerMin << "/" << metrics.MaxAvgWriteSpeedPerMin << "/" << metrics.TotalAvgWriteSpeedPerMin / NumActiveParts;}
209-
TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedHour: " << metrics.TotalAvgWriteSpeedPerHour << "/" << metrics.MaxAvgWriteSpeedPerHour << "/" << metrics.TotalAvgWriteSpeedPerHour / NumActiveParts;}
210-
TAG(TH3) {str << "[Total/Max/Avg]WriteSpeedDay: " << metrics.TotalAvgWriteSpeedPerDay << "/" << metrics.MaxAvgWriteSpeedPerDay << "/" << metrics.TotalAvgWriteSpeedPerDay / NumActiveParts;}
211-
TAG(TH3) {str << "TotalDataSize: " << AggregatedStats.TotalDataSize;}
212-
TAG(TH3) {str << "ReserveSize: " << PartitionReserveSize();}
213-
TAG(TH3) {str << "TotalUsedReserveSize: " << AggregatedStats.TotalUsedReserveSize;}
214-
}
194+
str << "<style>"
195+
<< " .properties { border-bottom-style: solid; border-top-style: solid; border-width: 1px; border-color: darkgrey; padding-bottom: 10px; } "
196+
<< " .properties>tbody>tr>td { padding-left: 10px; padding-right: 10px; } "
197+
<< " .tgrid { width: 100%; border: 0; }"
198+
<< " .tgrid>tbody>tr>td { vertical-align: top; }"
199+
<< "</style>";
200+
201+
TAG(TH3) {str << "PersQueueReadBalancer " << TabletID() << " (" << Path << ")";}
202+
203+
auto property = [&](const TString& name, const auto value) {
204+
TABLER() {
205+
TABLED() { str << name;}
206+
TABLED() { str << value; }
207+
}
208+
};
215209

216210
UL_CLASS("nav nav-tabs") {
217211
LI_CLASS("active") {
218-
str << "<a href=\"#main\" data-toggle=\"tab\">partitions</a>";
212+
str << "<a href=\"#generic\" data-toggle=\"tab\">Generic Info</a>";
213+
}
214+
LI() {
215+
str << "<a href=\"#partitions\" data-toggle=\"tab\">Partitions</a>";
219216
}
220217
for (auto& consumer : balancerStatistcs.Consumers) {
221218
LI() {
222219
str << "<a href=\"#c_" << EncodeAnchor(consumer.ConsumerName) << "\" data-toggle=\"tab\">" << NPersQueue::ConvertOldConsumerName(consumer.ConsumerName) << "</a>";
223220
}
224221
}
225222
}
223+
226224
DIV_CLASS("tab-content") {
227-
DIV_CLASS_ID("tab-pane fade in active", "main") {
228-
TABLE_SORTABLE_CLASS("table") {
225+
DIV_CLASS_ID("tab-pane fade in active", "generic") {
226+
TABLE_CLASS("tgrid") {
227+
TABLEBODY() {
228+
TABLER() {
229+
TABLED() {
230+
TABLE_CLASS("properties") {
231+
CAPTION() { str << "Tablet info"; }
232+
TABLEBODY() {
233+
property("Topic", Topic);
234+
property("Path", Path);
235+
property("Initialized", Inited ? "yes" : "no");
236+
property("SchemeShard", TStringBuilder() << "<a href=\"?TabletID=" << SchemeShardId << "\">" << SchemeShardId << "</a>");
237+
property("PathId", PathId);
238+
property("Version", Version);
239+
property("Generation", Generation);
240+
}
241+
}
242+
}
243+
TABLED() {
244+
if (Inited) {
245+
TABLE_CLASS("properties") {
246+
CAPTION() { str << "Statistics"; }
247+
TABLEBODY() {
248+
property("Active pipes", balancerStatistcs.Sessions.size());
249+
property("Active partitions", NumActiveParts);
250+
property("Total data size", AggregatedStats.TotalDataSize);
251+
property("Reserve size", PartitionReserveSize());
252+
property("Used reserve size", AggregatedStats.TotalUsedReserveSize);
253+
property("[Total/Max/Avg]WriteSpeedSec", TStringBuilder() << metrics.TotalAvgWriteSpeedPerSec << "/" << metrics.MaxAvgWriteSpeedPerSec << "/" << metrics.TotalAvgWriteSpeedPerSec / NumActiveParts);
254+
property("[Total/Max/Avg]WriteSpeedMin", TStringBuilder() << metrics.TotalAvgWriteSpeedPerMin << "/" << metrics.MaxAvgWriteSpeedPerMin << "/" << metrics.TotalAvgWriteSpeedPerMin / NumActiveParts);
255+
property("[Total/Max/Avg]WriteSpeedHour", TStringBuilder() << metrics.TotalAvgWriteSpeedPerHour << "/" << metrics.MaxAvgWriteSpeedPerHour << "/" << metrics.TotalAvgWriteSpeedPerHour / NumActiveParts);
256+
property("[Total/Max/Avg]WriteSpeedDay", TStringBuilder() << metrics.TotalAvgWriteSpeedPerDay << "/" << metrics.MaxAvgWriteSpeedPerDay << "/" << metrics.TotalAvgWriteSpeedPerDay / NumActiveParts);
257+
}
258+
}
259+
}
260+
}
261+
}
262+
}
263+
}
264+
}
265+
266+
DIV_CLASS_ID("tab-pane fade", "partitions") {
267+
TABLE_CLASS("table") {
229268
TABLEHEAD() {
230269
TABLER() {
231270
TABLEH() {str << "partition";}
232271
TABLEH() { str << "tabletId";}
272+
TABLEH() { str << "Size";}
233273
}
234274
}
235275
TABLEBODY() {
236-
for (auto& p : PartitionsInfo) {
276+
for (auto& [partitionId, partitionInfo] : PartitionsInfo) {
277+
const auto& stats = AggregatedStats.Stats[partitionId];
278+
237279
TABLER() {
238-
TABLED() { str << p.first;}
239-
TABLED() { HREF(TStringBuilder() << "?TabletID=" << p.second.TabletId) { str << p.second.TabletId; } }
280+
TABLED() { str << partitionId;}
281+
TABLED() { HREF(TStringBuilder() << "?TabletID=" << partitionInfo.TabletId) { str << partitionInfo.TabletId; } }
282+
TABLED() { str << stats.DataSize;}
240283
}
241284
}
242285
}
@@ -659,6 +702,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c
659702

660703
if (AggregatedStats.Cookies.empty()) {
661704
CheckStat(ctx);
705+
Balancer->ProcessPendingStats(ctx);
662706
}
663707
}
664708

@@ -762,7 +806,6 @@ void TPersQueueReadBalancer::CheckStat(const TActorContext& ctx) {
762806

763807
NTabletPipe::SendData(ctx, GetPipeClient(SchemeShardId, ctx), ev);
764808

765-
766809
UpdateCounters(ctx);
767810
}
768811

ydb/core/persqueue/read_balancer.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,5 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
355355

356356
TString EncodeAnchor(const TString& value);
357357

358-
NKikimrPQ::EConsumerScalingSupport DefaultScalingSupport();
359-
360358
}
361359
}

ydb/core/persqueue/read_balancer__balancing.cpp

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ bool TPartition::Reset() {
6565
bool result = IsInactive();
6666

6767
ScaleAwareSDK = false;
68+
StartedReadingFromEndOffset = false;
6869
ReadingFinished = false;
6970
Commited = false;
7071
++Cookie;
@@ -1631,6 +1632,12 @@ void TBalancer::Handle(TEvPQ::TEvWakeupReleasePartition::TPtr &ev, const TActorC
16311632
return;
16321633
}
16331634

1635+
if (partition->Commited) {
1636+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
1637+
GetPrefix() << "skip releasing partition " << msg->PartitionId << " of consumer \"" << msg->Consumer << "\" by reading finished timeout because offset is commited");
1638+
return;
1639+
}
1640+
16341641
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
16351642
GetPrefix() << "releasing partition " << msg->PartitionId << " of consumer \"" << msg->Consumer << "\" by reading finished timeout");
16361643

@@ -1821,29 +1828,28 @@ void TBalancer::Handle(TEvPQ::TEvBalanceConsumer::TPtr& ev, const TActorContext&
18211828
}
18221829

18231830
void TBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) {
1824-
struct TData {
1825-
ui32 Generation;
1826-
ui64 Cookie;
1827-
const TString& Consumer;
1828-
};
1829-
1830-
std::unordered_map<ui32, std::vector<TData>> index;
1831-
18321831
const auto& record = ev->Get()->Record;
18331832
for (const auto& partResult : record.GetPartResult()) {
18341833
for (const auto& consumerResult : partResult.GetConsumerResult()) {
1835-
if (consumerResult.GetReadingFinished()) {
1836-
index[partResult.GetPartition()].push_back(TData{partResult.GetGeneration(), partResult.GetCookie(), consumerResult.GetConsumer()});
1837-
}
1834+
PendingUpdates[partResult.GetPartition()].push_back(TData{partResult.GetGeneration(), partResult.GetCookie(), consumerResult.GetConsumer(), consumerResult.GetReadingFinished()});
18381835
}
18391836
}
1837+
}
1838+
1839+
void TBalancer::ProcessPendingStats(const TActorContext& ctx) {
1840+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
1841+
GetPrefix() << "ProcessPendingStats. PendingUpdates size " << PendingUpdates.size());
18401842

18411843
GetPartitionGraph().Travers([&](ui32 id) {
1842-
for (auto& d : index[id]) {
1843-
SetCommittedState(d.Consumer, id, d.Generation, d.Cookie, ctx);
1844+
for (auto& d : PendingUpdates[id]) {
1845+
if (d.Commited) {
1846+
SetCommittedState(d.Consumer, id, d.Generation, d.Cookie, ctx);
1847+
}
18441848
}
18451849
return true;
18461850
});
1851+
1852+
PendingUpdates.clear();
18471853
}
18481854

18491855
TString TBalancer::GetPrefix() const {

ydb/core/persqueue/read_balancer__balancing.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,9 @@ class TBalancer {
349349
void Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr& ev, const TActorContext& ctx);
350350

351351
void Handle(TEvPQ::TEvBalanceConsumer::TPtr& ev, const TActorContext& ctx);
352+
352353
void Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx);
354+
void ProcessPendingStats(const TActorContext& ctx);
353355

354356
void RenderApp(TStringStream& str) const;
355357

@@ -364,6 +366,14 @@ class TBalancer {
364366
std::unordered_map<TString, std::unique_ptr<TConsumer>> Consumers;
365367

366368
ui32 Step;
369+
370+
struct TData {
371+
ui32 Generation;
372+
ui64 Cookie;
373+
const TString Consumer;
374+
bool Commited;
375+
};
376+
std::unordered_map<ui32, std::vector<TData>> PendingUpdates;
367377
};
368378

369379
}

0 commit comments

Comments
 (0)