Skip to content

Commit 66b5733

Browse files
authored
Merge d8776f2 into 7156a4e
2 parents 7156a4e + d8776f2 commit 66b5733

File tree

5 files changed

+42
-21
lines changed

5 files changed

+42
-21
lines changed

ydb/library/actors/core/executor_pool_io.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,17 @@ namespace NActors {
3737
ThreadQueue.Push(workerId + 1, revolvingCounter);
3838

3939
NHPTimer::STime hpnow = GetCycleCountFast();
40-
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
40+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
4141
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
4242
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
4343

4444
if (threadCtx.WaitingPad.Park())
4545
return 0;
4646

4747
hpnow = GetCycleCountFast();
48-
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
48+
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
4949
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
50-
wctx.AddParkedCycles(hpnow - hpprev);
50+
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
5151
}
5252

5353
while (!StopFlag.load(std::memory_order_acquire)) {

ydb/library/actors/core/executor_thread.cpp

+13-9
Original file line numberDiff line numberDiff line change
@@ -199,16 +199,16 @@ namespace NActors {
199199
bool firstEvent = true;
200200
bool preempted = false;
201201
bool wasWorking = false;
202-
NHPTimer::STime hpnow = Ctx.HPStart;
203-
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
202+
NHPTimer::STime hpnow = Ctx.HPStart;
203+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
204204
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
205-
hpprev = Ctx.HPStart;
205+
NHPTimer::STime eventStart = Ctx.HPStart;
206206

207207
for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
208208
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
209209
mailbox->ProcessEvents(mailbox);
210210
recipient = evExt->GetRecipientRewrite();
211-
TActorContext ctx(*mailbox, *this, hpprev, recipient);
211+
TActorContext ctx(*mailbox, *this, eventStart, recipient);
212212
TlsActivationContext = &ctx; // ensure dtor (if any) is called within actor system
213213
// move for destruct before ctx;
214214
auto ev = std::move(evExt);
@@ -250,7 +250,7 @@ namespace NActors {
250250
actor->Receive(ev);
251251

252252
hpnow = GetCycleCountFast();
253-
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
253+
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
254254

255255
mailbox->ProcessEvents(mailbox);
256256
actor->OnDequeueEvent();
@@ -266,7 +266,10 @@ namespace NActors {
266266
if (mailbox->IsEmpty()) // was not-free and become free, we must reclaim mailbox
267267
reclaimAsFree = true;
268268

269-
NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter);
269+
270+
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
271+
Ctx.AddEventProcessingStats(eventStart, hpnow, activityType, CurrentActorScheduledEventsCounter);
272+
NHPTimer::STime elapsed = hpnow - eventStart;
270273
if (elapsed > 1000000) {
271274
LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0);
272275
}
@@ -286,9 +289,10 @@ namespace NActors {
286289
Ctx.IncrementNonDeliveredEvents();
287290
}
288291
hpnow = GetCycleCountFast();
289-
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
292+
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
290293
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
291294
}
295+
eventStart = hpnow;
292296

293297
if (TlsThreadContext->CapturedType == ESendingType::Tail) {
294298
AtomicStore(&mailbox->ScheduleMoment, hpnow);
@@ -777,7 +781,7 @@ namespace NActors {
777781
void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
778782
NHPTimer::STime hpnow = GetCycleCountFast();
779783
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
780-
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
784+
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
781785
if (activityType == Max<ui64>()) {
782786
Ctx.AddParkedCycles(hpnow - hpprev);
783787
} else {
@@ -789,7 +793,7 @@ namespace NActors {
789793
void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const {
790794
NHPTimer::STime hpnow = GetCycleCountFast();
791795
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
792-
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
796+
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
793797
if (activityType == Max<ui64>()) {
794798
Ctx.AddParkedCycles(hpnow - hpprev);
795799
} else {

ydb/library/actors/core/executor_thread_ctx.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,14 @@ namespace NActors {
9797
}
9898

9999
NHPTimer::STime hpnow = GetCycleCountFast();
100-
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
100+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
101101
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
102102
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
103103
do {
104104
if (WaitingPad.Park()) // interrupted
105105
return true;
106106
hpnow = GetCycleCountFast();
107-
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
107+
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
108108
TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev);
109109
state = GetState<TWaitState>();
110110
} while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));

ydb/library/actors/core/thread_context.h

+14
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "defs.h"
44

5+
#include <atomic>
56
#include <ydb/library/actors/util/datetime.h>
67
#include <ydb/library/actors/queues/mpmc_ring_queue.h>
78

@@ -33,6 +34,19 @@ namespace NActors {
3334
std::atomic<ui64> ElapsingActorActivity = 0;
3435
TWorkerContext *WorkerCtx = nullptr;
3536
ui32 ActorSystemIndex = 0;
37+
38+
ui64 UpdateStartOfElapsingTime(ui64 newValue) {
39+
ui64 oldValue = StartOfElapsingTime.load(std::memory_order_acquire);
40+
for (;;) {
41+
if (newValue <= oldValue) {
42+
break;
43+
}
44+
if (StartOfElapsingTime.compare_exchange_strong(oldValue, newValue, std::memory_order_acq_rel)) {
45+
break;
46+
}
47+
}
48+
return oldValue;
49+
}
3650
};
3751

3852
extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp

ydb/library/actors/core/worker_context.h

+10-7
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,17 @@ namespace NActors {
6161
}
6262

6363
void AddElapsedCycles(ui32 activityType, i64 elapsed) {
64-
Y_DEBUG_ABORT_UNLESS(activityType < Stats->MaxActivityType());
65-
RelaxedStore(&Stats->ElapsedTicks, RelaxedLoad(&Stats->ElapsedTicks) + elapsed);
66-
RelaxedStore(&Stats->ElapsedTicksByActivity[activityType], RelaxedLoad(&Stats->ElapsedTicksByActivity[activityType]) + elapsed);
64+
if (Y_LIKELY(elapsed > 0)) {
65+
Y_DEBUG_ABORT_UNLESS(activityType < Stats->MaxActivityType());
66+
RelaxedStore(&Stats->ElapsedTicks, RelaxedLoad(&Stats->ElapsedTicks) + elapsed);
67+
RelaxedStore(&Stats->ElapsedTicksByActivity[activityType], RelaxedLoad(&Stats->ElapsedTicksByActivity[activityType]) + elapsed);
68+
}
6769
}
6870

6971
void AddParkedCycles(i64 elapsed) {
70-
RelaxedStore(&Stats->ParkedTicks, RelaxedLoad(&Stats->ParkedTicks) + elapsed);
72+
if (Y_LIKELY(elapsed > 0)) {
73+
RelaxedStore(&Stats->ParkedTicks, RelaxedLoad(&Stats->ParkedTicks) + elapsed);
74+
}
7175
}
7276

7377
void AddBlockedCycles(i64 elapsed) {
@@ -127,7 +131,7 @@ namespace NActors {
127131
return usecDeliv;
128132
}
129133

130-
i64 AddEventProcessingStats(i64 deliveredTs, i64 processedTs, ui32 activityType, ui64 scheduled) {
134+
i64 AddEventProcessingStats(i64 deliveredTs, i64 processedTs, ui32 activityType) {
131135
i64 elapsed = processedTs - deliveredTs;
132136
ui64 usecElapsed = NHPTimer::GetSeconds(elapsed) * 1000000;
133137
activityType = (activityType >= Stats->MaxActivityType()) ? 0 : activityType;
@@ -136,8 +140,7 @@ namespace NActors {
136140
RelaxedStore(&Stats->ReceivedEvents, RelaxedLoad(&Stats->ReceivedEvents) + 1);
137141
RelaxedStore(&Stats->ReceivedEventsByActivity[activityType], RelaxedLoad(&Stats->ReceivedEventsByActivity[activityType]) + 1);
138142
RelaxedStore(&Stats->ScheduledEventsByActivity[activityType], RelaxedLoad(&Stats->ScheduledEventsByActivity[activityType]) + scheduled);
139-
AddElapsedCycles(activityType, elapsed);
140-
return elapsed;
143+
return std::max<i64>(0, elapsed);
141144
}
142145

143146
void UpdateActorsStats(size_t dyingActorsCnt) {

0 commit comments

Comments
 (0)