Skip to content

Commit b6fd10e

Browse files
committed
Refactor TR stats + CurrentWaitInput/OutputTime
1 parent 83500a1 commit b6fd10e

10 files changed

+124
-58
lines changed

Diff for: ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

+36
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,19 @@ ui64 NonZeroMin(ui64 a, ui64 b) {
1212
return (b == 0) ? a : ((a == 0 || a > b) ? b : a);
1313
}
1414

15+
ui64 ExportMaxStats(std::vector<ui64>& data);
16+
17+
void TMaxStats::Resize(ui32 count) {
18+
Values.resize(count);
19+
}
20+
21+
void TMaxStats::Set(ui32 index, ui64 value) {
22+
Y_ASSERT(index < Values.size());
23+
auto isMonotonic = value >= Values[index];
24+
Values[index] = value;
25+
MaxValue = isMonotonic ? (value > MaxValue ? value : MaxValue) : ExportMaxStats(Values);
26+
}
27+
1528
void TTimeSeriesStats::ExportAggStats(NYql::NDqProto::TDqStatsAggr& stats) {
1629
NKikimr::NKqp::ExportAggStats(Values, stats);
1730
}
@@ -272,6 +285,8 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
272285

273286
WaitInputTimeUs.Resize(taskCount);
274287
WaitOutputTimeUs.Resize(taskCount);
288+
CurrentWaitInputTimeUs.Resize(taskCount);
289+
CurrentWaitOutputTimeUs.Resize(taskCount);
275290

276291
SpillingComputeBytes.Resize(taskCount);
277292
SpillingChannelBytes.Resize(taskCount);
@@ -456,6 +471,8 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
456471
SetNonZero(DurationUs, index, durationUs);
457472
WaitInputTimeUs.SetNonZero(index, taskStats.GetWaitInputTimeUs());
458473
WaitOutputTimeUs.SetNonZero(index, taskStats.GetWaitOutputTimeUs());
474+
CurrentWaitInputTimeUs.Set(index, taskStats.GetCurrentWaitInputTimeUs());
475+
CurrentWaitOutputTimeUs.Set(index, taskStats.GetCurrentWaitOutputTimeUs());
459476

460477
SpillingComputeBytes.SetNonZero(index, taskStats.GetSpillingComputeWriteBytes());
461478
SpillingChannelBytes.SetNonZero(index, taskStats.GetSpillingChannelWriteBytes());
@@ -1170,6 +1187,25 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD
11701187

11711188
// SIMD-friendly aggregations are below. Compiler is able to vectorize sum/count, but needs help with min/max
11721189

1190+
ui64 ExportMaxStats(std::vector<ui64>& data) {
1191+
1192+
Y_DEBUG_ABORT_UNLESS((data.size() & 3) == 0);
1193+
1194+
ui64 max4[4] = {0, 0, 0, 0};
1195+
1196+
for (auto it = data.begin(); it < data.end(); it += 4) {
1197+
max4[0] = max4[0] > it[0] ? max4[0] : it[0];
1198+
max4[1] = max4[1] > it[1] ? max4[1] : it[1];
1199+
max4[2] = max4[2] > it[2] ? max4[2] : it[2];
1200+
max4[3] = max4[3] > it[3] ? max4[3] : it[3];
1201+
}
1202+
1203+
ui64 max01 = max4[0] > max4[1] ? max4[0] : max4[1];
1204+
ui64 max23 = max4[2] > max4[3] ? max4[2] : max4[3];
1205+
1206+
return max01 > max23 ? max01 : max23;
1207+
}
1208+
11731209
void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) {
11741210

11751211
Y_DEBUG_ABORT_UNLESS((data.size() & 3) == 0);

Diff for: ydb/core/kqp/executer_actor/kqp_executer_stats.h

+10
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ NYql::NDqProto::EDqStatsMode GetDqStatsModeShard(Ydb::Table::QueryStatsCollectio
1616
bool CollectFullStats(Ydb::Table::QueryStatsCollection::Mode statsMode);
1717
bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode);
1818

19+
struct TMaxStats {
20+
std::vector<ui64> Values;
21+
ui64 MaxValue = 0;
22+
23+
void Resize(ui32 count);
24+
void Set(ui32 index, ui64 value);
25+
};
26+
1927
struct TTimeSeriesStats {
2028
std::vector<ui64> Values;
2129
ui32 HistorySampleCount = 0;
@@ -200,6 +208,8 @@ struct TStageExecutionStats {
200208
std::vector<ui64> DurationUs;
201209
TTimeSeriesStats WaitInputTimeUs;
202210
TTimeSeriesStats WaitOutputTimeUs;
211+
TMaxStats CurrentWaitInputTimeUs;
212+
TMaxStats CurrentWaitOutputTimeUs;
203213

204214
TTimeSeriesStats SpillingComputeBytes;
205215
TTimeSeriesStats SpillingChannelBytes;

Diff for: ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -1057,7 +1057,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
10571057
return &ProfileStats;
10581058
}
10591059

1060-
const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() override {
1060+
const NYql::NDq::TDqTaskRunnerStats* GetTaskRunnerStats() override {
10611061
return TaskRunnerStats.Get();
10621062
}
10631063

Diff for: ydb/library/yql/dq/actors/compute/dq_compute_actor.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ using TTaskRunnerFactory = std::function<
387387

388388
void FillAsyncStats(NDqProto::TDqAsyncBufferStats& proto, TDqAsyncStats stats);
389389

390-
void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& taskStats,
390+
void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TDqTaskRunnerStats& taskStats,
391391
NDqProto::TDqTaskStats* protoTask, TCollectStatsLevel level);
392392

393393
NActors::IActor* CreateDqComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask* task,

Diff for: ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -1699,7 +1699,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
16991699
}
17001700
}
17011701

1702-
virtual const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() = 0;
1702+
virtual const NYql::NDq::TDqTaskRunnerStats* GetTaskRunnerStats() = 0;
17031703
virtual const NYql::NDq::TDqMeteringStats* GetMeteringStats() = 0;
17041704

17051705
virtual const IDqAsyncOutputBuffer* GetSink(ui64 outputIdx, const TAsyncOutputInfoBase& sinkInfo) const = 0;

Diff for: ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp

+45-41
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ void MergeMaxTs(TInstant& current, const TInstant value) {
4242
}
4343
}
4444

45-
void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& taskStats,
45+
void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TDqTaskRunnerStats& taskStats,
4646
NDqProto::TDqTaskStats* protoTask, TCollectStatsLevel level)
4747
{
4848
if (StatsLevelCollectNone(level)) {
@@ -65,53 +65,57 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase&
6565

6666
protoTask->SetWaitInputTimeUs(taskStats.WaitInputTime.MicroSeconds());
6767
protoTask->SetWaitOutputTimeUs(taskStats.WaitOutputTime.MicroSeconds());
68+
protoTask->SetCurrentWaitInputTimeUs(taskStats.CurrentWaitInputTime.MicroSeconds());
69+
protoTask->SetCurrentWaitOutputTimeUs(taskStats.CurrentWaitOutputTime.MicroSeconds());
6870

69-
protoTask->SetSpillingComputeWriteBytes(taskStats.SpillingComputeWriteBytes);
70-
protoTask->SetSpillingChannelWriteBytes(taskStats.SpillingChannelWriteBytes);
71+
if (StatsLevelCollectFull(level)) {
72+
protoTask->SetSpillingComputeWriteBytes(taskStats.SpillingComputeWriteBytes);
73+
protoTask->SetSpillingChannelWriteBytes(taskStats.SpillingChannelWriteBytes);
7174

72-
protoTask->SetSpillingComputeReadTimeUs(taskStats.SpillingComputeReadTime.MicroSeconds());
73-
protoTask->SetSpillingComputeWriteTimeUs(taskStats.SpillingComputeWriteTime.MicroSeconds());
74-
protoTask->SetSpillingChannelReadTimeUs(taskStats.SpillingChannelReadTime.MicroSeconds());
75-
protoTask->SetSpillingChannelWriteTimeUs(taskStats.SpillingChannelWriteTime.MicroSeconds());
75+
protoTask->SetSpillingComputeReadTimeUs(taskStats.SpillingComputeReadTime.MicroSeconds());
76+
protoTask->SetSpillingComputeWriteTimeUs(taskStats.SpillingComputeWriteTime.MicroSeconds());
77+
protoTask->SetSpillingChannelReadTimeUs(taskStats.SpillingChannelReadTime.MicroSeconds());
78+
protoTask->SetSpillingChannelWriteTimeUs(taskStats.SpillingChannelWriteTime.MicroSeconds());
7679

77-
if (StatsLevelCollectProfile(level)) {
78-
if (taskStats.ComputeCpuTimeByRun) {
79-
auto snapshot = taskStats.ComputeCpuTimeByRun->Snapshot();
80-
for (ui32 i = 0; i < snapshot->Count(); i++) {
81-
auto* protoBucket = protoTask->AddComputeCpuTimeByRun();
82-
protoBucket->SetBound(snapshot->UpperBound(i));
83-
protoBucket->SetValue(snapshot->Value(i));
80+
if (StatsLevelCollectProfile(level)) {
81+
if (taskStats.ComputeCpuTimeByRun) {
82+
auto snapshot = taskStats.ComputeCpuTimeByRun->Snapshot();
83+
for (ui32 i = 0; i < snapshot->Count(); i++) {
84+
auto* protoBucket = protoTask->AddComputeCpuTimeByRun();
85+
protoBucket->SetBound(snapshot->UpperBound(i));
86+
protoBucket->SetValue(snapshot->Value(i));
87+
}
8488
}
85-
}
8689

87-
for (const auto& stat : taskStats.MkqlStats) {
88-
auto* s = protoTask->MutableMkqlStats()->Add();
89-
s->SetName(TString(stat.Key.GetName()));
90-
s->SetValue(stat.Value);
91-
s->SetDeriv(stat.Key.IsDeriv());
90+
for (const auto& stat : taskStats.MkqlStats) {
91+
auto* s = protoTask->MutableMkqlStats()->Add();
92+
s->SetName(TString(stat.Key.GetName()));
93+
s->SetValue(stat.Value);
94+
s->SetDeriv(stat.Key.IsDeriv());
95+
}
9296
}
93-
}
9497

95-
for (const auto& opStat : taskStats.OperatorStat) {
96-
auto& op = *protoTask->MutableOperators()->Add();
97-
op.SetOperatorId(opStat.OperatorId);
98-
op.SetBytes(std::max<i64>(0, opStat.Bytes));
99-
op.SetRows(std::max<i64>(0, opStat.Rows));
100-
switch (opStat.OperatorType) {
101-
case TOperatorType::Join: {
102-
op.MutableJoin();
103-
}
104-
break;
105-
case TOperatorType::Filter: {
106-
op.MutableFilter();
107-
}
108-
break;
109-
case TOperatorType::Aggregation: {
110-
op.MutableAggregation();
111-
}
112-
break;
113-
default:
114-
break;
98+
for (const auto& opStat : taskStats.OperatorStat) {
99+
auto& op = *protoTask->MutableOperators()->Add();
100+
op.SetOperatorId(opStat.OperatorId);
101+
op.SetBytes(std::max<i64>(0, opStat.Bytes));
102+
op.SetRows(std::max<i64>(0, opStat.Rows));
103+
switch (opStat.OperatorType) {
104+
case TOperatorType::Join: {
105+
op.MutableJoin();
106+
}
107+
break;
108+
case TOperatorType::Filter: {
109+
op.MutableFilter();
110+
}
111+
break;
112+
case TOperatorType::Aggregation: {
113+
op.MutableAggregation();
114+
}
115+
break;
116+
default:
117+
break;
118+
}
115119
}
116120
}
117121

Diff for: ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
260260
);
261261
}
262262

263-
const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() override {
263+
const NYql::NDq::TDqTaskRunnerStats* GetTaskRunnerStats() override {
264264
return TaskRunner ? TaskRunner->GetStats() : nullptr;
265265
}
266266

Diff for: ydb/library/yql/dq/actors/protos/dq_stats.proto

+2
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,8 @@ message TDqTaskStats {
291291
reserved 104;
292292
uint64 WaitInputTimeUs = 111; // wait input wall time (any input: channels, source, ...)
293293
uint64 WaitOutputTimeUs = 105; // wait output wall time (any output: channels, sinks, ...)
294+
uint64 CurrentWaitInputTimeUs = 167; // last periof iff is waiting now (mutually exclusive with CurrentWaitOutputTimeUs)
295+
uint64 CurrentWaitOutputTimeUs = 168; // last periof iff is waiting now (mutually exclusive with CurrentWaitInputTimeUs)
294296
reserved 107;
295297
reserved 108;
296298
reserved 109;

Diff for: ydb/library/yql/dq/runtime/dq_tasks_runner.cpp

+16-1
Original file line numberDiff line numberDiff line change
@@ -776,14 +776,24 @@ class TDqTaskRunner : public IDqTaskRunner {
776776
if (Y_LIKELY(CollectBasic())) {
777777
switch (runStatus) {
778778
case ERunStatus::Finished:
779+
// finished => waiting for nothing
780+
Stats->CurrentWaitInputTime = TDuration::Zero();
781+
Stats->CurrentWaitOutputTime = TDuration::Zero();
779782
Stats->FinishTs = TInstant::Now();
780783
break;
781784
case ERunStatus::PendingInput:
782-
if (!InputConsumed) {
785+
// output is checked first => not waiting for output
786+
Stats->CurrentWaitOutputTime = TDuration::Zero();
787+
if (Y_LIKELY(InputConsumed)) {
788+
// did smth => waiting for nothing
789+
Stats->CurrentWaitInputTime = TDuration::Zero();
790+
} else {
783791
StartWaitingInput();
784792
}
785793
break;
786794
case ERunStatus::PendingOutput:
795+
// waiting for output => not waiting for input
796+
Stats->CurrentWaitInputTime = TDuration::Zero();
787797
StartWaitingOutput();
788798
break;
789799
}
@@ -1054,6 +1064,7 @@ class TDqTaskRunner : public IDqTaskRunner {
10541064
} else {
10551065
Stats->WaitStartTime += delta;
10561066
}
1067+
Stats->CurrentWaitInputTime += delta;
10571068
}
10581069
StartWaitInputTime = now;
10591070
}
@@ -1063,6 +1074,7 @@ class TDqTaskRunner : public IDqTaskRunner {
10631074
if (Y_LIKELY(StartWaitOutputTime)) {
10641075
auto delta = now - *StartWaitOutputTime;
10651076
Stats->WaitOutputTime += delta;
1077+
Stats->CurrentWaitOutputTime += delta;
10661078
}
10671079
StartWaitOutputTime = now;
10681080
}
@@ -1076,11 +1088,14 @@ class TDqTaskRunner : public IDqTaskRunner {
10761088
} else {
10771089
Stats->WaitStartTime += delta;
10781090
}
1091+
Stats->CurrentWaitInputTime += delta;
10791092
StartWaitInputTime.reset();
1093+
TDuration::Zero();
10801094
}
10811095
if (StartWaitOutputTime) {
10821096
auto delta = now - *StartWaitOutputTime;
10831097
Stats->WaitOutputTime += delta;
1098+
Stats->CurrentWaitOutputTime += delta;
10841099
StartWaitOutputTime.reset();
10851100
}
10861101
}

Diff for: ydb/library/yql/dq/runtime/dq_tasks_runner.h

+11-12
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ struct TMkqlStat {
4646
i64 Value = 0;
4747
};
4848

49-
struct TTaskRunnerStatsBase {
49+
struct TDqTaskRunnerStats {
5050
// basic stats
5151
TDuration BuildCpuTime;
5252
TInstant CreateTs;
@@ -57,6 +57,8 @@ struct TTaskRunnerStatsBase {
5757
TDuration WaitStartTime;
5858
TDuration WaitInputTime;
5959
TDuration WaitOutputTime;
60+
TDuration CurrentWaitInputTime;
61+
TDuration CurrentWaitOutputTime;
6062

6163
ui64 SpillingComputeWriteBytes;
6264
ui64 SpillingChannelWriteBytes;
@@ -76,17 +78,14 @@ struct TTaskRunnerStatsBase {
7678
TVector<TMkqlStat> MkqlStats;
7779
TVector<TOperatorStat> OperatorStat;
7880

79-
TTaskRunnerStatsBase() = default;
80-
TTaskRunnerStatsBase(TTaskRunnerStatsBase&&) = default;
81-
TTaskRunnerStatsBase& operator=(TTaskRunnerStatsBase&&) = default;
81+
TDqTaskRunnerStats() = default;
82+
TDqTaskRunnerStats(TDqTaskRunnerStats&&) = default;
83+
TDqTaskRunnerStats& operator=(TDqTaskRunnerStats&&) = default;
8284

83-
virtual ~TTaskRunnerStatsBase() = default;
85+
virtual ~TDqTaskRunnerStats() = default;
8486
};
8587

86-
struct TDqTaskRunnerStats : public TTaskRunnerStatsBase {
87-
};
88-
89-
// Provides read access to TTaskRunnerStatsBase
88+
// Provides read access to TDqTaskRunnerStats
9089
// May or may not own the underlying object
9190
class TDqTaskRunnerStatsView {
9291
public:
@@ -106,7 +105,7 @@ class TDqTaskRunnerStatsView {
106105
, ActorElapsedTicks(actorElapsedTicks) {
107106
}
108107

109-
const TTaskRunnerStatsBase* Get() {
108+
const TDqTaskRunnerStats* Get() {
110109
if (!IsDefined) {
111110
return nullptr;
112111
}
@@ -465,8 +464,8 @@ TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(
465464
} // namespace NYql::NDq
466465

467466
template <>
468-
inline void Out<NYql::NDq::TTaskRunnerStatsBase>(IOutputStream& os, TTypeTraits<NYql::NDq::TTaskRunnerStatsBase>::TFuncParam stats) {
469-
os << "TTaskRunnerStatsBase:" << Endl
467+
inline void Out<NYql::NDq::TDqTaskRunnerStats>(IOutputStream& os, TTypeTraits<NYql::NDq::TDqTaskRunnerStats>::TFuncParam stats) {
468+
os << "TDqTaskRunnerStats:" << Endl
470469
<< "\tBuildCpuTime: " << stats.BuildCpuTime << Endl
471470
<< "\tStartTs: " << stats.StartTs << Endl
472471
<< "\tFinishTs: " << stats.FinishTs << Endl

0 commit comments

Comments
 (0)