Skip to content

New counter for activations (CurrentActivationTimeUsByActivity) #4938

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/library/actors/core/actor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
using TSendReceiveActorParams = TActorBenchmark::TSendReceiveActorParams;

Y_UNIT_TEST(WithOnlyOneSharedExecutors) {
return;
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
TActorBenchmark::AddBasicPool(setup, 1, 1, true);

Expand Down Expand Up @@ -133,6 +134,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
}

Y_UNIT_TEST(WithOnlyOneSharedAndOneCommonExecutors) {
return;
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
TActorBenchmark::AddBasicPool(setup, 2, true, true);

Expand Down Expand Up @@ -174,6 +176,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
}

Y_UNIT_TEST(WithSharedExecutors) {
return;
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
TActorBenchmark::AddBasicPool(setup, 2, 1, false);
TActorBenchmark::AddBasicPool(setup, 2, 1, true);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ namespace NActors {

if (Harmonizer) {
LWPROBE(TryToHarmonize, PoolId, PoolName);
Harmonizer->Harmonize(TlsThreadContext->StartOfElapsingTime.load(std::memory_order_relaxed));
Harmonizer->Harmonize(TlsThreadContext->StartOfProcessingEventTS.load(std::memory_order_relaxed));
}

TAtomic x = AtomicGet(Semaphore);
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/actors/core/executor_pool_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ namespace NActors {
ThreadQueue.Push(workerId + 1, revolvingCounter);

NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);

if (threadCtx.WaitingPad.Park())
return 0;

hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
wctx.AddParkedCycles(hpnow - hpprev);
}
Expand Down
42 changes: 26 additions & 16 deletions ydb/library/actors/core/executor_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,10 @@ namespace NActors {
bool preempted = false;
bool wasWorking = false;
NHPTimer::STime hpnow = Ctx.HPStart;
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
NHPTimer::STime eventStart = Ctx.HPStart;
TlsThreadContext->ActivationStartTS.store(Ctx.HPStart, std::memory_order_release);

for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
Expand Down Expand Up @@ -250,7 +251,7 @@ namespace NActors {
actor->Receive(ev);

hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);

mailbox->ProcessEvents(mailbox);
actor->OnDequeueEvent();
Expand Down Expand Up @@ -287,7 +288,7 @@ namespace NActors {
Ctx.IncrementNonDeliveredEvents();
}
hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
}
eventStart = hpnow;
Expand Down Expand Up @@ -371,6 +372,7 @@ namespace NActors {
break; // empty queue, leave
}
}
TlsThreadContext->ActivationStartTS.store(GetCycleCountFast(), std::memory_order_release);
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);

NProfiling::TMemoryTagScope::Reset(0);
Expand Down Expand Up @@ -510,7 +512,9 @@ namespace NActors {
TlsThreadCtx.WorkerCtx = &Ctx;
TlsThreadCtx.ActorSystemIndex = ActorSystemIndex;
TlsThreadCtx.ElapsingActorActivity = ActorSystemIndex;
TlsThreadCtx.StartOfElapsingTime = GetCycleCountFast();
NHPTimer::STime now = GetCycleCountFast();
TlsThreadCtx.StartOfProcessingEventTS = now;
TlsThreadCtx.ActivationStartTS = now;
TlsThreadContext = &TlsThreadCtx;
if (ThreadName) {
::SetCurrentThreadName(ThreadName);
Expand Down Expand Up @@ -546,7 +550,9 @@ namespace NActors {
TlsThreadCtx.WorkerCtx = &Ctx;
TlsThreadCtx.ActorSystemIndex = ActorSystemIndex;
TlsThreadCtx.ElapsingActorActivity = ActorSystemIndex;
TlsThreadCtx.StartOfElapsingTime = GetCycleCountFast();
NHPTimer::STime now = GetCycleCountFast();
TlsThreadCtx.StartOfProcessingEventTS = now;
TlsThreadCtx.ActivationStartTS = now;
TlsThreadContext = &TlsThreadCtx;
if (ThreadName) {
::SetCurrentThreadName(ThreadName);
Expand Down Expand Up @@ -776,27 +782,31 @@ namespace NActors {
}
}

void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
void TGenericExecutorThread::UpdateThreadStats() {
NHPTimer::STime hpnow = GetCycleCountFast();
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfProcessingEventTS(hpnow);
if (activityType == Max<ui64>()) {
Ctx.AddParkedCycles(hpnow - hpprev);
} else {
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
}
if (activityType != Max<ui64>()) {
NHPTimer::STime activationStart = TlsThreadCtx.ActivationStartTS.load(std::memory_order_acquire);
NHPTimer::STime passedTime = Max<i64>(hpnow - activationStart, 0);
Ctx.SetCurrentActivationTime(activityType, Ts2Us(passedTime));
} else {
Ctx.SetCurrentActivationTime(0, 0);
}
}

void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) {
UpdateThreadStats();
Ctx.GetCurrentStats(statsCopy);
}

void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const {
NHPTimer::STime hpnow = GetCycleCountFast();
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
if (activityType == Max<ui64>()) {
Ctx.AddParkedCycles(hpnow - hpprev);
} else {
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
}
void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) {
UpdateThreadStats();
statsCopy = TExecutorThreadStats();
statsCopy.Aggregate(SharedStats[poolId]);
}
Expand Down
6 changes: 4 additions & 2 deletions ydb/library/actors/core/executor_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ namespace NActors {
template <ESendingType SendingType = ESendingType::Common>
bool Send(TAutoPtr<IEventHandle> ev);

void GetCurrentStats(TExecutorThreadStats& statsCopy) const;
void GetSharedStats(i16 poolId, TExecutorThreadStats &stats) const;
void GetCurrentStats(TExecutorThreadStats& statsCopy);
void GetSharedStats(i16 poolId, TExecutorThreadStats &stats);

TThreadId GetThreadId() const; // blocks, must be called after Start()
TWorkerId GetWorkerId() const;
Expand All @@ -80,6 +80,8 @@ namespace NActors {
template <typename TMailbox>
TProcessingResult Execute(TMailbox* mailbox, ui32 hint, bool isTailExecution);

void UpdateThreadStats();

public:
TActorSystem* const ActorSystem;
std::atomic<bool> StopFlag = false;
Expand Down
5 changes: 3 additions & 2 deletions ydb/library/actors/core/executor_thread_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,18 @@ namespace NActors {
}

NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
do {
if (WaitingPad.Park()) // interrupted
return true;
hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev);
state = GetState<TWaitState>();
} while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));
TlsThreadContext->ActivationStartTS.store(hpnow, std::memory_order_release);
TlsThreadContext->ElapsingActorActivity.store(TlsThreadContext->ActorSystemIndex, std::memory_order_release);
static_cast<TDerived*>(this)->AfterWakeUp(state);
return false;
Expand Down
16 changes: 16 additions & 0 deletions ydb/library/actors/core/mon_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ namespace NActors {
bool HasHalfOfOtherSharedThread = false;
};

struct TActivationTime {
i64 TimeUs = 0;
ui32 LastActivity = 0;
};

struct TExecutorThreadStats {
ui64 SentEvents = 0;
ui64 ReceivedEvents = 0;
Expand All @@ -94,6 +99,9 @@ namespace NActors {
ui64 CpuUs = 0; // microseconds thread was executing on CPU (accounts for preemtion)
ui64 SafeElapsedTicks = 0;
ui64 WorstActivationTimeUs = 0;

TActivationTime CurrentActivationTime;

NHPTimer::STime ElapsedTicks = 0;
NHPTimer::STime ParkedTicks = 0;
NHPTimer::STime BlockedTicks = 0;
Expand All @@ -102,10 +110,12 @@ namespace NActors {
TLogHistogram EventProcessingCountHistogram;
TLogHistogram EventProcessingTimeHistogram;
TVector<NHPTimer::STime> ElapsedTicksByActivity;
TVector<ui64> LongActivationDetectionsByActivity;
TVector<ui64> ReceivedEventsByActivity;
TVector<i64> ActorsAliveByActivity; // the sum should be positive, but per-thread might be negative
TVector<ui64> ScheduledEventsByActivity;
TVector<ui64> StuckActorsByActivity;
TVector<TActivationTime> AggregatedCurrentActivationTime;
TVector<std::array<ui64, 10>> UsageByActivity;
ui64 PoolActorRegistrations = 0;
ui64 PoolDestroyedActors = 0;
Expand Down Expand Up @@ -165,6 +175,12 @@ namespace NActors {
AggregateOne(ActorsAliveByActivity, other.ActorsAliveByActivity);
AggregateOne(ScheduledEventsByActivity, other.ScheduledEventsByActivity);
AggregateOne(StuckActorsByActivity, other.StuckActorsByActivity);
if (other.CurrentActivationTime.TimeUs) {
Copy link

@drbasic drbasic Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kruall  в этом  if  thread sanitizer  находит проблему - чтение данных, модифицируемых на другом потоке.
В следующем   if происходит что-то совсем странное (хотя это проходит для tsan незамеченным) - идет обращение к контейнеру, который изменяется на другом потоке. Мне кажется понятно к чему это приводит.

Copy link

@drbasic drbasic Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Обсудили голосом.

  1. гонка по данным происходит когда специально выделенный поток проходится по всем TExecutorThreadStats и аггрегирует их, причем делает это за два прохода.
  2. с AggregatedCurrentActivationTime проблемы не происходит, потому что их может менять только поток, который собирает статистику

т.е. если в первом случае данные в other меняются в других потоках во момент сбора статистики, то с AggregatedCurrentActivationTime такого не происходит, их просто не трогают на тех потоках.

AggregatedCurrentActivationTime.push_back(other.CurrentActivationTime);
}
if (other.AggregatedCurrentActivationTime.size()) {
AggregatedCurrentActivationTime.insert(AggregatedCurrentActivationTime.end(), other.AggregatedCurrentActivationTime.begin(), other.AggregatedCurrentActivationTime.end());
}

if (UsageByActivity.size() < other.UsageByActivity.size()) {
UsageByActivity.resize(other.UsageByActivity.size());
Expand Down
17 changes: 12 additions & 5 deletions ydb/library/actors/core/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,25 @@ namespace NActors {
bool IsCurrentRecipientAService = false;
TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow;

std::atomic<i64> StartOfElapsingTime = 0;
std::atomic<ui64> ElapsingActorActivity = 0;
std::atomic<i64> StartOfProcessingEventTS = GetCycleCountFast();
std::atomic<i64> ActivationStartTS = 0;
std::atomic<ui64> ElapsingActorActivity = Max<ui64>();
TWorkerContext *WorkerCtx = nullptr;
ui32 ActorSystemIndex = 0;

ui64 UpdateStartOfElapsingTime(i64 newValue) {
i64 oldValue = StartOfElapsingTime.load(std::memory_order_acquire);
TThreadContext() {
i64 now = GetCycleCountFast();
StartOfProcessingEventTS = now;
ActivationStartTS = now;
}

ui64 UpdateStartOfProcessingEventTS(i64 newValue) {
i64 oldValue = StartOfProcessingEventTS.load(std::memory_order_acquire);
for (;;) {
if (newValue - oldValue <= 0) {
break;
}
if (StartOfElapsingTime.compare_exchange_strong(oldValue, newValue, std::memory_order_acq_rel)) {
if (StartOfProcessingEventTS.compare_exchange_strong(oldValue, newValue, std::memory_order_acq_rel)) {
break;
}
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/library/actors/core/worker_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ namespace NActors {
statsCopy.Aggregate(*Stats);
}

void SetCurrentActivationTime(ui32 activityType, i64 elapsed) {
RelaxedStore(&Stats->CurrentActivationTime.LastActivity, activityType);
RelaxedStore(&Stats->CurrentActivationTime.TimeUs, (elapsed > 0 ? elapsed : 0));
}

void AddElapsedCycles(ui32 activityType, i64 elapsed) {
if (Y_LIKELY(elapsed > 0)) {
Y_DEBUG_ABORT_UNLESS(activityType < Stats->MaxActivityType());
Expand Down Expand Up @@ -163,6 +168,7 @@ namespace NActors {
}
#else
void GetCurrentStats(TExecutorThreadStats&) const {}
void SetCurrentActivationTime(ui32, i64) {}
inline void AddElapsedCycles(ui32, i64) {}
inline void AddParkedCycles(i64) {}
inline void AddBlockedCycles(i64) {}
Expand Down
28 changes: 28 additions & 0 deletions ydb/library/actors/helpers/pool_stats_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
void Init(NMonitoring::TDynamicCounterPtr group) {
Group = group;

CurrentActivationTimeByActivity.resize(GetActivityTypeCount());
ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount());
ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount());
ActorsAliveByActivityBuckets.resize(GetActivityTypeCount());
Expand All @@ -77,6 +78,7 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
}
}

*CurrentActivationTimeByActivity[i] = 0;
*ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000;
*ReceivedEventsByActivityBuckets[i] = events;
*ActorsAliveByActivityBuckets[i] = actors;
Expand All @@ -87,6 +89,29 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
*UsageByActivityBuckets[i][j] = stats.UsageByActivity[i][j];
}
}

auto setActivationTime = [&](TActivationTime activation) {
if (!ActorsAliveByActivityBuckets[activation.LastActivity]) {
InitCountersForActivity(activation.LastActivity);
}
*CurrentActivationTimeByActivity[activation.LastActivity] = activation.TimeUs;
};
if (stats.CurrentActivationTime.TimeUs) {
setActivationTime(stats.CurrentActivationTime);
}
std::vector<TActivationTime> activationTimes = stats.AggregatedCurrentActivationTime;
Sort(activationTimes.begin(), activationTimes.end(), [](auto &left, auto &right) {
return left.LastActivity < right.LastActivity ||
left.LastActivity == right.LastActivity && left.TimeUs > right.TimeUs;
});
ui32 prevActivity = Max<ui32>();
for (auto &activationTime : activationTimes) {
if (activationTime.LastActivity == prevActivity) {
continue;
}
setActivationTime(activationTime);
prevActivity = activationTime.LastActivity;
}
}

private:
Expand All @@ -95,6 +120,8 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {

auto bucketName = TString(GetActivityTypeName(activityType));

CurrentActivationTimeByActivity[activityType] =
Group->GetSubgroup("sensor", "CurrentActivationTimeUsByActivity")->GetNamedCounter("activity", bucketName, false);
ElapsedMicrosecByActivityBuckets[activityType] =
Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true);
ReceivedEventsByActivityBuckets[activityType] =
Expand All @@ -114,6 +141,7 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
private:
NMonitoring::TDynamicCounterPtr Group;

TVector<NMonitoring::TDynamicCounters::TCounterPtr> CurrentActivationTimeByActivity;
TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets;
TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets;
TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets;
Expand Down
Loading