Skip to content

Refactor thread ctx, KIKIMR-18440 #897

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 14 additions & 38 deletions ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "executor_pool_basic.h"
#include "executor_pool_basic_feature_flags.h"
#include "actor.h"
#include "executor_thread_ctx.h"
#include "probes.h"
#include "mailbox.h"
#include <ydb/library/actors/util/affinity.h>
Expand Down Expand Up @@ -167,7 +168,7 @@ namespace NActors {
}

if (workerId >= 0) {
Threads[workerId].ExchangeState(TExecutorThreadCtx::WS_NONE);
Threads[workerId].ExchangeState(EThreadState::None);
}

TAtomic x = AtomicGet(Semaphore);
Expand All @@ -191,7 +192,7 @@ namespace NActors {
} else {
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
if (workerId >= 0) {
Threads[workerId].ExchangeState(TExecutorThreadCtx::WS_RUNNING);
Threads[workerId].ExchangeState(EThreadState::Work);
}
AtomicDecrement(Semaphore);
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
Expand Down Expand Up @@ -244,18 +245,18 @@ namespace NActors {
inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) {
for (i16 i = 0;;) {
TExecutorThreadCtx& threadCtx = Threads[i];
TExecutorThreadCtx::TWaitState state = threadCtx.GetState();
switch (state.Flag) {
case TExecutorThreadCtx::WS_NONE:
case TExecutorThreadCtx::WS_RUNNING:
EThreadState state = threadCtx.GetState<EThreadState>();
switch (state) {
case EThreadState::None:
case EThreadState::Work:
if (++i >= MaxThreadCount - SharedExecutorsCount) {
i = 0;
}
break;
case TExecutorThreadCtx::WS_ACTIVE:
case TExecutorThreadCtx::WS_BLOCKED:
if (threadCtx.ReplaceState(state, TExecutorThreadCtx::WS_NONE)) {
if (state.Flag == TExecutorThreadCtx::WS_BLOCKED) {
case EThreadState::Spin:
case EThreadState::Sleep:
if (threadCtx.ReplaceState<EThreadState>(state, EThreadState::None)) {
if (state == EThreadState::Sleep) {
ui64 beforeUnpark = GetCycleCountFast();
threadCtx.StartWakingTs = beforeUnpark;
if (TlsThreadContext && TlsThreadContext->WaitingStats) {
Expand Down Expand Up @@ -601,38 +602,13 @@ namespace NActors {
}

bool TExecutorThreadCtx::Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
TWaitState state = ExchangeState(WS_ACTIVE);
Y_ABORT_UNLESS(state.Flag == WS_NONE, "WaitingFlag# %d", int(state.Flag));
if (OwnerExecutorPool) {
// if (!OwnerExecutorPool->SetSleepOwnSharedThread()) {
// return false;
// }
// if (TBasicExecutorPool *pool = OtherExecutorPool; pool) {
// if (!pool->SetSleepBorrowedSharedThread()) {
// return false;
// }
//}
}
EThreadState state = ExchangeState<EThreadState>(EThreadState::Spin);
Y_ABORT_UNLESS(state == EThreadState::None, "WaitingFlag# %d", int(state));
if (spinThresholdCycles > 0) {
// spin configured period
Spin(spinThresholdCycles, stopFlag);
// then - sleep
state = GetState();
if (state.Flag == WS_ACTIVE) {
if (ReplaceState(state, WS_BLOCKED)) {
if (Sleep(stopFlag)) { // interrupted
return true;
}
} else {
NextPool = state.NextPool;
}
}
} else {
Block(stopFlag);
}

Y_DEBUG_ABORT_UNLESS(stopFlag->load() || GetState().Flag == WS_NONE);
return false;
return Sleep(stopFlag);
}

}
2 changes: 1 addition & 1 deletion ydb/library/actors/core/executor_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ namespace NActors {
std::vector<TExecutorPoolBaseMailboxed*> pools;
do {
if (NeedToReloadPools.load() == EState::NeedToReloadPools) {
otherPool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ThreadCtx->OtherExecutorPool.load());
// otherPool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ThreadCtx->OtherExecutorPool.load());
NeedToReloadPools = EState::Running;
}
bool wasWorking = true;
Expand Down
152 changes: 97 additions & 55 deletions ydb/library/actors/core/executor_thread_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,66 +11,43 @@ namespace NActors {
class TExecutorThread;
class TBasicExecutorPool;

struct TExecutorThreadCtx {
enum EWaitState : ui64 {
WS_NONE,
WS_ACTIVE,
WS_BLOCKED,
WS_RUNNING
};

struct TWaitState {
EWaitState Flag = WS_NONE;
ui32 NextPool = Max<ui32>();

TWaitState() = default;

explicit TWaitState(ui64 state)
: Flag(static_cast<EWaitState>(state & 0x7))
, NextPool(state >> 3)
{}

explicit TWaitState(EWaitState flag, ui32 nextPool = Max<ui32>())
: Flag(flag)
, NextPool(nextPool)
{}

explicit operator ui64() {
return Flag | ui64(NextPool << 3);
}
};
enum class EThreadState : ui64 {
None,
Spin,
Sleep,
Work
};

struct TGenericExecutorThreadCtx {
TAutoPtr<TExecutorThread> Thread;
TThreadParkPad WaitingPad;

private:
std::atomic<ui64> WaitingFlag = WS_NONE;
std::atomic<ui64> WaitingFlag = static_cast<ui64>(EThreadState::None);

public:
TBasicExecutorPool *OwnerExecutorPool = nullptr;
std::atomic<TBasicExecutorPool*> OtherExecutorPool = nullptr;
ui64 StartWakingTs = 0;
ui32 NextPool = 0;
bool IsShared;

// different threads must spin/block on different cache-lines.
// we add some padding bytes to enforce this rule;

template <typename TWaitState>
TWaitState GetState() {
return TWaitState(WaitingFlag.load());
}

TWaitState ExchangeState(EWaitState flag, ui32 nextPool = Max<ui32>()) {
return TWaitState(WaitingFlag.exchange(static_cast<ui64>(TWaitState(flag, nextPool))));
template <typename TWaitState>
TWaitState ExchangeState(TWaitState state) {
return TWaitState(WaitingFlag.exchange(static_cast<ui64>(state)));
}

bool ReplaceState(TWaitState &expected, EWaitState flag, ui32 nextPool = Max<ui32>()) {
template <typename TWaitState>
bool ReplaceState(TWaitState &expected, TWaitState state) {
ui64 expectedInt = static_cast<ui64>(expected);
bool result = WaitingFlag.compare_exchange_strong(expectedInt, static_cast<ui64>(TWaitState(flag, nextPool)));
bool result = WaitingFlag.compare_exchange_strong(expectedInt, static_cast<ui64>(state));
expected = TWaitState(expectedInt);
return result;
}

protected:
template <typename TDerived, typename TWaitState>
void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
ui64 start = GetCycleCountFast();
bool doSpin = true;
Expand All @@ -81,11 +58,11 @@ namespace NActors {
break;
}
for (ui32 i = 0; i < 12; ++i) {
TWaitState state = GetState();
if (state.Flag == WS_ACTIVE) {
TWaitState state = GetState<TWaitState>();
if (static_cast<EThreadState>(state) == EThreadState::Spin) {
SpinLockPause();
} else {
NextPool = state.NextPool;
static_cast<TDerived*>(this)->AfterWakeUp(state);
doSpin = false;
break;
}
Expand All @@ -100,36 +77,101 @@ namespace NActors {
}
}

template <typename TDerived, typename TWaitState>
bool Sleep(std::atomic<bool> *stopFlag) {
Y_DEBUG_ABORT_UNLESS(TlsThreadContext);

TWaitState state;
TWaitState state = TWaitState{EThreadState::Spin};
if (!ReplaceState<TWaitState>(state, TWaitState{EThreadState::Sleep})) {
static_cast<TDerived*>(this)->AfterWakeUp(state);
return false;
}

do {
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart;
if (WaitingPad.Park()) // interrupted
return true;
TlsThreadContext->Timers.HPStart = GetCycleCountFast();
TlsThreadContext->Timers.Parked += TlsThreadContext->Timers.HPStart - TlsThreadContext->Timers.HPNow;
state = GetState();
} while (state.Flag == WS_BLOCKED && !stopFlag->load(std::memory_order_relaxed));
NextPool = state.NextPool;
state = GetState<TWaitState>();
} while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));

static_cast<TDerived*>(this)->AfterWakeUp(state);
return false;
}
};

struct TExecutorThreadCtx : public TGenericExecutorThreadCtx {
using TBase = TGenericExecutorThreadCtx;

TBasicExecutorPool *OwnerExecutorPool = nullptr;

void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
this->TBase::Spin<TExecutorThreadCtx, EThreadState>(spinThresholdCycles, stopFlag);
}

bool Sleep(std::atomic<bool> *stopFlag) {
return this->TBase::Sleep<TExecutorThreadCtx, EThreadState>(stopFlag);
}

bool Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag); // in executor_pool_basic.cpp

bool Block(std::atomic<bool> *stopFlag) {
TWaitState state{WS_ACTIVE};
if (ReplaceState(state, WS_BLOCKED)) {
Y_ABORT_UNLESS(state.Flag == WS_ACTIVE, "WaitingFlag# %d", int(state.Flag));
return Sleep(stopFlag);
} else {
return false;
}
void AfterWakeUp(EThreadState /*state*/) {
}

TExecutorThreadCtx() = default;
};


constexpr ui32 MaxPoolsForSharedThreads = 4;

struct TSharedExecutorThreadCtx : public TGenericExecutorThreadCtx {
using TBase = TGenericExecutorThreadCtx;

struct TWaitState {
EThreadState Flag = EThreadState::None;
ui32 NextPool = Max<ui32>();

TWaitState() = default;

TWaitState(ui64 state)
: Flag(static_cast<EThreadState>(state & 0x7))
, NextPool(state >> 3)
{}

TWaitState(EThreadState flag, ui32 nextPool = Max<ui32>())
: Flag(flag)
, NextPool(nextPool)
{}

explicit operator ui64() {
return static_cast<ui64>(Flag) | ui64(NextPool << 3);
}

explicit operator EThreadState() {
return Flag;
}
};

std::atomic<TBasicExecutorPool*> ExecutorPools[MaxPoolsForSharedThreads];
ui32 NextPool = 0;

void AfterWakeUp(TWaitState state) {
NextPool = state.NextPool;
}

void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
this->TBase::Spin<TSharedExecutorThreadCtx, TWaitState>(spinThresholdCycles, stopFlag);
}

bool Sleep(std::atomic<bool> *stopFlag) {
return this->TBase::Sleep<TSharedExecutorThreadCtx, TWaitState>(stopFlag);
}

bool Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag); // in executor_pool_basic.cpp

TSharedExecutorThreadCtx() = default;
};

}