Skip to content

Commit 5ed8c5f

Browse files
direct read fix (#15479)
1 parent b2df7c8 commit 5ed8c5f

File tree

4 files changed

+44
-5
lines changed

4 files changed

+44
-5
lines changed

ydb/core/persqueue/dread_cache_service/caching_service.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
5656
hFunc(TEvPQ::TEvGetFullDirectReadData, HandleGetData)
5757
hFunc(TEvPQProxy::TEvDirectReadDataSessionConnected, HandleCreateClientSession)
5858
hFunc(TEvPQProxy::TEvDirectReadDataSessionDead, HandleDestroyClientSession)
59+
hFunc(TEvPQProxy::TEvDirectReadDestroyPartitionSession, HandlePartitionSessionReleased)
5960
)
6061

6162
private:
@@ -112,6 +113,17 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
112113
AssignByProxy.erase(assignIter);
113114
}
114115

116+
void HandlePartitionSessionReleased(TEvPQProxy::TEvDirectReadDestroyPartitionSession::TPtr& ev) {
117+
auto assignIter = AssignByProxy.find(ev->Sender);
118+
if (assignIter.IsEnd())
119+
return;
120+
if (!assignIter->second.contains(ev->Get()->ReadKey.PartitionSessionId))
121+
return;
122+
123+
assignIter->second.erase(ev->Get()->ReadKey.PartitionSessionId);
124+
ServerSessions.erase(ev->Get()->ReadKey);
125+
}
126+
115127
void HandleRegister(TEvPQ::TEvRegisterDirectReadSession::TPtr& ev) {
116128
const auto& key = ev->Get()->Session;
117129
RegisterServerSession(key, ev->Get()->Generation);

ydb/services/persqueue_v1/actors/events.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,11 @@ struct TEvPQProxy {
585585
};
586586

587587
struct TEvDirectReadDestroyPartitionSession : public TEventLocal<TEvDirectReadDestroyPartitionSession, EvDirectReadDestroyPartitionSession> {
588+
589+
TEvDirectReadDestroyPartitionSession(const TString& sessionId, ui64 partitionSessionId)
590+
: ReadKey(sessionId, partitionSessionId)
591+
{}
592+
588593
TEvDirectReadDestroyPartitionSession(const NKikimr::NPQ::TReadSessionKey& sessionKey,
589594
Ydb::PersQueue::ErrorCode::ErrorCode code, const TString& reason)
590595
: ReadKey(sessionKey)

ydb/services/persqueue_v1/actors/partition_actor.cpp

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,10 +271,14 @@ void TPartitionActor::Handle(TEvPQProxy::TEvDirectReadAck::TPtr& ev, const TActo
271271
if (DirectReadRestoreStage != EDirectReadRestoreStage::None) {
272272
if (RestoredDirectReadId == ev->Get()->DirectReadId) {
273273
// This direct read is already being restored. Have to forget it later.
274+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Got ack for direct read " << ev->Get()->DirectReadId
275+
<< " while restoring, store it to forget further");
274276
DirectReadsToForget.insert(ev->Get()->DirectReadId);
275277
return;
276278
}
277279
if (DirectReadsToRestore.contains(ev->Get()->DirectReadId)) {
280+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Got ack for direct read " << ev->Get()->DirectReadId
281+
<< " while restoring, remove it from restore list");
278282
// This direct read is pending for restore. No need to foreget - not yet prepared, just erase it;
279283
DirectReadsToRestore.erase(ev->Get()->DirectReadId);
280284
DirectReadsToPublish.erase(ev->Get()->DirectReadId);
@@ -316,6 +320,7 @@ void TPartitionActor::Handle(const TEvPQProxy::TEvRestartPipe::TPtr&, const TAct
316320
DirectReadsToRestore = DirectReadResults;
317321
DirectReadsToPublish = PublishedDirectReads;
318322
Y_ABORT_UNLESS(!DirectReadsToPublish.contains(DirectReadId));
323+
RestoredDirectReadId = 0;
319324
RestartDirectReadSession();
320325
return;
321326
}
@@ -634,6 +639,7 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
634639
break;
635640
case EDirectReadRestoreStage::Session:
636641
Y_ABORT_UNLESS(result.HasCmdRestoreDirectReadResult());
642+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Direct read - session restarted for partition " << Partition);
637643
if (!SendNextRestorePrepareOrForget()) {
638644
OnDirectReadsRestored();
639645
}
@@ -654,6 +660,7 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
654660
return;
655661
case EDirectReadRestoreStage::Publish:
656662
Y_ABORT_UNLESS(RestoredDirectReadId != 0);
663+
657664
Y_ABORT_UNLESS(result.HasCmdPublishReadResult());
658665
Y_ABORT_UNLESS(*DirectReadsToPublish.begin() == result.GetCmdPublishReadResult().GetDirectReadId());
659666
DirectReadsToPublish.erase(DirectReadsToPublish.begin());
@@ -730,6 +737,8 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
730737

731738
Y_ABORT_UNLESS(DirectRead);
732739
Y_ABORT_UNLESS(res.GetDirectReadId() == DirectReadId);
740+
if (!PipeClient)
741+
return; // Pipe was already destroyed, direct read session is being restored. Will resend this request afterwards;
733742

734743
EndOffset = res.GetEndOffset();
735744
SizeLag = res.GetSizeLag();
@@ -1092,13 +1101,18 @@ bool TPartitionActor::SendNextRestorePrepareOrForget() {
10921101
if (shouldForget) {
10931102
// We have something to forget from what was already restored; Do NOT change RestoredDirectReadId
10941103
DirectReadRestoreStage = EDirectReadRestoreStage::Forget;
1104+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Restore direct read, forget id "
1105+
<< *DirectReadsToForget.begin() << " for partition " << Partition);
10951106
SendForgetDirectRead(*DirectReadsToForget.begin(), ctx);
10961107
return true;
10971108
} else {
1098-
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Resend prepare direct read id " << prepareId << " for partition " << Partition);
1109+
auto& dr = DirectReadsToRestore.begin()->second;
1110+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Resend prepare direct read id " << prepareId
1111+
<< " (internal id: " << dr.GetDirectReadId() << ") for partition " << Partition);
10991112
Y_ABORT_UNLESS(prepareId != 0);
1113+
11001114
//Restore;
1101-
auto& dr = DirectReadsToRestore.begin()->second;
1115+
Y_ABORT_UNLESS(prepareId == dr.GetDirectReadId());
11021116

11031117
Y_ABORT_UNLESS(RestoredDirectReadId < dr.GetDirectReadId());
11041118
RestoredDirectReadId = dr.GetDirectReadId();
@@ -1125,8 +1139,8 @@ bool TPartitionActor::SendNextRestorePublishRequest() {
11251139
return false;
11261140
}
11271141
auto id = *DirectReadsToPublish.begin();
1128-
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " " << Partition
1129-
<< "Resend publish direct read on restore, id: " << id);
1142+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << "Resend publish direct read on restore, id: "
1143+
<< id << " for partition " << Partition);
11301144

11311145
Y_ABORT_UNLESS(RestoredDirectReadId == id);
11321146
DirectReadRestoreStage = EDirectReadRestoreStage::Publish;
@@ -1298,7 +1312,6 @@ void TPartitionActor::Handle(TEvPQProxy::TEvRead::TPtr& ev, const TActorContext&
12981312

12991313
Y_ABORT_UNLESS(ReadGuid.empty());
13001314
Y_ABORT_UNLESS(!RequestInfly);
1301-
Y_ABORT_UNLESS(DirectReadRestoreStage == EDirectReadRestoreStage::None);
13021315

13031316
ReadGuid = ev->Get()->Guid;
13041317

@@ -1311,6 +1324,12 @@ void TPartitionActor::Handle(TEvPQProxy::TEvRead::TPtr& ev, const TActorContext&
13111324
if (!PipeClient) //Pipe will be recreated soon
13121325
return;
13131326

1327+
if (DirectReadRestoreStage != EDirectReadRestoreStage::None) {
1328+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " READ FROM " << Partition
1329+
<< " store this request utill direct read is restored");
1330+
return;
1331+
}
1332+
13141333
TAutoPtr<TEvPersQueue::TEvRequest> event(new TEvPersQueue::TEvRequest);
13151334
event->Record.Swap(&request);
13161335

ydb/services/persqueue_v1/actors/read_session_actor.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1421,6 +1421,9 @@ void TReadSessionActor<UseMigrationProtocol>::SendReleaseSignal(TPartitionActorI
14211421
result.mutable_stop_partition_session_request()->set_committed_offset(partition.Offset);
14221422
if (DirectRead) {
14231423
result.mutable_stop_partition_session_request()->set_last_direct_read_id(partition.LastDirectReadId);
1424+
ctx.Send(NPQ::MakePQDReadCacheServiceActorId(),
1425+
new TEvPQProxy::TEvDirectReadDestroyPartitionSession(Session, partition.Partition.AssignId));
1426+
14241427
}
14251428
}
14261429

0 commit comments

Comments
 (0)