Skip to content

Commit abde5f0

Browse files
authored
Merge 9470dac into 9d83402
2 parents 9d83402 + 9470dac commit abde5f0

File tree

2 files changed

+10
-6
lines changed

2 files changed

+10
-6
lines changed

ydb/core/persqueue/read_balancer__balancing.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ void TPartitionFamily::Release(const TActorContext& ctx, ETargetStatus targetSta
165165

166166
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
167167
GetPrefix() << " release partitions [" << JoinRange(", ", LockedPartitions.begin(), LockedPartitions.end())
168-
<< "]. Target status " << targetStatus);
168+
<< "] pipe " << Session->Pipe << ". Target status " << targetStatus);
169169

170170
Status = EStatus::Releasing;
171171
TargetStatus = targetStatus;
@@ -533,6 +533,7 @@ std::unique_ptr<TEvPersQueue::TEvReleasePartition> TPartitionFamily::MakeEvRelea
533533
r.SetPath(TopicPath());
534534
r.SetGeneration(TabletGeneration());
535535
r.SetClientId(Session->ClientId);
536+
r.SetCount(1);
536537
r.SetGroup(partitionId + 1);
537538
ActorIdToProto(Session->Pipe, r.MutablePipeClient());
538539

@@ -1661,20 +1662,20 @@ void TBalancer::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const TAc
16611662

16621663
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
16631664
GetPrefix() << "pipe " << ev->Get()->ClientId << " disconnected; active server actors: "
1664-
<< (it != Sessions.end() ? it->second->ServerActors : -1));
1665+
<< it->second->ServerActors);
16651666

16661667
auto& session = it->second;
16671668
if (--(session->ServerActors) > 0) {
16681669
return;
16691670
}
16701671

16711672
if (!session->SessionName.empty()) {
1672-
LOG_NOTICE_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
1673-
GetPrefix() << "pipe " << ev->Get()->ClientId << " client "
1674-
<< session->ClientId << " disconnected session " << session->SessionName);
1675-
16761673
auto* consumer = GetConsumer(session->ClientId);
16771674
if (consumer) {
1675+
LOG_NOTICE_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
1676+
GetPrefix() << "pipe " << ev->Get()->ClientId << " client "
1677+
<< session->ClientId << " disconnected session " << session->SessionName);
1678+
16781679
consumer->UnregisterReadingSession(session.get(), ctx);
16791680

16801681
if (consumer->Sessions.empty()) {

ydb/services/persqueue_v1/actors/read_session_actor.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1419,6 +1419,9 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvReleasePar
14191419
Y_ABORT_UNLESS(it != Topics.end());
14201420

14211421
if (it->second.PipeClient != ActorIdFromProto(record.GetPipeClient())) {
1422+
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " try releasing but pipe is bad"
1423+
<< ": group# " << group << " count# " << record.GetCount() << " expected: " << it->second.PipeClient
1424+
<< " actual: " << ActorIdFromProto(record.GetPipeClient()));
14221425
return;
14231426
}
14241427

0 commit comments

Comments
 (0)