Skip to content

Commit 47aedc2

Browse files
authored
Fix kafka read session partitions releases (#3528)
1 parent aee5571 commit 47aedc2

File tree

1 file changed

+8
-7
lines changed

1 file changed

+8
-7
lines changed

ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -599,18 +599,19 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart
599599
}
600600

601601
NeedRebalance = true;
602-
size_t partitionToReleaseIndex = 0;
603-
size_t i = 0;
602+
ui32 partitionToRelease = 0;
603+
ui32 i = 0;
604604

605-
for (size_t partIndex = 0; partIndex < topicPartitionsIt->second.ReadingNow.size(); partIndex++) {
606-
if (!topicPartitionsIt->second.ToRelease.contains(partIndex) && (group == 0 || partIndex + 1 == group)) {
605+
for (auto curPartition : topicPartitionsIt->second.ReadingNow) {
606+
if (!topicPartitionsIt->second.ToRelease.contains(curPartition) && (group == 0 || curPartition + 1 == group)) {
607607
++i;
608-
if (rand() % i == 0) { // will lead to 1/n probability for each of n partitions
609-
partitionToReleaseIndex = partIndex;
608+
if (rand() % i == 0) {
609+
partitionToRelease = curPartition;
610610
}
611611
}
612612
}
613-
topicPartitionsIt->second.ToRelease.emplace(partitionToReleaseIndex);
613+
614+
topicPartitionsIt->second.ToRelease.emplace(partitionToRelease);
614615
}
615616
}
616617

0 commit comments

Comments
 (0)