Skip to content

Commit cf8dbc3

Browse files
authored
Remove deprecated field of TReleasePartition (#7689)
1 parent d82059e commit cf8dbc3

File tree

5 files changed

+70
-141
lines changed

5 files changed

+70
-141
lines changed

ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp

Lines changed: 28 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -580,81 +580,47 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart
580580
auto newPartitionsToLockCount = newPartitionsToLockIt == NewPartitionsToLockOnTime.end() ? 0 : newPartitionsToLockIt->second.size();
581581

582582
auto topicPartitionsIt = TopicPartitions.find(pathIt->second->GetInternalName());
583-
Y_ABORT_UNLESS(record.GetCount() <= (topicPartitionsIt.IsEnd() ? 0 : topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size()) + newPartitionsToLockCount);
584-
585-
if (!group) {
586-
for (ui32 c = 0; c < record.GetCount(); ++c) {
587-
// if some partition not locked yet, then release it without rebalance
588-
if (newPartitionsToLockCount > 0) {
589-
newPartitionsToLockCount--;
590-
InformBalancerAboutPartitionRelease(topicInfoIt->first, newPartitionsToLockIt->second.back().PartitionId, ctx);
591-
newPartitionsToLockIt->second.pop_back();
592-
continue;
593-
}
594-
595-
if (!topicPartitionsIt->second.ToLock.empty()) {
596-
auto partitionToReleaseIt = topicPartitionsIt->second.ToLock.begin();
597-
topicPartitionsIt->second.ToLock.erase(partitionToReleaseIt);
598-
InformBalancerAboutPartitionRelease(topicInfoIt->first, *partitionToReleaseIt, ctx);
599-
continue;
600-
}
601-
602-
NeedRebalance = true;
603-
ui32 partitionToRelease = 0;
604-
ui32 i = 0;
605-
606-
for (auto curPartition : topicPartitionsIt->second.ReadingNow) {
607-
if (!topicPartitionsIt->second.ToRelease.contains(curPartition)) {
608-
++i;
609-
if (rand() % i == 0) {
610-
partitionToRelease = curPartition;
611-
}
612-
}
613-
}
583+
Y_ABORT_UNLESS(1 <= (topicPartitionsIt.IsEnd() ? 0 : topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size()) + newPartitionsToLockCount);
614584

615-
topicPartitionsIt->second.ToRelease.emplace(partitionToRelease);
616-
}
617-
} else {
618-
auto partitionToRelease = record.GetGroup() - 1;
585+
auto partitionToRelease = record.GetGroup() - 1;
619586

620-
if (newPartitionsToLockIt != NewPartitionsToLockOnTime.end()) {
621-
auto& newPartitions = newPartitionsToLockIt->second;
622-
for (auto& newPartition : newPartitions) {
623-
if (newPartition.PartitionId == partitionToRelease) {
624-
InformBalancerAboutPartitionRelease(topicInfoIt->first, partitionToRelease, ctx);
587+
if (newPartitionsToLockIt != NewPartitionsToLockOnTime.end()) {
588+
auto& newPartitions = newPartitionsToLockIt->second;
589+
for (auto& newPartition : newPartitions) {
590+
if (newPartition.PartitionId == partitionToRelease) {
591+
InformBalancerAboutPartitionRelease(topicInfoIt->first, partitionToRelease, ctx);
625592

626-
auto tmp = std::move(newPartitions);
627-
newPartitions.reserve(tmp.size() - 1);
593+
auto tmp = std::move(newPartitions);
594+
newPartitions.reserve(tmp.size() - 1);
628595

629-
for (auto& t : tmp) {
630-
if (t.PartitionId != partitionToRelease) {
631-
newPartitions.push_back(t);
632-
}
596+
for (auto& t : tmp) {
597+
if (t.PartitionId != partitionToRelease) {
598+
newPartitions.push_back(t);
633599
}
634-
635-
return;
636600
}
637-
}
638-
}
639601

640-
if (topicPartitionsIt != TopicPartitions.end()) {
641-
if (topicPartitionsIt->second.ToLock.contains(partitionToRelease)) {
642-
InformBalancerAboutPartitionRelease(topicInfoIt->first, partitionToRelease, ctx);
643-
topicPartitionsIt->second.ToLock.erase(partitionToRelease);
644602
return;
645603
}
604+
}
605+
}
646606

647-
if (topicPartitionsIt->second.ReadingNow.contains(partitionToRelease) && !topicPartitionsIt->second.ToRelease.contains(partitionToRelease)) {
648-
InformBalancerAboutPartitionRelease(topicInfoIt->first, partitionToRelease, ctx);
649-
NeedRebalance = true;
650-
topicPartitionsIt->second.ReadingNow.erase(partitionToRelease);
651-
return;
652-
}
607+
if (topicPartitionsIt != TopicPartitions.end()) {
608+
if (topicPartitionsIt->second.ToLock.contains(partitionToRelease)) {
609+
InformBalancerAboutPartitionRelease(topicInfoIt->first, partitionToRelease, ctx);
610+
topicPartitionsIt->second.ToLock.erase(partitionToRelease);
611+
return;
653612
}
654613

655-
KAFKA_LOG_I("ignored ev release topic# " << record.GetTopic()
656-
<< ", reason# partition " << partitionToRelease << " isn`t locked");
614+
if (topicPartitionsIt->second.ReadingNow.contains(partitionToRelease) && !topicPartitionsIt->second.ToRelease.contains(partitionToRelease)) {
615+
InformBalancerAboutPartitionRelease(topicInfoIt->first, partitionToRelease, ctx);
616+
NeedRebalance = true;
617+
topicPartitionsIt->second.ReadingNow.erase(partitionToRelease);
618+
return;
619+
}
657620
}
621+
622+
KAFKA_LOG_I("ignored ev release topic# " << record.GetTopic()
623+
<< ", reason# partition " << partitionToRelease << " isn`t locked");
658624
}
659625

660626
void TKafkaReadSessionActor::InformBalancerAboutPartitionRelease(const TString& topic, ui64 partition, const TActorContext& ctx) {

ydb/core/persqueue/read_balancer__balancing.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,6 @@ std::unique_ptr<TEvPersQueue::TEvReleasePartition> TPartitionFamily::MakeEvRelea
536536
r.SetPath(TopicPath());
537537
r.SetGeneration(TabletGeneration());
538538
r.SetClientId(Session->ClientId);
539-
r.SetCount(1);
540539
r.SetGroup(partitionId + 1);
541540
ActorIdToProto(Session->Pipe, r.MutablePipeClient());
542541

ydb/core/protos/pqconfig.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ message TReleasePartition {
649649
optional uint64 Generation = 2;
650650
optional string Session = 3;
651651
optional string ClientId = 4;
652-
optional uint32 Count = 5 [deprecated = true]; // Remove at 2025-1
652+
reserved 5; // optional uint32 Count = 5;
653653
optional NActorsProto.TActorId PipeClient = 6;
654654
optional uint32 Group = 7;
655655
optional string Path = 8;

ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,37 +1164,35 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvReleasePartition::TPtr& ev, cons
11641164
return;
11651165
}
11661166

1167-
for (ui32 c = 0; c < record.GetCount(); ++c) {
1168-
Y_ABORT_UNLESS(!Partitions.empty());
1169-
1170-
TActorId actorId = TActorId{};
1171-
auto jt = Partitions.begin();
1172-
ui32 i = 0;
1173-
for (auto it = Partitions.begin(); it != Partitions.end(); ++it) {
1174-
if (it->first.first == clientName && !it->second.Releasing && (group == 0 || it->first.second + 1 == group)) {
1175-
++i;
1176-
if (rand() % i == 0) { //will lead to 1/n probability for each of n partitions
1177-
actorId = it->second.Actor;
1178-
jt = it;
1179-
}
1167+
Y_ABORT_UNLESS(!Partitions.empty());
1168+
1169+
TActorId actorId = TActorId{};
1170+
auto jt = Partitions.begin();
1171+
ui32 i = 0;
1172+
for (auto it = Partitions.begin(); it != Partitions.end(); ++it) {
1173+
if (it->first.first == clientName && !it->second.Releasing && (group == 0 || it->first.second + 1 == group)) {
1174+
++i;
1175+
if (rand() % i == 0) { //will lead to 1/n probability for each of n partitions
1176+
actorId = it->second.Actor;
1177+
jt = it;
11801178
}
11811179
}
1182-
Y_ABORT_UNLESS(actorId);
1180+
}
1181+
Y_ABORT_UNLESS(actorId);
11831182

1184-
{
1185-
auto it = TopicCounters.find(name);
1186-
Y_ABORT_UNLESS(it != TopicCounters.end());
1187-
it->second.PartitionsToBeReleased.Inc();
1188-
}
1183+
{
1184+
auto it = TopicCounters.find(name);
1185+
Y_ABORT_UNLESS(it != TopicCounters.end());
1186+
it->second.PartitionsToBeReleased.Inc();
1187+
}
11891188

1190-
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " releasing " << jt->first.first << ":" << jt->first.second);
1191-
jt->second.Releasing = true;
1189+
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " releasing " << jt->first.first << ":" << jt->first.second);
1190+
jt->second.Releasing = true;
11921191

1193-
ctx.Send(actorId, new TEvPQProxy::TEvReleasePartition());
1194-
if (ClientsideLocksAllowed && jt->second.LockSent && !jt->second.Reading) { //locked and no active reads
1195-
if (!ProcessReleasePartition(jt, BalanceRightNow, false, ctx)) { // returns false if actor died
1196-
return;
1197-
}
1192+
ctx.Send(actorId, new TEvPQProxy::TEvReleasePartition());
1193+
if (ClientsideLocksAllowed && jt->second.LockSent && !jt->second.Reading) { //locked and no active reads
1194+
if (!ProcessReleasePartition(jt, BalanceRightNow, false, ctx)) { // returns false if actor died
1195+
return;
11981196
}
11991197
}
12001198
AnswerForCommitsIfCan(ctx); // in case of killing partition

ydb/services/persqueue_v1/actors/read_session_actor.cpp

Lines changed: 17 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1410,8 +1410,6 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvReleasePar
14101410
Y_ABORT_UNLESS(record.GetSession() == Session);
14111411
Y_ABORT_UNLESS(record.GetClientId() == ClientId);
14121412

1413-
const ui32 group = record.HasGroup() ? record.GetGroup() : 0;
1414-
14151413
auto pathIter = FullPathToConverter.find(NPersQueue::NormalizeFullPath(record.GetPath()));
14161414
Y_ABORT_UNLESS(pathIter != FullPathToConverter.end());
14171415

@@ -1445,61 +1443,29 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvReleasePar
14451443
}
14461444
};
14471445

1448-
if (!group) {
1449-
// Release partitions by count
1450-
for (ui32 c = 0; c < record.GetCount(); ++c) {
1451-
if (Partitions.empty()) {
1452-
return CloseSession(PersQueue::ErrorCode::ErrorCode::ERROR,
1453-
TStringBuilder() << "internal error: can`t release partition #01",
1454-
ctx);
1455-
}
1456-
1457-
auto jt = Partitions.end();
1458-
ui32 i = 0;
1446+
ui32 partitionId = record.GetGroup() - 1;
1447+
bool found = false;
14591448

1460-
for (auto it = Partitions.begin(); it != Partitions.end(); ++it) {
1461-
auto& partitionInfo = it->second;
1462-
if (!partitionInfo.Releasing && partitionInfo.Topic->GetInternalName() == converter->GetInternalName()) {
1463-
++i;
1464-
if (rand() % i == 0) { // will lead to 1/n probability for each of n partitions
1465-
jt = it;
1466-
}
1467-
}
1468-
}
1449+
// Release partitions by partition id
1450+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " gone release"
1451+
<< ": partition# " << partitionId);
14691452

1470-
if (jt == Partitions.end()) {
1471-
return CloseSession(PersQueue::ErrorCode::ErrorCode::ERROR,
1472-
TStringBuilder() << "internal error: can`t release partition #02",
1473-
ctx);
1453+
for (auto it = Partitions.begin(); it != Partitions.end(); ++it) {
1454+
auto& partitionInfo = it->second;
1455+
if (partitionInfo.Topic->GetInternalName() == converter->GetInternalName() && partitionId == partitionInfo.Partition.Partition) {
1456+
if (!partitionInfo.Releasing) {
1457+
doRelease(it);
14741458
}
14751459

1476-
doRelease(jt);
1477-
}
1478-
} else {
1479-
ui32 partitionId = group - 1;
1480-
bool found = false;
1481-
1482-
// Release partitions by partition id
1483-
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " gone release"
1484-
<< ": partition# " << partitionId);
1485-
1486-
for (auto it = Partitions.begin(); it != Partitions.end(); ++it) {
1487-
auto& partitionInfo = it->second;
1488-
if (partitionInfo.Topic->GetInternalName() == converter->GetInternalName() && partitionId == partitionInfo.Partition.Partition) {
1489-
if (!partitionInfo.Releasing) {
1490-
doRelease(it);
1491-
}
1492-
1493-
found = true;
1494-
break;
1495-
}
1460+
found = true;
1461+
break;
14961462
}
1463+
}
14971464

1498-
if (!found) {
1499-
return CloseSession(PersQueue::ErrorCode::ErrorCode::ERROR,
1500-
TStringBuilder() << "internal error: releasing unknown partition " << partitionId,
1501-
ctx);
1502-
}
1465+
if (!found) {
1466+
return CloseSession(PersQueue::ErrorCode::ErrorCode::ERROR,
1467+
TStringBuilder() << "internal error: releasing unknown partition " << partitionId,
1468+
ctx);
15031469
}
15041470
}
15051471

0 commit comments

Comments
 (0)