Skip to content

Commit 6fa7d3c

Browse files
authored
Merge f0e4111 into eae74a3
2 parents eae74a3 + f0e4111 commit 6fa7d3c

8 files changed

+72
-64
lines changed

ydb/library/actors/core/executor_pool_basic.cpp

+5-16
Original file line numberDiff line numberDiff line change
@@ -162,27 +162,23 @@ namespace NActors {
162162
TWorkerId workerId = wctx.WorkerId;
163163
Y_DEBUG_ABORT_UNLESS(workerId < PoolThreads);
164164

165-
TlsThreadContext->Timers.Reset();
166-
167-
if (Harmonizer) {
168-
LWPROBE(TryToHarmonize, PoolId, PoolName);
169-
Harmonizer->Harmonize(TlsThreadContext->Timers.HPStart);
170-
}
171-
172165
if (workerId >= 0) {
173166
Threads[workerId].UnsetWork();
174167
} else {
175168
Y_ABORT_UNLESS(wctx.SharedThread);
176169
wctx.SharedThread->UnsetWork();
177170
}
178171

172+
if (Harmonizer) {
173+
LWPROBE(TryToHarmonize, PoolId, PoolName);
174+
Harmonizer->Harmonize(TlsThreadContext->StartOfElapsingTime.load(std::memory_order_relaxed));
175+
}
176+
179177
TAtomic x = AtomicGet(Semaphore);
180178
TSemaphore semaphore = TSemaphore::GetSemaphore(x);
181179
while (!StopFlag.load(std::memory_order_acquire)) {
182180
if (!semaphore.OldSemaphore || workerId >= 0 && semaphore.CurrentSleepThreadCount < 0) {
183181
if (workerId < 0 || !wctx.IsNeededToWaitNextActivation) {
184-
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
185-
wctx.AddElapsedCycles(ActorSystemIndex, TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart);
186182
return 0;
187183
}
188184

@@ -203,13 +199,6 @@ namespace NActors {
203199
wctx.SharedThread->SetWork();
204200
}
205201
AtomicDecrement(Semaphore);
206-
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
207-
TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart;
208-
wctx.AddElapsedCycles(ActorSystemIndex, TlsThreadContext->Timers.Elapsed);
209-
if (TlsThreadContext->Timers.Parked > 0) {
210-
wctx.AddParkedCycles(TlsThreadContext->Timers.Parked);
211-
}
212-
213202
return activation;
214203
}
215204
semaphore.CurrentSleepThreadCount++;

ydb/library/actors/core/executor_pool_io.cpp

+11-15
Original file line numberDiff line numberDiff line change
@@ -30,31 +30,27 @@ namespace NActors {
3030
i16 workerId = wctx.WorkerId;
3131
Y_DEBUG_ABORT_UNLESS(workerId < PoolThreads);
3232

33-
NHPTimer::STime elapsed = 0;
34-
NHPTimer::STime parked = 0;
35-
NHPTimer::STime hpstart = GetCycleCountFast();
36-
NHPTimer::STime hpnow;
37-
3833
const TAtomic x = AtomicDecrement(Semaphore);
3934
if (x < 0) {
4035
TExecutorThreadCtx& threadCtx = Threads[workerId];
4136
ThreadQueue.Push(workerId + 1, revolvingCounter);
42-
hpnow = GetCycleCountFast();
43-
elapsed += hpnow - hpstart;
37+
38+
NHPTimer::STime hpnow = GetCycleCountFast();
39+
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
40+
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
41+
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
42+
4443
if (threadCtx.WaitingPad.Park())
4544
return 0;
46-
hpstart = GetCycleCountFast();
47-
parked += hpstart - hpnow;
45+
46+
hpnow = GetCycleCountFast();
47+
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
48+
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
49+
wctx.AddParkedCycles(hpnow - hpprev);
4850
}
4951

5052
while (!StopFlag.load(std::memory_order_acquire)) {
5153
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
52-
hpnow = GetCycleCountFast();
53-
elapsed += hpnow - hpstart;
54-
wctx.AddElapsedCycles(ActorSystemIndex, elapsed);
55-
if (parked > 0) {
56-
wctx.AddParkedCycles(parked);
57-
}
5854
return activation;
5955
}
6056
SpinLockPause();

ydb/library/actors/core/executor_pool_shared.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config
2020
, PoolCount(poolCount)
2121
, SharedThreadCount(poolsWithThreads.size())
2222
, Threads(new TSharedExecutorThreadCtx[SharedThreadCount])
23-
, Timers(new TTimers[SharedThreadCount])
2423
, TimePerMailbox(config.TimePerMailbox)
2524
, EventsPerMailbox(config.EventsPerMailbox)
2625
, SoftProcessingDurationTs(config.SoftProcessingDurationTs)

ydb/library/actors/core/executor_pool_shared.h

-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ namespace NActors {
4343
i16 PoolCount;
4444
i16 SharedThreadCount;
4545
std::unique_ptr<TSharedExecutorThreadCtx[]> Threads;
46-
std::unique_ptr<TTimers[]> Timers;
4746

4847
std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders;
4948
std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters;

ydb/library/actors/core/executor_thread.cpp

+37-8
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ namespace NActors {
4848
, ThreadName(threadName)
4949
, TimePerMailbox(timePerMailbox)
5050
, EventsPerMailbox(eventsPerMailbox)
51+
, ActorSystemIndex(TActorTypeOperator::GetActorSystemIndex())
5152
{
5253
Ctx.Switch(
5354
ExecutorPool,
@@ -75,6 +76,7 @@ namespace NActors {
7576
, EventsPerMailbox(eventsPerMailbox)
7677
, SoftProcessingDurationTs(softProcessingDurationTs)
7778
, SharedStats(poolCount)
79+
, ActorSystemIndex(TActorTypeOperator::GetActorSystemIndex())
7880
{
7981
Ctx.Switch(
8082
ExecutorPool,
@@ -239,9 +241,14 @@ namespace NActors {
239241
if (activityType != prevActivityType) {
240242
prevActivityType = activityType;
241243
NProfiling::TMemoryTagScope::Reset(activityType);
244+
TlsThreadContext->ElapsingActorActivity.store(activityType, std::memory_order_release);
242245
}
243246

244247
actor->Receive(ev);
248+
249+
hpnow = GetCycleCountFast();
250+
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
251+
245252
mailbox->ProcessEvents(mailbox);
246253
actor->OnDequeueEvent();
247254

@@ -256,7 +263,6 @@ namespace NActors {
256263
if (mailbox->IsEmpty()) // was not-free and become free, we must reclaim mailbox
257264
reclaimAsFree = true;
258265

259-
hpnow = GetCycleCountFast();
260266
NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter);
261267
if (elapsed > 1000000) {
262268
LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0);
@@ -277,10 +283,10 @@ namespace NActors {
277283
Ctx.IncrementNonDeliveredEvents();
278284
}
279285
hpnow = GetCycleCountFast();
286+
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
287+
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
280288
}
281289

282-
hpprev = hpnow;
283-
284290
if (TlsThreadContext->CapturedType == ESendingType::Tail) {
285291
AtomicStore(&mailbox->ScheduleMoment, hpnow);
286292
Ctx.IncrementMailboxPushedOutByTailSending();
@@ -360,6 +366,7 @@ namespace NActors {
360366
break; // empty queue, leave
361367
}
362368
}
369+
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
363370

364371
NProfiling::TMemoryTagScope::Reset(0);
365372
TlsActivationContext = nullptr;
@@ -495,8 +502,11 @@ namespace NActors {
495502
ThreadDisableBalloc();
496503
#endif
497504

498-
TThreadContext threadCtx;
499-
TlsThreadContext = &threadCtx;
505+
TlsThreadCtx.WorkerCtx = &Ctx;
506+
TlsThreadCtx.ActorSystemIndex = ActorSystemIndex;
507+
TlsThreadCtx.ElapsingActorActivity = ActorSystemIndex;
508+
TlsThreadCtx.StartOfElapsingTime = GetCycleCountFast();
509+
TlsThreadContext = &TlsThreadCtx;
500510
if (ThreadName) {
501511
::SetCurrentThreadName(ThreadName);
502512
}
@@ -529,8 +539,11 @@ namespace NActors {
529539
ThreadDisableBalloc();
530540
#endif
531541

532-
TThreadContext threadCtx;
533-
TlsThreadContext = &threadCtx;
542+
TlsThreadCtx.WorkerCtx = &Ctx;
543+
TlsThreadCtx.ActorSystemIndex = ActorSystemIndex;
544+
TlsThreadCtx.ElapsingActorActivity = ActorSystemIndex;
545+
TlsThreadCtx.StartOfElapsingTime = GetCycleCountFast();
546+
TlsThreadContext = &TlsThreadCtx;
534547
if (ThreadName) {
535548
::SetCurrentThreadName(ThreadName);
536549
}
@@ -551,7 +564,7 @@ namespace NActors {
551564
}
552565

553566
if (!wasWorking && !StopFlag.load(std::memory_order_relaxed)) {
554-
TlsThreadContext->Timers.Reset();
567+
ThreadCtx->UnsetWork();
555568
ThreadCtx->Wait(0, &StopFlag);
556569
}
557570

@@ -760,10 +773,26 @@ namespace NActors {
760773
}
761774

762775
void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
776+
NHPTimer::STime hpnow = GetCycleCountFast();
777+
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
778+
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
779+
if (activityType == Max<ui64>()) {
780+
Ctx.AddParkedCycles(hpnow - hpprev);
781+
} else {
782+
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
783+
}
763784
Ctx.GetCurrentStats(statsCopy);
764785
}
765786

766787
void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const {
788+
NHPTimer::STime hpnow = GetCycleCountFast();
789+
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
790+
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
791+
if (activityType == Max<ui64>()) {
792+
Ctx.AddParkedCycles(hpnow - hpprev);
793+
} else {
794+
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
795+
}
767796
statsCopy = TExecutorThreadStats();
768797
statsCopy.Aggregate(SharedStats[poolId]);
769798
}

ydb/library/actors/core/executor_thread.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "event.h"
55
#include "callstack.h"
66
#include "probes.h"
7+
#include "thread_context.h"
78
#include "worker_context.h"
89
#include "log_settings.h"
910

@@ -92,7 +93,8 @@ namespace NActors {
9293
ui64 CurrentActorScheduledEventsCounter = 0;
9394

9495
// Thread-specific
95-
TWorkerContext Ctx;
96+
mutable TThreadContext TlsThreadCtx;
97+
mutable TWorkerContext Ctx;
9698
ui64 RevolvingReadCounter = 0;
9799
ui64 RevolvingWriteCounter = 0;
98100
const TString ThreadName;
@@ -104,6 +106,7 @@ namespace NActors {
104106
ui64 SoftProcessingDurationTs;
105107

106108
std::vector<TExecutorThreadStats> SharedStats;
109+
const ui32 ActorSystemIndex;
107110
};
108111

109112
class TExecutorThread: public TGenericExecutorThread {

ydb/library/actors/core/executor_thread_ctx.h

+9-5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "defs.h"
44
#include "thread_context.h"
5+
#include "worker_context.h"
56

67
#include <ydb/library/actors/util/datetime.h>
78
#include <ydb/library/actors/util/threadparkpad.h>
@@ -95,16 +96,19 @@ namespace NActors {
9596
return false;
9697
}
9798

99+
NHPTimer::STime hpnow = GetCycleCountFast();
100+
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
101+
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
102+
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
98103
do {
99-
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
100-
TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart;
101104
if (WaitingPad.Park()) // interrupted
102105
return true;
103-
TlsThreadContext->Timers.HPStart = GetCycleCountFast();
104-
TlsThreadContext->Timers.Parked += TlsThreadContext->Timers.HPStart - TlsThreadContext->Timers.HPNow;
106+
hpnow = GetCycleCountFast();
107+
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
108+
TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev);
105109
state = GetState<TWaitState>();
106110
} while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));
107-
111+
TlsThreadContext->ElapsingActorActivity.store(TlsThreadContext->ActorSystemIndex, std::memory_order_release);
108112
static_cast<TDerived*>(this)->AfterWakeUp(state);
109113
return false;
110114
}

ydb/library/actors/core/thread_context.h

+6-17
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,11 @@
1111
namespace NActors {
1212

1313
class IExecutorPool;
14+
struct TWorkerContext;
1415

1516
template <typename T>
1617
struct TWaitingStats;
1718

18-
struct TTimers {
19-
NHPTimer::STime Elapsed = 0;
20-
NHPTimer::STime Parked = 0;
21-
NHPTimer::STime Blocked = 0;
22-
NHPTimer::STime HPStart = GetCycleCountFast();
23-
NHPTimer::STime HPNow;
24-
25-
void Reset() {
26-
Elapsed = 0;
27-
Parked = 0;
28-
Blocked = 0;
29-
HPStart = GetCycleCountFast();
30-
HPNow = HPStart;
31-
}
32-
};
33-
3419
struct TThreadContext {
3520
IExecutorPool *Pool = nullptr;
3621
ui32 CapturedActivation = 0;
@@ -42,8 +27,12 @@ namespace NActors {
4227
ui16 LocalQueueSize = 0;
4328
TWaitingStats<ui64> *WaitingStats = nullptr;
4429
bool IsCurrentRecipientAService = false;
45-
TTimers Timers;
4630
TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow;
31+
32+
std::atomic<ui64> StartOfElapsingTime = 0;
33+
std::atomic<ui64> ElapsingActorActivity = 0;
34+
TWorkerContext *WorkerCtx = nullptr;
35+
ui32 ActorSystemIndex = 0;
4736
};
4837

4938
extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp

0 commit comments

Comments
 (0)