Skip to content

Commit 46cea82

Browse files
authored
Merge 72353ad into 5bb9671
2 parents 5bb9671 + 72353ad commit 46cea82

File tree

1 file changed

+3
-5
lines changed

1 file changed

+3
-5
lines changed

ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp

+3-5
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,10 @@ void TKafkaReadSessionActor::HandleWakeup(TEvKafka::TEvWakeup::TPtr, const TActo
3636
return;
3737
}
3838

39-
for (auto& topicToPartitions: NewPartitionsToLockOnTime) {
40-
auto& partitions = topicToPartitions.second;
39+
for (auto& [topicName, partitions]: NewPartitionsToLockOnTime) {
4140
for (auto partitionsIt = partitions.begin(); partitionsIt != partitions.end(); ) {
4241
if (partitionsIt->LockOn <= ctx.Now()) {
43-
TopicPartitions[topicToPartitions.first].ToLock.emplace(partitionsIt->PartitionId);
42+
TopicPartitions[topicName].ToLock.emplace(partitionsIt->PartitionId);
4443
NeedRebalance = true;
4544
partitionsIt = partitions.erase(partitionsIt);
4645
} else {
@@ -579,8 +578,7 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart
579578
auto newPartitionsToLockCount = newPartitionsToLockIt == NewPartitionsToLockOnTime.end() ? 0 : newPartitionsToLockIt->second.size();
580579

581580
auto topicPartitionsIt = TopicPartitions.find(pathIt->second->GetInternalName());
582-
Y_ABORT_UNLESS(topicPartitionsIt != TopicPartitions.end());
583-
Y_ABORT_UNLESS(record.GetCount() <= topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size() + newPartitionsToLockCount);
581+
Y_ABORT_UNLESS(record.GetCount() <= (topicPartitionsIt.IsEnd() ? 0 : topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size()) + newPartitionsToLockCount);
584582

585583
for (ui32 c = 0; c < record.GetCount(); ++c) {
586584
// if some partition not locked yet, then release it without rebalance

0 commit comments

Comments
 (0)