Skip to content

Commit 407562e

Browse files
committed
fix walking back StartOfElapsingTime
1 parent ed038e2 commit 407562e

File tree

4 files changed

+61
-27
lines changed

4 files changed

+61
-27
lines changed

ydb/library/actors/core/executor_pool_io.cpp

+8-4
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,21 @@ namespace NActors {
3737
ThreadQueue.Push(workerId + 1, revolvingCounter);
3838

3939
NHPTimer::STime hpnow = GetCycleCountFast();
40-
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
40+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
4141
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
42-
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
42+
if (Y_LIKELY(hpprev < hpnow)) {
43+
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
44+
}
4345

4446
if (threadCtx.WaitingPad.Park())
4547
return 0;
4648

4749
hpnow = GetCycleCountFast();
48-
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
50+
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
4951
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
50-
wctx.AddParkedCycles(hpnow - hpprev);
52+
if (Y_LIKELY(hpprev < hpnow)) {
53+
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
54+
}
5155
}
5256

5357
while (!StopFlag.load(std::memory_order_acquire)) {

ydb/library/actors/core/executor_thread.cpp

+31-19
Original file line numberDiff line numberDiff line change
@@ -199,16 +199,18 @@ namespace NActors {
199199
bool firstEvent = true;
200200
bool preempted = false;
201201
bool wasWorking = false;
202-
NHPTimer::STime hpnow = Ctx.HPStart;
203-
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
204-
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
205-
hpprev = Ctx.HPStart;
202+
NHPTimer::STime hpnow = Ctx.HPStart;
203+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
204+
if (Y_LIKELY(hpprev < hpnow)) {
205+
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
206+
}
207+
NHPTimer::STime eventStart = Ctx.HPStart;
206208

207209
for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
208210
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
209211
mailbox->ProcessEvents(mailbox);
210212
recipient = evExt->GetRecipientRewrite();
211-
TActorContext ctx(*mailbox, *this, hpprev, recipient);
213+
TActorContext ctx(*mailbox, *this, eventStart, recipient);
212214
TlsActivationContext = &ctx; // ensure dtor (if any) is called within actor system
213215
// move for destruct before ctx;
214216
auto ev = std::move(evExt);
@@ -250,7 +252,7 @@ namespace NActors {
250252
actor->Receive(ev);
251253

252254
hpnow = GetCycleCountFast();
253-
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
255+
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
254256

255257
mailbox->ProcessEvents(mailbox);
256258
actor->OnDequeueEvent();
@@ -266,7 +268,10 @@ namespace NActors {
266268
if (mailbox->IsEmpty()) // was not-free and become free, we must reclaim mailbox
267269
reclaimAsFree = true;
268270

269-
NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter);
271+
if (Y_LIKELY(hpprev < hpnow)) {
272+
Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter);
273+
}
274+
NHPTimer::STime elapsed = hpnow - eventStart;
270275
if (elapsed > 1000000) {
271276
LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0);
272277
}
@@ -286,9 +291,12 @@ namespace NActors {
286291
Ctx.IncrementNonDeliveredEvents();
287292
}
288293
hpnow = GetCycleCountFast();
289-
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
290-
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
294+
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
295+
if (Y_LIKELY(hpprev < hpnow)) {
296+
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
297+
}
291298
}
299+
eventStart = hpnow;
292300

293301
if (TlsThreadContext->CapturedType == ESendingType::Tail) {
294302
AtomicStore(&mailbox->ScheduleMoment, hpnow);
@@ -777,23 +785,27 @@ namespace NActors {
777785
void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
778786
NHPTimer::STime hpnow = GetCycleCountFast();
779787
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
780-
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
781-
if (activityType == Max<ui64>()) {
782-
Ctx.AddParkedCycles(hpnow - hpprev);
783-
} else {
784-
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
788+
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
789+
if (Y_LIKELY(hpprev < hpnow)) {
790+
if (activityType == Max<ui64>()) {
791+
Ctx.AddParkedCycles(hpnow - hpprev);
792+
} else {
793+
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
794+
}
785795
}
786796
Ctx.GetCurrentStats(statsCopy);
787797
}
788798

789799
void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const {
790800
NHPTimer::STime hpnow = GetCycleCountFast();
791801
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
792-
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
793-
if (activityType == Max<ui64>()) {
794-
Ctx.AddParkedCycles(hpnow - hpprev);
795-
} else {
796-
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
802+
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
803+
if (Y_LIKELY(hpprev < hpnow)) {
804+
if (activityType == Max<ui64>()) {
805+
Ctx.AddParkedCycles(hpnow - hpprev);
806+
} else {
807+
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
808+
}
797809
}
798810
statsCopy = TExecutorThreadStats();
799811
statsCopy.Aggregate(SharedStats[poolId]);

ydb/library/actors/core/executor_thread_ctx.h

+8-4
Original file line numberDiff line numberDiff line change
@@ -97,15 +97,19 @@ namespace NActors {
9797
}
9898

9999
NHPTimer::STime hpnow = GetCycleCountFast();
100-
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
100+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
101101
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
102-
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
102+
if (Y_LIKELY(hpprev < hpnow)) {
103+
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
104+
}
103105
do {
104106
if (WaitingPad.Park()) // interrupted
105107
return true;
106108
hpnow = GetCycleCountFast();
107-
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
108-
TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev);
109+
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
110+
if (Y_LIKELY(hpprev < hpnow)) {
111+
TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev);
112+
}
109113
state = GetState<TWaitState>();
110114
} while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));
111115
TlsThreadContext->ElapsingActorActivity.store(TlsThreadContext->ActorSystemIndex, std::memory_order_release);

ydb/library/actors/core/thread_context.h

+14
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "defs.h"
44

5+
#include <atomic>
56
#include <ydb/library/actors/util/datetime.h>
67
#include <ydb/library/actors/queues/mpmc_ring_queue.h>
78

@@ -33,6 +34,19 @@ namespace NActors {
3334
std::atomic<ui64> ElapsingActorActivity = 0;
3435
TWorkerContext *WorkerCtx = nullptr;
3536
ui32 ActorSystemIndex = 0;
37+
38+
ui64 UpdateStartOfElapsingTime(ui64 newValue) {
39+
ui64 oldValue = StartOfElapsingTime.load(std::memory_order_acquire);
40+
for (;;) {
41+
if (newValue <= oldValue) {
42+
break;
43+
}
44+
if (StartOfElapsingTime.compare_exchange_strong(oldValue, newValue, std::memory_order_acq_rel)) {
45+
break;
46+
}
47+
}
48+
return oldValue;
49+
}
3650
};
3751

3852
extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp

0 commit comments

Comments
 (0)