diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 0e570d160fba..176e9f6e1e21 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -303,6 +303,7 @@ void AddExecutorPool( TBasicExecutorPoolConfig basic; basic.PoolId = poolId; basic.PoolName = poolConfig.GetName(); + basic.UseRingQueue = systemConfig.HasUseRingQueue() && systemConfig.GetUseRingQueue(); if (poolConfig.HasMaxAvgPingDeviation()) { auto poolGroup = counters->GetSubgroup("execpool", basic.PoolName); auto &poolInfo = cpuManager.PingInfoByPool[poolId]; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index a44ea1acb36b..33c6c447699e 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -118,6 +118,7 @@ message TActorSystemConfig { optional ENodeType NodeType = 14 [default = COMPUTE]; optional uint32 ForceIOPoolThreads = 17; optional bool UseSharedThreads = 18; + optional bool UseRingQueue = 19; optional bool MonitorStuckActors = 15; optional EActorSystemProfile ActorSystemProfile = 16; diff --git a/ydb/library/actors/core/config.h b/ydb/library/actors/core/config.h index 654e092a088a..038f5673c3c0 100644 --- a/ydb/library/actors/core/config.h +++ b/ydb/library/actors/core/config.h @@ -31,6 +31,7 @@ namespace NActors { i16 SoftProcessingDurationTs = 0; EASProfile ActorSystemProfile = EASProfile::Default; bool HasSharedThread = false; + bool UseRingQueue = false; }; struct TSharedExecutorPoolConfig { @@ -47,6 +48,7 @@ namespace NActors { TString PoolName; ui32 Threads = 1; TCpuMask Affinity; // Executor thread affinity + bool UseRingQueue = false; }; struct TSelfPingInfo { diff --git a/ydb/library/actors/core/executor_pool_base.cpp b/ydb/library/actors/core/executor_pool_base.cpp index 2bc87a17bf89..0c538f37b9e8 100644 --- a/ydb/library/actors/core/executor_pool_base.cpp +++ b/ydb/library/actors/core/executor_pool_base.cpp @@ -65,17 +65,20 @@ namespace NActors { } #endif - TExecutorPoolBase::TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity) + TExecutorPoolBase::TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity, bool useRingQueue) : TExecutorPoolBaseMailboxed(poolId) , PoolThreads(threads) , ThreadsAffinity(affinity) -#ifdef RING_ACTIVATION_QUEUE - , Activations(threads == 1) -#endif - {} + { + if (useRingQueue) { + Activations.emplace(threads == 1); + } else { + Activations.emplace(); + } + } TExecutorPoolBase::~TExecutorPoolBase() { - while (Activations.Pop(0)) + while (std::visit([](auto &x){return x.Pop(0);}, Activations)) ; } diff --git a/ydb/library/actors/core/executor_pool_base.h b/ydb/library/actors/core/executor_pool_base.h index 7a68d375bb14..323b06595805 100644 --- a/ydb/library/actors/core/executor_pool_base.h +++ b/ydb/library/actors/core/executor_pool_base.h @@ -47,21 +47,16 @@ namespace NActors { class TExecutorPoolBase: public TExecutorPoolBaseMailboxed { protected: - -#ifdef RING_ACTIVATION_QUEUE - using TActivationQueue = TRingActivationQueue; -#else - using TActivationQueue = TUnorderedCache; -#endif + using TUnorderedCacheActivationQueue = TUnorderedCache; const i16 PoolThreads; TIntrusivePtr ThreadsAffinity; TAtomic Semaphore = 0; - TActivationQueue Activations; + std::variant Activations; TAtomic ActivationsRevolvingCounter = 0; std::atomic_bool StopFlag = false; public: - TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity); + TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity, bool useRingQueue); ~TExecutorPoolBase(); void ScheduleActivation(ui32 activation) override; void SpecificScheduleActivation(ui32 activation) override; diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index 4a940a9ca37f..284392c36961 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -82,7 +82,7 @@ namespace NActors { } TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer, TExecutorPoolJail *jail) - : TExecutorPoolBase(cfg.PoolId, cfg.Threads, new TAffinity(cfg.Affinity)) + : TExecutorPoolBase(cfg.PoolId, cfg.Threads, new TAffinity(cfg.Affinity), cfg.UseRingQueue) , DefaultSpinThresholdCycles(cfg.SpinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles , SpinThresholdCycles(DefaultSpinThresholdCycles) , SpinThresholdCyclesPerThread(new NThreading::TPadded>[cfg.Threads]) @@ -235,7 +235,7 @@ namespace NActors { } } else { TInternalActorTypeGuard activityGuard; - if (const ui32 activation = Activations.Pop(++revolvingCounter)) { + if (const ui32 activation = std::visit([&revolvingCounter](auto &x) {return x.Pop(++revolvingCounter);}, Activations)) { if (workerId >= 0) { Threads[workerId].SetWork(); } else { @@ -308,8 +308,9 @@ namespace NActors { void TBasicExecutorPool::ScheduleActivationExCommon(ui32 activation, ui64 revolvingCounter, TAtomic x) { TSemaphore semaphore = TSemaphore::GetSemaphore(x); - - Activations.Push(activation, revolvingCounter); + std::visit([activation, revolvingCounter](auto &x) { + x.Push(activation, revolvingCounter); + }, Activations); bool needToWakeUp = false; bool needToChangeOldSemaphore = true; diff --git a/ydb/library/actors/core/executor_pool_io.cpp b/ydb/library/actors/core/executor_pool_io.cpp index d7b01339671d..31592f06a6ed 100644 --- a/ydb/library/actors/core/executor_pool_io.cpp +++ b/ydb/library/actors/core/executor_pool_io.cpp @@ -6,8 +6,8 @@ #include namespace NActors { - TIOExecutorPool::TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName, TAffinity* affinity) - : TExecutorPoolBase(poolId, threads, affinity) + TIOExecutorPool::TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName, TAffinity* affinity, bool useRingQueue) + : TExecutorPoolBase(poolId, threads, affinity, useRingQueue) , Threads(new TExecutorThreadCtx[threads]) , PoolName(poolName) {} @@ -17,7 +17,8 @@ namespace NActors { cfg.PoolId, cfg.Threads, cfg.PoolName, - new TAffinity(cfg.Affinity) + new TAffinity(cfg.Affinity), + cfg.UseRingQueue ) { Harmonizer = harmonizer; @@ -53,7 +54,7 @@ namespace NActors { } while (!StopFlag.load(std::memory_order_acquire)) { - if (const ui32 activation = Activations.Pop(++revolvingCounter)) { + if (const ui32 activation = std::visit([&revolvingCounter](auto &x){return x.Pop(++revolvingCounter);}, Activations)) { return activation; } SpinLockPause(); @@ -86,7 +87,9 @@ namespace NActors { } void TIOExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) { - Activations.Push(activation, revolvingWriteCounter); + std::visit([activation, revolvingWriteCounter](auto &x) { + x.Push(activation, revolvingWriteCounter); + }, Activations); const TAtomic x = AtomicIncrement(Semaphore); if (x <= 0) { for (;; ++revolvingWriteCounter) { diff --git a/ydb/library/actors/core/executor_pool_io.h b/ydb/library/actors/core/executor_pool_io.h index c4fd95b4890b..67bbd66c32a1 100644 --- a/ydb/library/actors/core/executor_pool_io.h +++ b/ydb/library/actors/core/executor_pool_io.h @@ -26,7 +26,7 @@ namespace NActors { const TString PoolName; const ui32 ActorSystemIndex = NActors::TActorTypeOperator::GetActorSystemIndex(); public: - TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName = "", TAffinity* affinity = nullptr); + TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName = "", TAffinity* affinity = nullptr, bool useRingQueue = false); explicit TIOExecutorPool(const TIOExecutorPoolConfig& cfg, IHarmonizer *harmonizer = nullptr); ~TIOExecutorPool();