From 5451705d3c05709df7028d01c103e05daa90e267 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Thu, 27 Feb 2025 15:11:52 +0300 Subject: [PATCH 1/3] [*] refactoring --- ydb/core/persqueue/events/internal.h | 3 + ydb/core/persqueue/partition.cpp | 192 +++++++++++++++++++-------- ydb/core/persqueue/partition.h | 19 ++- 3 files changed, 154 insertions(+), 60 deletions(-) diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index adc16ed6f952..3d71402f1a68 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -1086,6 +1086,8 @@ struct TEvPQ { } ui32 Cookie; // InternalPartitionId + TActorId SupportivePartition; + NPQ::TSourceIdMap SrcIdInfo; std::deque BodyKeys; TVector BlobsFromHead; @@ -1102,6 +1104,7 @@ struct TEvPQ { struct TEvGetWriteInfoError : public TEventLocal { ui32 Cookie; // InternalPartitionId TString Message; + TActorId SupportivePartition; TEvGetWriteInfoError(ui32 cookie, TString message) : Cookie(cookie), diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 68dc875982a8..7c55c14d6bde 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -581,7 +581,7 @@ void TPartition::InitComplete(const TActorContext& ctx) { TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds()); FillReadFromTimestamps(ctx); - ResendPendingEvents(ctx); + ProcessPendingEvents(ctx); ProcessTxsAndUserActs(ctx); ctx.Send(ctx.SelfID, new TEvents::TEvWakeup()); @@ -969,37 +969,56 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc ProcessTxsAndUserActs(ctx); } +template <> +void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) +{ + PushBackDistrTx(ev.release()); + + ProcessTxsAndUserActs(ctx); +} + void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx) { PQ_LOG_D("Handle TEvPQ::TEvProposePartitionConfig" << " Step " << ev->Get()->Step << ", TxId " << ev->Get()->TxId); - PushBackDistrTx(ev->Release()); + ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); +} - ProcessTxsAndUserActs(ctx); +template +void TPartition::AddPendingEvent(TAutoPtr>& ev) +{ + std::unique_ptr p(ev->Release().Release()); + PendingEvents.emplace_back(std::move(p)); } void TPartition::HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext&) { PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCalcPredicate"); - PendingEvents.emplace_back(ev->ReleaseBase().Release()); + AddPendingEvent(ev); } void TPartition::HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext&) { - PendingEvents.emplace_back(ev->ReleaseBase().Release()); + PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCommit"); + + AddPendingEvent(ev); } void TPartition::HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext&) { - PendingEvents.emplace_back(ev->ReleaseBase().Release()); + PQ_LOG_D("HandleOnInit TEvPQ::TEvTxRollback"); + + AddPendingEvent(ev); } void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext&) { - PendingEvents.emplace_back(ev->ReleaseBase().Release()); + PQ_LOG_D("HandleOnInit TEvPQ::TEvProposePartitionConfig"); + + AddPendingEvent(ev); } void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& /* ctx */) @@ -1009,7 +1028,7 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TAc Y_ABORT_UNLESS(IsSupportive()); ev->Get()->OriginalPartition = ev->Sender; - PendingEvents.emplace_back(ev->ReleaseBase().Release()); + AddPendingEvent(ev); } void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& /* ctx */) @@ -1018,7 +1037,7 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TA Y_ABORT_UNLESS(!IsSupportive()); - PendingEvents.emplace_back(ev->ReleaseBase().Release()); + AddPendingEvent(ev); } void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& /* ctx */) @@ -1027,43 +1046,46 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActo Y_ABORT_UNLESS(!IsSupportive()); - PendingEvents.emplace_back(ev->ReleaseBase().Release()); + AddPendingEvent(ev); } -void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx) +template <> +void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" << - " Step " << ev->Get()->Step << - ", TxId " << ev->Get()->TxId); - if (PlanStep.Defined() && TxId.Defined()) { - if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) { + if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) { Send(Tablet, - MakeHolder(ev->Get()->Step, - ev->Get()->TxId, + MakeHolder(ev->Step, + ev->TxId, Partition, Nothing()).Release()); return; } } - PushBackDistrTx(ev->Release()); + PushBackDistrTx(ev.release()); ProcessTxsAndUserActs(ctx); } -void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx) +void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvPQ::TEvTxCommit" << + PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" << " Step " << ev->Get()->Step << ", TxId " << ev->Get()->TxId); + ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); +} + +template <> +void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) +{ if (PlanStep.Defined() && TxId.Defined()) { - if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) { + if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) { PQ_LOG_D("Send TEvTxCommitDone" << - " Step " << ev->Get()->Step << - ", TxId " << ev->Get()->TxId); - ctx.Send(Tablet, MakeCommitDone(ev->Get()->Step, ev->Get()->TxId).Release()); + " Step " << ev->Step << + ", TxId " << ev->TxId); + ctx.Send(Tablet, MakeCommitDone(ev->Step, ev->TxId).Release()); return; } } @@ -1073,18 +1095,18 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx) Y_ABORT_UNLESS(TransactionsInflight.size() == 1, "PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64, TabletID, Partition.OriginalPartitionId, - ev->Get()->Step, ev->Get()->TxId); - PendingExplicitMessageGroups = ev->Get()->ExplicitMessageGroups; + ev->Step, ev->TxId); + PendingExplicitMessageGroups = ev->ExplicitMessageGroups; } else { Y_ABORT_UNLESS(!TransactionsInflight.empty(), "PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64, TabletID, Partition.OriginalPartitionId, - ev->Get()->Step, ev->Get()->TxId); - txIter = TransactionsInflight.find(ev->Get()->TxId); + ev->Step, ev->TxId); + txIter = TransactionsInflight.find(ev->TxId); Y_ABORT_UNLESS(!txIter.IsEnd(), "PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64, TabletID, Partition.OriginalPartitionId, - ev->Get()->Step, ev->Get()->TxId); + ev->Step, ev->TxId); } Y_ABORT_UNLESS(txIter->second->State == ECommitState::Pending); @@ -1092,14 +1114,23 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx) ProcessTxsAndUserActs(ctx); } -void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx) +void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx) +{ + PQ_LOG_D("Handle TEvPQ::TEvTxCommit" << + " Step " << ev->Get()->Step << + ", TxId " << ev->Get()->TxId); + + ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); +} + +template <> +void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) { - auto* event = ev->Get(); if (PlanStep.Defined() && TxId.Defined()) { - if (GetStepAndTxId(*event) < GetStepAndTxId(*PlanStep, *TxId)) { + if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) { PQ_LOG_D("Rollback for" << - " Step " << ev->Get()->Step << - ", TxId " << ev->Get()->TxId); + " Step " << ev->Step << + ", TxId " << ev->TxId); return; } } @@ -1113,7 +1144,7 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx Y_ABORT_UNLESS(!TransactionsInflight.empty(), "PQ: %" PRIu64 ", Partition: %" PRIu32, TabletID, Partition.OriginalPartitionId); - txIter = TransactionsInflight.find(ev->Get()->TxId); + txIter = TransactionsInflight.find(ev->TxId); Y_ABORT_UNLESS(!txIter.IsEnd(), "PQ: %" PRIu64 ", Partition: %" PRIu32, TabletID, Partition.OriginalPartitionId); @@ -1124,13 +1155,17 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx ProcessTxsAndUserActs(ctx); } -void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest"); - TActorId originalPartition = ev->Get()->OriginalPartition; - if (!originalPartition) { - // original message - originalPartition = ev->Sender; - } +void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx) +{ + ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); +} + +template <> +void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) +{ + TActorId originalPartition = ev->OriginalPartition; + Y_ABORT_UNLESS(originalPartition != TActorId()); + if (ClosedInternalPartition || WaitingForPreviousBlobQuota() || (CurrentStateFunc() != &TThis::StateIdle)) { PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoError"); auto* response = new TEvPQ::TEvGetWriteInfoError(Partition.InternalPartitionId, @@ -1162,6 +1197,13 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon ctx.Send(originalPartition, response); } +void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) { + PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest"); + + ev->Get()->OriginalPartition = ev->Sender; + ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); +} + void TPartition::WriteInfoResponseHandler( const TActorId& sender, TGetWriteInfoResp&& ev, @@ -1250,17 +1292,34 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx) return ret; } +template <> +void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) +{ + const auto sender = ev->SupportivePartition; + WriteInfoResponseHandler(sender, ev.release(), ctx); +} + void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& ctx) { PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoResponse"); - WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx); + + ev->Get()->SupportivePartition = ev->Sender; + ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); } +template <> +void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) +{ + const auto sender = ev->SupportivePartition; + WriteInfoResponseHandler(sender, ev.release(), ctx); +} void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx) { PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError " << "Cookie " << ev->Get()->Cookie << ", Message " << ev->Get()->Message); - WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx); + + ev->Get()->SupportivePartition = ev->Sender; + ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); } void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr& tx, bool isPredicate) { @@ -2698,16 +2757,6 @@ void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId) TxIdHasChanged = true; } -void TPartition::ResendPendingEvents(const TActorContext& ctx) -{ - PQ_LOG_D("Resend pending events. Count " << PendingEvents.size()); - - while (!PendingEvents.empty()) { - ctx.Schedule(TDuration::Zero(), PendingEvents.front().release()); - PendingEvents.pop_front(); - } -} - TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx) { if (AffectedUsers.size() >= MAX_USERS) { @@ -3560,14 +3609,17 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&) { + PQ_LOG_D("HandleOnInit TEvPQ::TEvDeletePartition"); + Y_ABORT_UNLESS(IsSupportive()); - PendingEvents.emplace_back(ev->ReleaseBase().Release()); + AddPendingEvent(ev); } -void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& ctx) +template <> +void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvPQ::TEvDeletePartition"); + Y_UNUSED(ev); Y_ABORT_UNLESS(IsSupportive()); Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED); @@ -3577,6 +3629,13 @@ void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& c ProcessTxsAndUserActs(ctx); } +void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx) +{ + PQ_LOG_D("Handle TEvPQ::TEvDeletePartition"); + + ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); +} + void TPartition::ScheduleNegativeReplies() { auto processQueue = [&](std::deque& queue) { @@ -3647,6 +3706,23 @@ void TPartition::ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransac MakeHolder(writeId).Release()); } +void TPartition::ProcessPendingEvents(const TActorContext& ctx) +{ + PQ_LOG_D("Process pending events. Count " << PendingEvents.size()); + + while (!PendingEvents.empty()) { + auto ev = std::move(PendingEvents.front()); + PendingEvents.pop_front(); + + auto visitor = [this, &ctx](auto&& v) { + using T = std::decay_t; + ProcessPendingEvent(std::forward(v), ctx); + }; + + std::visit(visitor, std::move(ev)); + } +} + const NKikimrPQ::TPQTabletConfig::TPartition* TPartition::GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config) { return NPQ::GetPartitionConfig(config, Partition.OriginalPartitionId); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 197ef34f6181..33a0725ba8f7 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -427,7 +427,6 @@ class TPartition : public TActorBootstrapped { void ChangePlanStepAndTxId(ui64 step, ui64 txId); - void ResendPendingEvents(const TActorContext& ctx); void SendReadPreparedProxyResponse(const TReadAnswer& answer, const TReadInfo& readInfo, TUserInfo& user); void CheckIfSessionExists(TUserInfoBase& userInfo, const TActorId& newPipe); @@ -938,7 +937,23 @@ class TPartition : public TActorBootstrapped { TInstant LastUsedStorageMeterTimestamp; - TDeque> PendingEvents; + using TPendingEvent = std::variant< + std::unique_ptr, + std::unique_ptr, + std::unique_ptr, + std::unique_ptr, + std::unique_ptr, + std::unique_ptr, + std::unique_ptr, + std::unique_ptr + >; + + TDeque PendingEvents; + + template void AddPendingEvent(TAutoPtr>& ev); + template void ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx); + void ProcessPendingEvents(const TActorContext& ctx); + TRowVersion LastEmittedHeartbeat; TLastCounter SourceIdCounter; From 6c0e5a85968dfa6ab6869d8325bce5159d8977ac Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Thu, 27 Feb 2025 22:24:07 +0300 Subject: [PATCH 2/3] [*] deferred message queue --- ydb/core/persqueue/partition.cpp | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 7c55c14d6bde..fa666ad0648b 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -983,7 +983,8 @@ void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActor " Step " << ev->Get()->Step << ", TxId " << ev->Get()->TxId); - ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); + AddPendingEvent(ev); + ProcessPendingEvents(ctx); } template @@ -1074,7 +1075,8 @@ void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext " Step " << ev->Get()->Step << ", TxId " << ev->Get()->TxId); - ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); + AddPendingEvent(ev); + ProcessPendingEvents(ctx); } template <> @@ -1120,7 +1122,8 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx) " Step " << ev->Get()->Step << ", TxId " << ev->Get()->TxId); - ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); + AddPendingEvent(ev); + ProcessPendingEvents(ctx); } template <> @@ -1157,7 +1160,8 @@ void TPartition::ProcessPendingEvent(std::unique_ptr ev, c void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx) { - ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); + AddPendingEvent(ev); + ProcessPendingEvents(ctx); } template <> @@ -1201,7 +1205,9 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest"); ev->Get()->OriginalPartition = ev->Sender; - ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); + + AddPendingEvent(ev); + ProcessPendingEvents(ctx); } void TPartition::WriteInfoResponseHandler( @@ -1303,7 +1309,9 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoResponse"); ev->Get()->SupportivePartition = ev->Sender; - ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); + + AddPendingEvent(ev); + ProcessPendingEvents(ctx); } template <> @@ -1319,7 +1327,9 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorConte ", Message " << ev->Get()->Message); ev->Get()->SupportivePartition = ev->Sender; - ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); + + AddPendingEvent(ev); + ProcessPendingEvents(ctx); } void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr& tx, bool isPredicate) { @@ -3633,7 +3643,8 @@ void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext { PQ_LOG_D("Handle TEvPQ::TEvDeletePartition"); - ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); + AddPendingEvent(ev); + ProcessPendingEvents(ctx); } void TPartition::ScheduleNegativeReplies() From 018a84f508a0b71ea69f8524febf9290816422f0 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Fri, 28 Feb 2025 09:09:56 +0300 Subject: [PATCH 3/3] [*] optimization --- ydb/core/persqueue/partition.cpp | 37 ++++++++++++++++++-------------- ydb/core/persqueue/partition.h | 1 + 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index fa666ad0648b..a46a185b49fa 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -969,6 +969,19 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc ProcessTxsAndUserActs(ctx); } +template +void TPartition::ProcessPendingEvent(TAutoPtr>& ev, const TActorContext& ctx) +{ + if (PendingEvents.empty()) { + // Optimization: if the queue is empty, you can process the message immediately + ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); + } else { + // We need to keep the order in which the messages arrived + AddPendingEvent(ev); + ProcessPendingEvents(ctx); + } +} + template <> void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) { @@ -983,8 +996,7 @@ void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActor " Step " << ev->Get()->Step << ", TxId " << ev->Get()->TxId); - AddPendingEvent(ev); - ProcessPendingEvents(ctx); + ProcessPendingEvent(ev, ctx); } template @@ -1075,8 +1087,7 @@ void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext " Step " << ev->Get()->Step << ", TxId " << ev->Get()->TxId); - AddPendingEvent(ev); - ProcessPendingEvents(ctx); + ProcessPendingEvent(ev, ctx); } template <> @@ -1122,8 +1133,7 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx) " Step " << ev->Get()->Step << ", TxId " << ev->Get()->TxId); - AddPendingEvent(ev); - ProcessPendingEvents(ctx); + ProcessPendingEvent(ev, ctx); } template <> @@ -1160,8 +1170,7 @@ void TPartition::ProcessPendingEvent(std::unique_ptr ev, c void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx) { - AddPendingEvent(ev); - ProcessPendingEvents(ctx); + ProcessPendingEvent(ev, ctx); } template <> @@ -1206,8 +1215,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon ev->Get()->OriginalPartition = ev->Sender; - AddPendingEvent(ev); - ProcessPendingEvents(ctx); + ProcessPendingEvent(ev, ctx); } void TPartition::WriteInfoResponseHandler( @@ -1310,8 +1318,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo ev->Get()->SupportivePartition = ev->Sender; - AddPendingEvent(ev); - ProcessPendingEvents(ctx); + ProcessPendingEvent(ev, ctx); } template <> @@ -1328,8 +1335,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorConte ev->Get()->SupportivePartition = ev->Sender; - AddPendingEvent(ev); - ProcessPendingEvents(ctx); + ProcessPendingEvent(ev, ctx); } void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr& tx, bool isPredicate) { @@ -3643,8 +3649,7 @@ void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext { PQ_LOG_D("Handle TEvPQ::TEvDeletePartition"); - AddPendingEvent(ev); - ProcessPendingEvents(ctx); + ProcessPendingEvent(ev, ctx); } void TPartition::ScheduleNegativeReplies() diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 33a0725ba8f7..5fba23d76e18 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -952,6 +952,7 @@ class TPartition : public TActorBootstrapped { template void AddPendingEvent(TAutoPtr>& ev); template void ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx); + template void ProcessPendingEvent(TAutoPtr>& ev, const TActorContext& ctx); void ProcessPendingEvents(const TActorContext& ctx); TRowVersion LastEmittedHeartbeat;