From 3147be2c853e9a6373936d01d9f4eb47d6ccc03c Mon Sep 17 00:00:00 2001 From: Aleksandr Kriukov Date: Tue, 9 Jan 2024 17:06:19 +0000 Subject: [PATCH] refactor thread ctx --- .../actors/core/executor_pool_basic.cpp | 52 ++---- ydb/library/actors/core/executor_thread.cpp | 2 +- ydb/library/actors/core/executor_thread_ctx.h | 152 +++++++++++------- 3 files changed, 112 insertions(+), 94 deletions(-) diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index f316201ef769..8f9892d96c6f 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -1,6 +1,7 @@ #include "executor_pool_basic.h" #include "executor_pool_basic_feature_flags.h" #include "actor.h" +#include "executor_thread_ctx.h" #include "probes.h" #include "mailbox.h" #include @@ -167,7 +168,7 @@ namespace NActors { } if (workerId >= 0) { - Threads[workerId].ExchangeState(TExecutorThreadCtx::WS_NONE); + Threads[workerId].ExchangeState(EThreadState::None); } TAtomic x = AtomicGet(Semaphore); @@ -191,7 +192,7 @@ namespace NActors { } else { if (const ui32 activation = Activations.Pop(++revolvingCounter)) { if (workerId >= 0) { - Threads[workerId].ExchangeState(TExecutorThreadCtx::WS_RUNNING); + Threads[workerId].ExchangeState(EThreadState::Work); } AtomicDecrement(Semaphore); TlsThreadContext->Timers.HPNow = GetCycleCountFast(); @@ -244,18 +245,18 @@ namespace NActors { inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) { for (i16 i = 0;;) { TExecutorThreadCtx& threadCtx = Threads[i]; - TExecutorThreadCtx::TWaitState state = threadCtx.GetState(); - switch (state.Flag) { - case TExecutorThreadCtx::WS_NONE: - case TExecutorThreadCtx::WS_RUNNING: + EThreadState state = threadCtx.GetState(); + switch (state) { + case EThreadState::None: + case EThreadState::Work: if (++i >= MaxThreadCount - SharedExecutorsCount) { i = 0; } break; - case TExecutorThreadCtx::WS_ACTIVE: - case TExecutorThreadCtx::WS_BLOCKED: - if (threadCtx.ReplaceState(state, TExecutorThreadCtx::WS_NONE)) { - if (state.Flag == TExecutorThreadCtx::WS_BLOCKED) { + case EThreadState::Spin: + case EThreadState::Sleep: + if (threadCtx.ReplaceState(state, EThreadState::None)) { + if (state == EThreadState::Sleep) { ui64 beforeUnpark = GetCycleCountFast(); threadCtx.StartWakingTs = beforeUnpark; if (TlsThreadContext && TlsThreadContext->WaitingStats) { @@ -601,38 +602,13 @@ namespace NActors { } bool TExecutorThreadCtx::Wait(ui64 spinThresholdCycles, std::atomic *stopFlag) { - TWaitState state = ExchangeState(WS_ACTIVE); - Y_ABORT_UNLESS(state.Flag == WS_NONE, "WaitingFlag# %d", int(state.Flag)); - if (OwnerExecutorPool) { - // if (!OwnerExecutorPool->SetSleepOwnSharedThread()) { - // return false; - // } - // if (TBasicExecutorPool *pool = OtherExecutorPool; pool) { - // if (!pool->SetSleepBorrowedSharedThread()) { - // return false; - // } - //} - } + EThreadState state = ExchangeState(EThreadState::Spin); + Y_ABORT_UNLESS(state == EThreadState::None, "WaitingFlag# %d", int(state)); if (spinThresholdCycles > 0) { // spin configured period Spin(spinThresholdCycles, stopFlag); - // then - sleep - state = GetState(); - if (state.Flag == WS_ACTIVE) { - if (ReplaceState(state, WS_BLOCKED)) { - if (Sleep(stopFlag)) { // interrupted - return true; - } - } else { - NextPool = state.NextPool; - } - } - } else { - Block(stopFlag); } - - Y_DEBUG_ABORT_UNLESS(stopFlag->load() || GetState().Flag == WS_NONE); - return false; + return Sleep(stopFlag); } } diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp index aa4441590d95..477a98ba8887 100644 --- a/ydb/library/actors/core/executor_thread.cpp +++ b/ydb/library/actors/core/executor_thread.cpp @@ -523,7 +523,7 @@ namespace NActors { std::vector pools; do { if (NeedToReloadPools.load() == EState::NeedToReloadPools) { - otherPool = dynamic_cast(ThreadCtx->OtherExecutorPool.load()); + // otherPool = dynamic_cast(ThreadCtx->OtherExecutorPool.load()); NeedToReloadPools = EState::Running; } bool wasWorking = true; diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h index 2674fd61ca07..67554885b273 100644 --- a/ydb/library/actors/core/executor_thread_ctx.h +++ b/ydb/library/actors/core/executor_thread_ctx.h @@ -11,66 +11,43 @@ namespace NActors { class TExecutorThread; class TBasicExecutorPool; - struct TExecutorThreadCtx { - enum EWaitState : ui64 { - WS_NONE, - WS_ACTIVE, - WS_BLOCKED, - WS_RUNNING - }; - - struct TWaitState { - EWaitState Flag = WS_NONE; - ui32 NextPool = Max(); - - TWaitState() = default; - - explicit TWaitState(ui64 state) - : Flag(static_cast(state & 0x7)) - , NextPool(state >> 3) - {} - - explicit TWaitState(EWaitState flag, ui32 nextPool = Max()) - : Flag(flag) - , NextPool(nextPool) - {} - - explicit operator ui64() { - return Flag | ui64(NextPool << 3); - } - }; + enum class EThreadState : ui64 { + None, + Spin, + Sleep, + Work + }; + struct TGenericExecutorThreadCtx { TAutoPtr Thread; TThreadParkPad WaitingPad; private: - std::atomic WaitingFlag = WS_NONE; + std::atomic WaitingFlag = static_cast(EThreadState::None); public: - TBasicExecutorPool *OwnerExecutorPool = nullptr; - std::atomic OtherExecutorPool = nullptr; ui64 StartWakingTs = 0; - ui32 NextPool = 0; - bool IsShared; - - // different threads must spin/block on different cache-lines. - // we add some padding bytes to enforce this rule; + template TWaitState GetState() { return TWaitState(WaitingFlag.load()); } - TWaitState ExchangeState(EWaitState flag, ui32 nextPool = Max()) { - return TWaitState(WaitingFlag.exchange(static_cast(TWaitState(flag, nextPool)))); + template + TWaitState ExchangeState(TWaitState state) { + return TWaitState(WaitingFlag.exchange(static_cast(state))); } - bool ReplaceState(TWaitState &expected, EWaitState flag, ui32 nextPool = Max()) { + template + bool ReplaceState(TWaitState &expected, TWaitState state) { ui64 expectedInt = static_cast(expected); - bool result = WaitingFlag.compare_exchange_strong(expectedInt, static_cast(TWaitState(flag, nextPool))); + bool result = WaitingFlag.compare_exchange_strong(expectedInt, static_cast(state)); expected = TWaitState(expectedInt); return result; } + protected: + template void Spin(ui64 spinThresholdCycles, std::atomic *stopFlag) { ui64 start = GetCycleCountFast(); bool doSpin = true; @@ -81,11 +58,11 @@ namespace NActors { break; } for (ui32 i = 0; i < 12; ++i) { - TWaitState state = GetState(); - if (state.Flag == WS_ACTIVE) { + TWaitState state = GetState(); + if (static_cast(state) == EThreadState::Spin) { SpinLockPause(); } else { - NextPool = state.NextPool; + static_cast(this)->AfterWakeUp(state); doSpin = false; break; } @@ -100,10 +77,16 @@ namespace NActors { } } + template bool Sleep(std::atomic *stopFlag) { Y_DEBUG_ABORT_UNLESS(TlsThreadContext); - TWaitState state; + TWaitState state = TWaitState{EThreadState::Spin}; + if (!ReplaceState(state, TWaitState{EThreadState::Sleep})) { + static_cast(this)->AfterWakeUp(state); + return false; + } + do { TlsThreadContext->Timers.HPNow = GetCycleCountFast(); TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart; @@ -111,25 +94,84 @@ namespace NActors { return true; TlsThreadContext->Timers.HPStart = GetCycleCountFast(); TlsThreadContext->Timers.Parked += TlsThreadContext->Timers.HPStart - TlsThreadContext->Timers.HPNow; - state = GetState(); - } while (state.Flag == WS_BLOCKED && !stopFlag->load(std::memory_order_relaxed)); - NextPool = state.NextPool; + state = GetState(); + } while (static_cast(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed)); + + static_cast(this)->AfterWakeUp(state); return false; } + }; + + struct TExecutorThreadCtx : public TGenericExecutorThreadCtx { + using TBase = TGenericExecutorThreadCtx; + + TBasicExecutorPool *OwnerExecutorPool = nullptr; + + void Spin(ui64 spinThresholdCycles, std::atomic *stopFlag) { + this->TBase::Spin(spinThresholdCycles, stopFlag); + } + + bool Sleep(std::atomic *stopFlag) { + return this->TBase::Sleep(stopFlag); + } bool Wait(ui64 spinThresholdCycles, std::atomic *stopFlag); // in executor_pool_basic.cpp - bool Block(std::atomic *stopFlag) { - TWaitState state{WS_ACTIVE}; - if (ReplaceState(state, WS_BLOCKED)) { - Y_ABORT_UNLESS(state.Flag == WS_ACTIVE, "WaitingFlag# %d", int(state.Flag)); - return Sleep(stopFlag); - } else { - return false; - } + void AfterWakeUp(EThreadState /*state*/) { } TExecutorThreadCtx() = default; }; + + constexpr ui32 MaxPoolsForSharedThreads = 4; + + struct TSharedExecutorThreadCtx : public TGenericExecutorThreadCtx { + using TBase = TGenericExecutorThreadCtx; + + struct TWaitState { + EThreadState Flag = EThreadState::None; + ui32 NextPool = Max(); + + TWaitState() = default; + + TWaitState(ui64 state) + : Flag(static_cast(state & 0x7)) + , NextPool(state >> 3) + {} + + TWaitState(EThreadState flag, ui32 nextPool = Max()) + : Flag(flag) + , NextPool(nextPool) + {} + + explicit operator ui64() { + return static_cast(Flag) | ui64(NextPool << 3); + } + + explicit operator EThreadState() { + return Flag; + } + }; + + std::atomic ExecutorPools[MaxPoolsForSharedThreads]; + ui32 NextPool = 0; + + void AfterWakeUp(TWaitState state) { + NextPool = state.NextPool; + } + + void Spin(ui64 spinThresholdCycles, std::atomic *stopFlag) { + this->TBase::Spin(spinThresholdCycles, stopFlag); + } + + bool Sleep(std::atomic *stopFlag) { + return this->TBase::Sleep(stopFlag); + } + + bool Wait(ui64 spinThresholdCycles, std::atomic *stopFlag); // in executor_pool_basic.cpp + + TSharedExecutorThreadCtx() = default; + }; + }