Skip to content

Commit ba54d6e

Browse files
authored
Merge c4941a5 into eae74a3
2 parents eae74a3 + c4941a5 commit ba54d6e

8 files changed

+77
-66
lines changed

ydb/library/actors/core/executor_pool_basic.cpp

+5-16
Original file line numberDiff line numberDiff line change
@@ -162,27 +162,23 @@ namespace NActors {
162162
TWorkerId workerId = wctx.WorkerId;
163163
Y_DEBUG_ABORT_UNLESS(workerId < PoolThreads);
164164

165-
TlsThreadContext->Timers.Reset();
166-
167-
if (Harmonizer) {
168-
LWPROBE(TryToHarmonize, PoolId, PoolName);
169-
Harmonizer->Harmonize(TlsThreadContext->Timers.HPStart);
170-
}
171-
172165
if (workerId >= 0) {
173166
Threads[workerId].UnsetWork();
174167
} else {
175168
Y_ABORT_UNLESS(wctx.SharedThread);
176169
wctx.SharedThread->UnsetWork();
177170
}
178171

172+
if (Harmonizer) {
173+
LWPROBE(TryToHarmonize, PoolId, PoolName);
174+
Harmonizer->Harmonize(TlsThreadContext->StartOfElapsingTime.load(std::memory_order_relaxed));
175+
}
176+
179177
TAtomic x = AtomicGet(Semaphore);
180178
TSemaphore semaphore = TSemaphore::GetSemaphore(x);
181179
while (!StopFlag.load(std::memory_order_acquire)) {
182180
if (!semaphore.OldSemaphore || workerId >= 0 && semaphore.CurrentSleepThreadCount < 0) {
183181
if (workerId < 0 || !wctx.IsNeededToWaitNextActivation) {
184-
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
185-
wctx.AddElapsedCycles(ActorSystemIndex, TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart);
186182
return 0;
187183
}
188184

@@ -203,13 +199,6 @@ namespace NActors {
203199
wctx.SharedThread->SetWork();
204200
}
205201
AtomicDecrement(Semaphore);
206-
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
207-
TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart;
208-
wctx.AddElapsedCycles(ActorSystemIndex, TlsThreadContext->Timers.Elapsed);
209-
if (TlsThreadContext->Timers.Parked > 0) {
210-
wctx.AddParkedCycles(TlsThreadContext->Timers.Parked);
211-
}
212-
213202
return activation;
214203
}
215204
semaphore.CurrentSleepThreadCount++;

ydb/library/actors/core/executor_pool_io.cpp

+11-15
Original file line numberDiff line numberDiff line change
@@ -30,31 +30,27 @@ namespace NActors {
3030
i16 workerId = wctx.WorkerId;
3131
Y_DEBUG_ABORT_UNLESS(workerId < PoolThreads);
3232

33-
NHPTimer::STime elapsed = 0;
34-
NHPTimer::STime parked = 0;
35-
NHPTimer::STime hpstart = GetCycleCountFast();
36-
NHPTimer::STime hpnow;
37-
3833
const TAtomic x = AtomicDecrement(Semaphore);
3934
if (x < 0) {
4035
TExecutorThreadCtx& threadCtx = Threads[workerId];
4136
ThreadQueue.Push(workerId + 1, revolvingCounter);
42-
hpnow = GetCycleCountFast();
43-
elapsed += hpnow - hpstart;
37+
38+
NHPTimer::STime hpnow = GetCycleCountFast();
39+
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
40+
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
41+
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
42+
4443
if (threadCtx.WaitingPad.Park())
4544
return 0;
46-
hpstart = GetCycleCountFast();
47-
parked += hpstart - hpnow;
45+
46+
hpnow = GetCycleCountFast();
47+
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
48+
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
49+
wctx.AddParkedCycles(hpnow - hpprev);
4850
}
4951

5052
while (!StopFlag.load(std::memory_order_acquire)) {
5153
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
52-
hpnow = GetCycleCountFast();
53-
elapsed += hpnow - hpstart;
54-
wctx.AddElapsedCycles(ActorSystemIndex, elapsed);
55-
if (parked > 0) {
56-
wctx.AddParkedCycles(parked);
57-
}
5854
return activation;
5955
}
6056
SpinLockPause();

ydb/library/actors/core/executor_pool_shared.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config
2020
, PoolCount(poolCount)
2121
, SharedThreadCount(poolsWithThreads.size())
2222
, Threads(new TSharedExecutorThreadCtx[SharedThreadCount])
23-
, Timers(new TTimers[SharedThreadCount])
2423
, TimePerMailbox(config.TimePerMailbox)
2524
, EventsPerMailbox(config.EventsPerMailbox)
2625
, SoftProcessingDurationTs(config.SoftProcessingDurationTs)

ydb/library/actors/core/executor_pool_shared.h

-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ namespace NActors {
4343
i16 PoolCount;
4444
i16 SharedThreadCount;
4545
std::unique_ptr<TSharedExecutorThreadCtx[]> Threads;
46-
std::unique_ptr<TTimers[]> Timers;
4746

4847
std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders;
4948
std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters;

ydb/library/actors/core/executor_thread.cpp

+42-10
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ namespace NActors {
4848
, ThreadName(threadName)
4949
, TimePerMailbox(timePerMailbox)
5050
, EventsPerMailbox(eventsPerMailbox)
51+
, ActorSystemIndex(TActorTypeOperator::GetActorSystemIndex())
5152
{
5253
Ctx.Switch(
5354
ExecutorPool,
@@ -75,6 +76,7 @@ namespace NActors {
7576
, EventsPerMailbox(eventsPerMailbox)
7677
, SoftProcessingDurationTs(softProcessingDurationTs)
7778
, SharedStats(poolCount)
79+
, ActorSystemIndex(TActorTypeOperator::GetActorSystemIndex())
7880
{
7981
Ctx.Switch(
8082
ExecutorPool,
@@ -189,7 +191,6 @@ namespace NActors {
189191
Ctx.HPStart = GetCycleCountFast();
190192
Ctx.ExecutedEvents = 0;
191193
}
192-
NHPTimer::STime hpprev = Ctx.HPStart;
193194

194195
IActor* actor = nullptr;
195196
const std::type_info* actorType = nullptr;
@@ -198,10 +199,14 @@ namespace NActors {
198199
bool firstEvent = true;
199200
bool preempted = false;
200201
bool wasWorking = false;
202+
NHPTimer::STime hpnow = Ctx.HPStart;
203+
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
204+
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
205+
hpprev = Ctx.HPStart;
206+
201207
for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
202208
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
203209
mailbox->ProcessEvents(mailbox);
204-
NHPTimer::STime hpnow;
205210
recipient = evExt->GetRecipientRewrite();
206211
TActorContext ctx(*mailbox, *this, hpprev, recipient);
207212
TlsActivationContext = &ctx; // ensure dtor (if any) is called within actor system
@@ -239,9 +244,14 @@ namespace NActors {
239244
if (activityType != prevActivityType) {
240245
prevActivityType = activityType;
241246
NProfiling::TMemoryTagScope::Reset(activityType);
247+
TlsThreadContext->ElapsingActorActivity.store(activityType, std::memory_order_release);
242248
}
243249

244250
actor->Receive(ev);
251+
252+
hpnow = GetCycleCountFast();
253+
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
254+
245255
mailbox->ProcessEvents(mailbox);
246256
actor->OnDequeueEvent();
247257

@@ -256,7 +266,6 @@ namespace NActors {
256266
if (mailbox->IsEmpty()) // was not-free and become free, we must reclaim mailbox
257267
reclaimAsFree = true;
258268

259-
hpnow = GetCycleCountFast();
260269
NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter);
261270
if (elapsed > 1000000) {
262271
LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0);
@@ -277,10 +286,10 @@ namespace NActors {
277286
Ctx.IncrementNonDeliveredEvents();
278287
}
279288
hpnow = GetCycleCountFast();
289+
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
290+
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
280291
}
281292

282-
hpprev = hpnow;
283-
284293
if (TlsThreadContext->CapturedType == ESendingType::Tail) {
285294
AtomicStore(&mailbox->ScheduleMoment, hpnow);
286295
Ctx.IncrementMailboxPushedOutByTailSending();
@@ -360,6 +369,7 @@ namespace NActors {
360369
break; // empty queue, leave
361370
}
362371
}
372+
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
363373

364374
NProfiling::TMemoryTagScope::Reset(0);
365375
TlsActivationContext = nullptr;
@@ -495,8 +505,11 @@ namespace NActors {
495505
ThreadDisableBalloc();
496506
#endif
497507

498-
TThreadContext threadCtx;
499-
TlsThreadContext = &threadCtx;
508+
TlsThreadCtx.WorkerCtx = &Ctx;
509+
TlsThreadCtx.ActorSystemIndex = ActorSystemIndex;
510+
TlsThreadCtx.ElapsingActorActivity = ActorSystemIndex;
511+
TlsThreadCtx.StartOfElapsingTime = GetCycleCountFast();
512+
TlsThreadContext = &TlsThreadCtx;
500513
if (ThreadName) {
501514
::SetCurrentThreadName(ThreadName);
502515
}
@@ -529,8 +542,11 @@ namespace NActors {
529542
ThreadDisableBalloc();
530543
#endif
531544

532-
TThreadContext threadCtx;
533-
TlsThreadContext = &threadCtx;
545+
TlsThreadCtx.WorkerCtx = &Ctx;
546+
TlsThreadCtx.ActorSystemIndex = ActorSystemIndex;
547+
TlsThreadCtx.ElapsingActorActivity = ActorSystemIndex;
548+
TlsThreadCtx.StartOfElapsingTime = GetCycleCountFast();
549+
TlsThreadContext = &TlsThreadCtx;
534550
if (ThreadName) {
535551
::SetCurrentThreadName(ThreadName);
536552
}
@@ -551,7 +567,7 @@ namespace NActors {
551567
}
552568

553569
if (!wasWorking && !StopFlag.load(std::memory_order_relaxed)) {
554-
TlsThreadContext->Timers.Reset();
570+
ThreadCtx->UnsetWork();
555571
ThreadCtx->Wait(0, &StopFlag);
556572
}
557573

@@ -760,10 +776,26 @@ namespace NActors {
760776
}
761777

762778
void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
779+
NHPTimer::STime hpnow = GetCycleCountFast();
780+
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
781+
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
782+
if (activityType == Max<ui64>()) {
783+
Ctx.AddParkedCycles(hpnow - hpprev);
784+
} else {
785+
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
786+
}
763787
Ctx.GetCurrentStats(statsCopy);
764788
}
765789

766790
void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const {
791+
NHPTimer::STime hpnow = GetCycleCountFast();
792+
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
793+
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
794+
if (activityType == Max<ui64>()) {
795+
Ctx.AddParkedCycles(hpnow - hpprev);
796+
} else {
797+
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
798+
}
767799
statsCopy = TExecutorThreadStats();
768800
statsCopy.Aggregate(SharedStats[poolId]);
769801
}

ydb/library/actors/core/executor_thread.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "event.h"
55
#include "callstack.h"
66
#include "probes.h"
7+
#include "thread_context.h"
78
#include "worker_context.h"
89
#include "log_settings.h"
910

@@ -92,7 +93,8 @@ namespace NActors {
9293
ui64 CurrentActorScheduledEventsCounter = 0;
9394

9495
// Thread-specific
95-
TWorkerContext Ctx;
96+
mutable TThreadContext TlsThreadCtx;
97+
mutable TWorkerContext Ctx;
9698
ui64 RevolvingReadCounter = 0;
9799
ui64 RevolvingWriteCounter = 0;
98100
const TString ThreadName;
@@ -104,6 +106,7 @@ namespace NActors {
104106
ui64 SoftProcessingDurationTs;
105107

106108
std::vector<TExecutorThreadStats> SharedStats;
109+
const ui32 ActorSystemIndex;
107110
};
108111

109112
class TExecutorThread: public TGenericExecutorThread {

ydb/library/actors/core/executor_thread_ctx.h

+9-5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "defs.h"
44
#include "thread_context.h"
5+
#include "worker_context.h"
56

67
#include <ydb/library/actors/util/datetime.h>
78
#include <ydb/library/actors/util/threadparkpad.h>
@@ -95,16 +96,19 @@ namespace NActors {
9596
return false;
9697
}
9798

99+
NHPTimer::STime hpnow = GetCycleCountFast();
100+
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
101+
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
102+
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
98103
do {
99-
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
100-
TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart;
101104
if (WaitingPad.Park()) // interrupted
102105
return true;
103-
TlsThreadContext->Timers.HPStart = GetCycleCountFast();
104-
TlsThreadContext->Timers.Parked += TlsThreadContext->Timers.HPStart - TlsThreadContext->Timers.HPNow;
106+
hpnow = GetCycleCountFast();
107+
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
108+
TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev);
105109
state = GetState<TWaitState>();
106110
} while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));
107-
111+
TlsThreadContext->ElapsingActorActivity.store(TlsThreadContext->ActorSystemIndex, std::memory_order_release);
108112
static_cast<TDerived*>(this)->AfterWakeUp(state);
109113
return false;
110114
}

ydb/library/actors/core/thread_context.h

+6-17
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,11 @@
1111
namespace NActors {
1212

1313
class IExecutorPool;
14+
struct TWorkerContext;
1415

1516
template <typename T>
1617
struct TWaitingStats;
1718

18-
struct TTimers {
19-
NHPTimer::STime Elapsed = 0;
20-
NHPTimer::STime Parked = 0;
21-
NHPTimer::STime Blocked = 0;
22-
NHPTimer::STime HPStart = GetCycleCountFast();
23-
NHPTimer::STime HPNow;
24-
25-
void Reset() {
26-
Elapsed = 0;
27-
Parked = 0;
28-
Blocked = 0;
29-
HPStart = GetCycleCountFast();
30-
HPNow = HPStart;
31-
}
32-
};
33-
3419
struct TThreadContext {
3520
IExecutorPool *Pool = nullptr;
3621
ui32 CapturedActivation = 0;
@@ -42,8 +27,12 @@ namespace NActors {
4227
ui16 LocalQueueSize = 0;
4328
TWaitingStats<ui64> *WaitingStats = nullptr;
4429
bool IsCurrentRecipientAService = false;
45-
TTimers Timers;
4630
TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow;
31+
32+
std::atomic<ui64> StartOfElapsingTime = 0;
33+
std::atomic<ui64> ElapsingActorActivity = 0;
34+
TWorkerContext *WorkerCtx = nullptr;
35+
ui32 ActorSystemIndex = 0;
4736
};
4837

4938
extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp

0 commit comments

Comments
 (0)