Skip to content

Commit f0f1aba

Browse files
committed
Deadlock detection
1 parent b6fd10e commit f0f1aba

File tree

7 files changed

+152
-19
lines changed

7 files changed

+152
-19
lines changed

Diff for: ydb/core/kqp/executer_actor/kqp_data_executer.cpp

+6-2
Original file line numberDiff line numberDiff line change
@@ -1841,7 +1841,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
18411841
flags));
18421842
}
18431843

1844-
NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(),
1844+
NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(),
18451845
Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
18461846

18471847
ResponseEv->Orbit.Fork(evData->Orbit);
@@ -1878,7 +1878,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
18781878

18791879
auto traceId = ExecuterSpan.GetTraceId();
18801880

1881-
NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(),
1881+
NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(),
18821882
Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
18831883

18841884
auto shardsToString = [](const auto& shards) {
@@ -2101,6 +2101,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
21012101
return;
21022102
}
21032103

2104+
if (Stats) {
2105+
Stats->Prepare();
2106+
}
2107+
21042108
THashMap<ui64, TVector<NDqProto::TDqTask*>> datashardTasks; // shardId -> [task]
21052109
THashMap<ui64, TVector<ui64>> remoteComputeTasks; // shardId -> [task]
21062110
TVector<ui64> computeTasks;

Diff for: ydb/core/kqp/executer_actor/kqp_executer_impl.h

+8
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,14 @@ class TKqpExecuterBase : public TActor<TDerived> {
430430
ui64 cycleCount = GetCycleCountFast();
431431

432432
Stats->UpdateTaskStats(taskId, state.GetStats(), (NYql::NDqProto::EComputeState) state.GetState());
433+
434+
if (Stats->DeadlockedStageId) {
435+
NYql::TIssues issues;
436+
issues.AddIssue(TStringBuilder() << "Deadlock detected: stage " << *Stats->DeadlockedStageId << " waits for input while peer(s) wait for output");
437+
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::CANCELLED, issues);
438+
this->Send(this->SelfId(), abortEv.Release());
439+
}
440+
433441
if (Request.ProgressStatsPeriod) {
434442
auto now = TInstant::Now();
435443
if (LastProgressStats + Request.ProgressStatsPeriod <= now) {

Diff for: ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

+93
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,22 @@ ui64 NonZeroMin(ui64 a, ui64 b) {
1212
return (b == 0) ? a : ((a == 0 || a > b) ? b : a);
1313
}
1414

15+
ui64 ExportMinStats(std::vector<ui64>& data);
1516
ui64 ExportMaxStats(std::vector<ui64>& data);
1617

18+
void TMinStats::Resize(ui32 count) {
19+
Values.resize(count);
20+
}
21+
22+
void TMinStats::Set(ui32 index, ui64 value) {
23+
Y_ASSERT(index < Values.size());
24+
auto maybeMin = Values[index] == MinValue;
25+
Values[index] = value;
26+
if (maybeMin) {
27+
MinValue = ExportMinStats(Values);
28+
}
29+
}
30+
1731
void TMaxStats::Resize(ui32 count) {
1832
Values.resize(count);
1933
}
@@ -589,6 +603,23 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
589603
return baseTimeMs;
590604
}
591605

606+
bool TStageExecutionStats::IsDeadlocked(ui64 deadline) {
607+
if (CurrentWaitInputTimeUs.MinValue < deadline || InputStages.empty()) {
608+
return false;
609+
}
610+
611+
for (auto stat : InputStages) {
612+
if (stat->CurrentWaitOutputTimeUs.MinValue < deadline && !stat->IsFinished()) {
613+
return false;
614+
}
615+
}
616+
return true;
617+
}
618+
619+
bool TStageExecutionStats::IsFinished() {
620+
return FinishedCount == Task2Index.size();
621+
}
622+
592623
namespace {
593624

594625
TTableStat operator - (const TTableStat& l, const TTableStat& r) {
@@ -756,6 +787,37 @@ bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode) {
756787
return statsMode >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE;
757788
}
758789

790+
void TQueryExecutionStats::Prepare() {
791+
if (CollectFullStats(StatsMode)) {
792+
// stages
793+
for (auto& [stageId, info] : TasksGraph->GetStagesInfo()) {
794+
auto [it, inserted] = StageStats.try_emplace(stageId.StageId);
795+
Y_ENSURE(inserted);
796+
it->second.StageId = stageId;
797+
}
798+
// connections
799+
for (auto& [_, stageStats] : StageStats) {
800+
auto& info = TasksGraph->GetStageInfo(stageStats.StageId);
801+
auto& stage = info.Meta.GetStage(info.Id);
802+
for (const auto& input : stage.GetInputs()) {
803+
auto& peerStageStats = StageStats[input.GetStageIndex()];
804+
stageStats.InputStages.push_back(&peerStageStats);
805+
peerStageStats.OutputStages.push_back(&stageStats);
806+
}
807+
}
808+
// tasks
809+
for (auto& task : TasksGraph->GetTasks()) {
810+
auto& stageStats = StageStats[task.StageId.StageId];
811+
stageStats.Task2Index.emplace(task.Id, stageStats.Task2Index.size());
812+
}
813+
for (auto& [_, stageStats] : StageStats) {
814+
stageStats.TaskCount = (stageStats.Task2Index.size() + 3) & ~3;
815+
stageStats.Resize(stageStats.TaskCount);
816+
}
817+
}
818+
}
819+
820+
759821
void TQueryExecutionStats::FillStageDurationUs(NYql::NDqProto::TDqStageStats& stats) {
760822
if (stats.HasStartTimeMs() && stats.HasFinishTimeMs()) {
761823
auto startTimeMs = stats.GetStartTimeMs().GetMin();
@@ -1183,10 +1245,41 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD
11831245
it->second.SetHistorySampleCount(HistorySampleCount);
11841246
}
11851247
BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, state, stats.GetMaxMemoryUsage(), stats.GetDurationUs()));
1248+
1249+
constexpr ui64 deadline = 60'000'000; // 60s
1250+
if (it->second.CurrentWaitOutputTimeUs.MinValue > deadline) {
1251+
for (auto stat : it->second.OutputStages) {
1252+
if (stat->IsDeadlocked(deadline)) {
1253+
DeadlockedStageId = stat->StageId.StageId;
1254+
break;
1255+
}
1256+
}
1257+
} else if (it->second.IsDeadlocked(deadline)) {
1258+
DeadlockedStageId = it->second.StageId.StageId;
1259+
}
11861260
}
11871261

11881262
// SIMD-friendly aggregations are below. Compiler is able to vectorize sum/count, but needs help with min/max
11891263

1264+
ui64 ExportMinStats(std::vector<ui64>& data) {
1265+
1266+
Y_DEBUG_ABORT_UNLESS((data.size() & 3) == 0);
1267+
1268+
ui64 min4[4] = {0, 0, 0, 0};
1269+
1270+
for (auto it = data.begin(); it < data.end(); it += 4) {
1271+
min4[0] = min4[0] ? (it[0] ? (min4[0] < it[0] ? min4[0] : it[0]) : min4[0]) : it[0];
1272+
min4[1] = min4[1] ? (it[1] ? (min4[1] < it[1] ? min4[1] : it[1]) : min4[1]) : it[1];
1273+
min4[2] = min4[2] ? (it[2] ? (min4[2] < it[2] ? min4[2] : it[2]) : min4[2]) : it[2];
1274+
min4[3] = min4[3] ? (it[3] ? (min4[3] < it[3] ? min4[3] : it[3]) : min4[3]) : it[3];
1275+
}
1276+
1277+
ui64 min01 = min4[0] ? (min4[1] ? (min4[0] < min4[1] ? min4[0] : min4[1]) : min4[0]) : min4[1];
1278+
ui64 min23 = min4[2] ? (min4[3] ? (min4[2] < min4[3] ? min4[2] : min4[3]) : min4[2]) : min4[3];
1279+
1280+
return min01 ? (min23 ? (min01 < min23 ? min01 : min23) : min01) : min23;
1281+
}
1282+
11901283
ui64 ExportMaxStats(std::vector<ui64>& data) {
11911284

11921285
Y_DEBUG_ABORT_UNLESS((data.size() & 3) == 0);

Diff for: ydb/core/kqp/executer_actor/kqp_executer_stats.h

+18-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ NYql::NDqProto::EDqStatsMode GetDqStatsModeShard(Ydb::Table::QueryStatsCollectio
1616
bool CollectFullStats(Ydb::Table::QueryStatsCollection::Mode statsMode);
1717
bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode);
1818

19+
struct TMinStats {
20+
std::vector<ui64> Values;
21+
ui64 MinValue = 0;
22+
23+
void Resize(ui32 count);
24+
void Set(ui32 index, ui64 value);
25+
};
26+
1927
struct TMaxStats {
2028
std::vector<ui64> Values;
2129
ui64 MaxValue = 0;
@@ -208,8 +216,8 @@ struct TStageExecutionStats {
208216
std::vector<ui64> DurationUs;
209217
TTimeSeriesStats WaitInputTimeUs;
210218
TTimeSeriesStats WaitOutputTimeUs;
211-
TMaxStats CurrentWaitInputTimeUs;
212-
TMaxStats CurrentWaitOutputTimeUs;
219+
TMinStats CurrentWaitInputTimeUs;
220+
TMinStats CurrentWaitOutputTimeUs;
213221

214222
TTimeSeriesStats SpillingComputeBytes;
215223
TTimeSeriesStats SpillingChannelBytes;
@@ -229,9 +237,11 @@ struct TStageExecutionStats {
229237
TTimeSeriesStats MaxMemoryUsage;
230238

231239
ui32 HistorySampleCount = 0;
232-
ui32 TaskCount = 0;
240+
ui32 TaskCount = 0; // rounded to 4 value of Task2Index.size(), which is actual
233241
std::vector<bool> Finished;
234242
ui32 FinishedCount = 0;
243+
std::vector<TStageExecutionStats*> InputStages;
244+
std::vector<TStageExecutionStats*> OutputStages;
235245

236246
void Resize(ui32 taskCount);
237247
ui32 EstimateMem() {
@@ -245,6 +255,8 @@ struct TStageExecutionStats {
245255
void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStageStats& stageStats);
246256
ui64 UpdateAsyncStats(ui32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats);
247257
ui64 UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, NYql::NDqProto::EComputeState state, ui64 maxMemoryUsage, ui64 durationUs);
258+
bool IsDeadlocked(ui64 deadline);
259+
bool IsFinished();
248260
};
249261

250262
struct TExternalPartitionStat {
@@ -284,6 +296,7 @@ struct TQueryExecutionStats {
284296
const Ydb::Table::QueryStatsCollection::Mode StatsMode;
285297
const TKqpTasksGraph* const TasksGraph = nullptr;
286298
NYql::NDqProto::TDqExecutionStats* const Result;
299+
std::optional<ui32> DeadlockedStageId;
287300

288301
// basic stats
289302
std::unordered_set<ui64> AffectedShards;
@@ -317,6 +330,8 @@ struct TQueryExecutionStats {
317330
HistorySampleCount = 32;
318331
}
319332

333+
void Prepare();
334+
320335
void AddComputeActorStats(
321336
ui32 nodeId,
322337
NYql::NDqProto::TDqComputeActorStats&& stats,

Diff for: ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp

+7-9
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvRetryChannelData::TPtr
240240

241241
outputChannel.RetryState->RetryScheduled = false;
242242

243-
auto now = Now();
243+
auto now = TInstant::Now();
244244

245245
for (auto& inFlight : outputChannel.InFlight) {
246246
ui64 seqNo = inFlight.first;
@@ -270,6 +270,7 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvRetryChannelData::TPtr
270270

271271
ui32 flags = CalcMessageFlags(*outputChannel.Peer);
272272
Send(*outputChannel.Peer, retryEv.Release(), flags, /* cookie */ msg->ChannelId);
273+
LastOutputMessageTime = TInstant::Now();
273274
}
274275
}
275276

@@ -354,7 +355,7 @@ void TDqComputeActorChannels::HandleWork(TEvInterconnect::TEvNodeDisconnected::T
354355
return RuntimeError("detected disconnected node");
355356
}
356357

357-
auto now = Now();
358+
auto now = TInstant::Now();
358359

359360
for (auto& inputChannel : InputChannelsMap) {
360361
if (!inputChannel.second.IsFromNode(nodeId)) {
@@ -419,7 +420,7 @@ void TDqComputeActorChannels::HandleUndeliveredEvChannelData(ui64 channelId, NAc
419420
return RuntimeError(message);
420421
}
421422

422-
ScheduleRetryForChannel<TOutputChannelState, TEvDqCompute::TEvRetryChannelData>(outputChannel, Now());
423+
ScheduleRetryForChannel<TOutputChannelState, TEvDqCompute::TEvRetryChannelData>(outputChannel, TInstant::Now());
423424
}
424425

425426
void TDqComputeActorChannels::HandleUndeliveredEvChannelDataAck(ui64 channelId, NActors::TEvents::TEvUndelivered::EReason reason) {
@@ -450,7 +451,7 @@ void TDqComputeActorChannels::HandleUndeliveredEvChannelDataAck(ui64 channelId,
450451
return RuntimeError(message);
451452
}
452453

453-
ScheduleRetryForChannel<TInputChannelState, TEvDqCompute::TEvRetryChannelDataAck>(inputChannel, Now());
454+
ScheduleRetryForChannel<TInputChannelState, TEvDqCompute::TEvRetryChannelDataAck>(inputChannel, TInstant::Now());
454455
}
455456

456457
template <typename TChannelState, typename TRetryEvent>
@@ -494,10 +495,6 @@ void TDqComputeActorChannels::HandlePoison(TEvents::TEvPoison::TPtr&) {
494495
PassAway();
495496
}
496497

497-
TInstant TDqComputeActorChannels::Now() const {
498-
return TInstant::Now();
499-
}
500-
501498
void TDqComputeActorChannels::RuntimeError(const TString& message) {
502499
LOG_E(message);
503500

@@ -570,7 +567,7 @@ void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData, con
570567

571568
auto dataEv = MakeHolder<TEvDqCompute::TEvChannelData>();
572569
dataEv->Record.SetSeqNo(seqNo);
573-
dataEv->Record.SetSendTime(Now().MilliSeconds());
570+
dataEv->Record.SetSendTime(TInstant::Now().MilliSeconds());
574571
// copying here since we need to save channelData in InFlight
575572
*dataEv->Record.MutableChannelData() = channelData.Proto;
576573
if (channelData.Proto.HasData()) {
@@ -593,6 +590,7 @@ void TDqComputeActorChannels::SendChannelData(TChannelDataOOB&& channelData, con
593590
ui32 flags = CalcMessageFlags(*outputChannel.Peer);
594591
dataEv->Record.SetNoAck(!needAck);
595592
Send(*outputChannel.Peer, dataEv.Release(), flags, /* cookie */ outputChannel.ChannelId);
593+
LastOutputMessageTime = TInstant::Now();
596594

597595
outputChannel.PeerState.AddInFlight(dataBytes, dataChunks);
598596
}

Diff for: ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ class TDqComputeActorChannels : public NActors::TActor<TDqComputeActorChannels>
103103
void HandlePoison(NActors::TEvents::TEvPoison::TPtr&);
104104

105105
private:
106-
TInstant Now() const;
107106
void RuntimeError(const TString& message);
108107
void InternalError(const TString& message);
109108
void PassAway() override;
@@ -123,6 +122,9 @@ class TDqComputeActorChannels : public NActors::TActor<TDqComputeActorChannels>
123122
const TPeerState& GetOutputChannelInFlightState(ui64 channelId);
124123
const TInputChannelStats* GetInputChannelStats(ui64 channelId);
125124
const TOutputChannelStats* GetOutputChannelStats(ui64 channelId);
125+
TInstant GetLastOutputMessageTime() const {
126+
return LastOutputMessageTime;
127+
}
126128

127129
private:
128130
struct TChannelRetryState {
@@ -229,6 +231,7 @@ class TDqComputeActorChannels : public NActors::TActor<TDqComputeActorChannels>
229231
THashSet<ui32> TrackingNodes;
230232
THashMap<ui64, TInputChannelState> InputChannelsMap;
231233
THashMap<ui64, TOutputChannelState> OutputChannelsMap;
234+
TInstant LastOutputMessageTime;
232235
};
233236

234237
} // namespace NYql::NDq

Diff for: ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

+16-4
Original file line numberDiff line numberDiff line change
@@ -1131,8 +1131,13 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
11311131
}
11321132
case EEvWakeupTag::PeriodicStatsTag: {
11331133
const auto maxInterval = RuntimeSettings.ReportStatsSettings->MaxInterval;
1134-
if (Running && ProcessOutputsState.LastRunStatus != ERunStatus::Finished) {
1135-
DoExecute();
1134+
if (Running && State == NDqProto::COMPUTE_STATE_EXECUTING) {
1135+
if (ProcessOutputsState.LastRunStatus == ERunStatus::Finished) {
1136+
// wait until all outputs are drained
1137+
ReportStats();
1138+
} else {
1139+
DoExecute();
1140+
}
11361141
this->Schedule(maxInterval, new NActors::TEvents::TEvWakeup(EEvWakeupTag::PeriodicStatsTag));
11371142
}
11381143
break;
@@ -1777,7 +1782,15 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
17771782
source->FillExtraStats(protoTask, last, GetMeteringStats());
17781783
}
17791784
}
1785+
17801786
FillTaskRunnerStats(Task.GetId(), Task.GetStageId(), *taskStats, protoTask, RuntimeSettings.GetCollectStatsLevel());
1787+
// when TR finished, use channels to detect output back pressure
1788+
if (taskStats->FinishTs && State != NDqProto::COMPUTE_STATE_FINISHED) {
1789+
auto lastOutputTime = Channels->GetLastOutputMessageTime();
1790+
if (lastOutputTime) {
1791+
protoTask->SetCurrentWaitOutputTimeUs((TInstant::Now() - lastOutputTime).MicroSeconds());
1792+
}
1793+
}
17811794

17821795
auto cpuTimeUs = taskStats->ComputeCpuTime.MicroSeconds() + taskStats->BuildCpuTime.MicroSeconds();
17831796
if (TDerived::HasAsyncTaskRunner) {
@@ -1988,10 +2001,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
19882001
protected:
19892002
void ReportStats() {
19902003
auto now = TInstant::Now();
1991-
if (!RuntimeSettings.ReportStatsSettings || now - LastSendStatsTime < RuntimeSettings.ReportStatsSettings->MinInterval) {
2004+
if (State != NDqProto::COMPUTE_STATE_EXECUTING || !RuntimeSettings.ReportStatsSettings || now - LastSendStatsTime < RuntimeSettings.ReportStatsSettings->MinInterval) {
19922005
return;
19932006
}
1994-
19952007
auto evState = std::make_unique<TEvDqCompute::TEvState>();
19962008
evState->Record.SetState(NDqProto::COMPUTE_STATE_EXECUTING);
19972009
evState->Record.SetTaskId(Task.GetId());

0 commit comments

Comments
 (0)