Skip to content

Commit 8792ae7

Browse files
authored
Merge af87841 into 7156a4e
2 parents 7156a4e + af87841 commit 8792ae7

File tree

5 files changed

+42
-21
lines changed

5 files changed

+42
-21
lines changed

ydb/library/actors/core/executor_pool_io.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,17 @@ namespace NActors {
3737
ThreadQueue.Push(workerId + 1, revolvingCounter);
3838

3939
NHPTimer::STime hpnow = GetCycleCountFast();
40-
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
40+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
4141
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
4242
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
4343

4444
if (threadCtx.WaitingPad.Park())
4545
return 0;
4646

4747
hpnow = GetCycleCountFast();
48-
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
48+
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
4949
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
50-
wctx.AddParkedCycles(hpnow - hpprev);
50+
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
5151
}
5252

5353
while (!StopFlag.load(std::memory_order_acquire)) {

ydb/library/actors/core/executor_thread.cpp

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -199,16 +199,16 @@ namespace NActors {
199199
bool firstEvent = true;
200200
bool preempted = false;
201201
bool wasWorking = false;
202-
NHPTimer::STime hpnow = Ctx.HPStart;
203-
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
202+
NHPTimer::STime hpnow = Ctx.HPStart;
203+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
204204
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
205-
hpprev = Ctx.HPStart;
205+
NHPTimer::STime eventStart = Ctx.HPStart;
206206

207207
for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
208208
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
209209
mailbox->ProcessEvents(mailbox);
210210
recipient = evExt->GetRecipientRewrite();
211-
TActorContext ctx(*mailbox, *this, hpprev, recipient);
211+
TActorContext ctx(*mailbox, *this, eventStart, recipient);
212212
TlsActivationContext = &ctx; // ensure dtor (if any) is called within actor system
213213
// move for destruct before ctx;
214214
auto ev = std::move(evExt);
@@ -250,7 +250,7 @@ namespace NActors {
250250
actor->Receive(ev);
251251

252252
hpnow = GetCycleCountFast();
253-
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
253+
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
254254

255255
mailbox->ProcessEvents(mailbox);
256256
actor->OnDequeueEvent();
@@ -266,7 +266,10 @@ namespace NActors {
266266
if (mailbox->IsEmpty()) // was not-free and become free, we must reclaim mailbox
267267
reclaimAsFree = true;
268268

269-
NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter);
269+
270+
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
271+
Ctx.AddEventProcessingStats(eventStart, hpnow, activityType, CurrentActorScheduledEventsCounter);
272+
NHPTimer::STime elapsed = hpnow - eventStart;
270273
if (elapsed > 1000000) {
271274
LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0);
272275
}
@@ -286,9 +289,10 @@ namespace NActors {
286289
Ctx.IncrementNonDeliveredEvents();
287290
}
288291
hpnow = GetCycleCountFast();
289-
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
292+
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
290293
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
291294
}
295+
eventStart = hpnow;
292296

293297
if (TlsThreadContext->CapturedType == ESendingType::Tail) {
294298
AtomicStore(&mailbox->ScheduleMoment, hpnow);
@@ -777,7 +781,7 @@ namespace NActors {
777781
void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
778782
NHPTimer::STime hpnow = GetCycleCountFast();
779783
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
780-
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
784+
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
781785
if (activityType == Max<ui64>()) {
782786
Ctx.AddParkedCycles(hpnow - hpprev);
783787
} else {
@@ -789,7 +793,7 @@ namespace NActors {
789793
void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const {
790794
NHPTimer::STime hpnow = GetCycleCountFast();
791795
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
792-
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
796+
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
793797
if (activityType == Max<ui64>()) {
794798
Ctx.AddParkedCycles(hpnow - hpprev);
795799
} else {

ydb/library/actors/core/executor_thread_ctx.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,14 @@ namespace NActors {
9797
}
9898

9999
NHPTimer::STime hpnow = GetCycleCountFast();
100-
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
100+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
101101
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
102102
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
103103
do {
104104
if (WaitingPad.Park()) // interrupted
105105
return true;
106106
hpnow = GetCycleCountFast();
107-
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
107+
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
108108
TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev);
109109
state = GetState<TWaitState>();
110110
} while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));

ydb/library/actors/core/thread_context.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "defs.h"
44

5+
#include <atomic>
56
#include <ydb/library/actors/util/datetime.h>
67
#include <ydb/library/actors/queues/mpmc_ring_queue.h>
78

@@ -29,10 +30,23 @@ namespace NActors {
2930
bool IsCurrentRecipientAService = false;
3031
TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow;
3132

32-
std::atomic<ui64> StartOfElapsingTime = 0;
33+
std::atomic<i64> StartOfElapsingTime = 0;
3334
std::atomic<ui64> ElapsingActorActivity = 0;
3435
TWorkerContext *WorkerCtx = nullptr;
3536
ui32 ActorSystemIndex = 0;
37+
38+
ui64 UpdateStartOfElapsingTime(i64 newValue) {
39+
i64 oldValue = StartOfElapsingTime.load(std::memory_order_acquire);
40+
for (;;) {
41+
if (newValue - oldValue <= 0) {
42+
break;
43+
}
44+
if (StartOfElapsingTime.compare_exchange_strong(oldValue, newValue, std::memory_order_acq_rel)) {
45+
break;
46+
}
47+
}
48+
return oldValue;
49+
}
3650
};
3751

3852
extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp

ydb/library/actors/core/worker_context.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,17 @@ namespace NActors {
6161
}
6262

6363
void AddElapsedCycles(ui32 activityType, i64 elapsed) {
64-
Y_DEBUG_ABORT_UNLESS(activityType < Stats->MaxActivityType());
65-
RelaxedStore(&Stats->ElapsedTicks, RelaxedLoad(&Stats->ElapsedTicks) + elapsed);
66-
RelaxedStore(&Stats->ElapsedTicksByActivity[activityType], RelaxedLoad(&Stats->ElapsedTicksByActivity[activityType]) + elapsed);
64+
if (Y_LIKELY(elapsed > 0)) {
65+
Y_DEBUG_ABORT_UNLESS(activityType < Stats->MaxActivityType());
66+
RelaxedStore(&Stats->ElapsedTicks, RelaxedLoad(&Stats->ElapsedTicks) + elapsed);
67+
RelaxedStore(&Stats->ElapsedTicksByActivity[activityType], RelaxedLoad(&Stats->ElapsedTicksByActivity[activityType]) + elapsed);
68+
}
6769
}
6870

6971
void AddParkedCycles(i64 elapsed) {
70-
RelaxedStore(&Stats->ParkedTicks, RelaxedLoad(&Stats->ParkedTicks) + elapsed);
72+
if (Y_LIKELY(elapsed > 0)) {
73+
RelaxedStore(&Stats->ParkedTicks, RelaxedLoad(&Stats->ParkedTicks) + elapsed);
74+
}
7175
}
7276

7377
void AddBlockedCycles(i64 elapsed) {
@@ -136,8 +140,7 @@ namespace NActors {
136140
RelaxedStore(&Stats->ReceivedEvents, RelaxedLoad(&Stats->ReceivedEvents) + 1);
137141
RelaxedStore(&Stats->ReceivedEventsByActivity[activityType], RelaxedLoad(&Stats->ReceivedEventsByActivity[activityType]) + 1);
138142
RelaxedStore(&Stats->ScheduledEventsByActivity[activityType], RelaxedLoad(&Stats->ScheduledEventsByActivity[activityType]) + scheduled);
139-
AddElapsedCycles(activityType, elapsed);
140-
return elapsed;
143+
return std::max<i64>(0, elapsed);
141144
}
142145

143146
void UpdateActorsStats(size_t dyingActorsCnt) {

0 commit comments

Comments
 (0)