diff --git a/ydb/core/tablet_flat/flat_exec_seat.cpp b/ydb/core/tablet_flat/flat_exec_seat.cpp index 116661d2c1dd..a5715f19d4ce 100644 --- a/ydb/core/tablet_flat/flat_exec_seat.cpp +++ b/ydb/core/tablet_flat/flat_exec_seat.cpp @@ -4,6 +4,13 @@ namespace NKikimr { namespace NTabletFlatExecutor { void TSeat::Complete(const TActorContext& ctx, bool isRW) noexcept { + if (Y_UNLIKELY(IsTerminated())) { + Y_ABORT_UNLESS(!isRW, "Terminating a read-write transaction"); + Self->Terminate(TerminationReason, ctx); + Self->TxSpan.EndError("Terminated"); + return; + } + NWilson::TSpan span(TWilsonTablet::TabletDetailed, Self->TxSpan.GetTraceId(), "Tablet.Transaction.Complete"); for (auto& callback : OnPersistent) { callback(); @@ -15,11 +22,5 @@ namespace NTabletFlatExecutor { Self->TxSpan.EndOk(); } - void TSeat::Terminate(ETerminationReason reason, const TActorContext& ctx) noexcept { - Self->Terminate(reason, ctx); - - Self->TxSpan.EndError("Terminated"); - } - } // namespace NTabletFlatExecutor } // namespace NKikimr diff --git a/ydb/core/tablet_flat/flat_exec_seat.h b/ydb/core/tablet_flat/flat_exec_seat.h index 2eef8f0c59f6..e6473153e426 100644 --- a/ydb/core/tablet_flat/flat_exec_seat.h +++ b/ydb/core/tablet_flat/flat_exec_seat.h @@ -4,6 +4,7 @@ #include "tablet_flat_executor.h" #include "flat_sausagecache.h" +#include #include #include #include @@ -12,7 +13,17 @@ namespace NKikimr { namespace NTabletFlatExecutor { - struct TSeat { + enum class ESeatState { + None, + Pending, + Active, + ActiveLow, + Postponed, + Waiting, + Done, + }; + + struct TSeat : public TIntrusiveListItem { using TPinned = THashMap>>; TSeat(const TSeat&) = delete; @@ -30,9 +41,11 @@ namespace NTabletFlatExecutor { out << "}"; } - void Complete(const TActorContext& ctx, bool isRW) noexcept; + bool IsTerminated() const { + return TerminationReason != ETerminationReason::None; + } - void Terminate(ETerminationReason reason, const TActorContext& ctx) noexcept; + void Complete(const TActorContext& ctx, bool isRW) noexcept; void StartEnqueuedSpan() noexcept { WaitingSpan = NWilson::TSpan(TWilsonTablet::TabletDetailed, Self->TxSpan.GetTraceId(), "Tablet.Transaction.Enqueued"); @@ -54,7 +67,7 @@ namespace NTabletFlatExecutor { return Self->TxSpan.GetTraceId(); } - const ui64 UniqID = Max(); + const ui64 UniqID; const TAutoPtr Self; NWilson::TSpan WaitingSpan; ui64 Retries = 0; @@ -74,6 +87,10 @@ namespace NTabletFlatExecutor { ui32 NotEnoughMemoryCount = 0; ui64 TaskId = 0; + ESeatState State = ESeatState::Done; + bool LowPriority = false; + bool Cancelled = false; + TAutoPtr AttachedMemory; TIntrusivePtr CapturedMemory; TVector> OnPersistent; diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 0dcd0c59a242..3518176d4a7b 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -128,8 +128,6 @@ TExecutor::TExecutor( , Time(TAppData::TimeProvider) , Owner(owner) , OwnerActorId(ownerActorId) - , ActivationQueue(new TActivationQueue()) - , PendingQueue(new TActivationQueue()) , Emitter(new TIdEmitter) , CounterEventsInFlight(new TEvTabletCounters::TInFlightCookie) , Stats(new TExecutorStatsImpl()) @@ -146,6 +144,10 @@ ui64 TExecutor::Stamp() const noexcept return CommitManager ? CommitManager->Stamp() : TTxStamp{ Generation0, Step0 }.Raw; } +TActorContext TExecutor::SelfCtx() const { + return TActivationContext::ActorContextFor(SelfId()); +} + TActorContext TExecutor::OwnerCtx() const { return TActivationContext::ActorContextFor(OwnerActorId); } @@ -222,12 +224,16 @@ void TExecutor::RecreatePageCollectionsCache() noexcept if (TransactionWaitPads) { for (auto &xpair : TransactionWaitPads) { - auto &seat = xpair.second->Seat; + TSeat* seat = xpair.second->Seat; + Y_ABORT_UNLESS(seat->State == ESeatState::Waiting); + seat->State = ESeatState::None; xpair.second->WaitingSpan.EndOk(); - LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); - seat->StartEnqueuedSpan(); - ActivationQueue->Push(seat.Release()); - ActivateTransactionWaiting++; + + if (seat->Cancelled) { + FinishCancellation(seat, false); + } else { + EnqueueActivation(seat, false); + } } TransactionWaitPads.clear(); } @@ -547,46 +553,98 @@ void TExecutor::TranscriptFollowerBootOpResult(ui32 res, const TActorContext &ct } } +std::unique_ptr TExecutor::RemoveTransaction(ui64 id) { + auto it = Transactions.find(id); + Y_ABORT_UNLESS(it != Transactions.end(), "Cannot find transaction %" PRIu64, id); + auto res = std::move(it->second); + res->State = ESeatState::Done; + Transactions.erase(it); + return res; +} + +void TExecutor::FinishCancellation(TSeat* seat, bool activateMore) { + UnpinTransactionPages(*seat); + Memory->ReleaseMemory(*seat); + --Stats->TxInFly; + Counters->Simple()[TExecutorCounters::DB_TX_IN_FLY] = Stats->TxInFly; + RemoveTransaction(seat->UniqID); + if (activateMore) { + PlanTransactionActivation(); + MaybeRelaxRejectProbability(); + } +} + +void TExecutor::EnqueueActivation(TSeat* seat, bool activate) { + LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); + seat->StartEnqueuedSpan(); + if (!seat->LowPriority) { + seat->State = ESeatState::Active; + ActivationQueue.PushBack(seat); + ActivateTransactionWaiting++; + if (activate && ActivateTransactionInFlight < ActivateTransactionWaiting) { + Send(SelfId(), new TEvPrivate::TEvActivateExecution()); + ++ActivateTransactionInFlight; + } + } else { + seat->State = ESeatState::ActiveLow; + ActivationLowQueue.PushBack(seat); + ActivateLowTransactionWaiting++; + if (activate && ActivateLowTransactionInFlight < 1) { + Send(SelfId(), new TEvPrivate::TEvActivateLowExecution()); + ++ActivateLowTransactionInFlight; + } + } +} + void TExecutor::PlanTransactionActivation() { if (!CanExecuteTransaction()) return; const ui64 limitTxInFly = Scheme().Executor.LimitInFlyTx; - while (PendingQueue->Head() && (!limitTxInFly || (Stats->TxInFly - Stats->TxPending < limitTxInFly))) { - TAutoPtr seat = PendingQueue->Pop(); + while (PendingQueue && (!limitTxInFly || (Stats->TxInFly - Stats->TxPending < limitTxInFly))) { + TSeat* seat = PendingQueue.PopFront(); + Y_ABORT_UNLESS(seat->State == ESeatState::Pending); + seat->State = ESeatState::None; seat->FinishPendingSpan(); - LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); - seat->StartEnqueuedSpan(); - ActivationQueue->Push(seat.Release()); - ActivateTransactionWaiting++; --Stats->TxPending; + EnqueueActivation(seat, false); } while (ActivateTransactionInFlight < ActivateTransactionWaiting) { Send(SelfId(), new TEvPrivate::TEvActivateExecution()); - ActivateTransactionInFlight++; + ++ActivateTransactionInFlight; + } + + while (ActivateLowTransactionWaiting > 0 && ActivateLowTransactionInFlight < 1) { + Send(SelfId(), new TEvPrivate::TEvActivateLowExecution()); + ++ActivateLowTransactionInFlight; } } void TExecutor::ActivateWaitingTransactions(TPrivatePageCache::TPage::TWaitQueuePtr waitPadsQueue) { if (waitPadsQueue) { - bool haveTransactions = false; + bool activate = CanExecuteTransaction(); + bool cancelled = false; while (TPrivatePageCacheWaitPad *waitPad = waitPadsQueue->Pop()) { if (auto it = TransactionWaitPads.find(waitPad); it != TransactionWaitPads.end()) { - it->second->WaitingSpan.EndOk(); - auto &seat = it->second->Seat; - LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); - seat->StartEnqueuedSpan(); - ActivationQueue->Push(seat.Release()); - ActivateTransactionWaiting++; + TSeat* seat = it->second->Seat; + Y_ABORT_UNLESS(seat->State == ESeatState::Waiting); + seat->State = ESeatState::None; + seat->WaitingSpan.EndOk(); TransactionWaitPads.erase(waitPad); - haveTransactions = true; + if (seat->Cancelled) { + FinishCancellation(seat, false); + cancelled = true; + } else { + EnqueueActivation(seat, activate); + } } else { Y_Fail("Unexpected wait pad triggered"); } } - if (haveTransactions) { + if (cancelled && activate) { PlanTransactionActivation(); + MaybeRelaxRejectProbability(); } } } @@ -1678,10 +1736,10 @@ bool TExecutor::CanExecuteTransaction() const { return Stats->IsActive && (Stats->IsFollower() || PendingPartSwitches.empty()) && !BrokenTransaction; } -void TExecutor::DoExecute(TAutoPtr self, bool allowImmediate, const TActorContext &ctx) { - Y_ABORT_UNLESS(ActivationQueue, "attempt to execute transaction before activation"); - - TAutoPtr seat = new TSeat(++TransactionUniqCounter, self); +ui64 TExecutor::DoExecute(TAutoPtr self, ETxMode mode) { + ui64 uniqId = ++TransactionUniqCounter; + TSeat* seat = (Transactions[uniqId] = std::make_unique(uniqId, self)).get(); + seat->LowPriority = mode == ETxMode::LowPriority; seat->Self->SetupTxSpanName(); LWTRACK(TransactionBegin, seat->Self->Orbit, seat->UniqID, Owner->TabletID(), TypeName(*seat->Self)); @@ -1705,11 +1763,9 @@ void TExecutor::DoExecute(TAutoPtr self, bool allowImmediate, cons if (staticRemain < seat->CurrentTxDataLimit) { LWTRACK(TransactionNeedMemory, seat->Self->Orbit, seat->UniqID); Memory->RequestLimit(*seat, seat->CurrentTxDataLimit); - auto *transptr = seat.Release(); - auto pairIt = PostponedTransactions.emplace(transptr, transptr); - Y_ABORT_UNLESS(pairIt.second); - - return; + seat->State = ESeatState::Postponed; + PostponedTransactions.PushBack(seat); + return uniqId; } Memory->AllocStatic(*seat, Memory->Profile->GetInitialTxMemory()); @@ -1720,32 +1776,98 @@ void TExecutor::DoExecute(TAutoPtr self, bool allowImmediate, cons LWTRACK(TransactionPending, seat->Self->Orbit, seat->UniqID, CanExecuteTransaction() ? "tx limit reached" : "transactions paused"); seat->CreatePendingSpan(); - PendingQueue->Push(seat.Release()); + seat->State = ESeatState::Pending; + PendingQueue.PushBack(seat); ++Stats->TxPending; - return; + return uniqId; } - if (ActiveTransaction || ActivateTransactionWaiting || !allowImmediate) { - LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); - seat->StartEnqueuedSpan(); - ActivationQueue->Push(seat.Release()); - ActivateTransactionWaiting++; - PlanTransactionActivation(); - return; + if (mode == ETxMode::Execute && (ActiveTransaction || ActivateTransactionWaiting)) { + mode = ETxMode::Enqueue; } - ExecuteTransaction(seat, ctx); + switch (mode) { + case ETxMode::Execute: + ExecuteTransaction(seat); + return uniqId; + + case ETxMode::Enqueue: + case ETxMode::LowPriority: + EnqueueActivation(seat, true); + return uniqId; + } + + Y_FAIL("Unimplemented transaction mode"); } void TExecutor::Execute(TAutoPtr self, const TActorContext &ctx) { - DoExecute(self, true, ctx); + Y_UNUSED(ctx); + DoExecute(self, ETxMode::Execute); +} + +ui64 TExecutor::Enqueue(TAutoPtr self) { + return DoExecute(self, ETxMode::Enqueue); } -void TExecutor::Enqueue(TAutoPtr self, const TActorContext &ctx) { - DoExecute(self, false, ctx); +ui64 TExecutor::EnqueueLowPriority(TAutoPtr self) { + return DoExecute(self, ETxMode::LowPriority); } -void TExecutor::ExecuteTransaction(TAutoPtr seat, const TActorContext &ctx) { +bool TExecutor::CancelTransaction(ui64 id) { + auto it = Transactions.find(id); + if (it == Transactions.end()) { + return false; + } + + TSeat* seat = it->second.get(); + switch (seat->State) { + case ESeatState::None: + // Transaction is not paused in any way + Y_DEBUG_ABORT_UNLESS(false, + "Tablet %" PRIu64 " CancelTransaction(%" PRIu64 ") from inside transaction?", + TabletId(), id); + return false; + + case ESeatState::Active: + ActivationQueue.Remove(seat); + Y_ABORT_UNLESS(ActivateTransactionWaiting > 0); + --ActivateTransactionWaiting; + break; + + case ESeatState::ActiveLow: + ActivationLowQueue.Remove(seat); + Y_ABORT_UNLESS(ActivateLowTransactionWaiting > 0); + --ActivateLowTransactionWaiting; + break; + + case ESeatState::Pending: + PendingQueue.Remove(seat); + Y_ABORT_UNLESS(Stats->TxPending > 0); + --Stats->TxPending; + break; + + case ESeatState::Postponed: + case ESeatState::Waiting: + if (seat->Cancelled) { + return false; + } + // Cannot safely remove now, wait until later + seat->Cancelled = true; + return true; + + default: + Y_DEBUG_ABORT_UNLESS(false, + "Tablet %" PRIu64 " CancelTransaction(% " PRIu64 ") for a finished transaction", + TabletId(), id); + return false; + } + + seat->State = ESeatState::None; + FinishCancellation(seat); + return true; +} + +void TExecutor::ExecuteTransaction(TSeat* seat) { TActiveTransactionZone activeTransaction(this); ++seat->Retries; @@ -1762,7 +1884,7 @@ void TExecutor::ExecuteTransaction(TAutoPtr seat, const TActorContext &ct LWTRACK(TransactionExecuteBegin, seat->Self->Orbit, seat->UniqID); txc.StartExecutionSpan(); - const bool done = seat->Self->Execute(txc, ctx.MakeFor(OwnerActorId)); + const bool done = seat->Self->Execute(txc, OwnerCtx()); txc.FinishExecutionSpan(); LWTRACK(TransactionExecuteEnd, seat->Self->Orbit, seat->UniqID, done); @@ -1826,18 +1948,21 @@ void TExecutor::ExecuteTransaction(TAutoPtr seat, const TActorContext &ct // It may not be safe to call Broken right now, call it later Send(SelfId(), new TEvPrivate::TEvBrokenTransaction()); + + // Make sure transaction is properly destroyed + RemoveTransaction(seat->UniqID); } else if (done) { Y_ABORT_UNLESS(!txc.IsRescheduled()); Y_ABORT_UNLESS(!seat->RequestedMemory); seat->OnPersistent = std::move(prod.OnPersistent); - CommitTransactionLog(seat, env, prod.Change, cpuTimer, ctx); + CommitTransactionLog(RemoveTransaction(seat->UniqID), env, prod.Change, cpuTimer); } else { Y_ABORT_UNLESS(!seat->CapturedMemory); if (!PrivatePageCache->GetStats().CurrentCacheMisses && !seat->RequestedMemory && !txc.IsRescheduled()) { Y_Fail(NFmt::Do(*this) << " " << NFmt::Do(*seat) << " type " << NFmt::Do(*seat->Self) << " postponed w/o demands"); } - PostponeTransaction(seat, env, prod.Change, cpuTimer, ctx); + PostponeTransaction(seat, env, prod.Change, cpuTimer); } PrivatePageCache->ResetTouchesAndToLoad(false); @@ -1859,14 +1984,14 @@ void TExecutor::UnpinTransactionPages(TSeat &seat) { Counters->Simple()[TExecutorCounters::CACHE_TOTAL_USED] = TransactionPagesMemory; } -void TExecutor::ReleaseTxData(TSeat &seat, ui64 requested, const TActorContext &ctx) +void TExecutor::ReleaseTxData(TSeat &seat, ui64 requested) { if (auto logl = Logger->Log(ELnLev::Debug)) logl << NFmt::Do(*this) << " " << NFmt::Do(seat) << " release tx data"; TTxMemoryProvider provider(seat.CurrentTxDataLimit - requested, seat.TaskId); static_cast(provider).RequestMemory(requested); - seat.Self->ReleaseTxData(provider, ctx.MakeFor(OwnerActorId)); + seat.Self->ReleaseTxData(provider, OwnerCtx()); Counters->Cumulative()[TExecutorCounters::TX_DATA_RELEASES].Increment(1); @@ -1876,9 +2001,9 @@ void TExecutor::ReleaseTxData(TSeat &seat, ui64 requested, const TActorContext & Memory->ReleaseTxData(seat); } -void TExecutor::PostponeTransaction(TAutoPtr seat, TPageCollectionTxEnv &env, +void TExecutor::PostponeTransaction(TSeat* seat, TPageCollectionTxEnv &env, TAutoPtr change, - THPTimer &bookkeepingTimer, const TActorContext &ctx) + THPTimer &bookkeepingTimer) { TTxType txType = seat->Self->GetTxType(); @@ -1926,7 +2051,7 @@ void TExecutor::PostponeTransaction(TAutoPtr seat, TPageCollectionTxEnv & } seat->TerminationReason = ETerminationReason::MemoryLimitExceeded; - CommitTransactionLog(seat, env, change, bookkeepingTimer, ctx); + CommitTransactionLog(RemoveTransaction(seat->UniqID), env, change, bookkeepingTimer); return; } else if (totalMemory > seat->CurrentMemoryLimit) { @@ -1950,13 +2075,11 @@ void TExecutor::PostponeTransaction(TAutoPtr seat, TPageCollectionTxEnv & LWTRACK(TransactionNeedMemory, seat->Self->Orbit, seat->UniqID); Memory->FreeStatic(*seat, 0); UnpinTransactionPages(*seat); - ReleaseTxData(*seat, requestedMemory, ctx); + ReleaseTxData(*seat, requestedMemory); Memory->RequestLimit(*seat, desired); - - auto *transptr = seat.Release(); - auto pairIt = PostponedTransactions.emplace(transptr, transptr); - Y_ABORT_UNLESS(pairIt.second); + seat->State = ESeatState::Postponed; + PostponedTransactions.PushBack(seat); // todo: counters return; @@ -1966,16 +2089,13 @@ void TExecutor::PostponeTransaction(TAutoPtr seat, TPageCollectionTxEnv & // If memory was allocated and there is nothing to load // then tx may be re-activated. if (!PrivatePageCache->GetStats().CurrentCacheMisses) { - LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); - seat->StartEnqueuedSpan(); - ActivationQueue->Push(seat.Release()); - ActivateTransactionWaiting++; - PlanTransactionActivation(); + EnqueueActivation(seat, CanExecuteTransaction()); return; } LWTRACK(TransactionPageFault, seat->Self->Orbit, seat->UniqID); - auto padHolder = MakeHolder(std::move(seat)); + seat->State = ESeatState::Waiting; + auto padHolder = MakeHolder(seat); auto *const pad = padHolder.Get(); TransactionWaitPads[pad] = std::move(padHolder); @@ -2041,10 +2161,9 @@ void TExecutor::PostponeTransaction(TAutoPtr seat, TPageCollectionTxEnv & Counters->Simple()[TExecutorCounters::CACHE_TOTAL_USED] = TransactionPagesMemory; } -void TExecutor::CommitTransactionLog(TAutoPtr seat, TPageCollectionTxEnv &env, - TAutoPtr change, THPTimer &bookkeepingTimer, const TActorContext &ctx) { +void TExecutor::CommitTransactionLog(std::unique_ptr seat, TPageCollectionTxEnv &env, + TAutoPtr change, THPTimer &bookkeepingTimer) { const bool isReadOnly = !(change->HasAny() || env.HasChanges()); - const bool isTerminated = seat->TerminationReason != ETerminationReason::None; const TTxType txType = seat->Self->GetTxType(); size_t touchedBlocks = PrivatePageCache->GetStats().CurrentCacheHits; @@ -2072,27 +2191,19 @@ void TExecutor::CommitTransactionLog(TAutoPtr seat, TPageCollectionTxEnv const double currentBookkeepingTime = seat->CPUBookkeepingTime; const double currentExecTime = seat->CPUExecTime; - if (isTerminated) { - if (Stats->IsFollower()) { - --Stats->TxInFly; - Counters->Simple()[TExecutorCounters::DB_TX_IN_FLY] = Stats->TxInFly; - seat->Terminate(seat->TerminationReason, OwnerCtx()); - } else if (LogicRedo->TerminateTransaction(seat, ctx, OwnerActorId)) { - --Stats->TxInFly; - Counters->Simple()[TExecutorCounters::DB_TX_IN_FLY] = Stats->TxInFly; - } - } else if (isReadOnly) { + if (isReadOnly) { if (Stats->IsFollower()) { // todo: extract completion counters from txloglogic --Stats->TxInFly; Counters->Simple()[TExecutorCounters::DB_TX_IN_FLY] = Stats->TxInFly; - CompleteRoTransaction(seat, OwnerCtx(), Counters.Get(), AppTxCounters); - } else if (LogicRedo->CommitROTransaction(seat, OwnerCtx())) { + CompleteRoTransaction(std::move(seat), OwnerCtx(), Counters.Get(), AppTxCounters); + } else if (LogicRedo->CommitROTransaction(std::move(seat), OwnerCtx())) { --Stats->TxInFly; Counters->Simple()[TExecutorCounters::DB_TX_IN_FLY] = Stats->TxInFly; } } else { Y_ABORT_UNLESS(!Stats->IsFollower()); + Y_ABORT_UNLESS(!seat->IsTerminated(), "Read-write transactions cannot be terminated"); const bool allowBatching = Scheme().Executor.AllowLogBatching; const bool force = !allowBatching @@ -2106,7 +2217,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr seat, TPageCollectionTxEnv || env.LoanConfirmation || env.BorrowUpdates; - auto commitResult = LogicRedo->CommitRWTransaction(seat, *change, force); + auto commitResult = LogicRedo->CommitRWTransaction(std::move(seat), *change, force); Y_ABORT_UNLESS(!force || commitResult.Commit); auto *commit = commitResult.Commit.Get(); // could be nullptr @@ -2534,7 +2645,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr seat, TPageCollectionTxEnv delay = TDuration::MicroSeconds(LogFlushDelayOverrideUsec); } if (delay.MicroSeconds() == 0) { - ctx.Send(ctx.SelfID, new TEvents::TEvFlushLog()); + Send(SelfId(), new TEvents::TEvFlushLog()); } else { Y_DEBUG_ABORT_UNLESS(delay < TDuration::Minutes(1)); delay = Min(delay, TDuration::Seconds(59)); @@ -2562,7 +2673,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr seat, TPageCollectionTxEnv if (ResourceMetrics) { ResourceMetrics->CPU.Increment(bookkeepingTimeuS + execTimeuS, Time->Now()); - ResourceMetrics->TryUpdate(ctx); + ResourceMetrics->TryUpdate(SelfCtx()); } MaybeRelaxRejectProbability(); @@ -2683,17 +2794,21 @@ void TExecutor::MakeLogSnapshot() { void TExecutor::Handle(TEvPrivate::TEvActivateExecution::TPtr &ev, const TActorContext &ctx) { Y_UNUSED(ev); + Y_UNUSED(ctx); Y_ABORT_UNLESS(ActivateTransactionInFlight > 0); ActivateTransactionInFlight--; if (!CanExecuteTransaction()) return; - if (TAutoPtr seat = ActivationQueue->Pop()) { + if (ActivationQueue) { + TSeat* seat = ActivationQueue.PopFront(); + Y_ABORT_UNLESS(seat->State == ESeatState::Active); + seat->State = ESeatState::None; Y_ABORT_UNLESS(ActivateTransactionWaiting > 0); ActivateTransactionWaiting--; seat->FinishEnqueuedSpan(); - ExecuteTransaction(seat, ctx); + ExecuteTransaction(seat); } else { // N.B. it should actually never happen, since ActivationQueue size // is always exactly equal to ActivateTransactionWaiting and we never @@ -2704,6 +2819,35 @@ void TExecutor::Handle(TEvPrivate::TEvActivateExecution::TPtr &ev, const TActorC } } +void TExecutor::Handle(TEvPrivate::TEvActivateLowExecution::TPtr &ev, const TActorContext &ctx) { + Y_UNUSED(ev); + Y_UNUSED(ctx); + Y_ABORT_UNLESS(ActivateLowTransactionInFlight > 0); + ActivateLowTransactionInFlight--; + + if (!CanExecuteTransaction()) + return; + + if (ActivationLowQueue) { + TSeat* seat = ActivationLowQueue.PopFront(); + Y_ABORT_UNLESS(seat->State == ESeatState::ActiveLow); + seat->State = ESeatState::None; + Y_ABORT_UNLESS(ActivateLowTransactionWaiting > 0); + ActivateLowTransactionWaiting--; + seat->FinishEnqueuedSpan(); + + // Activate the next transaction + if (ActivateLowTransactionWaiting > 0 && ActivateLowTransactionInFlight < 1) { + Send(SelfId(), new TEvPrivate::TEvActivateLowExecution()); + ++ActivateLowTransactionInFlight; + } + + ExecuteTransaction(seat); + } else { + Y_ABORT_UNLESS(ActivateLowTransactionWaiting == 0); + } +} + void TExecutor::Handle(TEvPrivate::TEvBrokenTransaction::TPtr &ev, const TActorContext &ctx) { Y_UNUSED(ev); Y_UNUSED(ctx); @@ -3028,16 +3172,18 @@ void TExecutor::Handle(TEvResourceBroker::TEvResourceAllocated::TPtr &ev) { void TExecutor::StartSeat(ui64 task, TResource *cookie_) noexcept { auto *cookie = CheckedCast(cookie_); - auto it = PostponedTransactions.find(cookie->Seat); - Y_ABORT_UNLESS(it != PostponedTransactions.end()); - TAutoPtr seat = std::move(it->second); - PostponedTransactions.erase(it); + TSeat* seat = cookie->Seat; + Y_ABORT_UNLESS(seat->State == ESeatState::Postponed); + PostponedTransactions.Remove(seat); + seat->State = ESeatState::None; Memory->AcquiredMemory(*seat, task); - LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); - seat->StartEnqueuedSpan(); - ActivationQueue->Push(seat.Release()); - ActivateTransactionWaiting++; - PlanTransactionActivation(); + + if (seat->Cancelled) { + FinishCancellation(seat); + return; + } + + EnqueueActivation(seat, CanExecuteTransaction()); } THolder TExecutor::PrepareScanSnapshot(ui32 table, const NTable::TCompactionParams *params, TRowVersion snapshot) @@ -3957,6 +4103,7 @@ STFUNC(TExecutor::StateBoot) { switch (ev->GetTypeRewrite()) { // N.B. must work during follower promotion to leader HFunc(TEvPrivate::TEvActivateExecution, Handle); + HFunc(TEvPrivate::TEvActivateLowExecution, Handle); HFunc(TEvPrivate::TEvBrokenTransaction, Handle); HFunc(TEvents::TEvWakeup, Wakeup); hFunc(TEvResourceBroker::TEvResourceAllocated, Handle); @@ -3968,6 +4115,7 @@ STFUNC(TExecutor::StateBoot) { STFUNC(TExecutor::StateWork) { switch (ev->GetTypeRewrite()) { HFunc(TEvPrivate::TEvActivateExecution, Handle); + HFunc(TEvPrivate::TEvActivateLowExecution, Handle); HFunc(TEvPrivate::TEvBrokenTransaction, Handle); HFunc(TEvPrivate::TEvActivateCompactionChanges, Handle); CFunc(TEvPrivate::EvUpdateCounters, UpdateCounters); @@ -4000,6 +4148,7 @@ STFUNC(TExecutor::StateWork) { STFUNC(TExecutor::StateFollower) { switch (ev->GetTypeRewrite()) { HFunc(TEvPrivate::TEvActivateExecution, Handle); + HFunc(TEvPrivate::TEvActivateLowExecution, Handle); HFunc(TEvPrivate::TEvBrokenTransaction, Handle); CFunc(TEvPrivate::EvUpdateCounters, UpdateCounters); HFunc(TEvents::TEvWakeup, Wakeup); @@ -4022,6 +4171,7 @@ STFUNC(TExecutor::StateFollowerBoot) { switch (ev->GetTypeRewrite()) { // N.B. must handle activities started before resync HFunc(TEvPrivate::TEvActivateExecution, Handle); + HFunc(TEvPrivate::TEvActivateLowExecution, Handle); HFunc(TEvPrivate::TEvBrokenTransaction, Handle); HFunc(TEvents::TEvWakeup, Wakeup); hFunc(TEvResourceBroker::TEvResourceAllocated, Handle); @@ -4301,8 +4451,8 @@ TString TExecutor::CheckBorrowConsistency() { return BorrowLogic->DebugCheckBorrowConsistency(knownBundles); } -TTransactionWaitPad::TTransactionWaitPad(THolder seat) - : Seat(std::move(seat)) +TTransactionWaitPad::TTransactionWaitPad(TSeat* seat) + : Seat(seat) , WaitingSpan(NWilson::TSpan(TWilsonTablet::TabletDetailed, Seat->GetTxTraceId(), "Tablet.Transaction.Wait")) {} diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index 598af4e31933..06ec9afebe05 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -293,10 +293,10 @@ struct TExecutorStatsImpl : public TExecutorStats { }; struct TTransactionWaitPad : public TPrivatePageCacheWaitPad { - THolder Seat; + TSeat* Seat; NWilson::TSpan WaitingSpan; - TTransactionWaitPad(THolder seat); + TTransactionWaitPad(TSeat* seat); ~TTransactionWaitPad(); NWilson::TTraceId GetWaitingTraceId() const noexcept; @@ -331,6 +331,7 @@ class TExecutor EvActivateCompactionChanges, EvBrokenTransaction, EvLeaseExtend, + EvActivateLowExecution, EvEnd }; @@ -338,6 +339,7 @@ class TExecutor static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE), "enum range overrun"); struct TEvActivateExecution : public TEventLocal {}; + struct TEvActivateLowExecution : public TEventLocal {}; struct TEvUpdateCounters : public TEventLocal {}; struct TEvCheckYellow : public TEventLocal {}; struct TEvUpdateCompactions : public TEventLocal {}; @@ -346,6 +348,12 @@ class TExecutor struct TEvLeaseExtend : public TEventLocal {}; }; + enum class ETxMode { + Execute, + Enqueue, + LowPriority, + }; + const TIntrusivePtr Time = nullptr; NFlatExecutorSetup::ITablet * Owner; const TActorId OwnerActorId; @@ -391,13 +399,17 @@ class TExecutor TList LeaseCommits; std::multimap LeaseCommitsByEnd; - using TActivationQueue = TOneOneQueueInplace; - THolder ActivationQueue; - THolder PendingQueue; + // TSeat's UniqID to an owned pointer + absl::flat_hash_map> Transactions; + + using TSeatList = TIntrusiveList; + TSeatList ActivationQueue; + TSeatList ActivationLowQueue; + TSeatList PendingQueue; bool CompactionChangesActivating = false; - TMap> PostponedTransactions; + TSeatList PostponedTransactions; THashMap> ScanSnapshots; ui64 ScanSnapshotId = 1; @@ -407,6 +419,8 @@ class TExecutor bool BrokenTransaction = false; ui32 ActivateTransactionWaiting = 0; ui32 ActivateTransactionInFlight = 0; + ui32 ActivateLowTransactionWaiting = 0; + ui32 ActivateLowTransactionInFlight = 0; using TWaitingSnaps = THashMap>; @@ -470,6 +484,7 @@ class TExecutor ui64 StickyPagesMemory = 0; ui64 TransactionPagesMemory = 0; + TActorContext SelfCtx() const; TActorContext OwnerCtx() const; TControlWrapper LogFlushDelayOverrideUsec; @@ -502,12 +517,15 @@ class TExecutor void TranscriptBootOpResult(ui32 res, const TActorContext &ctx); void TranscriptFollowerBootOpResult(ui32 res, const TActorContext &ctx); - void ExecuteTransaction(TAutoPtr seat, const TActorContext &ctx); - void CommitTransactionLog(TAutoPtr, TPageCollectionTxEnv&, TAutoPtr, - THPTimer &bookkeepingTimer, const TActorContext &ctx); + std::unique_ptr RemoveTransaction(ui64 id); + void FinishCancellation(TSeat* seat, bool activateMore = true); + void ExecuteTransaction(TSeat* seat); + void CommitTransactionLog(std::unique_ptr, TPageCollectionTxEnv&, TAutoPtr, + THPTimer &bookkeepingTimer); void UnpinTransactionPages(TSeat &seat); - void ReleaseTxData(TSeat &seat, ui64 requested, const TActorContext &ctx); - void PostponeTransaction(TAutoPtr, TPageCollectionTxEnv&, TAutoPtr, THPTimer &bookkeepingTimer, const TActorContext &ctx); + void ReleaseTxData(TSeat &seat, ui64 requested); + void PostponeTransaction(TSeat*, TPageCollectionTxEnv&, TAutoPtr, THPTimer &bookkeepingTimer); + void EnqueueActivation(TSeat* seat, bool activate); void PlanTransactionActivation(); void MakeLogSnapshot(); void ActivateWaitingTransactions(TPrivatePageCache::TPage::TWaitQueuePtr waitPadsQueue); @@ -543,6 +561,7 @@ class TExecutor void Handle(TEvPrivate::TEvLeaseExtend::TPtr &ev, const TActorContext &ctx); void Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext &ctx); void Handle(TEvPrivate::TEvActivateExecution::TPtr &ev, const TActorContext &ctx); + void Handle(TEvPrivate::TEvActivateLowExecution::TPtr &ev, const TActorContext &ctx); void Handle(TEvPrivate::TEvBrokenTransaction::TPtr &ev, const TActorContext &ctx); void Handle(TEvents::TEvFlushLog::TPtr &ev); void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr&); @@ -615,9 +634,11 @@ class TExecutor void Boot(TEvTablet::TEvBoot::TPtr &ev, const TActorContext &ctx) override; void Restored(TEvTablet::TEvRestored::TPtr &ev, const TActorContext &ctx) override; void DetachTablet(const TActorContext &ctx) override; - void DoExecute(TAutoPtr transaction, bool allowImmediate, const TActorContext &ctx); + ui64 DoExecute(TAutoPtr transaction, ETxMode mode); void Execute(TAutoPtr transaction, const TActorContext &ctx) override; - void Enqueue(TAutoPtr transaction, const TActorContext &ctx) override; + ui64 Enqueue(TAutoPtr transaction) override; + ui64 EnqueueLowPriority(TAutoPtr transaction) override; + bool CancelTransaction(ui64 id) override; TLeaseCommit* AttachLeaseCommit(TLogCommit* commit, bool force = false); TLeaseCommit* EnsureReadOnlyLease(TMonotonic at); diff --git a/ydb/core/tablet_flat/flat_executor_txloglogic.cpp b/ydb/core/tablet_flat/flat_executor_txloglogic.cpp index d6ab2f5c44c0..5fb15b6318ad 100644 --- a/ydb/core/tablet_flat/flat_executor_txloglogic.cpp +++ b/ydb/core/tablet_flat/flat_executor_txloglogic.cpp @@ -20,10 +20,11 @@ const static ui64 MaxSizeToEmbedInLog = 2048; const static ui64 MaxBytesToBatch = 2 * 1024 * 1024; const static ui64 MaxItemsToBatch = 64; -TLogicRedo::TCompletionEntry::TCompletionEntry(TAutoPtr seat, ui32 step) +TLogicRedo::TCompletionEntry::TCompletionEntry(std::unique_ptr seat, ui32 step) : Step(step) - , InFlyRWTransaction(seat) -{} +{ + Transactions.PushBack(seat.release()); +} TLogicRedo::TLogicRedo(TAutoPtr cookies, TCommitManager *commitManager, TAutoPtr queue) : CommitManager(commitManager) @@ -56,22 +57,7 @@ TArrayRef TLogicRedo::GrabLogUsage() const noexcept return Queue->GrabUsage(); } -bool TLogicRedo::TerminateTransaction(TAutoPtr seat, const TActorContext &ctx, const TActorId &ownerID) { - if (CompletionQueue.empty()) { - const TTxType txType = seat->Self->GetTxType(); - - seat->Terminate(seat->TerminationReason, ctx.MakeFor(ownerID)); - Counters->Cumulative()[TExecutorCounters::TX_TERMINATED].Increment(1); - if (AppTxCounters && txType != UnknownTxType) - AppTxCounters->TxCumulative(txType, COUNTER_TT_TERMINATED).Increment(1); - return true; - } else { - CompletionQueue.back().WaitingTerminatedTransactions.push_back(seat); - return false; - } -} - -void CompleteRoTransaction(TAutoPtr seat, const TActorContext &ownerCtx, TExecutorCounters *counters, TTabletCountersWithTxTypes *appTxCounters ) { +void CompleteRoTransaction(std::unique_ptr seat, const TActorContext &ownerCtx, TExecutorCounters *counters, TTabletCountersWithTxTypes *appTxCounters) { const TTxType txType = seat->Self->GetTxType(); const ui64 latencyus = ui64(1000000. * seat->LatencyTimer.Passed()); @@ -84,22 +70,31 @@ void CompleteRoTransaction(TAutoPtr seat, const TActorContext &ownerCtx, const ui64 completeTimeus = ui64(1000000. * completeTimer.Passed()); - counters->Cumulative()[TExecutorCounters::TX_RO_COMPLETED].Increment(1); - if (appTxCounters && txType != UnknownTxType) - appTxCounters->TxCumulative(txType, COUNTER_TT_RO_COMPLETED).Increment(1); + if (Y_UNLIKELY(seat->IsTerminated())) { + counters->Cumulative()[TExecutorCounters::TX_TERMINATED].Increment(1); + if (appTxCounters && txType != UnknownTxType) { + appTxCounters->TxCumulative(txType, COUNTER_TT_TERMINATED).Increment(1); + } + } else { + counters->Cumulative()[TExecutorCounters::TX_RO_COMPLETED].Increment(1); + if (appTxCounters && txType != UnknownTxType) { + appTxCounters->TxCumulative(txType, COUNTER_TT_RO_COMPLETED).Increment(1); + } + } + counters->Percentile()[TExecutorCounters::TX_PERCENTILE_COMMITED_CPUTIME].IncrementFor(completeTimeus); counters->Cumulative()[TExecutorCounters::CONSUMED_CPU].Increment(completeTimeus); if (appTxCounters && txType != UnknownTxType) appTxCounters->TxCumulative(txType, COUNTER_TT_COMMITED_CPUTIME).Increment(completeTimeus); } -bool TLogicRedo::CommitROTransaction(TAutoPtr seat, const TActorContext &ownerCtx) { +bool TLogicRedo::CommitROTransaction(std::unique_ptr seat, const TActorContext &ownerCtx) { if (CompletionQueue.empty()) { - CompleteRoTransaction(seat, ownerCtx, Counters, AppTxCounters); + CompleteRoTransaction(std::move(seat), ownerCtx, Counters, AppTxCounters); return true; } else { LWTRACK(TransactionReadOnlyWait, seat->Self->Orbit, seat->UniqID, CompletionQueue.back().Step); - CompletionQueue.back().WaitingROTransactions.push_back(seat); + CompletionQueue.back().Transactions.PushBack(seat.release()); return false; } } @@ -128,7 +123,7 @@ void TLogicRedo::FlushBatchedLog() } TLogicRedo::TCommitRWTransactionResult TLogicRedo::CommitRWTransaction( - TAutoPtr seat, NTable::TChange &change, bool force) + std::unique_ptr seat, NTable::TChange &change, bool force) { seat->CommitTimer.Reset(); @@ -159,8 +154,8 @@ TLogicRedo::TCommitRWTransactionResult TLogicRedo::CommitRWTransaction( auto commit = CommitManager->Begin(true, ECommit::Redo, seat->GetTxTraceId()); - commit->PushTx(seat.Get()); - CompletionQueue.push_back({ seat, commit->Step }); + commit->PushTx(seat.get()); + CompletionQueue.push_back({ std::move(seat), commit->Step }); MakeLogEntry(*commit, std::move(change.Redo), change.Affects, !force); const auto was = commit->GcDelta.Created.size(); @@ -208,9 +203,9 @@ TLogicRedo::TCommitRWTransactionResult TLogicRedo::CommitRWTransaction( } } - Batch->Commit->PushTx(seat.Get()); + Batch->Commit->PushTx(seat.get()); - CompletionQueue.push_back({ seat, Batch->Commit->Step }); + CompletionQueue.push_back({ std::move(seat), Batch->Commit->Step }); Batch->Add(std::move(change.Redo), change.Affects); @@ -252,20 +247,24 @@ ui64 TLogicRedo::Confirm(ui32 step, const TActorContext &ctx, const TActorId &ow " non-expected confirmation %" PRIu32 ", prev %" PRIu32, Cookies->Tablet, step, PrevConfirmedStep); - Y_ABORT_UNLESS(CompletionQueue[0].Step == step, "t: %" PRIu64 + Y_ABORT_UNLESS(CompletionQueue.front().Step == step, "t: %" PRIu64 " inconsistent confirmation head: %" PRIu32 ", step: %" PRIu32 ", queue size: %" PRISZT ", prev confimed: %" PRIu32 - , Cookies->Tablet, CompletionQueue[0].Step, step, CompletionQueue.size(), PrevConfirmedStep); + , Cookies->Tablet, CompletionQueue.front().Step, step, CompletionQueue.size(), PrevConfirmedStep); PrevConfirmedStep = step; const TActorContext ownerCtx = ctx.MakeFor(ownerId); ui64 confirmedTransactions = 0; do { - TCompletionEntry &entry = CompletionQueue[0]; - auto &seat = entry.InFlyRWTransaction; + TCompletionEntry &entry = CompletionQueue.front(); + + Y_ABORT_UNLESS(entry.Transactions, + "tablet: %" PRIu64 " entry without transactions, step: %" PRIu32, Cookies->Tablet, step); + + std::unique_ptr seat{entry.Transactions.PopFront()}; const TTxType txType = seat->Self->GetTxType(); const ui64 commitLatencyus = ui64(1000000. * seat->CommitTimer.Passed()); @@ -276,7 +275,7 @@ ui64 TLogicRedo::Confirm(ui32 step, const TActorContext &ctx, const TActorId &ow ++confirmedTransactions; THPTimer completeTimer; LWTRACK(TransactionCompleteBegin, seat->Self->Orbit, seat->UniqID); - entry.InFlyRWTransaction->Complete(ownerCtx, true); + seat->Complete(ownerCtx, true); LWTRACK(TransactionCompleteEnd, seat->Self->Orbit, seat->UniqID); const ui64 completeTimeus = ui64(1000000. * completeTimer.Passed()); @@ -289,24 +288,16 @@ ui64 TLogicRedo::Confirm(ui32 step, const TActorContext &ctx, const TActorId &ow if (AppTxCounters && txType != UnknownTxType) AppTxCounters->TxCumulative(txType, COUNTER_TT_COMMITED_CPUTIME).Increment(completeTimeus); - for (auto &x : entry.WaitingROTransactions) { - ++confirmedTransactions; - CompleteRoTransaction(x, ownerCtx, Counters, AppTxCounters); - } - - for (auto &x : entry.WaitingTerminatedTransactions) { - const TTxType roTxType = x->Self->GetTxType(); - x->Terminate(x->TerminationReason, ownerCtx); - - Counters->Cumulative()[TExecutorCounters::TX_TERMINATED].Increment(1); - if (AppTxCounters && roTxType != UnknownTxType) - AppTxCounters->TxCumulative(roTxType, COUNTER_TT_TERMINATED).Increment(1); + seat.reset(); + while (entry.Transactions) { + std::unique_ptr seat{entry.Transactions.PopFront()}; ++confirmedTransactions; + CompleteRoTransaction(std::move(seat), ownerCtx, Counters, AppTxCounters); } CompletionQueue.pop_front(); - } while (!CompletionQueue.empty() && CompletionQueue[0].Step == step); + } while (!CompletionQueue.empty() && CompletionQueue.front().Step == step); return confirmedTransactions; } diff --git a/ydb/core/tablet_flat/flat_executor_txloglogic.h b/ydb/core/tablet_flat/flat_executor_txloglogic.h index 1272c4f947bc..ff8ffba5ff97 100644 --- a/ydb/core/tablet_flat/flat_executor_txloglogic.h +++ b/ydb/core/tablet_flat/flat_executor_txloglogic.h @@ -37,12 +37,10 @@ class TLogicRedo { struct TCompletionEntry { ui32 Step; - /* vvvv argh.... */ - TAutoPtr InFlyRWTransaction; - TVector> WaitingROTransactions; - TVector> WaitingTerminatedTransactions; + // The first owned tx is rw + TIntrusiveListWithAutoDelete Transactions; - TCompletionEntry(TAutoPtr seat, ui32 step); + TCompletionEntry(std::unique_ptr seat, ui32 step); }; TDeque CompletionQueue; // would be graph once data-dependencies implemented @@ -59,9 +57,8 @@ class TLogicRedo { void Describe(IOutputStream &out) const noexcept; void InstallCounters(TExecutorCounters *counters, TTabletCountersWithTxTypes* appTxCounters); - bool TerminateTransaction(TAutoPtr, const TActorContext &ctx, const TActorId &ownerId); - bool CommitROTransaction(TAutoPtr seat, const TActorContext &ownerCtx); - TCommitRWTransactionResult CommitRWTransaction(TAutoPtr seat, NTable::TChange &change, bool force); + bool CommitROTransaction(std::unique_ptr seat, const TActorContext &ownerCtx); + TCommitRWTransactionResult CommitRWTransaction(std::unique_ptr seat, NTable::TChange &change, bool force); void MakeLogEntry(TLogCommit&, TString redo, TArrayRef affects, bool embed); void FlushBatchedLog(); @@ -73,6 +70,6 @@ class TLogicRedo { TArrayRef GrabLogUsage() const noexcept; }; -void CompleteRoTransaction(TAutoPtr, const TActorContext &ownerCtx, TExecutorCounters *counters, TTabletCountersWithTxTypes *appTxCounters); +void CompleteRoTransaction(std::unique_ptr, const TActorContext &ownerCtx, TExecutorCounters *counters, TTabletCountersWithTxTypes *appTxCounters); }} diff --git a/ydb/core/tablet_flat/flat_executor_ut.cpp b/ydb/core/tablet_flat/flat_executor_ut.cpp index 5b728620c4da..9cd0e6a135d9 100644 --- a/ydb/core/tablet_flat/flat_executor_ut.cpp +++ b/ydb/core/tablet_flat/flat_executor_ut.cpp @@ -497,11 +497,14 @@ class TTestFlatTablet : public TActor, public TTabletExecutedFl } void Handle(NFake::TEvExecute::TPtr &ev, const TActorContext &ctx) { - for (auto& f : ev->Get()->Funcs) { - if (auto* tx = dynamic_cast(f.Get())) { - tx->Executor = Executor(); + for (auto& tx : ev->Get()->Txs) { + if (auto* e = dynamic_cast(tx.Get())) { + e->Executor = Executor(); } - Execute(f.Release(), ctx); + Execute(tx.Release(), ctx); + } + for (auto& lambda : ev->Get()->Lambdas) { + std::move(lambda)(Executor(), ctx); } } @@ -7132,5 +7135,215 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_Gc) { } } +Y_UNIT_TEST_SUITE(TFlatTableExecutor_LowPriorityTxs) { + struct TTxState { + int Executed = 0; + int Completed = 0; + int Destroyed = 0; + }; + + struct TSimpleTx : public ITransaction { + TTxState& State; + + TSimpleTx(TTxState& state) + : State(state) + {} + + bool Execute(TTransactionContext&, const TActorContext&) override { + ++State.Executed; + return true; + } + + void Complete(const TActorContext&) override { + ++State.Completed; + } + + ~TSimpleTx() { + ++State.Destroyed; + } + }; + + struct TAllocatingTx : public ITransaction { + TTxState& State; + + TAllocatingTx(TTxState& state) + : State(state) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + ++State.Executed; + // Keep requesting 2GB more... + txc.RequestMemory(2ULL * 1024 * 1024 * 1024); + return false; + } + + void Complete(const TActorContext&) override { + ++State.Completed; + } + + ~TAllocatingTx() { + ++State.Destroyed; + } + }; + + Y_UNIT_TEST(TestEnqueueCancel) { + TMyEnvBase env; + + env.FireDummyTablet(); + + TTxState tx1, tx2, tx3; + ui64 id1 = 0; + ui64 id2 = 0; + ui64 id3 = 0; + env.SendSync(new NFake::TEvExecute{[&](auto*, const auto& ctx) { + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto* x, const auto& ctx) { + id1 = x->EnqueueLowPriority(new TSimpleTx(tx1)); + Y_ABORT_UNLESS(tx1.Executed == 0); + id2 = x->Enqueue(new TSimpleTx(tx2)); + Y_ABORT_UNLESS(tx2.Executed == 0); + id3 = x->Enqueue(new TSimpleTx(tx3)); + Y_ABORT_UNLESS(tx3.Executed == 0); + // Validate this new mailbox event is handled after tx3 is executed and destroyed, i.e. it's high priority + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto*, const auto&) { + Y_ABORT_UNLESS(tx3.Executed == 1); + Y_ABORT_UNLESS(tx3.Destroyed == 1); + }}); + }}); + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto* x, const auto&) { + Y_ABORT_UNLESS(id1 != 0); + Y_ABORT_UNLESS(tx1.Executed == 0); + Y_ABORT_UNLESS(tx1.Destroyed == 0); + bool ok = x->CancelTransaction(id1); + Y_ABORT_UNLESS(ok); + Y_ABORT_UNLESS(tx1.Destroyed == 1); + }}); + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto* x, const auto&) { + Y_ABORT_UNLESS(id2 != 0); + Y_ABORT_UNLESS(tx2.Executed == 0); + Y_ABORT_UNLESS(tx2.Destroyed == 0); + bool ok = x->CancelTransaction(id2); + Y_ABORT_UNLESS(ok); + Y_ABORT_UNLESS(tx2.Destroyed == 1); + }}); + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto*, const auto& ctx) { + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto*, const auto& ctx) { + ctx.Send(ctx.SelfID, new NFake::TEvReturn); + }}); + }}); + }}); + } + + Y_UNIT_TEST(TestLowPriority) { + TMyEnvBase env; + + env.FireDummyTablet(); + + TTxState tx1, tx2; + ui64 id1 = 0; + ui64 id2 = 0; + env.SendSync(new NFake::TEvExecute{[&](auto*, const auto& ctx) { + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto* x, const auto& ctx) { + id1 = x->EnqueueLowPriority(new TSimpleTx(tx1)); + Y_ABORT_UNLESS(tx1.Executed == 0); + id2 = x->EnqueueLowPriority(new TSimpleTx(tx2)); + Y_ABORT_UNLESS(tx2.Executed == 0); + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto*, const auto& ctx) { + Y_ABORT_UNLESS(tx1.Executed == 1); + Y_ABORT_UNLESS(tx1.Completed == 1); + Y_ABORT_UNLESS(tx1.Destroyed == 1); + Y_ABORT_UNLESS(tx2.Executed == 0); + Y_ABORT_UNLESS(tx2.Destroyed == 0); + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto*, const auto& ctx) { + Y_ABORT_UNLESS(tx2.Executed == 1); + Y_ABORT_UNLESS(tx2.Completed == 1); + Y_ABORT_UNLESS(tx2.Destroyed == 1); + ctx.Send(ctx.SelfID, new NFake::TEvReturn); + }}); + }}); + }}); + }}); + } + + Y_UNIT_TEST(TestLowPriorityCancel) { + TMyEnvBase env; + + env.FireDummyTablet(); + + TTxState tx1, tx2; + ui64 id1 = 0; + ui64 id2 = 0; + env.SendSync(new NFake::TEvExecute{[&](auto*, const auto& ctx) { + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto* x, const auto& ctx) { + id1 = x->EnqueueLowPriority(new TSimpleTx(tx1)); + Y_ABORT_UNLESS(tx1.Executed == 0); + id2 = x->EnqueueLowPriority(new TSimpleTx(tx2)); + Y_ABORT_UNLESS(tx2.Executed == 0); + // The next new mailbox event is supposed to execute after tx1 is executed, but before tx2 + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto* x, const auto& ctx) { + Y_ABORT_UNLESS(tx1.Executed == 1); + Y_ABORT_UNLESS(tx1.Destroyed == 1); + bool ok1 = x->CancelTransaction(id1); + Y_ABORT_UNLESS(!ok1); // cannot cancel executed transaction + Y_ABORT_UNLESS(tx2.Executed == 0); + Y_ABORT_UNLESS(tx2.Destroyed == 0); + bool ok2 = x->CancelTransaction(id2); + Y_ABORT_UNLESS(ok2); + Y_ABORT_UNLESS(tx2.Executed == 0); + Y_ABORT_UNLESS(tx2.Destroyed == 1); + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto*, const auto& ctx) { + ctx.Send(ctx.SelfID, new NFake::TEvReturn); + }}); + }}); + }}); + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto*, const auto&) { + Y_ABORT_UNLESS(id1 != 0); + Y_ABORT_UNLESS(tx1.Executed == 0); + Y_ABORT_UNLESS(tx1.Destroyed == 0); + }}); + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto*, const auto&) { + Y_ABORT_UNLESS(id2 != 0); + Y_ABORT_UNLESS(tx2.Executed == 0); + Y_ABORT_UNLESS(tx2.Destroyed == 0); + }}); + }}); + } + + Y_UNIT_TEST(TestLowPriorityAllocatingCancel) { + TMyEnvBase env; + + env.FireDummyTablet(); + + TTxState tx1, tx2; + ui64 id1 = 0; + ui64 id2 = 0; + env.SendSync(new NFake::TEvExecute{[&](auto*, const auto& ctx) { + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto* x, const auto& ctx) { + id1 = x->EnqueueLowPriority(new TAllocatingTx(tx1)); + id2 = x->EnqueueLowPriority(new TSimpleTx(tx2)); + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto* x, const auto& ctx) { + Y_ABORT_UNLESS(tx1.Executed == 1); + Y_ABORT_UNLESS(tx1.Destroyed == 0); + bool ok = x->CancelTransaction(id1); + Y_ABORT_UNLESS(ok); + // Transaction is requesting more memory and cannot be destroyed + Y_ABORT_UNLESS(tx1.Destroyed == 0); + ctx.Send(ctx.SelfID, new NFake::TEvExecute{[&](auto*, const auto&) { + Y_ABORT_UNLESS(tx2.Executed == 1); + Y_ABORT_UNLESS(tx2.Completed == 1); + Y_ABORT_UNLESS(tx2.Destroyed == 1); + }}); + ctx.Schedule(TDuration::MilliSeconds(1), new NFake::TEvExecute{[&](auto*, const auto& ctx) { + // We should observe tx1 destroyed eventually, not executing again + Y_ABORT_UNLESS(tx1.Executed == 1); + Y_ABORT_UNLESS(tx1.Completed == 0); + Y_ABORT_UNLESS(tx1.Destroyed == 1); + ctx.Send(ctx.SelfID, new NFake::TEvReturn); + }}); + }}); + }}); + }}); + } +} + } } diff --git a/ydb/core/tablet_flat/tablet_flat_executed.cpp b/ydb/core/tablet_flat/tablet_flat_executed.cpp index e2ca8c1a0fb8..53783ebba71c 100644 --- a/ydb/core/tablet_flat/tablet_flat_executed.cpp +++ b/ydb/core/tablet_flat/tablet_flat_executed.cpp @@ -39,9 +39,24 @@ void TTabletExecutedFlat::Execute(TAutoPtr transaction) { static_cast(Executor())->Execute(transaction, ExecutorCtx(*TlsActivationContext)); } -void TTabletExecutedFlat::EnqueueExecute(TAutoPtr transaction) { - if (transaction) - static_cast(Executor())->Enqueue(transaction, ExecutorCtx(*TlsActivationContext)); +ui64 TTabletExecutedFlat::Enqueue(TAutoPtr transaction) { + if (transaction) { + return static_cast(Executor())->Enqueue(transaction); + } else { + return 0; + } +} + +ui64 TTabletExecutedFlat::EnqueueExecute(TAutoPtr transaction) { + return Enqueue(transaction); +} + +ui64 TTabletExecutedFlat::EnqueueLowPriority(TAutoPtr transaction) { + if (transaction) { + return static_cast(Executor())->EnqueueLowPriority(transaction); + } else { + return 0; + } } const NTable::TScheme& TTabletExecutedFlat::Scheme() const noexcept { diff --git a/ydb/core/tablet_flat/tablet_flat_executed.h b/ydb/core/tablet_flat/tablet_flat_executed.h index 48386e26efe0..b490055dc539 100644 --- a/ydb/core/tablet_flat/tablet_flat_executed.h +++ b/ydb/core/tablet_flat/tablet_flat_executed.h @@ -26,7 +26,9 @@ class TTabletExecutedFlat : public NFlatExecutorSetup::ITablet { void Execute(TAutoPtr transaction, const TActorContext &ctx); void Execute(TAutoPtr transaction); - void EnqueueExecute(TAutoPtr transaction); + ui64 Enqueue(TAutoPtr transaction); + ui64 EnqueueExecute(TAutoPtr transaction); + ui64 EnqueueLowPriority(TAutoPtr transaction); const NTable::TScheme& Scheme() const noexcept; diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h index c5dbbf81216b..53e719cdf5e2 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.h +++ b/ydb/core/tablet_flat/tablet_flat_executor.h @@ -565,7 +565,26 @@ namespace NFlatExecutorSetup { virtual void FollowerGcApplied(ui32 step, TDuration followerSyncDelay) = 0; virtual void Execute(TAutoPtr transaction, const TActorContext &ctx) = 0; - virtual void Enqueue(TAutoPtr transaction, const TActorContext &ctx) = 0; + + /** + * Enqueue a transaction for execution + * Returns the unique id that may be used for cancellation. + */ + virtual ui64 Enqueue(TAutoPtr transaction) = 0; + + /** + * Enqueue a transaction that is low priority with respect to other + * mailbox events. Every transaction in turn is scheduled at the end of + * the current mailbox, which allows actors to handle possible cancellation + * before each transaction's execution. + * Returns the unique id that may be used for cancellation. + */ + virtual ui64 EnqueueLowPriority(TAutoPtr transaction) = 0; + + /** + * Cancel a previously enqueued transaction with the specified id. + */ + virtual bool CancelTransaction(ui64 id) = 0; virtual void ConfirmReadOnlyLease(TMonotonic at) = 0; virtual void ConfirmReadOnlyLease(TMonotonic at, std::function callback) = 0; diff --git a/ydb/core/tablet_flat/test/libs/exec/dummy.h b/ydb/core/tablet_flat/test/libs/exec/dummy.h index 0798fe045c30..8df6036401de 100644 --- a/ydb/core/tablet_flat/test/libs/exec/dummy.h +++ b/ydb/core/tablet_flat/test/libs/exec/dummy.h @@ -48,8 +48,11 @@ namespace NFake { if (auto *ev = eh->CastAsLocal()) { Y_ABORT_UNLESS(State == EState::Work, "Cannot handle TX now"); - for (auto& f : ev->Funcs) { - Execute(f.Release(), this->ActorContext()); + for (auto& tx : ev->Txs) { + Execute(tx.Release(), this->ActorContext()); + } + for (auto& lambda : ev->Lambdas) { + std::move(lambda)(Executor(), this->ActorContext()); } } else if (auto *ev = eh->CastAsLocal()) { Y_ABORT_UNLESS(State == EState::Work, "Cannot handle compaction now"); diff --git a/ydb/core/tablet_flat/test/libs/exec/events.h b/ydb/core/tablet_flat/test/libs/exec/events.h index 44a33e89a66e..fe6d6d3c2368 100644 --- a/ydb/core/tablet_flat/test/libs/exec/events.h +++ b/ydb/core/tablet_flat/test/libs/exec/events.h @@ -54,17 +54,27 @@ namespace NFake { struct TEvExecute : public TEventLocal { using ITransaction = NTabletFlatExecutor::ITransaction; + using IExecutor = NTabletFlatExecutor::NFlatExecutorSetup::IExecutor; + using TLambda = std::function; + + TEvExecute(ITransaction* tx) { + Txs.emplace_back(tx); + } - TEvExecute(TAutoPtr func) { - THolder h(func.Release()); - Funcs.push_back(std::move(h)); + TEvExecute(THolder tx) { + Txs.push_back(std::move(tx)); } - TEvExecute(TVector> funcs) - : Funcs(std::move(funcs)) + TEvExecute(TVector> txs) + : Txs(std::move(txs)) { } - TVector> Funcs; + TEvExecute(TLambda&& lambda) { + Lambdas.push_back(std::move(lambda)); + } + + TVector> Txs; + TVector Lambdas; }; struct TEvResult : public TEventLocal { diff --git a/ydb/core/tx/datashard/conflicts_cache.cpp b/ydb/core/tx/datashard/conflicts_cache.cpp index 2a97923c15be..507c95f248e5 100644 --- a/ydb/core/tx/datashard/conflicts_cache.cpp +++ b/ydb/core/tx/datashard/conflicts_cache.cpp @@ -352,7 +352,7 @@ void TConflictsCache::RegisterDistributedWrites(ui64 txId, TPendingWrites&& writ if (!writes.empty()) { PendingWrites[txId] = std::move(writes); - Self->EnqueueExecute(new TTxFindWriteConflicts(Self, txId)); + Self->Enqueue(new TTxFindWriteConflicts(Self, txId)); } } diff --git a/ydb/core/tx/datashard/datashard_change_receiving.cpp b/ydb/core/tx/datashard/datashard_change_receiving.cpp index e1035a3e70ed..a1937880cf21 100644 --- a/ydb/core/tx/datashard/datashard_change_receiving.cpp +++ b/ydb/core/tx/datashard/datashard_change_receiving.cpp @@ -450,7 +450,7 @@ void TDataShard::Handle(TEvPrivate::TEvChangeExchangeExecuteHandshakes::TPtr&, c void TDataShard::RunChangeExchangeHandshakeTx() { if (!ChangeExchangeHandshakeTxScheduled && !PendingChangeExchangeHandshakes.empty()) { ChangeExchangeHandshakeTxScheduled = true; - EnqueueExecute(new TTxChangeExchangeHandshake(this)); + Enqueue(new TTxChangeExchangeHandshake(this)); } } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index c36c6331bb1f..d64e14291c8f 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -3087,6 +3087,8 @@ class TDataShard StateInitImpl(ev, SelfId()); } + using TTabletExecutedFlat::Enqueue; + void Enqueue(STFUNC_SIG) override { ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateInit unhandled event type: " << ev->GetTypeRewrite() << " event: " << ev->ToString()); diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index 12a2cf887ca7..d3b75840fa7c 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -1059,12 +1059,12 @@ namespace NKikimr::NDataShard { void TVolatileTxManager::ScheduleCommitTx(TVolatileTxInfo* info) { Y_DEBUG_ABORT_UNLESS(info && info->State == EVolatileTxState::Committed); - Self->EnqueueExecute(new TDataShard::TTxVolatileTxCommit(Self, info)); + Self->Enqueue(new TDataShard::TTxVolatileTxCommit(Self, info)); } void TVolatileTxManager::ScheduleAbortTx(TVolatileTxInfo* info) { Y_DEBUG_ABORT_UNLESS(info && info->State == EVolatileTxState::Aborting); - Self->EnqueueExecute(new TDataShard::TTxVolatileTxAbort(Self, info)); + Self->Enqueue(new TDataShard::TTxVolatileTxAbort(Self, info)); } bool TVolatileTxManager::RemoveFromCommitOrder(TVolatileTxInfo* info) {