From dddf08257888d550ebc2004492f445b75fcb7f9c Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Tue, 27 Aug 2024 20:09:48 +0300 Subject: [PATCH 1/6] Apply the last stats received from terminated CAs --- ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 1d18d034ac9d..0812b291aa6d 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2607,6 +2607,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseBecome(&TThis::WaitShutdownState); LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and " << Planner->GetPendingComputeActors().size() << " compute actors"); + // TODO(ilezhankin): the CA awaiting timeout should be configurable. TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison)); } } else { @@ -2646,7 +2647,16 @@ class TKqpDataExecuter : public TKqpExecuterBaseSender; - ui64 taskId = ev->Get()->Record.GetTaskId(); + auto& state = ev->Get()->Record; + ui64 taskId = state.GetTaskId(); + + if (Stats) { + Stats->AddComputeActorStats( + actor.NodeId(), + std::move(*state.MutableStats()), + TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs()) + ); + } Planner->CompletedCA(taskId, actor); From d3449f08fd3fdf2067b7bf604257b8d344312279 Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Tue, 27 Aug 2024 20:16:39 +0300 Subject: [PATCH 2/6] Apply stats to response --- ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 0812b291aa6d..09669f24b4ce 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2629,6 +2629,10 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.GetResponse().GetStatus()); + TBase::PassAway(); } From 3e9e20c3eac85d67e651e61655c21ba826f27dca Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Wed, 28 Aug 2024 13:29:52 +0300 Subject: [PATCH 3/6] Properly handle all CA states and stats --- .../kqp/executer_actor/kqp_data_executer.cpp | 22 ++------ .../kqp/executer_actor/kqp_executer_impl.h | 54 ++++++++++++------- 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 09669f24b4ce..038283425cdf 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2647,26 +2647,10 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) { - YQL_ENSURE(Planner); - - TActorId actor = ev->Sender; - auto& state = ev->Get()->Record; - ui64 taskId = state.GetTaskId(); - - if (Stats) { - Stats->AddComputeActorStats( - actor.NodeId(), - std::move(*state.MutableStats()), - TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs()) - ); - } - - Planner->CompletedCA(taskId, actor); + HandleComputeState(ev); - if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { - PassAway(); - } + if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { + PassAway(); } } diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 2915775ea3fc..50e226384f55 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -380,7 +380,7 @@ class TKqpExecuterBase : public TActorBootstrapped { this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId); } - void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) { + bool HandleComputeState(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) { TActorId computeActor = ev->Sender; auto& state = ev->Get()->Record; ui64 taskId = state.GetTaskId(); @@ -409,27 +409,13 @@ class TKqpExecuterBase : public TActorBootstrapped { } YQL_ENSURE(Planner); - bool populateChannels = Planner->AcknowledgeCA(taskId, computeActor, &state); + bool ack = Planner->AcknowledgeCA(taskId, computeActor, &state); switch (state.GetState()) { - case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: { - YQL_ENSURE(false, "unexpected state from " << computeActor << ", task: " << taskId); - return; - } - - case NYql::NDqProto::COMPUTE_STATE_EXECUTING: { - if (populateChannels) { - auto& task = TasksGraph.GetTask(taskId); - THashMap> updates; - CollectTaskChannelsUpdates(task, updates); - PropagateChannelsUpdates(updates); - } - break; - } - case NYql::NDqProto::COMPUTE_STATE_FAILURE: case NYql::NDqProto::COMPUTE_STATE_FINISHED: { ExtraData[computeActor].Swap(state.MutableExtraData()); + if (Stats) { Stats->AddComputeActorStats( computeActor.NodeId(), @@ -440,9 +426,41 @@ class TKqpExecuterBase : public TActorBootstrapped { LastTaskId = taskId; LastComputeActorId = computeActor.ToString(); - YQL_ENSURE(Planner); + Planner->CompletedCA(taskId, computeActor); } + default: + ; // ignore all other states. + } + + return ack; + } + + void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) { + TActorId computeActor = ev->Sender; + auto& state = ev->Get()->Record; + ui64 taskId = state.GetTaskId(); + + bool populateChannels = HandleComputeState(ev); + + switch (state.GetState()) { + case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: { + YQL_ENSURE(false, "unexpected state from " << computeActor << ", task: " << taskId); + return; + } + + case NYql::NDqProto::COMPUTE_STATE_EXECUTING: { + if (populateChannels) { + auto& task = TasksGraph.GetTask(taskId); + THashMap> updates; + CollectTaskChannelsUpdates(task, updates); + PropagateChannelsUpdates(updates); + } + break; + } + + default: + ; // ignore all other states. } if (state.GetState() == NYql::NDqProto::COMPUTE_STATE_FAILURE) { From b8d7260aa3cac05efc16b8d9d50790ef57cc2c94 Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Wed, 28 Aug 2024 13:33:27 +0300 Subject: [PATCH 4/6] Refactor a little bit --- ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 4 ---- ydb/core/kqp/executer_actor/kqp_executer_impl.h | 3 +++ 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 038283425cdf..76a6c4ddd928 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2629,10 +2629,6 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.GetResponse().GetStatus()); - TBase::PassAway(); } diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 50e226384f55..ca78d14984ba 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -1872,6 +1872,9 @@ class TKqpExecuterBase : public TActorBootstrapped { void PassAway() override { YQL_ENSURE(AlreadyReplied && ResponseEv); + + // Actualize stats with the last stats from terminated CAs, but keep the status. + FillResponseStats(ResponseEv->Record.GetResponse().GetStatus()); this->Send(Target, ResponseEv.release()); for (auto channelPair: ResultChannelProxies) { From d5afc6b12c16258ac833e81dadb195ae3c514a23 Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Wed, 28 Aug 2024 18:21:52 +0300 Subject: [PATCH 5/6] Don't finalize stats twice --- .../kqp/executer_actor/kqp_data_executer.cpp | 6 +-- .../kqp/executer_actor/kqp_executer_impl.h | 43 ++++++++++--------- ydb/core/kqp/executer_actor/kqp_planner.cpp | 6 ++- ydb/core/kqp/executer_actor/kqp_planner.h | 2 +- .../kqp/executer_actor/kqp_scan_executer.cpp | 2 +- 5 files changed, 31 insertions(+), 28 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 76a6c4ddd928..62f840864966 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -529,7 +529,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) { CancelProposal(0); } - HandleComputeStats(ev); + HandleComputeState(ev); } void HandlePrepare(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { @@ -1008,7 +1008,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { PassAway(); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index ca78d14984ba..f2c1b7ad4ff3 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -380,7 +380,7 @@ class TKqpExecuterBase : public TActorBootstrapped { this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId); } - bool HandleComputeState(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) { + bool HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) { TActorId computeActor = ev->Sender; auto& state = ev->Get()->Record; ui64 taskId = state.GetTaskId(); @@ -411,37 +411,38 @@ class TKqpExecuterBase : public TActorBootstrapped { YQL_ENSURE(Planner); bool ack = Planner->AcknowledgeCA(taskId, computeActor, &state); - switch (state.GetState()) { - case NYql::NDqProto::COMPUTE_STATE_FAILURE: - case NYql::NDqProto::COMPUTE_STATE_FINISHED: { - ExtraData[computeActor].Swap(state.MutableExtraData()); - - if (Stats) { - Stats->AddComputeActorStats( - computeActor.NodeId(), - std::move(*state.MutableStats()), - TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs()) - ); - } - - LastTaskId = taskId; - LastComputeActorId = computeActor.ToString(); + // Don't finalize stats twice. + if (Planner->CompletedCA(taskId, computeActor)) { + switch (state.GetState()) { + case NYql::NDqProto::COMPUTE_STATE_FAILURE: + case NYql::NDqProto::COMPUTE_STATE_FINISHED: { + ExtraData[computeActor].Swap(state.MutableExtraData()); + + if (Stats) { + Stats->AddComputeActorStats( + computeActor.NodeId(), + std::move(*state.MutableStats()), + TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs()) + ); + } - Planner->CompletedCA(taskId, computeActor); + LastTaskId = taskId; + LastComputeActorId = computeActor.ToString(); + } + default: + ; // ignore all other states. } - default: - ; // ignore all other states. } return ack; } - void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) { + void HandleComputeState(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) { TActorId computeActor = ev->Sender; auto& state = ev->Get()->Record; ui64 taskId = state.GetTaskId(); - bool populateChannels = HandleComputeState(ev); + bool populateChannels = HandleComputeStats(ev); switch (state.GetState()) { case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: { diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 926dda2700ef..b8e55cc11a55 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -542,11 +542,11 @@ bool TKqpPlanner::AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql:: return false; } -void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) { +bool TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) { auto& task = TasksGraph.GetTask(taskId); if (task.Meta.Completed) { YQL_ENSURE(!PendingComputeActors.contains(computeActor)); - return; + return false; } task.Meta.Completed = true; @@ -556,6 +556,8 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) { PendingComputeActors.erase(it); LOG_I("Compute actor has finished execution: " << computeActor.ToString()); + + return true; } void TKqpPlanner::TaskNotStarted(ui64 taskId) { diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index eed887d6e9bc..ab05969f7fd1 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -73,7 +73,7 @@ class TKqpPlanner { std::unique_ptr PlanExecution(); std::unique_ptr AssignTasksToNodes(); bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state); - void CompletedCA(ui64 taskId, TActorId computeActor); + bool CompletedCA(ui64 taskId, TActorId computeActor); void TaskNotStarted(ui64 taskId); TProgressStat::TEntry CalculateConsumptionUpdate(); void ShiftConsumption(); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index c04d5e7573f4..f7357744e14a 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -110,7 +110,7 @@ class TKqpScanExecuter : public TKqpExecuterBaseGetTypeRewrite()) { - hFunc(TEvDqCompute::TEvState, HandleComputeStats); + hFunc(TEvDqCompute::TEvState, HandleComputeState); hFunc(TEvDqCompute::TEvChannelData, HandleChannelData); // from CA hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); From 9185b23a5a0e014b41658444930b377af6e2e2c8 Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Thu, 29 Aug 2024 14:39:27 +0300 Subject: [PATCH 6/6] Fix bug --- ydb/core/kqp/executer_actor/kqp_executer_impl.h | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index f2c1b7ad4ff3..297e247e147c 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -411,11 +411,11 @@ class TKqpExecuterBase : public TActorBootstrapped { YQL_ENSURE(Planner); bool ack = Planner->AcknowledgeCA(taskId, computeActor, &state); - // Don't finalize stats twice. - if (Planner->CompletedCA(taskId, computeActor)) { - switch (state.GetState()) { - case NYql::NDqProto::COMPUTE_STATE_FAILURE: - case NYql::NDqProto::COMPUTE_STATE_FINISHED: { + switch (state.GetState()) { + case NYql::NDqProto::COMPUTE_STATE_FAILURE: + case NYql::NDqProto::COMPUTE_STATE_FINISHED: + // Don't finalize stats twice. + if (Planner->CompletedCA(taskId, computeActor)) { ExtraData[computeActor].Swap(state.MutableExtraData()); if (Stats) { @@ -429,9 +429,8 @@ class TKqpExecuterBase : public TActorBootstrapped { LastTaskId = taskId; LastComputeActorId = computeActor.ToString(); } - default: - ; // ignore all other states. - } + default: + ; // ignore all other states. } return ack;