Skip to content

Commit e884754

Browse files
authored
fix walking back StartOfElapsingTime (#4555)
1 parent 9bce709 commit e884754

File tree

5 files changed

+39
-20
lines changed

5 files changed

+39
-20
lines changed

ydb/library/actors/core/executor_pool_io.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ namespace NActors {
3737
ThreadQueue.Push(workerId + 1, revolvingCounter);
3838

3939
NHPTimer::STime hpnow = GetCycleCountFast();
40-
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
40+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
4141
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
4242
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
4343

4444
if (threadCtx.WaitingPad.Park())
4545
return 0;
4646

4747
hpnow = GetCycleCountFast();
48-
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
48+
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
4949
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
5050
wctx.AddParkedCycles(hpnow - hpprev);
5151
}

ydb/library/actors/core/executor_thread.cpp

+12-10
Original file line numberDiff line numberDiff line change
@@ -199,16 +199,16 @@ namespace NActors {
199199
bool firstEvent = true;
200200
bool preempted = false;
201201
bool wasWorking = false;
202-
NHPTimer::STime hpnow = Ctx.HPStart;
203-
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
202+
NHPTimer::STime hpnow = Ctx.HPStart;
203+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
204204
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
205-
hpprev = Ctx.HPStart;
205+
NHPTimer::STime eventStart = Ctx.HPStart;
206206

207207
for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
208208
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
209209
mailbox->ProcessEvents(mailbox);
210210
recipient = evExt->GetRecipientRewrite();
211-
TActorContext ctx(*mailbox, *this, hpprev, recipient);
211+
TActorContext ctx(*mailbox, *this, eventStart, recipient);
212212
TlsActivationContext = &ctx; // ensure dtor (if any) is called within actor system
213213
// move for destruct before ctx;
214214
auto ev = std::move(evExt);
@@ -250,7 +250,7 @@ namespace NActors {
250250
actor->Receive(ev);
251251

252252
hpnow = GetCycleCountFast();
253-
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
253+
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
254254

255255
mailbox->ProcessEvents(mailbox);
256256
actor->OnDequeueEvent();
@@ -265,8 +265,9 @@ namespace NActors {
265265

266266
if (mailbox->IsEmpty()) // was not-free and become free, we must reclaim mailbox
267267
reclaimAsFree = true;
268-
269-
NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter);
268+
269+
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
270+
NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(eventStart, hpnow, activityType, CurrentActorScheduledEventsCounter);
270271
if (elapsed > 1000000) {
271272
LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0);
272273
}
@@ -286,9 +287,10 @@ namespace NActors {
286287
Ctx.IncrementNonDeliveredEvents();
287288
}
288289
hpnow = GetCycleCountFast();
289-
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
290+
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
290291
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
291292
}
293+
eventStart = hpnow;
292294

293295
if (TlsThreadContext->CapturedType == ESendingType::Tail) {
294296
AtomicStore(&mailbox->ScheduleMoment, hpnow);
@@ -777,7 +779,7 @@ namespace NActors {
777779
void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
778780
NHPTimer::STime hpnow = GetCycleCountFast();
779781
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
780-
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
782+
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
781783
if (activityType == Max<ui64>()) {
782784
Ctx.AddParkedCycles(hpnow - hpprev);
783785
} else {
@@ -789,7 +791,7 @@ namespace NActors {
789791
void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const {
790792
NHPTimer::STime hpnow = GetCycleCountFast();
791793
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
792-
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
794+
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
793795
if (activityType == Max<ui64>()) {
794796
Ctx.AddParkedCycles(hpnow - hpprev);
795797
} else {

ydb/library/actors/core/executor_thread_ctx.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,14 @@ namespace NActors {
9797
}
9898

9999
NHPTimer::STime hpnow = GetCycleCountFast();
100-
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
100+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
101101
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
102102
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
103103
do {
104104
if (WaitingPad.Park()) // interrupted
105105
return true;
106106
hpnow = GetCycleCountFast();
107-
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
107+
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
108108
TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev);
109109
state = GetState<TWaitState>();
110110
} while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));

ydb/library/actors/core/thread_context.h

+15-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "defs.h"
44

5+
#include <atomic>
56
#include <ydb/library/actors/util/datetime.h>
67
#include <ydb/library/actors/queues/mpmc_ring_queue.h>
78

@@ -29,10 +30,23 @@ namespace NActors {
2930
bool IsCurrentRecipientAService = false;
3031
TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow;
3132

32-
std::atomic<ui64> StartOfElapsingTime = 0;
33+
std::atomic<i64> StartOfElapsingTime = 0;
3334
std::atomic<ui64> ElapsingActorActivity = 0;
3435
TWorkerContext *WorkerCtx = nullptr;
3536
ui32 ActorSystemIndex = 0;
37+
38+
ui64 UpdateStartOfElapsingTime(i64 newValue) {
39+
i64 oldValue = StartOfElapsingTime.load(std::memory_order_acquire);
40+
for (;;) {
41+
if (newValue - oldValue <= 0) {
42+
break;
43+
}
44+
if (StartOfElapsingTime.compare_exchange_strong(oldValue, newValue, std::memory_order_acq_rel)) {
45+
break;
46+
}
47+
}
48+
return oldValue;
49+
}
3650
};
3751

3852
extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp

ydb/library/actors/core/worker_context.h

+8-5
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,17 @@ namespace NActors {
6161
}
6262

6363
void AddElapsedCycles(ui32 activityType, i64 elapsed) {
64-
Y_DEBUG_ABORT_UNLESS(activityType < Stats->MaxActivityType());
65-
RelaxedStore(&Stats->ElapsedTicks, RelaxedLoad(&Stats->ElapsedTicks) + elapsed);
66-
RelaxedStore(&Stats->ElapsedTicksByActivity[activityType], RelaxedLoad(&Stats->ElapsedTicksByActivity[activityType]) + elapsed);
64+
if (Y_LIKELY(elapsed > 0)) {
65+
Y_DEBUG_ABORT_UNLESS(activityType < Stats->MaxActivityType());
66+
RelaxedStore(&Stats->ElapsedTicks, RelaxedLoad(&Stats->ElapsedTicks) + elapsed);
67+
RelaxedStore(&Stats->ElapsedTicksByActivity[activityType], RelaxedLoad(&Stats->ElapsedTicksByActivity[activityType]) + elapsed);
68+
}
6769
}
6870

6971
void AddParkedCycles(i64 elapsed) {
70-
RelaxedStore(&Stats->ParkedTicks, RelaxedLoad(&Stats->ParkedTicks) + elapsed);
72+
if (Y_LIKELY(elapsed > 0)) {
73+
RelaxedStore(&Stats->ParkedTicks, RelaxedLoad(&Stats->ParkedTicks) + elapsed);
74+
}
7175
}
7276

7377
void AddBlockedCycles(i64 elapsed) {
@@ -136,7 +140,6 @@ namespace NActors {
136140
RelaxedStore(&Stats->ReceivedEvents, RelaxedLoad(&Stats->ReceivedEvents) + 1);
137141
RelaxedStore(&Stats->ReceivedEventsByActivity[activityType], RelaxedLoad(&Stats->ReceivedEventsByActivity[activityType]) + 1);
138142
RelaxedStore(&Stats->ScheduledEventsByActivity[activityType], RelaxedLoad(&Stats->ScheduledEventsByActivity[activityType]) + scheduled);
139-
AddElapsedCycles(activityType, elapsed);
140143
return elapsed;
141144
}
142145

0 commit comments

Comments
 (0)