Skip to content

Commit b6a279d

Browse files
authored
Kafka balance fixes to 24 1 (#3537)
1 parent 9acd579 commit b6a279d

File tree

1 file changed

+13
-12
lines changed

1 file changed

+13
-12
lines changed

ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp

+13-12
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,10 @@ void TKafkaReadSessionActor::HandleWakeup(TEvKafka::TEvWakeup::TPtr, const TActo
3636
return;
3737
}
3838

39-
for (auto& topicToPartitions: NewPartitionsToLockOnTime) {
40-
auto& partitions = topicToPartitions.second;
39+
for (auto& [topicName, partitions]: NewPartitionsToLockOnTime) {
4140
for (auto partitionsIt = partitions.begin(); partitionsIt != partitions.end(); ) {
4241
if (partitionsIt->LockOn <= ctx.Now()) {
43-
TopicPartitions[topicToPartitions.first].ToLock.emplace(partitionsIt->PartitionId);
42+
TopicPartitions[topicName].ToLock.emplace(partitionsIt->PartitionId);
4443
NeedRebalance = true;
4544
partitionsIt = partitions.erase(partitionsIt);
4645
} else {
@@ -408,6 +407,8 @@ void TKafkaReadSessionActor::HandlePipeDestroyed(TEvTabletPipe::TEvClientDestroy
408407
}
409408

410409
void TKafkaReadSessionActor::ProcessBalancerDead(ui64 tabletId, const TActorContext& ctx) {
410+
NewPartitionsToLockOnTime.clear();
411+
411412
for (auto& [topicName, topicInfo] : TopicsInfo) {
412413
if (topicInfo.TabletID == tabletId) {
413414
auto partitionsIt = TopicPartitions.find(topicName);
@@ -579,8 +580,7 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart
579580
auto newPartitionsToLockCount = newPartitionsToLockIt == NewPartitionsToLockOnTime.end() ? 0 : newPartitionsToLockIt->second.size();
580581

581582
auto topicPartitionsIt = TopicPartitions.find(pathIt->second->GetInternalName());
582-
Y_ABORT_UNLESS(topicPartitionsIt != TopicPartitions.end());
583-
Y_ABORT_UNLESS(record.GetCount() <= topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size() + newPartitionsToLockCount);
583+
Y_ABORT_UNLESS(record.GetCount() <= (topicPartitionsIt.IsEnd() ? 0 : topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size()) + newPartitionsToLockCount);
584584

585585
for (ui32 c = 0; c < record.GetCount(); ++c) {
586586
// if some partition not locked yet, then release it without rebalance
@@ -599,18 +599,19 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart
599599
}
600600

601601
NeedRebalance = true;
602-
size_t partitionToReleaseIndex = 0;
603-
size_t i = 0;
602+
ui32 partitionToRelease = 0;
603+
ui32 i = 0;
604604

605-
for (size_t partIndex = 0; partIndex < topicPartitionsIt->second.ReadingNow.size(); partIndex++) {
606-
if (!topicPartitionsIt->second.ToRelease.contains(partIndex) && (group == 0 || partIndex + 1 == group)) {
605+
for (auto curPartition : topicPartitionsIt->second.ReadingNow) {
606+
if (!topicPartitionsIt->second.ToRelease.contains(curPartition) && (group == 0 || curPartition + 1 == group)) {
607607
++i;
608-
if (rand() % i == 0) { // will lead to 1/n probability for each of n partitions
609-
partitionToReleaseIndex = partIndex;
608+
if (rand() % i == 0) {
609+
partitionToRelease = curPartition;
610610
}
611611
}
612612
}
613-
topicPartitionsIt->second.ToRelease.emplace(partitionToReleaseIndex);
613+
614+
topicPartitionsIt->second.ToRelease.emplace(partitionToRelease);
614615
}
615616
}
616617

0 commit comments

Comments
 (0)