From bd0e1874b2e62ca3a01b97864523be6b2a67bb31 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Sun, 7 Apr 2024 14:30:45 +0000 Subject: [PATCH 1/2] Fix partitions release --- .../kafka_proxy/actors/kafka_read_session_actor.cpp | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp index cd3f6dd7d674..045ae73c5e9a 100644 --- a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp @@ -599,18 +599,19 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart } NeedRebalance = true; - size_t partitionToReleaseIndex = 0; + ui32 partitionToRelease = 0; size_t i = 0; - for (size_t partIndex = 0; partIndex < topicPartitionsIt->second.ReadingNow.size(); partIndex++) { - if (!topicPartitionsIt->second.ToRelease.contains(partIndex) && (group == 0 || partIndex + 1 == group)) { + for (auto curPartition : topicPartitionsIt->second.ReadingNow) { + if (!topicPartitionsIt->second.ToRelease.contains(curPartition) && (group == 0 || curPartition + 1 == group)) { ++i; - if (rand() % i == 0) { // will lead to 1/n probability for each of n partitions - partitionToReleaseIndex = partIndex; + if (rand() % i == 0) { + partitionToRelease = curPartition; } } } - topicPartitionsIt->second.ToRelease.emplace(partitionToReleaseIndex); + + topicPartitionsIt->second.ToRelease.emplace(partitionToRelease); } } From 0fd6a86994e126a23e11de541de3f5ac4ac59681 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Sun, 7 Apr 2024 14:49:59 +0000 Subject: [PATCH 2/2] fix --- ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp index 045ae73c5e9a..37b7096badd7 100644 --- a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp @@ -600,7 +600,7 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart NeedRebalance = true; ui32 partitionToRelease = 0; - size_t i = 0; + ui32 i = 0; for (auto curPartition : topicPartitionsIt->second.ReadingNow) { if (!topicPartitionsIt->second.ToRelease.contains(curPartition) && (group == 0 || curPartition + 1 == group)) {