Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock detection #16782

Merged
merged 6 commits into from
Apr 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1841,7 +1841,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
flags));
}

NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(),
NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(),
Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());

ResponseEv->Orbit.Fork(evData->Orbit);
Expand Down Expand Up @@ -1878,7 +1878,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

auto traceId = ExecuterSpan.GetTraceId();

NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(),
NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(),
Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());

auto shardsToString = [](const auto& shards) {
Expand Down Expand Up @@ -2101,6 +2101,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
return;
}

if (Stats) {
Stats->Prepare();
}

THashMap<ui64, TVector<NDqProto::TDqTask*>> datashardTasks; // shardId -> [task]
THashMap<ui64, TVector<ui64>> remoteComputeTasks; // shardId -> [task]
TVector<ui64> computeTasks;
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,14 @@ class TKqpExecuterBase : public TActor<TDerived> {
ui64 cycleCount = GetCycleCountFast();

Stats->UpdateTaskStats(taskId, state.GetStats(), (NYql::NDqProto::EComputeState) state.GetState());

if (Stats->DeadlockedStageId) {
NYql::TIssues issues;
issues.AddIssue(TStringBuilder() << "Deadlock detected: stage " << *Stats->DeadlockedStageId << " waits for input while peer(s) wait for output");
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::CANCELLED, issues);
this->Send(this->SelfId(), abortEv.Release());
}

if (Request.ProgressStatsPeriod) {
auto now = TInstant::Now();
if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
Expand Down
161 changes: 145 additions & 16 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,33 @@ ui64 NonZeroMin(ui64 a, ui64 b) {
return (b == 0) ? a : ((a == 0 || a > b) ? b : a);
}

ui64 ExportMinStats(std::vector<ui64>& data);
ui64 ExportMaxStats(std::vector<ui64>& data);

void TMinStats::Resize(ui32 count) {
Values.resize(count);
}

void TMinStats::Set(ui32 index, ui64 value) {
AFL_ENSURE(index < Values.size());
auto maybeMin = Values[index] == MinValue;
Values[index] = value;
if (maybeMin) {
MinValue = ExportMinStats(Values);
}
}

void TMaxStats::Resize(ui32 count) {
Values.resize(count);
}

void TMaxStats::Set(ui32 index, ui64 value) {
AFL_ENSURE(index < Values.size());
auto isMonotonic = value >= Values[index];
Values[index] = value;
MaxValue = isMonotonic ? (value > MaxValue ? value : MaxValue) : ExportMaxStats(Values);
}

void TTimeSeriesStats::ExportAggStats(NYql::NDqProto::TDqStatsAggr& stats) {
NKikimr::NKqp::ExportAggStats(Values, stats);
}
Expand All @@ -38,7 +65,7 @@ void TTimeSeriesStats::Resize(ui32 count) {

void TTimeSeriesStats::SetNonZero(ui32 index, ui64 value) {
if (value) {
Y_ASSERT(index < Values.size());
AFL_ENSURE(index < Values.size());
Sum += value;
Sum -= Values[index];
Values[index] = value;
Expand Down Expand Up @@ -123,12 +150,12 @@ void TPartitionedStats::ResizeByParts(ui32 partCount, ui32 taskCount) {

void TPartitionedStats::SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries) {
if (value) {
Y_ASSERT(partIndex < Parts.size());
AFL_ENSURE(partIndex < Parts.size());
auto& part = Parts[partIndex];
auto delta = value - part[taskIndex];
Y_ASSERT(taskIndex < part.size());
AFL_ENSURE(taskIndex < part.size());
part[taskIndex] = value;
Y_ASSERT(partIndex < Values.size());
AFL_ENSURE(partIndex < Values.size());
Values[partIndex] += delta;
Sum += delta;
if (recordTimeSeries) {
Expand Down Expand Up @@ -249,7 +276,7 @@ void TOperatorStats::Resize(ui32 taskCount) {

void TStageExecutionStats::Resize(ui32 taskCount) {

Y_DEBUG_ABORT_UNLESS((taskCount & 3) == 0);
AFL_ENSURE((taskCount & 3) == 0);

CpuTimeUs.Resize(taskCount);
SourceCpuTimeUs.resize(taskCount);
Expand All @@ -272,6 +299,8 @@ void TStageExecutionStats::Resize(ui32 taskCount) {

WaitInputTimeUs.Resize(taskCount);
WaitOutputTimeUs.Resize(taskCount);
CurrentWaitInputTimeUs.Resize(taskCount);
CurrentWaitOutputTimeUs.Resize(taskCount);

SpillingComputeBytes.Resize(taskCount);
SpillingChannelBytes.Resize(taskCount);
Expand Down Expand Up @@ -365,7 +394,7 @@ inline void SetNonZero(ui64& target, ui64 source) {
}

inline void SetNonZero(std::vector<ui64>& vector, ui32 index, ui64 value) {
Y_ASSERT(index < vector.size());
AFL_ENSURE(index < vector.size());
SetNonZero(vector[index], value);
}

Expand Down Expand Up @@ -397,7 +426,7 @@ ui64 TStageExecutionStats::UpdateAsyncStats(ui32 index, TAsyncStats& aggrAsyncSt
aggrAsyncStats.WaitTimeUs.SetNonZero(index, asyncStats.GetWaitTimeUs());
SetNonZero(aggrAsyncStats.WaitPeriods, index, asyncStats.GetWaitPeriods());
if (firstMessageMs && lastMessageMs > firstMessageMs) {
Y_ASSERT(index < aggrAsyncStats.ActiveTimeUs.size());
AFL_ENSURE(index < aggrAsyncStats.ActiveTimeUs.size());
aggrAsyncStats.ActiveTimeUs[index] = lastMessageMs - firstMessageMs;
}

Expand All @@ -409,7 +438,7 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
auto it = Task2Index.find(taskId);
ui64 baseTimeMs = 0;

Y_DEBUG_ABORT_UNLESS(TaskCount >= Task2Index.size());
AFL_ENSURE(TaskCount >= Task2Index.size());

ui32 index;
if (it == Task2Index.end()) {
Expand Down Expand Up @@ -456,6 +485,8 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
SetNonZero(DurationUs, index, durationUs);
WaitInputTimeUs.SetNonZero(index, taskStats.GetWaitInputTimeUs());
WaitOutputTimeUs.SetNonZero(index, taskStats.GetWaitOutputTimeUs());
CurrentWaitInputTimeUs.Set(index, taskStats.GetCurrentWaitInputTimeUs());
CurrentWaitOutputTimeUs.Set(index, taskStats.GetCurrentWaitOutputTimeUs());

SpillingComputeBytes.SetNonZero(index, taskStats.GetSpillingComputeWriteBytes());
SpillingChannelBytes.SetNonZero(index, taskStats.GetSpillingChannelWriteBytes());
Expand Down Expand Up @@ -572,6 +603,23 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
return baseTimeMs;
}

bool TStageExecutionStats::IsDeadlocked(ui64 deadline) {
if (CurrentWaitInputTimeUs.MinValue < deadline || InputStages.empty()) {
return false;
}

for (auto stat : InputStages) {
if (stat->CurrentWaitOutputTimeUs.MinValue < deadline && !stat->IsFinished()) {
return false;
}
}
return true;
}

bool TStageExecutionStats::IsFinished() {
return FinishedCount == Task2Index.size();
}

namespace {

TTableStat operator - (const TTableStat& l, const TTableStat& r) {
Expand Down Expand Up @@ -739,6 +787,37 @@ bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode) {
return statsMode >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE;
}

void TQueryExecutionStats::Prepare() {
if (CollectFullStats(StatsMode)) {
// stages
for (auto& [stageId, info] : TasksGraph->GetStagesInfo()) {
auto [it, inserted] = StageStats.try_emplace(stageId);
Y_ENSURE(inserted);
it->second.StageId = stageId;
}
// connections
for (auto& [_, stageStats] : StageStats) {
auto& info = TasksGraph->GetStageInfo(stageStats.StageId);
auto& stage = info.Meta.GetStage(info.Id);
for (const auto& input : stage.GetInputs()) {
auto& peerStageStats = StageStats[NYql::NDq::TStageId(stageStats.StageId.TxId, input.GetStageIndex())];
stageStats.InputStages.push_back(&peerStageStats);
peerStageStats.OutputStages.push_back(&stageStats);
}
}
// tasks
for (auto& task : TasksGraph->GetTasks()) {
auto& stageStats = StageStats[task.StageId];
stageStats.Task2Index.emplace(task.Id, stageStats.Task2Index.size());
}
for (auto& [_, stageStats] : StageStats) {
stageStats.TaskCount = (stageStats.Task2Index.size() + 3) & ~3;
stageStats.Resize(stageStats.TaskCount);
}
}
}


void TQueryExecutionStats::FillStageDurationUs(NYql::NDqProto::TDqStageStats& stats) {
if (stats.HasStartTimeMs() && stats.HasFinishTimeMs()) {
auto startTimeMs = stats.GetStartTimeMs().GetMin();
Expand Down Expand Up @@ -1156,23 +1235,73 @@ void TQueryExecutionStats::AddBufferStats(NYql::NDqProto::TDqTaskStats&& taskSta
}

void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats, NYql::NDqProto::EComputeState state) {
Y_ASSERT(stats.GetTasks().size() == 1);
AFL_ENSURE(stats.GetTasks().size() == 1);
const NYql::NDqProto::TDqTaskStats& taskStats = stats.GetTasks(0);
Y_ASSERT(taskStats.GetTaskId() == taskId);
auto stageId = taskStats.GetStageId();
AFL_ENSURE(taskStats.GetTaskId() == taskId);
auto stageId = TasksGraph->GetTask(taskId).StageId;
auto [it, inserted] = StageStats.try_emplace(stageId);
if (inserted) {
it->second.StageId = TasksGraph->GetTask(taskStats.GetTaskId()).StageId;
it->second.StageId = stageId;
it->second.SetHistorySampleCount(HistorySampleCount);
}
BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, state, stats.GetMaxMemoryUsage(), stats.GetDurationUs()));

constexpr ui64 deadline = 600'000'000; // 10m
if (it->second.CurrentWaitOutputTimeUs.MinValue > deadline) {
for (auto stat : it->second.OutputStages) {
if (stat->IsDeadlocked(deadline)) {
DeadlockedStageId = stat->StageId.StageId;
break;
}
}
} else if (it->second.IsDeadlocked(deadline)) {
DeadlockedStageId = it->second.StageId.StageId;
}
}

// SIMD-friendly aggregations are below. Compiler is able to vectorize sum/count, but needs help with min/max

ui64 ExportMinStats(std::vector<ui64>& data) {

AFL_ENSURE((data.size() & 3) == 0);

ui64 min4[4] = {0, 0, 0, 0};

for (auto it = data.begin(); it < data.end(); it += 4) {
min4[0] = min4[0] ? (it[0] ? (min4[0] < it[0] ? min4[0] : it[0]) : min4[0]) : it[0];
min4[1] = min4[1] ? (it[1] ? (min4[1] < it[1] ? min4[1] : it[1]) : min4[1]) : it[1];
min4[2] = min4[2] ? (it[2] ? (min4[2] < it[2] ? min4[2] : it[2]) : min4[2]) : it[2];
min4[3] = min4[3] ? (it[3] ? (min4[3] < it[3] ? min4[3] : it[3]) : min4[3]) : it[3];
}

ui64 min01 = min4[0] ? (min4[1] ? (min4[0] < min4[1] ? min4[0] : min4[1]) : min4[0]) : min4[1];
ui64 min23 = min4[2] ? (min4[3] ? (min4[2] < min4[3] ? min4[2] : min4[3]) : min4[2]) : min4[3];

return min01 ? (min23 ? (min01 < min23 ? min01 : min23) : min01) : min23;
}

ui64 ExportMaxStats(std::vector<ui64>& data) {

AFL_ENSURE((data.size() & 3) == 0);

ui64 max4[4] = {0, 0, 0, 0};

for (auto it = data.begin(); it < data.end(); it += 4) {
max4[0] = max4[0] > it[0] ? max4[0] : it[0];
max4[1] = max4[1] > it[1] ? max4[1] : it[1];
max4[2] = max4[2] > it[2] ? max4[2] : it[2];
max4[3] = max4[3] > it[3] ? max4[3] : it[3];
}

ui64 max01 = max4[0] > max4[1] ? max4[0] : max4[1];
ui64 max23 = max4[2] > max4[3] ? max4[2] : max4[3];

return max01 > max23 ? max01 : max23;
}

void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) {

Y_DEBUG_ABORT_UNLESS((data.size() & 3) == 0);
AFL_ENSURE((data.size() & 3) == 0);

ui64 count = 0;
ui64 min4[4] = {0, 0, 0, 0};
Expand Down Expand Up @@ -1205,7 +1334,7 @@ void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& sta

void ExportOffsetAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsAggr& stats, ui64 offset) {

Y_DEBUG_ABORT_UNLESS((data.size() & 3) == 0);
AFL_ENSURE((data.size() & 3) == 0);

ui64 count = 0;
ui64 sum = 0;
Expand Down Expand Up @@ -1250,7 +1379,7 @@ void ExportOffsetAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsAggr&

void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsAggr& stats) {

Y_DEBUG_ABORT_UNLESS((data.size() & 3) == 0);
AFL_ENSURE((data.size() & 3) == 0);

ui64 count = 0;
ui64 sum = 0;
Expand Down Expand Up @@ -1531,7 +1660,7 @@ void TQueryExecutionStats::Finish() {
}

AdjustBaseTime(stageStats);
auto it = StageStats.find(stageId.StageId);
auto it = StageStats.find(stageId);
if (it != StageStats.end()) {
it->second.ExportHistory(BaseTimeMs, *stageStats);
}
Expand Down
Loading
Loading