diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index adc16ed6f952..3d71402f1a68 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -1086,6 +1086,8 @@ struct TEvPQ { } ui32 Cookie; // InternalPartitionId + TActorId SupportivePartition; + NPQ::TSourceIdMap SrcIdInfo; std::deque BodyKeys; TVector BlobsFromHead; @@ -1102,6 +1104,7 @@ struct TEvPQ { struct TEvGetWriteInfoError : public TEventLocal { ui32 Cookie; // InternalPartitionId TString Message; + TActorId SupportivePartition; TEvGetWriteInfoError(ui32 cookie, TString message) : Cookie(cookie), diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 68dc875982a8..a46a185b49fa 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -581,7 +581,7 @@ void TPartition::InitComplete(const TActorContext& ctx) { TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds()); FillReadFromTimestamps(ctx); - ResendPendingEvents(ctx); + ProcessPendingEvents(ctx); ProcessTxsAndUserActs(ctx); ctx.Send(ctx.SelfID, new TEvents::TEvWakeup()); @@ -969,37 +969,69 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc ProcessTxsAndUserActs(ctx); } +template +void TPartition::ProcessPendingEvent(TAutoPtr>& ev, const TActorContext& ctx) +{ + if (PendingEvents.empty()) { + // Optimization: if the queue is empty, you can process the message immediately + ProcessPendingEvent(std::unique_ptr(ev->Release().Release()), ctx); + } else { + // We need to keep the order in which the messages arrived + AddPendingEvent(ev); + ProcessPendingEvents(ctx); + } +} + +template <> +void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) +{ + PushBackDistrTx(ev.release()); + + ProcessTxsAndUserActs(ctx); +} + void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx) { PQ_LOG_D("Handle TEvPQ::TEvProposePartitionConfig" << " Step " << ev->Get()->Step << ", TxId " << ev->Get()->TxId); - PushBackDistrTx(ev->Release()); + ProcessPendingEvent(ev, ctx); +} - ProcessTxsAndUserActs(ctx); +template +void TPartition::AddPendingEvent(TAutoPtr>& ev) +{ + std::unique_ptr p(ev->Release().Release()); + PendingEvents.emplace_back(std::move(p)); } void TPartition::HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext&) { PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCalcPredicate"); - PendingEvents.emplace_back(ev->ReleaseBase().Release()); + AddPendingEvent(ev); } void TPartition::HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext&) { - PendingEvents.emplace_back(ev->ReleaseBase().Release()); + PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCommit"); + + AddPendingEvent(ev); } void TPartition::HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext&) { - PendingEvents.emplace_back(ev->ReleaseBase().Release()); + PQ_LOG_D("HandleOnInit TEvPQ::TEvTxRollback"); + + AddPendingEvent(ev); } void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext&) { - PendingEvents.emplace_back(ev->ReleaseBase().Release()); + PQ_LOG_D("HandleOnInit TEvPQ::TEvProposePartitionConfig"); + + AddPendingEvent(ev); } void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& /* ctx */) @@ -1009,7 +1041,7 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TAc Y_ABORT_UNLESS(IsSupportive()); ev->Get()->OriginalPartition = ev->Sender; - PendingEvents.emplace_back(ev->ReleaseBase().Release()); + AddPendingEvent(ev); } void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& /* ctx */) @@ -1018,7 +1050,7 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TA Y_ABORT_UNLESS(!IsSupportive()); - PendingEvents.emplace_back(ev->ReleaseBase().Release()); + AddPendingEvent(ev); } void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& /* ctx */) @@ -1027,43 +1059,46 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActo Y_ABORT_UNLESS(!IsSupportive()); - PendingEvents.emplace_back(ev->ReleaseBase().Release()); + AddPendingEvent(ev); } -void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx) +template <> +void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" << - " Step " << ev->Get()->Step << - ", TxId " << ev->Get()->TxId); - if (PlanStep.Defined() && TxId.Defined()) { - if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) { + if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) { Send(Tablet, - MakeHolder(ev->Get()->Step, - ev->Get()->TxId, + MakeHolder(ev->Step, + ev->TxId, Partition, Nothing()).Release()); return; } } - PushBackDistrTx(ev->Release()); + PushBackDistrTx(ev.release()); ProcessTxsAndUserActs(ctx); } -void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx) +void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvPQ::TEvTxCommit" << + PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" << " Step " << ev->Get()->Step << ", TxId " << ev->Get()->TxId); + ProcessPendingEvent(ev, ctx); +} + +template <> +void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) +{ if (PlanStep.Defined() && TxId.Defined()) { - if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) { + if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) { PQ_LOG_D("Send TEvTxCommitDone" << - " Step " << ev->Get()->Step << - ", TxId " << ev->Get()->TxId); - ctx.Send(Tablet, MakeCommitDone(ev->Get()->Step, ev->Get()->TxId).Release()); + " Step " << ev->Step << + ", TxId " << ev->TxId); + ctx.Send(Tablet, MakeCommitDone(ev->Step, ev->TxId).Release()); return; } } @@ -1073,18 +1108,18 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx) Y_ABORT_UNLESS(TransactionsInflight.size() == 1, "PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64, TabletID, Partition.OriginalPartitionId, - ev->Get()->Step, ev->Get()->TxId); - PendingExplicitMessageGroups = ev->Get()->ExplicitMessageGroups; + ev->Step, ev->TxId); + PendingExplicitMessageGroups = ev->ExplicitMessageGroups; } else { Y_ABORT_UNLESS(!TransactionsInflight.empty(), "PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64, TabletID, Partition.OriginalPartitionId, - ev->Get()->Step, ev->Get()->TxId); - txIter = TransactionsInflight.find(ev->Get()->TxId); + ev->Step, ev->TxId); + txIter = TransactionsInflight.find(ev->TxId); Y_ABORT_UNLESS(!txIter.IsEnd(), "PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64, TabletID, Partition.OriginalPartitionId, - ev->Get()->Step, ev->Get()->TxId); + ev->Step, ev->TxId); } Y_ABORT_UNLESS(txIter->second->State == ECommitState::Pending); @@ -1092,14 +1127,23 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx) ProcessTxsAndUserActs(ctx); } -void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx) +void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx) +{ + PQ_LOG_D("Handle TEvPQ::TEvTxCommit" << + " Step " << ev->Get()->Step << + ", TxId " << ev->Get()->TxId); + + ProcessPendingEvent(ev, ctx); +} + +template <> +void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) { - auto* event = ev->Get(); if (PlanStep.Defined() && TxId.Defined()) { - if (GetStepAndTxId(*event) < GetStepAndTxId(*PlanStep, *TxId)) { + if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) { PQ_LOG_D("Rollback for" << - " Step " << ev->Get()->Step << - ", TxId " << ev->Get()->TxId); + " Step " << ev->Step << + ", TxId " << ev->TxId); return; } } @@ -1113,7 +1157,7 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx Y_ABORT_UNLESS(!TransactionsInflight.empty(), "PQ: %" PRIu64 ", Partition: %" PRIu32, TabletID, Partition.OriginalPartitionId); - txIter = TransactionsInflight.find(ev->Get()->TxId); + txIter = TransactionsInflight.find(ev->TxId); Y_ABORT_UNLESS(!txIter.IsEnd(), "PQ: %" PRIu64 ", Partition: %" PRIu32, TabletID, Partition.OriginalPartitionId); @@ -1124,13 +1168,17 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx ProcessTxsAndUserActs(ctx); } -void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest"); - TActorId originalPartition = ev->Get()->OriginalPartition; - if (!originalPartition) { - // original message - originalPartition = ev->Sender; - } +void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx) +{ + ProcessPendingEvent(ev, ctx); +} + +template <> +void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) +{ + TActorId originalPartition = ev->OriginalPartition; + Y_ABORT_UNLESS(originalPartition != TActorId()); + if (ClosedInternalPartition || WaitingForPreviousBlobQuota() || (CurrentStateFunc() != &TThis::StateIdle)) { PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoError"); auto* response = new TEvPQ::TEvGetWriteInfoError(Partition.InternalPartitionId, @@ -1162,6 +1210,14 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon ctx.Send(originalPartition, response); } +void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) { + PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest"); + + ev->Get()->OriginalPartition = ev->Sender; + + ProcessPendingEvent(ev, ctx); +} + void TPartition::WriteInfoResponseHandler( const TActorId& sender, TGetWriteInfoResp&& ev, @@ -1250,17 +1306,36 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx) return ret; } +template <> +void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) +{ + const auto sender = ev->SupportivePartition; + WriteInfoResponseHandler(sender, ev.release(), ctx); +} + void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& ctx) { PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoResponse"); - WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx); + + ev->Get()->SupportivePartition = ev->Sender; + + ProcessPendingEvent(ev, ctx); } +template <> +void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) +{ + const auto sender = ev->SupportivePartition; + WriteInfoResponseHandler(sender, ev.release(), ctx); +} void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx) { PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError " << "Cookie " << ev->Get()->Cookie << ", Message " << ev->Get()->Message); - WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx); + + ev->Get()->SupportivePartition = ev->Sender; + + ProcessPendingEvent(ev, ctx); } void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr& tx, bool isPredicate) { @@ -2698,16 +2773,6 @@ void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId) TxIdHasChanged = true; } -void TPartition::ResendPendingEvents(const TActorContext& ctx) -{ - PQ_LOG_D("Resend pending events. Count " << PendingEvents.size()); - - while (!PendingEvents.empty()) { - ctx.Schedule(TDuration::Zero(), PendingEvents.front().release()); - PendingEvents.pop_front(); - } -} - TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx) { if (AffectedUsers.size() >= MAX_USERS) { @@ -3560,14 +3625,17 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&) { + PQ_LOG_D("HandleOnInit TEvPQ::TEvDeletePartition"); + Y_ABORT_UNLESS(IsSupportive()); - PendingEvents.emplace_back(ev->ReleaseBase().Release()); + AddPendingEvent(ev); } -void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& ctx) +template <> +void TPartition::ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx) { - PQ_LOG_D("Handle TEvPQ::TEvDeletePartition"); + Y_UNUSED(ev); Y_ABORT_UNLESS(IsSupportive()); Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED); @@ -3577,6 +3645,13 @@ void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& c ProcessTxsAndUserActs(ctx); } +void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx) +{ + PQ_LOG_D("Handle TEvPQ::TEvDeletePartition"); + + ProcessPendingEvent(ev, ctx); +} + void TPartition::ScheduleNegativeReplies() { auto processQueue = [&](std::deque& queue) { @@ -3647,6 +3722,23 @@ void TPartition::ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransac MakeHolder(writeId).Release()); } +void TPartition::ProcessPendingEvents(const TActorContext& ctx) +{ + PQ_LOG_D("Process pending events. Count " << PendingEvents.size()); + + while (!PendingEvents.empty()) { + auto ev = std::move(PendingEvents.front()); + PendingEvents.pop_front(); + + auto visitor = [this, &ctx](auto&& v) { + using T = std::decay_t; + ProcessPendingEvent(std::forward(v), ctx); + }; + + std::visit(visitor, std::move(ev)); + } +} + const NKikimrPQ::TPQTabletConfig::TPartition* TPartition::GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config) { return NPQ::GetPartitionConfig(config, Partition.OriginalPartitionId); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 197ef34f6181..5fba23d76e18 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -427,7 +427,6 @@ class TPartition : public TActorBootstrapped { void ChangePlanStepAndTxId(ui64 step, ui64 txId); - void ResendPendingEvents(const TActorContext& ctx); void SendReadPreparedProxyResponse(const TReadAnswer& answer, const TReadInfo& readInfo, TUserInfo& user); void CheckIfSessionExists(TUserInfoBase& userInfo, const TActorId& newPipe); @@ -938,7 +937,24 @@ class TPartition : public TActorBootstrapped { TInstant LastUsedStorageMeterTimestamp; - TDeque> PendingEvents; + using TPendingEvent = std::variant< + std::unique_ptr, + std::unique_ptr, + std::unique_ptr, + std::unique_ptr, + std::unique_ptr, + std::unique_ptr, + std::unique_ptr, + std::unique_ptr + >; + + TDeque PendingEvents; + + template void AddPendingEvent(TAutoPtr>& ev); + template void ProcessPendingEvent(std::unique_ptr ev, const TActorContext& ctx); + template void ProcessPendingEvent(TAutoPtr>& ev, const TActorContext& ctx); + void ProcessPendingEvents(const TActorContext& ctx); + TRowVersion LastEmittedHeartbeat; TLastCounter SourceIdCounter;