Skip to content

Commit fbd8f5a

Browse files
authored
Merge 9c8092d into b392a83
2 parents b392a83 + 9c8092d commit fbd8f5a

15 files changed

+312
-111
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

+6-2
Original file line numberDiff line numberDiff line change
@@ -1841,7 +1841,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
18411841
flags));
18421842
}
18431843

1844-
NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(),
1844+
NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(),
18451845
Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
18461846

18471847
ResponseEv->Orbit.Fork(evData->Orbit);
@@ -1878,7 +1878,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
18781878

18791879
auto traceId = ExecuterSpan.GetTraceId();
18801880

1881-
NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(),
1881+
NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(),
18821882
Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
18831883

18841884
auto shardsToString = [](const auto& shards) {
@@ -2101,6 +2101,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
21012101
return;
21022102
}
21032103

2104+
if (Stats) {
2105+
Stats->Prepare();
2106+
}
2107+
21042108
THashMap<ui64, TVector<NDqProto::TDqTask*>> datashardTasks; // shardId -> [task]
21052109
THashMap<ui64, TVector<ui64>> remoteComputeTasks; // shardId -> [task]
21062110
TVector<ui64> computeTasks;

ydb/core/kqp/executer_actor/kqp_executer_impl.h

+8
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,14 @@ class TKqpExecuterBase : public TActor<TDerived> {
430430
ui64 cycleCount = GetCycleCountFast();
431431

432432
Stats->UpdateTaskStats(taskId, state.GetStats(), (NYql::NDqProto::EComputeState) state.GetState());
433+
434+
if (Stats->DeadlockedStageId) {
435+
NYql::TIssues issues;
436+
issues.AddIssue(TStringBuilder() << "Deadlock detected: stage " << *Stats->DeadlockedStageId << " waits for input while peer(s) wait for output");
437+
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::CANCELLED, issues);
438+
this->Send(this->SelfId(), abortEv.Release());
439+
}
440+
433441
if (Request.ProgressStatsPeriod) {
434442
auto now = TInstant::Now();
435443
if (LastProgressStats + Request.ProgressStatsPeriod <= now) {

ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

+132-3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,33 @@ ui64 NonZeroMin(ui64 a, ui64 b) {
1212
return (b == 0) ? a : ((a == 0 || a > b) ? b : a);
1313
}
1414

15+
ui64 ExportMinStats(std::vector<ui64>& data);
16+
ui64 ExportMaxStats(std::vector<ui64>& data);
17+
18+
void TMinStats::Resize(ui32 count) {
19+
Values.resize(count);
20+
}
21+
22+
void TMinStats::Set(ui32 index, ui64 value) {
23+
Y_ASSERT(index < Values.size());
24+
auto maybeMin = Values[index] == MinValue;
25+
Values[index] = value;
26+
if (maybeMin) {
27+
MinValue = ExportMinStats(Values);
28+
}
29+
}
30+
31+
void TMaxStats::Resize(ui32 count) {
32+
Values.resize(count);
33+
}
34+
35+
void TMaxStats::Set(ui32 index, ui64 value) {
36+
Y_ASSERT(index < Values.size());
37+
auto isMonotonic = value >= Values[index];
38+
Values[index] = value;
39+
MaxValue = isMonotonic ? (value > MaxValue ? value : MaxValue) : ExportMaxStats(Values);
40+
}
41+
1542
void TTimeSeriesStats::ExportAggStats(NYql::NDqProto::TDqStatsAggr& stats) {
1643
NKikimr::NKqp::ExportAggStats(Values, stats);
1744
}
@@ -272,6 +299,8 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
272299

273300
WaitInputTimeUs.Resize(taskCount);
274301
WaitOutputTimeUs.Resize(taskCount);
302+
CurrentWaitInputTimeUs.Resize(taskCount);
303+
CurrentWaitOutputTimeUs.Resize(taskCount);
275304

276305
SpillingComputeBytes.Resize(taskCount);
277306
SpillingChannelBytes.Resize(taskCount);
@@ -456,6 +485,8 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
456485
SetNonZero(DurationUs, index, durationUs);
457486
WaitInputTimeUs.SetNonZero(index, taskStats.GetWaitInputTimeUs());
458487
WaitOutputTimeUs.SetNonZero(index, taskStats.GetWaitOutputTimeUs());
488+
CurrentWaitInputTimeUs.Set(index, taskStats.GetCurrentWaitInputTimeUs());
489+
CurrentWaitOutputTimeUs.Set(index, taskStats.GetCurrentWaitOutputTimeUs());
459490

460491
SpillingComputeBytes.SetNonZero(index, taskStats.GetSpillingComputeWriteBytes());
461492
SpillingChannelBytes.SetNonZero(index, taskStats.GetSpillingChannelWriteBytes());
@@ -572,6 +603,23 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
572603
return baseTimeMs;
573604
}
574605

606+
bool TStageExecutionStats::IsDeadlocked(ui64 deadline) {
607+
if (CurrentWaitInputTimeUs.MinValue < deadline || InputStages.empty()) {
608+
return false;
609+
}
610+
611+
for (auto stat : InputStages) {
612+
if (stat->CurrentWaitOutputTimeUs.MinValue < deadline && !stat->IsFinished()) {
613+
return false;
614+
}
615+
}
616+
return true;
617+
}
618+
619+
bool TStageExecutionStats::IsFinished() {
620+
return FinishedCount == Task2Index.size();
621+
}
622+
575623
namespace {
576624

577625
TTableStat operator - (const TTableStat& l, const TTableStat& r) {
@@ -739,6 +787,37 @@ bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode) {
739787
return statsMode >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE;
740788
}
741789

790+
void TQueryExecutionStats::Prepare() {
791+
if (CollectFullStats(StatsMode)) {
792+
// stages
793+
for (auto& [stageId, info] : TasksGraph->GetStagesInfo()) {
794+
auto [it, inserted] = StageStats.try_emplace(stageId);
795+
Y_ENSURE(inserted);
796+
it->second.StageId = stageId;
797+
}
798+
// connections
799+
for (auto& [_, stageStats] : StageStats) {
800+
auto& info = TasksGraph->GetStageInfo(stageStats.StageId);
801+
auto& stage = info.Meta.GetStage(info.Id);
802+
for (const auto& input : stage.GetInputs()) {
803+
auto& peerStageStats = StageStats[NYql::NDq::TStageId(stageStats.StageId.TxId, input.GetStageIndex())];
804+
stageStats.InputStages.push_back(&peerStageStats);
805+
peerStageStats.OutputStages.push_back(&stageStats);
806+
}
807+
}
808+
// tasks
809+
for (auto& task : TasksGraph->GetTasks()) {
810+
auto& stageStats = StageStats[task.StageId];
811+
stageStats.Task2Index.emplace(task.Id, stageStats.Task2Index.size());
812+
}
813+
for (auto& [_, stageStats] : StageStats) {
814+
stageStats.TaskCount = (stageStats.Task2Index.size() + 3) & ~3;
815+
stageStats.Resize(stageStats.TaskCount);
816+
}
817+
}
818+
}
819+
820+
742821
void TQueryExecutionStats::FillStageDurationUs(NYql::NDqProto::TDqStageStats& stats) {
743822
if (stats.HasStartTimeMs() && stats.HasFinishTimeMs()) {
744823
auto startTimeMs = stats.GetStartTimeMs().GetMin();
@@ -1165,17 +1244,67 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD
11651244
Y_ASSERT(stats.GetTasks().size() == 1);
11661245
const NYql::NDqProto::TDqTaskStats& taskStats = stats.GetTasks(0);
11671246
Y_ASSERT(taskStats.GetTaskId() == taskId);
1168-
auto stageId = taskStats.GetStageId();
1247+
auto stageId = TasksGraph->GetTask(taskId).StageId;
11691248
auto [it, inserted] = StageStats.try_emplace(stageId);
11701249
if (inserted) {
1171-
it->second.StageId = TasksGraph->GetTask(taskStats.GetTaskId()).StageId;
1250+
it->second.StageId = stageId;
11721251
it->second.SetHistorySampleCount(HistorySampleCount);
11731252
}
11741253
BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, state, stats.GetMaxMemoryUsage(), stats.GetDurationUs()));
1254+
1255+
constexpr ui64 deadline = 60'000'000; // 60s
1256+
if (it->second.CurrentWaitOutputTimeUs.MinValue > deadline) {
1257+
for (auto stat : it->second.OutputStages) {
1258+
if (stat->IsDeadlocked(deadline)) {
1259+
DeadlockedStageId = stat->StageId.StageId;
1260+
break;
1261+
}
1262+
}
1263+
} else if (it->second.IsDeadlocked(deadline)) {
1264+
DeadlockedStageId = it->second.StageId.StageId;
1265+
}
11751266
}
11761267

11771268
// SIMD-friendly aggregations are below. Compiler is able to vectorize sum/count, but needs help with min/max
11781269

1270+
ui64 ExportMinStats(std::vector<ui64>& data) {
1271+
1272+
Y_DEBUG_ABORT_UNLESS((data.size() & 3) == 0);
1273+
1274+
ui64 min4[4] = {0, 0, 0, 0};
1275+
1276+
for (auto it = data.begin(); it < data.end(); it += 4) {
1277+
min4[0] = min4[0] ? (it[0] ? (min4[0] < it[0] ? min4[0] : it[0]) : min4[0]) : it[0];
1278+
min4[1] = min4[1] ? (it[1] ? (min4[1] < it[1] ? min4[1] : it[1]) : min4[1]) : it[1];
1279+
min4[2] = min4[2] ? (it[2] ? (min4[2] < it[2] ? min4[2] : it[2]) : min4[2]) : it[2];
1280+
min4[3] = min4[3] ? (it[3] ? (min4[3] < it[3] ? min4[3] : it[3]) : min4[3]) : it[3];
1281+
}
1282+
1283+
ui64 min01 = min4[0] ? (min4[1] ? (min4[0] < min4[1] ? min4[0] : min4[1]) : min4[0]) : min4[1];
1284+
ui64 min23 = min4[2] ? (min4[3] ? (min4[2] < min4[3] ? min4[2] : min4[3]) : min4[2]) : min4[3];
1285+
1286+
return min01 ? (min23 ? (min01 < min23 ? min01 : min23) : min01) : min23;
1287+
}
1288+
1289+
ui64 ExportMaxStats(std::vector<ui64>& data) {
1290+
1291+
Y_DEBUG_ABORT_UNLESS((data.size() & 3) == 0);
1292+
1293+
ui64 max4[4] = {0, 0, 0, 0};
1294+
1295+
for (auto it = data.begin(); it < data.end(); it += 4) {
1296+
max4[0] = max4[0] > it[0] ? max4[0] : it[0];
1297+
max4[1] = max4[1] > it[1] ? max4[1] : it[1];
1298+
max4[2] = max4[2] > it[2] ? max4[2] : it[2];
1299+
max4[3] = max4[3] > it[3] ? max4[3] : it[3];
1300+
}
1301+
1302+
ui64 max01 = max4[0] > max4[1] ? max4[0] : max4[1];
1303+
ui64 max23 = max4[2] > max4[3] ? max4[2] : max4[3];
1304+
1305+
return max01 > max23 ? max01 : max23;
1306+
}
1307+
11791308
void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) {
11801309

11811310
Y_DEBUG_ABORT_UNLESS((data.size() & 3) == 0);
@@ -1562,7 +1691,7 @@ void TQueryExecutionStats::Finish() {
15621691
}
15631692

15641693
AdjustBaseTime(stageStats);
1565-
auto it = StageStats.find(stageId.StageId);
1694+
auto it = StageStats.find(stageId);
15661695
if (it != StageStats.end()) {
15671696
it->second.ExportHistory(BaseTimeMs, *stageStats);
15681697
}

ydb/core/kqp/executer_actor/kqp_executer_stats.h

+31-6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,22 @@ NYql::NDqProto::EDqStatsMode GetDqStatsModeShard(Ydb::Table::QueryStatsCollectio
1616
bool CollectFullStats(Ydb::Table::QueryStatsCollection::Mode statsMode);
1717
bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode);
1818

19+
struct TMinStats {
20+
std::vector<ui64> Values;
21+
ui64 MinValue = 0;
22+
23+
void Resize(ui32 count);
24+
void Set(ui32 index, ui64 value);
25+
};
26+
27+
struct TMaxStats {
28+
std::vector<ui64> Values;
29+
ui64 MaxValue = 0;
30+
31+
void Resize(ui32 count);
32+
void Set(ui32 index, ui64 value);
33+
};
34+
1935
struct TTimeSeriesStats {
2036
std::vector<ui64> Values;
2137
ui32 HistorySampleCount = 0;
@@ -200,6 +216,8 @@ struct TStageExecutionStats {
200216
std::vector<ui64> DurationUs;
201217
TTimeSeriesStats WaitInputTimeUs;
202218
TTimeSeriesStats WaitOutputTimeUs;
219+
TMinStats CurrentWaitInputTimeUs;
220+
TMinStats CurrentWaitOutputTimeUs;
203221

204222
TTimeSeriesStats SpillingComputeBytes;
205223
TTimeSeriesStats SpillingChannelBytes;
@@ -219,9 +237,11 @@ struct TStageExecutionStats {
219237
TTimeSeriesStats MaxMemoryUsage;
220238

221239
ui32 HistorySampleCount = 0;
222-
ui32 TaskCount = 0;
240+
ui32 TaskCount = 0; // rounded to 4 value of Task2Index.size(), which is actual
223241
std::vector<bool> Finished;
224242
ui32 FinishedCount = 0;
243+
std::vector<TStageExecutionStats*> InputStages;
244+
std::vector<TStageExecutionStats*> OutputStages;
225245

226246
void Resize(ui32 taskCount);
227247
ui32 EstimateMem() {
@@ -235,6 +255,8 @@ struct TStageExecutionStats {
235255
void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStageStats& stageStats);
236256
ui64 UpdateAsyncStats(ui32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats);
237257
ui64 UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, NYql::NDqProto::EComputeState state, ui64 maxMemoryUsage, ui64 durationUs);
258+
bool IsDeadlocked(ui64 deadline);
259+
bool IsFinished();
238260
};
239261

240262
struct TExternalPartitionStat {
@@ -257,12 +279,12 @@ struct TIngressExternalPartitionStat {
257279

258280
struct TQueryExecutionStats {
259281
private:
260-
std::map<ui32, std::map<ui32, ui32>> ShardsCountByNode;
261-
std::map<ui32, bool> UseLlvmByStageId;
262-
std::map<ui32, TStageExecutionStats> StageStats;
263-
std::map<ui32, TIngressExternalPartitionStat> ExternalPartitionStats; // FIXME: several ingresses
282+
std::unordered_map<ui32, std::map<ui32, ui32>> ShardsCountByNode;
283+
std::unordered_map<ui32, bool> UseLlvmByStageId;
284+
THashMap<NYql::NDq::TStageId, TStageExecutionStats> StageStats;
285+
std::unordered_map<ui32, TIngressExternalPartitionStat> ExternalPartitionStats; // FIXME: several ingresses
264286
ui64 BaseTimeMs = 0;
265-
std::map<ui32, TDuration> LongestTaskDurations;
287+
std::unordered_map<ui32, TDuration> LongestTaskDurations;
266288
void ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto::TDqAsyncStatsAggr& stats);
267289
void ExportAggAsyncBufferStats(TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats);
268290
void AdjustExternalAggr(NYql::NDqProto::TDqExternalAggrStats& stats);
@@ -274,6 +296,7 @@ struct TQueryExecutionStats {
274296
const Ydb::Table::QueryStatsCollection::Mode StatsMode;
275297
const TKqpTasksGraph* const TasksGraph = nullptr;
276298
NYql::NDqProto::TDqExecutionStats* const Result;
299+
std::optional<ui32> DeadlockedStageId;
277300

278301
// basic stats
279302
std::unordered_set<ui64> AffectedShards;
@@ -307,6 +330,8 @@ struct TQueryExecutionStats {
307330
HistorySampleCount = 32;
308331
}
309332

333+
void Prepare();
334+
310335
void AddComputeActorStats(
311336
ui32 nodeId,
312337
NYql::NDqProto::TDqComputeActorStats&& stats,

ydb/core/kqp/node_service/kqp_node_service.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ namespace {
4343
// Min interval between stats send from scan/compute actor to executor
4444
constexpr TDuration MinStatInterval = TDuration::MilliSeconds(20);
4545
// Max interval in case of no activety
46-
constexpr TDuration MaxStatInterval = TDuration::MilliSeconds(100);
46+
constexpr TDuration MaxStatInterval = TDuration::Seconds(1);
4747

4848
template <class TTasksCollection>
4949
TString TasksIdsStr(const TTasksCollection& tasks) {

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
134134
TComputeActorAsyncInputHelperAsync CreateInputHelper(const TString& logPrefix,
135135
ui64 index,
136136
NDqProto::EWatermarksMode watermarksMode
137-
)
137+
)
138138
{
139139
return TComputeActorAsyncInputHelperAsync(logPrefix, index, watermarksMode, Cookie, ProcessSourcesState.Inflight);
140140
}
@@ -808,7 +808,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
808808
}
809809
ProcessOutputsImpl(status);
810810
if (status == ERunStatus::Finished) {
811-
ReportStats(TInstant::Now(), ESendStats::IfPossible);
811+
ReportStats();
812812
}
813813

814814
if (UseCpuQuota()) {
@@ -1057,7 +1057,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
10571057
return &ProfileStats;
10581058
}
10591059

1060-
const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() override {
1060+
const NYql::NDq::TDqTaskRunnerStats* GetTaskRunnerStats() override {
10611061
return TaskRunnerStats.Get();
10621062
}
10631063

ydb/library/yql/dq/actors/compute/dq_compute_actor.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ using TTaskRunnerFactory = std::function<
390390

391391
void FillAsyncStats(NDqProto::TDqAsyncBufferStats& proto, TDqAsyncStats stats);
392392

393-
void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& taskStats,
393+
void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TDqTaskRunnerStats& taskStats,
394394
NDqProto::TDqTaskStats* protoTask, TCollectStatsLevel level);
395395

396396
NActors::IActor* CreateDqComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask* task,

0 commit comments

Comments
 (0)