diff --git a/ydb/library/actors/core/executor_pool_io.cpp b/ydb/library/actors/core/executor_pool_io.cpp index 7eef8f372139..2c535032db57 100644 --- a/ydb/library/actors/core/executor_pool_io.cpp +++ b/ydb/library/actors/core/executor_pool_io.cpp @@ -37,7 +37,7 @@ namespace NActors { ThreadQueue.Push(workerId + 1, revolvingCounter); NHPTimer::STime hpnow = GetCycleCountFast(); - NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow); TlsThreadContext->ElapsingActorActivity.store(Max(), std::memory_order_release); wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev); @@ -45,7 +45,7 @@ namespace NActors { return 0; hpnow = GetCycleCountFast(); - hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow); TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release); wctx.AddParkedCycles(hpnow - hpprev); } diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp index d386b90d32a9..ce99a87bf923 100644 --- a/ydb/library/actors/core/executor_thread.cpp +++ b/ydb/library/actors/core/executor_thread.cpp @@ -199,16 +199,16 @@ namespace NActors { bool firstEvent = true; bool preempted = false; bool wasWorking = false; - NHPTimer::STime hpnow = Ctx.HPStart; - NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + NHPTimer::STime hpnow = Ctx.HPStart; + NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow); Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev); - hpprev = Ctx.HPStart; + NHPTimer::STime eventStart = Ctx.HPStart; for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) { if (TAutoPtr evExt = mailbox->Pop()) { mailbox->ProcessEvents(mailbox); recipient = evExt->GetRecipientRewrite(); - TActorContext ctx(*mailbox, *this, hpprev, recipient); + TActorContext ctx(*mailbox, *this, eventStart, recipient); TlsActivationContext = &ctx; // ensure dtor (if any) is called within actor system // move for destruct before ctx; auto ev = std::move(evExt); @@ -250,7 +250,7 @@ namespace NActors { actor->Receive(ev); hpnow = GetCycleCountFast(); - hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow); mailbox->ProcessEvents(mailbox); actor->OnDequeueEvent(); @@ -265,8 +265,9 @@ namespace NActors { if (mailbox->IsEmpty()) // was not-free and become free, we must reclaim mailbox reclaimAsFree = true; - - NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter); + + Ctx.AddElapsedCycles(activityType, hpnow - hpprev); + NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(eventStart, hpnow, activityType, CurrentActorScheduledEventsCounter); if (elapsed > 1000000) { LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0); } @@ -286,9 +287,10 @@ namespace NActors { Ctx.IncrementNonDeliveredEvents(); } hpnow = GetCycleCountFast(); - hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow); Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev); } + eventStart = hpnow; if (TlsThreadContext->CapturedType == ESendingType::Tail) { AtomicStore(&mailbox->ScheduleMoment, hpnow); @@ -777,7 +779,7 @@ namespace NActors { void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const { NHPTimer::STime hpnow = GetCycleCountFast(); ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire); - NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow); if (activityType == Max()) { Ctx.AddParkedCycles(hpnow - hpprev); } else { @@ -789,7 +791,7 @@ namespace NActors { void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const { NHPTimer::STime hpnow = GetCycleCountFast(); ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire); - NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow); if (activityType == Max()) { Ctx.AddParkedCycles(hpnow - hpprev); } else { diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h index d88b2de00d35..ebf1d9d80632 100644 --- a/ydb/library/actors/core/executor_thread_ctx.h +++ b/ydb/library/actors/core/executor_thread_ctx.h @@ -97,14 +97,14 @@ namespace NActors { } NHPTimer::STime hpnow = GetCycleCountFast(); - NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow); TlsThreadContext->ElapsingActorActivity.store(Max(), std::memory_order_release); TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev); do { if (WaitingPad.Park()) // interrupted return true; hpnow = GetCycleCountFast(); - hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow); TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev); state = GetState(); } while (static_cast(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed)); diff --git a/ydb/library/actors/core/thread_context.h b/ydb/library/actors/core/thread_context.h index 9bfa8dd45284..3e03d454ce60 100644 --- a/ydb/library/actors/core/thread_context.h +++ b/ydb/library/actors/core/thread_context.h @@ -2,6 +2,7 @@ #include "defs.h" +#include #include #include @@ -29,10 +30,23 @@ namespace NActors { bool IsCurrentRecipientAService = false; TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow; - std::atomic StartOfElapsingTime = 0; + std::atomic StartOfElapsingTime = 0; std::atomic ElapsingActorActivity = 0; TWorkerContext *WorkerCtx = nullptr; ui32 ActorSystemIndex = 0; + + ui64 UpdateStartOfElapsingTime(i64 newValue) { + i64 oldValue = StartOfElapsingTime.load(std::memory_order_acquire); + for (;;) { + if (newValue - oldValue <= 0) { + break; + } + if (StartOfElapsingTime.compare_exchange_strong(oldValue, newValue, std::memory_order_acq_rel)) { + break; + } + } + return oldValue; + } }; extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp diff --git a/ydb/library/actors/core/worker_context.h b/ydb/library/actors/core/worker_context.h index f9ddc90bbb1a..9e045b23c799 100644 --- a/ydb/library/actors/core/worker_context.h +++ b/ydb/library/actors/core/worker_context.h @@ -61,13 +61,17 @@ namespace NActors { } void AddElapsedCycles(ui32 activityType, i64 elapsed) { - Y_DEBUG_ABORT_UNLESS(activityType < Stats->MaxActivityType()); - RelaxedStore(&Stats->ElapsedTicks, RelaxedLoad(&Stats->ElapsedTicks) + elapsed); - RelaxedStore(&Stats->ElapsedTicksByActivity[activityType], RelaxedLoad(&Stats->ElapsedTicksByActivity[activityType]) + elapsed); + if (Y_LIKELY(elapsed > 0)) { + Y_DEBUG_ABORT_UNLESS(activityType < Stats->MaxActivityType()); + RelaxedStore(&Stats->ElapsedTicks, RelaxedLoad(&Stats->ElapsedTicks) + elapsed); + RelaxedStore(&Stats->ElapsedTicksByActivity[activityType], RelaxedLoad(&Stats->ElapsedTicksByActivity[activityType]) + elapsed); + } } void AddParkedCycles(i64 elapsed) { - RelaxedStore(&Stats->ParkedTicks, RelaxedLoad(&Stats->ParkedTicks) + elapsed); + if (Y_LIKELY(elapsed > 0)) { + RelaxedStore(&Stats->ParkedTicks, RelaxedLoad(&Stats->ParkedTicks) + elapsed); + } } void AddBlockedCycles(i64 elapsed) { @@ -136,7 +140,6 @@ namespace NActors { RelaxedStore(&Stats->ReceivedEvents, RelaxedLoad(&Stats->ReceivedEvents) + 1); RelaxedStore(&Stats->ReceivedEventsByActivity[activityType], RelaxedLoad(&Stats->ReceivedEventsByActivity[activityType]) + 1); RelaxedStore(&Stats->ScheduledEventsByActivity[activityType], RelaxedLoad(&Stats->ScheduledEventsByActivity[activityType]) + scheduled); - AddElapsedCycles(activityType, elapsed); return elapsed; }