Skip to content

Commit 83500a1

Browse files
committed
Stats reporting refactoring
1 parent 4af7b01 commit 83500a1

File tree

4 files changed

+34
-32
lines changed

4 files changed

+34
-32
lines changed

ydb/core/kqp/node_service/kqp_node_service.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ namespace {
4343
// Min interval between stats send from scan/compute actor to executor
4444
constexpr TDuration MinStatInterval = TDuration::MilliSeconds(20);
4545
// Max interval in case of no activety
46-
constexpr TDuration MaxStatInterval = TDuration::MilliSeconds(100);
46+
constexpr TDuration MaxStatInterval = TDuration::Seconds(1);
4747

4848
template <class TTasksCollection>
4949
TString TasksIdsStr(const TTasksCollection& tasks) {

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
134134
TComputeActorAsyncInputHelperAsync CreateInputHelper(const TString& logPrefix,
135135
ui64 index,
136136
NDqProto::EWatermarksMode watermarksMode
137-
)
137+
)
138138
{
139139
return TComputeActorAsyncInputHelperAsync(logPrefix, index, watermarksMode, Cookie, ProcessSourcesState.Inflight);
140140
}
@@ -808,7 +808,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
808808
}
809809
ProcessOutputsImpl(status);
810810
if (status == ERunStatus::Finished) {
811-
ReportStats(TInstant::Now(), ESendStats::IfPossible);
811+
ReportStats();
812812
}
813813

814814
if (UseCpuQuota()) {

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

+9-22
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
359359
MemoryQuota->TryShrinkMemory(alloc);
360360
}
361361

362-
ReportStats(TInstant::Now(), ESendStats::IfPossible);
362+
ReportStats();
363363
}
364364
if (Terminated) {
365365
DoTerminateImpl();
@@ -1131,9 +1131,10 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
11311131
}
11321132
case EEvWakeupTag::PeriodicStatsTag: {
11331133
const auto maxInterval = RuntimeSettings.ReportStatsSettings->MaxInterval;
1134-
this->Schedule(maxInterval, new NActors::TEvents::TEvWakeup(EEvWakeupTag::PeriodicStatsTag));
1135-
1136-
ReportStats(NActors::TActivationContext::Now(), ESendStats::IfRequired);
1134+
if (Running && ProcessOutputsState.LastRunStatus != ERunStatus::Finished) {
1135+
DoExecute();
1136+
this->Schedule(maxInterval, new NActors::TEvents::TEvWakeup(EEvWakeupTag::PeriodicStatsTag));
1137+
}
11371138
break;
11381139
}
11391140
default:
@@ -1985,26 +1986,12 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
19851986
}
19861987

19871988
protected:
1988-
enum class ESendStats {
1989-
IfPossible,
1990-
IfRequired
1991-
};
1992-
void ReportStats(TInstant now, ESendStats condition) {
1993-
if (!RuntimeSettings.ReportStatsSettings) {
1989+
void ReportStats() {
1990+
auto now = TInstant::Now();
1991+
if (!RuntimeSettings.ReportStatsSettings || now - LastSendStatsTime < RuntimeSettings.ReportStatsSettings->MinInterval) {
19941992
return;
19951993
}
1996-
auto dT = now - LastSendStatsTime;
1997-
switch(condition) {
1998-
case ESendStats::IfPossible:
1999-
if (dT < RuntimeSettings.ReportStatsSettings->MinInterval) {
2000-
return;
2001-
}
2002-
break;
2003-
case ESendStats::IfRequired:
2004-
if (dT < RuntimeSettings.ReportStatsSettings->MaxInterval) {
2005-
return;
2006-
}
2007-
}
1994+
20081995
auto evState = std::make_unique<TEvDqCompute::TEvState>();
20091996
evState->Record.SetState(NDqProto::COMPUTE_STATE_EXECUTING);
20101997
evState->Record.SetTaskId(Task.GetId());

ydb/library/yql/dq/runtime/dq_tasks_runner.cpp

+22-7
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,7 @@ class TDqTaskRunner : public IDqTaskRunner {
524524
const IDqTaskRunnerExecutionContext& execCtx) override
525525
{
526526
TaskId = task.GetId();
527+
StageId = task.GetStageId();
527528
auto entry = BuildTask(task);
528529

529530
LOG(TStringBuilder() << "Prepare task: " << TaskId);
@@ -980,6 +981,7 @@ class TDqTaskRunner : public IDqTaskRunner {
980981
TIntrusivePtr<TSpillingTaskCounters> SpillingTaskCounters;
981982

982983
ui64 TaskId = 0;
984+
ui32 StageId = 0;
983985
TDqTaskRunnerContext Context;
984986
TDqTaskRunnerSettings Settings;
985987
TLogFunc LogFunc;
@@ -1044,28 +1046,41 @@ class TDqTaskRunner : public IDqTaskRunner {
10441046
std::unique_ptr<NUdf::IPgBuilder> PgBuilder_ = CreatePgBuilder();
10451047

10461048
void StartWaitingInput() {
1047-
if (!StartWaitInputTime) {
1048-
StartWaitInputTime = TInstant::Now();
1049+
auto now = TInstant::Now();
1050+
if (Y_LIKELY(StartWaitInputTime)) {
1051+
auto delta = now - *StartWaitInputTime;
1052+
if (Y_LIKELY(Stats->StartTs)) {
1053+
Stats->WaitInputTime += delta;
1054+
} else {
1055+
Stats->WaitStartTime += delta;
1056+
}
10491057
}
1058+
StartWaitInputTime = now;
10501059
}
10511060

10521061
void StartWaitingOutput() {
1053-
if (!StartWaitOutputTime) {
1054-
StartWaitOutputTime = TInstant::Now();
1062+
auto now = TInstant::Now();
1063+
if (Y_LIKELY(StartWaitOutputTime)) {
1064+
auto delta = now - *StartWaitOutputTime;
1065+
Stats->WaitOutputTime += delta;
10551066
}
1067+
StartWaitOutputTime = now;
10561068
}
10571069

10581070
void StopWaiting() {
1071+
auto now = TInstant::Now();
10591072
if (StartWaitInputTime) {
1073+
auto delta = now - *StartWaitInputTime;
10601074
if (Y_LIKELY(Stats->StartTs)) {
1061-
Stats->WaitInputTime += (TInstant::Now() - *StartWaitInputTime);
1075+
Stats->WaitInputTime += delta;
10621076
} else {
1063-
Stats->WaitStartTime += (TInstant::Now() - *StartWaitInputTime);
1077+
Stats->WaitStartTime += delta;
10641078
}
10651079
StartWaitInputTime.reset();
10661080
}
10671081
if (StartWaitOutputTime) {
1068-
Stats->WaitOutputTime += (TInstant::Now() - *StartWaitOutputTime);
1082+
auto delta = now - *StartWaitOutputTime;
1083+
Stats->WaitOutputTime += delta;
10691084
StartWaitOutputTime.reset();
10701085
}
10711086
}

0 commit comments

Comments
 (0)