Skip to content

Commit 1810b3a

Browse files
authored
Revert "Refactor harmonizer in actorsystem (#12084)"
This reverts commit 97c9f43.
1 parent 14a54ab commit 1810b3a

32 files changed

+1003
-2234
lines changed

ydb/library/actors/core/cpu_manager.cpp

+6-4
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ namespace NActors {
3737
poolsWithSharedThreads.push_back(cfg.PoolId);
3838
}
3939
}
40-
Shared.reset(CreateSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads));
41-
auto sharedPool = static_cast<ISharedExecutorPool*>(Shared.get());
40+
Shared.reset(new TSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads));
41+
auto sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());
4242

4343
ui64 ts = GetCycleCountFast();
4444
Harmonizer.reset(MakeHarmonizer(ts));
@@ -135,9 +135,11 @@ namespace NActors {
135135
for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
136136
if (cfg.PoolId == poolId) {
137137
if (cfg.HasSharedThread) {
138-
auto *sharedPool = Shared.get();
138+
auto *sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());
139139
auto *pool = new TBasicExecutorPool(cfg, Harmonizer.get(), Jail.get());
140-
pool->AddSharedThread(sharedPool->GetSharedThread(poolId));
140+
if (pool) {
141+
pool->AddSharedThread(sharedPool->GetSharedThread(poolId));
142+
}
141143
return pool;
142144
} else {
143145
return new TBasicExecutorPool(cfg, Harmonizer.get(), Jail.get());

ydb/library/actors/core/cpu_manager.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22

33
#include "config.h"
44
#include "executor_pool_jail.h"
5+
#include "harmonizer.h"
56
#include "executor_pool.h"
67
#include "executor_pool_shared.h"
78
#include "mon_stats.h"
8-
#include <ydb/library/actors/core/harmonizer/harmonizer.h>
99
#include <memory>
1010

1111
namespace NActors {
@@ -16,7 +16,7 @@ namespace NActors {
1616
const ui32 ExecutorPoolCount;
1717
TArrayHolder<TAutoPtr<IExecutorPool>> Executors;
1818
std::unique_ptr<IHarmonizer> Harmonizer;
19-
std::unique_ptr<ISharedExecutorPool> Shared;
19+
std::unique_ptr<TSharedExecutorPool> Shared;
2020
std::unique_ptr<TExecutorPoolJail> Jail;
2121
TCpuManagerConfig Config;
2222

ydb/library/actors/core/executor_pool.h

+4-15
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,15 @@ namespace NActors {
1414
struct TExecutorThreadStats;
1515
class TExecutorPoolJail;
1616
class ISchedulerCookie;
17-
struct TSharedExecutorThreadCtx;
1817

1918
struct TCpuConsumption {
20-
double CpuUs = 0;
21-
double ElapsedUs = 0;
19+
double ConsumedUs = 0;
20+
double BookedUs = 0;
2221
ui64 NotEnoughCpuExecutions = 0;
2322

2423
void Add(const TCpuConsumption& other) {
25-
CpuUs += other.CpuUs;
26-
ElapsedUs += other.ElapsedUs;
24+
ConsumedUs += other.ConsumedUs;
25+
BookedUs += other.BookedUs;
2726
NotEnoughCpuExecutions += other.NotEnoughCpuExecutions;
2827
}
2928
};
@@ -177,16 +176,6 @@ namespace NActors {
177176
return 1;
178177
}
179178

180-
virtual TSharedExecutorThreadCtx* ReleaseSharedThread() {
181-
return nullptr;
182-
}
183-
virtual void AddSharedThread(TSharedExecutorThreadCtx*) {
184-
}
185-
186-
virtual bool IsSharedThreadEnabled() const {
187-
return false;
188-
}
189-
190179
virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) {
191180
Y_UNUSED(threadIdx);
192181
return TCpuConsumption{0.0, 0.0};

ydb/library/actors/core/executor_pool_base.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
#include "actorsystem.h"
2-
#include "activity_guard.h"
32
#include "actor.h"
43
#include "executor_pool_base.h"
54
#include "executor_pool_basic_feature_flags.h"

ydb/library/actors/core/executor_pool_basic.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -407,10 +407,10 @@ namespace NActors {
407407
poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState;
408408
poolStats.DecreasingThreadsByExchange = stats.DecreasingThreadsByExchange;
409409
poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount;
410-
poolStats.MaxCpuUs = stats.MaxCpuUs;
411-
poolStats.MinCpuUs = stats.MinCpuUs;
412-
poolStats.MaxElapsedUs = stats.MaxElapsedUs;
413-
poolStats.MinElapsedUs = stats.MinElapsedUs;
410+
poolStats.MaxConsumedCpuUs = stats.MaxConsumedCpu;
411+
poolStats.MinConsumedCpuUs = stats.MinConsumedCpu;
412+
poolStats.MaxBookedCpuUs = stats.MaxBookedCpu;
413+
poolStats.MinBookedCpuUs = stats.MinBookedCpu;
414414
}
415415

416416
statsCopy.resize(MaxFullThreadCount + 1);
@@ -429,7 +429,7 @@ namespace NActors {
429429
void TBasicExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
430430
if (Harmonizer) {
431431
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
432-
poolState.UsedCpu = stats.AvgElapsedUs;
432+
poolState.UsedCpu = stats.AvgConsumedCpu;
433433
poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount;
434434
} else {
435435
poolState.PossibleMaxLimit = poolState.MaxLimit;
@@ -625,7 +625,7 @@ namespace NActors {
625625
TExecutorThreadCtx& threadCtx = Threads[threadIdx];
626626
TExecutorThreadStats stats;
627627
threadCtx.Thread->GetCurrentStatsForHarmonizer(stats);
628-
return {static_cast<double>(stats.CpuUs), Ts2Us(stats.SafeElapsedTicks), stats.NotEnoughCpuExecutions};
628+
return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs), stats.NotEnoughCpuExecutions};
629629
}
630630

631631
i16 TBasicExecutorPool::GetBlockingThreadCount() const {

ydb/library/actors/core/executor_pool_basic.h

+3-6
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
#include "executor_pool_basic_feature_flags.h"
88
#include "scheduler_queue.h"
99
#include "executor_pool_base.h"
10+
#include "harmonizer.h"
1011
#include <memory>
11-
#include <ydb/library/actors/core/harmonizer/harmonizer.h>
1212
#include <ydb/library/actors/actor_type/indexes.h>
1313
#include <ydb/library/actors/util/unordered_cache.h>
1414
#include <ydb/library/actors/util/threadparkpad.h>
@@ -278,11 +278,8 @@ namespace NActors {
278278
void CalcSpinPerThread(ui64 wakingUpConsumption);
279279
void ClearWaitingStats() const;
280280

281-
TSharedExecutorThreadCtx* ReleaseSharedThread() override;
282-
void AddSharedThread(TSharedExecutorThreadCtx* thread) override;
283-
bool IsSharedThreadEnabled() const override {
284-
return true;
285-
}
281+
TSharedExecutorThreadCtx* ReleaseSharedThread();
282+
void AddSharedThread(TSharedExecutorThreadCtx* thread);
286283

287284
private:
288285
void AskToGoToSleep(bool *needToWait, bool *needToBlock);

ydb/library/actors/core/executor_pool_io.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ namespace NActors {
156156
void TIOExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
157157
if (Harmonizer) {
158158
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
159-
poolState.UsedCpu = stats.AvgElapsedUs;
159+
poolState.UsedCpu = stats.AvgConsumedCpu;
160160
}
161161
poolState.CurrentLimit = PoolThreads;
162162
poolState.MaxLimit = PoolThreads;

ydb/library/actors/core/executor_pool_io.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
#include "actorsystem.h"
44
#include "executor_thread.h"
55
#include "executor_thread_ctx.h"
6+
#include "harmonizer.h"
67
#include "scheduler_queue.h"
78
#include "executor_pool_base.h"
8-
#include <ydb/library/actors/core/harmonizer/harmonizer.h>
99
#include <ydb/library/actors/actor_type/indexes.h>
1010
#include <ydb/library/actors/util/ticket_lock.h>
1111
#include <ydb/library/actors/util/unordered_cache.h>

ydb/library/actors/core/executor_pool_shared.cpp

+37-121
Original file line numberDiff line numberDiff line change
@@ -12,50 +12,6 @@
1212

1313
namespace NActors {
1414

15-
class TSharedExecutorPool: public ISharedExecutorPool {
16-
public:
17-
TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads);
18-
19-
// IThreadPool
20-
void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override;
21-
void Start() override;
22-
void PrepareStop() override;
23-
void Shutdown() override;
24-
bool Cleanup() override;
25-
26-
TSharedExecutorThreadCtx *GetSharedThread(i16 poolId) override;
27-
void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override;
28-
void GetSharedStatsForHarmonizer(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override;
29-
TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx) override;
30-
std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId) override;
31-
32-
i16 ReturnOwnHalfThread(i16 pool) override;
33-
i16 ReturnBorrowedHalfThread(i16 pool) override;
34-
void GiveHalfThread(i16 from, i16 to) override;
35-
36-
i16 GetSharedThreadCount() const override;
37-
38-
TSharedPoolState GetState() const override;
39-
40-
void Init(const std::vector<IExecutorPool*>& pools, bool withThreads) override;
41-
42-
private:
43-
TSharedPoolState State;
44-
45-
std::vector<IExecutorPool*> Pools;
46-
47-
i16 PoolCount;
48-
i16 SharedThreadCount;
49-
std::unique_ptr<TSharedExecutorThreadCtx[]> Threads;
50-
51-
std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders;
52-
std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters;
53-
54-
TDuration TimePerMailbox;
55-
ui32 EventsPerMailbox;
56-
ui64 SoftProcessingDurationTs;
57-
}; // class TSharedExecutorPool
58-
5915
TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads)
6016
: State(poolCount, poolsWithThreads.size())
6117
, Pools(poolCount)
@@ -73,47 +29,40 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config
7329
}
7430
}
7531

76-
void TSharedExecutorPool::Init(const std::vector<IExecutorPool*>& pools, bool withThreads) {
77-
std::vector<IExecutorPool*> poolByThread(SharedThreadCount);
78-
for (IExecutorPool* pool : pools) {
79-
Pools[pool->PoolId] = pool;
80-
i16 threadIdx = State.ThreadByPool[pool->PoolId];
81-
if (threadIdx >= 0) {
82-
poolByThread[threadIdx] = pool;
32+
void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
33+
// ActorSystem = actorSystem;
34+
35+
ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]);
36+
ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]);
37+
38+
std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools();
39+
std::vector<IExecutorPool*> poolByThread(SharedThreadCount);
40+
for (IExecutorPool* pool : poolsBasic) {
41+
Pools[pool->PoolId] = dynamic_cast<TBasicExecutorPool*>(pool);
42+
i16 threadIdx = State.ThreadByPool[pool->PoolId];
43+
if (threadIdx >= 0) {
44+
poolByThread[threadIdx] = pool;
45+
}
8346
}
84-
}
8547

86-
for (i16 i = 0; i != SharedThreadCount; ++i) {
87-
// !TODO
88-
Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release);
89-
if (withThreads) {
48+
for (i16 i = 0; i != SharedThreadCount; ++i) {
49+
// !TODO
50+
Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release);
9051
Threads[i].Thread.reset(
9152
new TSharedExecutorThread(
9253
-1,
93-
nullptr,
94-
&Threads[i],
95-
PoolCount,
96-
"SharedThread",
97-
SoftProcessingDurationTs,
98-
TimePerMailbox,
54+
actorSystem,
55+
&Threads[i],
56+
PoolCount,
57+
"SharedThread",
58+
SoftProcessingDurationTs,
59+
TimePerMailbox,
9960
EventsPerMailbox));
61+
ScheduleWriters[i].Init(ScheduleReaders[i]);
10062
}
101-
}
102-
}
103-
104-
void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
105-
ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]);
106-
ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]);
107-
108-
std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools();
109-
Init(poolsBasic, true);
11063

111-
for (i16 i = 0; i != SharedThreadCount; ++i) {
112-
ScheduleWriters[i].Init(ScheduleReaders[i]);
113-
}
114-
115-
*scheduleReaders = ScheduleReaders.get();
116-
*scheduleSz = SharedThreadCount;
64+
*scheduleReaders = ScheduleReaders.get();
65+
*scheduleSz = SharedThreadCount;
11766
}
11867

11968
void TSharedExecutorPool::Start() {
@@ -150,27 +99,24 @@ TSharedExecutorThreadCtx* TSharedExecutorPool::GetSharedThread(i16 pool) {
15099
return &Threads[threadIdx];
151100
}
152101

153-
i16 TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) {
102+
void TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) {
154103
i16 threadIdx = State.ThreadByPool[pool];
155-
IExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
104+
TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
156105
Y_ABORT_UNLESS(borrowingPool);
157-
i16 borrowedPool = State.PoolByBorrowedThread[threadIdx];
158-
State.BorrowedThreadByPool[borrowedPool] = -1;
106+
State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1;
159107
State.PoolByBorrowedThread[threadIdx] = -1;
160108
// TODO(kruall): Check on race
161109
borrowingPool->ReleaseSharedThread();
162-
return borrowedPool;
163110
}
164111

165-
i16 TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) {
112+
void TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) {
166113
i16 threadIdx = State.BorrowedThreadByPool[pool];
167-
IExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
114+
TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
168115
Y_ABORT_UNLESS(borrowingPool);
169116
State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1;
170117
State.PoolByBorrowedThread[threadIdx] = -1;
171118
// TODO(kruall): Check on race
172119
borrowingPool->ReleaseSharedThread();
173-
return State.PoolByThread[threadIdx];
174120
}
175121

176122
void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
@@ -181,14 +127,14 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
181127
if (borrowedThreadIdx != -1) {
182128
i16 originalPool = State.PoolByThread[borrowedThreadIdx];
183129
if (originalPool == to) {
184-
ReturnOwnHalfThread(to);
130+
return ReturnOwnHalfThread(to);
185131
} else {
186132
ReturnOwnHalfThread(originalPool);
187133
}
188134
from = originalPool;
189135
}
190136
i16 threadIdx = State.ThreadByPool[from];
191-
IExecutorPool* borrowingPool = Pools[to];
137+
TBasicExecutorPool* borrowingPool = Pools[to];
192138
Threads[threadIdx].ExecutorPools[1].store(borrowingPool, std::memory_order_release);
193139
State.BorrowedThreadByPool[to] = threadIdx;
194140
State.PoolByBorrowedThread[threadIdx] = to;
@@ -197,16 +143,16 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
197143
}
198144

199145
void TSharedExecutorPool::GetSharedStats(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
200-
statsCopy.resize(SharedThreadCount);
146+
statsCopy.resize(SharedThreadCount + 1);
201147
for (i16 i = 0; i < SharedThreadCount; ++i) {
202-
Threads[i].Thread->GetSharedStats(poolId, statsCopy[i]);
148+
Threads[i].Thread->GetSharedStats(poolId, statsCopy[i + 1]);
203149
}
204150
}
205151

206152
void TSharedExecutorPool::GetSharedStatsForHarmonizer(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
207-
statsCopy.resize(SharedThreadCount);
153+
statsCopy.resize(SharedThreadCount + 1);
208154
for (i16 i = 0; i < SharedThreadCount; ++i) {
209-
Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i]);
155+
Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i + 1]);
210156
}
211157
}
212158

@@ -235,34 +181,4 @@ TSharedPoolState TSharedExecutorPool::GetState() const {
235181
return State;
236182
}
237183

238-
ISharedExecutorPool *CreateSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads) {
239-
return new TSharedExecutorPool(config, poolCount, poolsWithThreads);
240-
}
241-
242-
TString TSharedPoolState::ToString() const {
243-
TStringBuilder builder;
244-
builder << '{';
245-
builder << "ThreadByPool: [";
246-
for (ui32 i = 0; i < ThreadByPool.size(); ++i) {
247-
builder << ThreadByPool[i] << (i == ThreadByPool.size() - 1 ? "" : ", ");
248-
}
249-
builder << "], ";
250-
builder << "PoolByThread: [";
251-
for (ui32 i = 0; i < PoolByThread.size(); ++i) {
252-
builder << PoolByThread[i] << (i == PoolByThread.size() - 1 ? "" : ", ");
253-
}
254-
builder << "], ";
255-
builder << "BorrowedThreadByPool: [";
256-
for (ui32 i = 0; i < BorrowedThreadByPool.size(); ++i) {
257-
builder << BorrowedThreadByPool[i] << (i == BorrowedThreadByPool.size() - 1 ? "" : ", ");
258-
}
259-
builder << "], ";
260-
builder << "PoolByBorrowedThread: [";
261-
for (ui32 i = 0; i < PoolByBorrowedThread.size(); ++i) {
262-
builder << PoolByBorrowedThread[i] << (i == PoolByBorrowedThread.size() - 1 ? "" : ", ");
263-
}
264-
builder << ']';
265-
return builder << '}';
266-
}
267-
268184
}

0 commit comments

Comments
 (0)