From 424f3f52acab4cf664b2f5d24e9e5f50dd434794 Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Thu, 15 Aug 2024 12:35:36 +0300 Subject: [PATCH 01/12] Wait for all CAs inside Executer before shutdown --- .../kqp/executer_actor/kqp_data_executer.cpp | 47 ++++++++++++++++++- .../kqp/executer_actor/kqp_executer_impl.h | 45 ++++++++---------- .../kqp/executer_actor/kqp_scan_executer.cpp | 5 +- 3 files changed, 68 insertions(+), 29 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index fd16b7b441a6..56e96cb1521e 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -205,6 +205,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.MutableResponse(); FillResponseStats(Ydb::StatusIds::SUCCESS); @@ -278,8 +282,6 @@ class TKqpDataExecuter : public TKqpExecuterBaseResultsSize()); - Send(Target, ResponseEv.release()); PassAway(); } @@ -319,6 +321,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { + DoShutdown(); + } else { + this->Become(&TThis::WaitShutdownState); + } + } else { + DoShutdown(); + } + } + + STATEFN(WaitShutdownState) { + switch(ev->GetTypeRewrite()) { + // TODO: properly handle node disconnect + // TODO: implement wait timeout mechanism + hFunc(TEvDqCompute::TEvState, HandleShutdown); + default: + ; // ignore all other events + } + } + + void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) { + if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FINISHED) { + YQL_ENSURE(Planner); + + TActorId actor = ev->Sender; + ui64 taskId = ev->Get()->Record.GetTaskId(); + + Planner->CompletedCA(taskId, actor); + + if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { + DoShutdown(); + } + } else { + // TODO: handle another states. + } + } + + void DoShutdown() { auto totalTime = TInstant::Now() - StartTime; Counters->Counters->DataTxTotalTimeHistogram->Collect(totalTime.MilliSeconds()); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 37f8bec82831..98d65e16d182 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -667,7 +667,7 @@ class TKqpExecuterBase : public TActorBootstrapped { if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) { InternalError(issues); } else if (statusCode == Ydb::StatusIds::TIMEOUT) { - AbortExecutionAndDie(ev->Sender, NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded"); + TimeoutError(ev->Sender); } else { RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues); } @@ -1624,14 +1624,14 @@ class TKqpExecuterBase : public TActorBootstrapped { protected: void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) { for (const auto& task : this->TasksGraph.GetTasks()) { - if (task.ComputeActorId) { + if (task.ComputeActorId && !task.Meta.Completed) { LOG_I("aborting compute actor execution, message: " << issues.ToOneLineString() << ", compute actor: " << task.ComputeActorId << ", task: " << task.Id); auto ev = MakeHolder(NYql::NDq::YdbStatusToDqStatus(code), issues); this->Send(task.ComputeActorId, ev.Release()); } else { - LOG_I("task: " << task.Id << ", does not have Compute ActorId yet"); + LOG_I("task: " << task.Id << ", does not have the CA id yet or is already complete"); } } } @@ -1649,7 +1649,6 @@ class TKqpExecuterBase : public TActorBootstrapped { void InternalError(const NYql::TIssues& issues) { LOG_E(issues.ToOneLineString()); - TerminateComputeActors(Ydb::StatusIds::INTERNAL_ERROR, issues); auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::UNEXPECTED, "Internal error while executing transaction."); for (const NYql::TIssue& i : issues) { issue.AddSubIssue(MakeIntrusive(i)); @@ -1663,7 +1662,6 @@ class TKqpExecuterBase : public TActorBootstrapped { void ReplyUnavailable(const TString& message) { LOG_E("UNAVAILABLE: " << message); - TerminateComputeActors(Ydb::StatusIds::UNAVAILABLE, message); auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE); issue.AddSubIssue(new NYql::TIssue(message)); ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue); @@ -1671,7 +1669,6 @@ class TKqpExecuterBase : public TActorBootstrapped { void RuntimeError(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) { LOG_E(Ydb::StatusIds_StatusCode_Name(code) << ": " << issues.ToOneLineString()); - TerminateComputeActors(code, issues); ReplyErrorAndDie(code, issues); } @@ -1687,11 +1684,18 @@ class TKqpExecuterBase : public TActorBootstrapped { ReplyErrorAndDie(status, &issues); } - void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const TString& message) { + void TimeoutError(TActorId abortSender) { if (AlreadyReplied) { return; } + const auto status = NYql::NDqProto::StatusIds::TIMEOUT; + const TString message = "Request timeout exceeded"; + + TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message); + + AlreadyReplied = true; + LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message); if (ExecuterSpan) { ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status)); @@ -1701,16 +1705,13 @@ class TKqpExecuterBase : public TActorBootstrapped { // TEvAbortExecution can come from either ComputeActor or SessionActor (== Target). if (abortSender != Target) { - auto abortEv = MakeHolder(status, "Request timeout exceeded"); + auto abortEv = MakeHolder(status, message); this->Send(Target, abortEv.Release()); } - AlreadyReplied = true; LOG_E("Sending timeout response to: " << Target); - this->Send(Target, ResponseEv.release()); Request.Transactions.crop(0); - TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message); this->PassAway(); } @@ -1749,14 +1750,7 @@ class TKqpExecuterBase : public TActorBootstrapped { return; } - if (Planner) { - for (auto computeActor : Planner->GetPendingComputeActors()) { - LOG_D("terminate compute actor " << computeActor.first); - - auto ev = MakeHolder(NYql::NDq::YdbStatusToDqStatus(status), "Terminate execution"); - this->Send(computeActor.first, ev.Release()); - } - } + TerminateComputeActors(status, "Terminate execution"); AlreadyReplied = true; auto& response = *ResponseEv->Record.MutableResponse(); @@ -1782,7 +1776,6 @@ class TKqpExecuterBase : public TActorBootstrapped { ExecuterStateSpan.EndError(response.DebugString()); Request.Transactions.crop(0); - this->Send(Target, ResponseEv.release()); this->PassAway(); } @@ -1852,6 +1845,9 @@ class TKqpExecuterBase : public TActorBootstrapped { protected: void PassAway() override { + YQL_ENSURE(ResponseEv); + this->Send(Target, ResponseEv.release()); + for (auto channelPair: ResultChannelProxies) { LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId()); @@ -1872,12 +1868,11 @@ class TKqpExecuterBase : public TActorBootstrapped { if (KqpTableResolverId) { this->Send(KqpTableResolverId, new TEvents::TEvPoison); - this->Send(this->SelfId(), new TEvents::TEvPoison); - LOG_T("Terminate, become ZombieState"); - this->Become(&TKqpExecuterBase::ZombieState); - } else { - IActor::PassAway(); } + + this->Send(this->SelfId(), new TEvents::TEvPoison); + LOG_T("Terminate, become ZombieState"); + this->Become(&TKqpExecuterBase::ZombieState); } STATEFN(ZombieState) { diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 277e77da71e0..c04d5e7573f4 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -273,6 +273,9 @@ class TKqpScanExecuter : public TKqpExecuterBaseOrbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize()); @@ -281,8 +284,6 @@ class TKqpScanExecuter : public TKqpExecuterBase Date: Thu, 15 Aug 2024 13:57:53 +0300 Subject: [PATCH 02/12] Handle timeout and node disconnect --- .../kqp/executer_actor/kqp_data_executer.cpp | 33 +++++++++++++++++-- ydb/core/kqp/executer_actor/kqp_planner.cpp | 11 +++++++ ydb/core/kqp/executer_actor/kqp_planner.h | 1 + 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 56e96cb1521e..19b641b46710 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2605,6 +2605,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseBecome(&TThis::WaitShutdownState); + TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison)); } } else { DoShutdown(); @@ -2613,9 +2614,9 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetTypeRewrite()) { - // TODO: properly handle node disconnect - // TODO: implement wait timeout mechanism hFunc(TEvDqCompute::TEvState, HandleShutdown); + hFunc(TEvInterconnect::TEvNodeDisconnected, HandleShutdown); + hFunc(TEvents::TEvPoison, HandleShutdown); default: ; // ignore all other events } @@ -2638,6 +2639,34 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet()->NodeId; + LOG_N("Node has disconnected while shutdown: " << nodeId); + + YQL_ENSURE(Planner); + + for (const auto& task : TasksGraph.GetTasks()) { + if (task.Meta.NodeId == nodeId && !task.Meta.Completed) { + if (task.ComputeActorId) { + Planner->CompletedCA(task.Id, task.ComputeActorId); + } else { + Planner->TaskNotStarted(task.Id); + } + } + } + + if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { + DoShutdown(); + } + } + + void HandleShutdown(TEvents::TEvPoison::TPtr& ev) { + // Self-poison means timeout - don't wait anymore. + if (ev->Sender == SelfId()) { + DoShutdown(); + } + } + void DoShutdown() { auto totalTime = TInstant::Now() - StartTime; Counters->Counters->DataTxTotalTimeHistogram->Collect(totalTime.MilliSeconds()); diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 3a13c9fb7f6e..81ed87cf153e 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -557,6 +557,17 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) { return; } +void TKqpPlanner::TaskNotStarted(ui64 taskId) { + // NOTE: should be invoked only while shutting down - when node is disconnected. + + auto& task = TasksGraph.GetTask(taskId); + + YQL_ENSURE(!task.ComputeActorId); + YQL_ENSURE(!task.Meta.Completed); + + PendingComputeTasks.erase(taskId); +} + TProgressStat::TEntry TKqpPlanner::CalculateConsumptionUpdate() { TProgressStat::TEntry consumption; diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 639c53737055..eed887d6e9bc 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -74,6 +74,7 @@ class TKqpPlanner { std::unique_ptr AssignTasksToNodes(); bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state); void CompletedCA(ui64 taskId, TActorId computeActor); + void TaskNotStarted(ui64 taskId); TProgressStat::TEntry CalculateConsumptionUpdate(); void ShiftConsumption(); void Submit(); From 64e1e8c8a05b9d0f2fc8612be5a1164ae25dc851 Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Thu, 15 Aug 2024 22:14:34 +0300 Subject: [PATCH 03/12] Send event from CA to Executer on terminate --- ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 2 ++ ydb/core/kqp/executer_actor/kqp_planner.cpp | 3 ++- .../yql/dq/actors/compute/dq_compute_actor_impl.h | 12 ++++++++++++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 19b641b46710..087c5d0cc4be 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2605,6 +2605,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseBecome(&TThis::WaitShutdownState); + LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and " + << Planner->GetPendingComputeActors().size() << " compute actors"); TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison)); } } else { diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 81ed87cf153e..926dda2700ef 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -554,7 +554,8 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) { YQL_ENSURE(it != PendingComputeActors.end()); LastStats.emplace_back(std::move(it->second)); PendingComputeActors.erase(it); - return; + + LOG_I("Compute actor has finished execution: " << computeActor.ToString()); } void TKqpPlanner::TaskNotStarted(ui64 taskId) { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 9536882a2a7a..a51d31c38274 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -515,6 +515,18 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped RuntimeSettings.TerminateHandler(success, issues); } + // Send final state to executer to inform about termination. + { + auto ev = MakeHolder(); + auto& record = ev->Record; + + record.SetState(NDqProto::COMPUTE_STATE_FINISHED); + record.SetStatusCode(NDqProto::StatusIds::ABORTED); + record.SetTaskId(Task.GetId()); + + this->Send(ExecuterId, ev.Release()); + } + this->PassAway(); Terminated = true; } From fd130021d56512ae358c176a382d165618298ca8 Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Fri, 16 Aug 2024 15:17:25 +0300 Subject: [PATCH 04/12] Better handle Finalize() --- ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 5 ++--- ydb/core/kqp/executer_actor/kqp_executer_impl.h | 2 ++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 087c5d0cc4be..b9a7e97c2b74 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -214,8 +214,6 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.MutableResponse(); FillResponseStats(Ydb::StatusIds::SUCCESS); @@ -282,6 +280,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetTypeName()); // ignore all other events } } diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 98d65e16d182..f82bf5a62333 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -1686,6 +1686,7 @@ class TKqpExecuterBase : public TActorBootstrapped { void TimeoutError(TActorId abortSender) { if (AlreadyReplied) { + LOG_E("Timeout when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl); return; } @@ -1747,6 +1748,7 @@ class TKqpExecuterBase : public TActorBootstrapped { google::protobuf::RepeatedPtrField* issues) { if (AlreadyReplied) { + LOG_E("Error when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl); return; } From 89671e1acd0b2c6adcce53204d9ffe2af143a2fc Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Fri, 16 Aug 2024 20:25:02 +0300 Subject: [PATCH 05/12] Send termination event only when !success --- ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 6 +++++- ydb/core/kqp/executer_actor/kqp_executer_impl.h | 2 +- ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index b9a7e97c2b74..6e692a178b6a 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2627,6 +2627,10 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet()->Record.GetState() == NDqProto::COMPUTE_STATE_FINISHED) { YQL_ENSURE(Planner); + if (ev->Get()->Record.GetStatusCode() != NDqProto::StatusIds::ABORTED) { + return; + } + TActorId actor = ev->Sender; ui64 taskId = ev->Get()->Record.GetTaskId(); @@ -2636,7 +2640,7 @@ class TKqpDataExecuter : public TKqpExecuterBase { protected: void PassAway() override { - YQL_ENSURE(ResponseEv); + YQL_ENSURE(AlreadyReplied && ResponseEv); this->Send(Target, ResponseEv.release()); for (auto channelPair: ResultChannelProxies) { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index a51d31c38274..1e850495a161 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -516,7 +516,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } // Send final state to executer to inform about termination. - { + if (!success) { auto ev = MakeHolder(); auto& record = ev->Record; From 335e8056b61217b957cfcbcf89d5d25585116eb5 Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Mon, 19 Aug 2024 12:30:05 +0300 Subject: [PATCH 06/12] =?UTF-8?q?PassAway=20=E2=86=92=20Shutdown,=20for=20?= =?UTF-8?q?clarity?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kqp/executer_actor/kqp_data_executer.cpp | 48 ++++++++++--------- .../kqp/executer_actor/kqp_executer_impl.h | 10 +++- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 6e692a178b6a..bad588f2d383 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2598,10 +2598,10 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { - DoShutdown(); + PassAway(); } else { this->Become(&TThis::WaitShutdownState); LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and " @@ -2609,8 +2609,25 @@ class TKqpDataExecuter : public TKqpExecuterBaseCounters->DataTxTotalTimeHistogram->Collect(totalTime.MilliSeconds()); + + // TxProxyMon compatibility + Counters->TxProxyMon->TxTotalTimeHgram->Collect(totalTime.MilliSeconds()); + Counters->TxProxyMon->TxExecuteTimeHgram->Collect(totalTime.MilliSeconds()); + + Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvUnlink(0)); + + if (GetUseFollowers()) { + Send(MakePipePerNodeCacheID(true), new TEvPipeCache::TEvUnlink(0)); } + + TBase::PassAway(); } STATEFN(WaitShutdownState) { @@ -2637,7 +2654,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseCompletedCA(taskId, actor); if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { - DoShutdown(); + PassAway(); } } else { // TODO: handle other states. @@ -2661,32 +2678,17 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { - DoShutdown(); + PassAway(); } } void HandleShutdown(TEvents::TEvPoison::TPtr& ev) { // Self-poison means timeout - don't wait anymore. - if (ev->Sender == SelfId()) { - DoShutdown(); - } - } + LOG_I("Timed out on waiting for Compute Actors to finish - forcing shutdown"); - void DoShutdown() { - auto totalTime = TInstant::Now() - StartTime; - Counters->Counters->DataTxTotalTimeHistogram->Collect(totalTime.MilliSeconds()); - - // TxProxyMon compatibility - Counters->TxProxyMon->TxTotalTimeHgram->Collect(totalTime.MilliSeconds()); - Counters->TxProxyMon->TxExecuteTimeHgram->Collect(totalTime.MilliSeconds()); - - Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvUnlink(0)); - - if (GetUseFollowers()) { - Send(MakePipePerNodeCacheID(true), new TEvPipeCache::TEvUnlink(0)); + if (ev->Sender == SelfId()) { + PassAway(); } - - TBase::PassAway(); } private: diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 44607057fe75..2915775ea3fc 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -1713,7 +1713,7 @@ class TKqpExecuterBase : public TActorBootstrapped { LOG_E("Sending timeout response to: " << Target); Request.Transactions.crop(0); - this->PassAway(); + this->Shutdown(); } void FillResponseStats(Ydb::StatusIds::StatusCode status) { @@ -1778,7 +1778,7 @@ class TKqpExecuterBase : public TActorBootstrapped { ExecuterStateSpan.EndError(response.DebugString()); Request.Transactions.crop(0); - this->PassAway(); + this->Shutdown(); } protected: @@ -1846,6 +1846,12 @@ class TKqpExecuterBase : public TActorBootstrapped { } protected: + // Introduced separate method from `PassAway()` - to not get confused with expectations from other actors, + // that `PassAway()` should kill actor immediately. + virtual void Shutdown() { + PassAway(); + } + void PassAway() override { YQL_ENSURE(AlreadyReplied && ResponseEv); this->Send(Target, ResponseEv.release()); From 8555d1a9cfa8d7dd5dfca40c19dc79680b3c39bc Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Thu, 22 Aug 2024 12:26:30 +0300 Subject: [PATCH 07/12] Add tests (no test for node disconnect) --- ydb/core/kqp/ut/query/kqp_limits_ut.cpp | 113 ++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp index 0eebf6bec7dc..a86c3a25e597 100644 --- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp @@ -1,7 +1,10 @@ #include +#include #include +#include #include +#include #include #include @@ -830,6 +833,116 @@ Y_UNIT_TEST_SUITE(KqpLimits) { UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::TIMEOUT); } + /* Scenario: + - prepare and run query + - observe first EvState event from CA to Executer and replace it with EvAbortExecution + - count all EvState events from all CAs + - wait for final event EvTxResponse from Executer + - expect it to happen strictly after all EvState events + */ + Y_UNIT_TEST(WaitCAsStateOnAbort) { + TKikimrRunner kikimr(TKikimrSettings().SetUseRealThreads(false)); + auto db = kikimr.RunCall([&] { return kikimr.GetTableClient(); } ); + auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); } ); + + auto prepareResult = kikimr.RunCall([&] { return session.PrepareDataQuery(Q_(R"( + SELECT COUNT(*) FROM `/Root/TwoShard`; + )")).GetValueSync(); + }); + UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString()); + auto dataQuery = prepareResult.GetQuery(); + + bool firstEvState = false; + ui32 totalEvState = 0; + TActorId executerId; + ui32 actorCount = 3; // TODO: get number of actors properly. + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + runtime.SetObserverFunc([&](TAutoPtr& ev) { + if (ev->GetTypeRewrite() == NYql::NDq::TEvDqCompute::TEvState::EventType) { + if (!firstEvState) { + executerId = ev->Recipient; + ev = new IEventHandle(ev->Recipient, ev->Sender, + new NKikimr::NKqp::TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues())); + firstEvState = true; + } + ++totalEvState; + } + return TTestActorRuntime::EEventAction::PROCESS; + }); + + auto settings = TExecDataQuerySettings().OperationTimeout(TDuration::MilliSeconds(500)); + kikimr.RunInThreadPool([&] { return dataQuery.Execute(TTxControl::BeginTx().CommitTx(), settings).GetValueSync(); }); + + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&](IEventHandle& ev) { + return ev.GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType + && ev.Sender == executerId && totalEvState == actorCount*2; + }); + + UNIT_ASSERT(runtime.DispatchEvents(opts)); + } + + /* Scenario: + - prepare and run query + - observe first EvState event from CA to Executer and replace it with EvAbortExecution + - count all EvState events from all CAs + - drop final EvState event from last CA + - wait for final event EvTxResponse from Executer after timeout poison + - expect it to happen strictly after all EvState events + */ + Y_UNIT_TEST(WaitCAsTimeout) { + TKikimrRunner kikimr(TKikimrSettings().SetUseRealThreads(false)); + auto db = kikimr.RunCall([&] { return kikimr.GetTableClient(); } ); + auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); } ); + + auto prepareResult = kikimr.RunCall([&] { return session.PrepareDataQuery(Q_(R"( + SELECT COUNT(*) FROM `/Root/TwoShard`; + )")).GetValueSync(); + }); + UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString()); + auto dataQuery = prepareResult.GetQuery(); + + bool firstEvState = false; + bool timeoutPoison = false; + ui32 totalEvState = 0; + TActorId executerId; + ui32 actorCount = 3; // TODO: get number of actors properly. + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + runtime.SetObserverFunc([&](TAutoPtr& ev) { + if (ev->GetTypeRewrite() == NYql::NDq::TEvDqCompute::TEvState::EventType) { + if (!firstEvState) { + executerId = ev->Recipient; + ev = new IEventHandle(ev->Recipient, ev->Sender, + new NKikimr::NKqp::TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues())); + firstEvState = true; + } + ++totalEvState; + + if (totalEvState == actorCount*2) { + return TTestActorRuntime::EEventAction::DROP; + } + } + + timeoutPoison = ev->GetTypeRewrite() == TEvents::TEvPoison::EventType && totalEvState == actorCount*2 + && ev->Sender == executerId && ev->Recipient == executerId; + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + auto settings = TExecDataQuerySettings().OperationTimeout(TDuration::MilliSeconds(500)); + kikimr.RunInThreadPool([&] { return dataQuery.Execute(TTxControl::BeginTx().CommitTx(), settings).GetValueSync(); }); + + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&](IEventHandle& ev) { + return ev.GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType + && ev.Sender == executerId && totalEvState == actorCount*2 && timeoutPoison; + }); + + UNIT_ASSERT(runtime.DispatchEvents(opts)); + } + Y_UNIT_TEST(ReplySizeExceeded) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetTableClient(); From 7664e67be5ae7c9087e321b454f8d5b5bc3ae7fd Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Thu, 22 Aug 2024 13:03:43 +0300 Subject: [PATCH 08/12] Fix test and send stats --- .gitignore | 3 +++ ydb/core/kqp/ut/query/kqp_limits_ut.cpp | 7 ++++--- ydb/core/kqp/ut/query/ya.make | 2 +- ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 4 ++++ 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index f97992275a5d..bde4ce084232 100644 --- a/.gitignore +++ b/.gitignore @@ -26,6 +26,9 @@ __pycache__/ *.pb.h *.pb.cc +# Other generated +*.fbs.h + # MacOS specific .DS_Store diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp index a86c3a25e597..180e386507fb 100644 --- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp @@ -923,11 +923,12 @@ Y_UNIT_TEST_SUITE(KqpLimits) { if (totalEvState == actorCount*2) { return TTestActorRuntime::EEventAction::DROP; } + } else if (ev->GetTypeRewrite() == TEvents::TEvPoison::EventType && totalEvState == actorCount*2 && + ev->Sender == executerId && ev->Recipient == executerId) + { + timeoutPoison = true; } - timeoutPoison = ev->GetTypeRewrite() == TEvents::TEvPoison::EventType && totalEvState == actorCount*2 - && ev->Sender == executerId && ev->Recipient == executerId; - return TTestActorRuntime::EEventAction::PROCESS; }); diff --git a/ydb/core/kqp/ut/query/ya.make b/ydb/core/kqp/ut/query/ya.make index c80a662940cc..0a515f4b86e9 100644 --- a/ydb/core/kqp/ut/query/ya.make +++ b/ydb/core/kqp/ut/query/ya.make @@ -12,7 +12,7 @@ IF (WITH_VALGRIND) SIZE(LARGE) TAG(ya:fat) ELSE() - TIMEOUT(600) + TIMEOUT(60) SIZE(MEDIUM) ENDIF() diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 1e850495a161..d626c79beba5 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -520,6 +520,10 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped auto ev = MakeHolder(); auto& record = ev->Record; + if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) { + FillStats(record.MutableStats(), /* last */ true); + } + record.SetState(NDqProto::COMPUTE_STATE_FINISHED); record.SetStatusCode(NDqProto::StatusIds::ABORTED); record.SetTaskId(Task.GetId()); From 4965bc0cf8a9482474e15bd8aa1799bc773f1094 Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Thu, 22 Aug 2024 13:04:46 +0300 Subject: [PATCH 09/12] Fix ya.make --- ydb/core/kqp/ut/query/ya.make | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/kqp/ut/query/ya.make b/ydb/core/kqp/ut/query/ya.make index 0a515f4b86e9..c80a662940cc 100644 --- a/ydb/core/kqp/ut/query/ya.make +++ b/ydb/core/kqp/ut/query/ya.make @@ -12,7 +12,7 @@ IF (WITH_VALGRIND) SIZE(LARGE) TAG(ya:fat) ELSE() - TIMEOUT(60) + TIMEOUT(600) SIZE(MEDIUM) ENDIF() From 7b73908717315dc49842f0cf0c53b3a8cf32e0ef Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Thu, 22 Aug 2024 15:30:15 +0300 Subject: [PATCH 10/12] Improve compute actor code --- .../dq/actors/compute/dq_compute_actor_impl.h | 46 ++++++------------- 1 file changed, 14 insertions(+), 32 deletions(-) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index d626c79beba5..1718bb67e1a9 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -515,22 +515,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped RuntimeSettings.TerminateHandler(success, issues); } - // Send final state to executer to inform about termination. - if (!success) { - auto ev = MakeHolder(); - auto& record = ev->Record; - - if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) { - FillStats(record.MutableStats(), /* last */ true); - } - - record.SetState(NDqProto::COMPUTE_STATE_FINISHED); - record.SetStatusCode(NDqProto::StatusIds::ABORTED); - record.SetTaskId(Task.GetId()); - - this->Send(ExecuterId, ev.Release()); - } - this->PassAway(); Terminated = true; } @@ -570,7 +554,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } } - void ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) + void ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool forceTerminate = false) { auto execEv = MakeHolder(); auto& record = execEv->Record; @@ -591,7 +575,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped this->Send(ExecuterId, execEv.Release()); - if (Checkpoints && State == NDqProto::COMPUTE_STATE_FINISHED) { + if (!forceTerminate && Checkpoints && State == NDqProto::COMPUTE_STATE_FINISHED) { // checkpointed CAs must not self-destroy return; } @@ -1048,10 +1032,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped auto tag = (EEvWakeupTag) ev->Get()->Tag; switch (tag) { case EEvWakeupTag::TimeoutTag: { - auto abortEv = MakeHolder(NYql::NDqProto::StatusIds::TIMEOUT, TStringBuilder() - << "Timeout event from compute actor " << this->SelfId() - << ", TxId: " << TxId << ", task: " << Task.GetId()); - if (ComputeActorSpan) { ComputeActorSpan.EndError( TStringBuilder() @@ -1060,10 +1040,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped ); } - this->Send(ExecuterId, abortEv.Release()); - - TerminateSources("timeout exceeded", false); - Terminate(false, "timeout exceeded"); + State = NDqProto::COMPUTE_STATE_FAILURE; + ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::TIMEOUT, {TIssue("timeout exceeded")}, true); break; } case EEvWakeupTag::PeriodicStatsTag: { @@ -1087,8 +1065,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped switch (lostEventType) { case TEvDqCompute::TEvState::EventType: { CA_LOG_E("Handle undelivered TEvState event, abort execution"); - this->TerminateSources("executer lost", false); - Terminate(false, "executer lost"); + + TerminateSources("executer lost", false); + Terminate(false, "executer lost"); // Executer lost - no need to report state break; } default: { @@ -1134,14 +1113,17 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, *ev->Get()->GetIssues().begin()); return; } + TIssues issues = ev->Get()->GetIssues(); CA_LOG_E("Handle abort execution event from: " << ev->Sender << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(ev->Get()->Record.GetStatusCode()) << ", reason: " << issues.ToOneLineString()); - bool success = ev->Get()->Record.GetStatusCode() == NYql::NDqProto::StatusIds::SUCCESS; - - this->TerminateSources(issues, success); + if (ev->Get()->Record.GetStatusCode() == NYql::NDqProto::StatusIds::SUCCESS) { + State = NDqProto::COMPUTE_STATE_FINISHED; + } else { + State = NDqProto::COMPUTE_STATE_FAILURE; + } if (ev->Sender != ExecuterId) { if (ComputeActorSpan) { @@ -1151,7 +1133,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped NActors::TActivationContext::Send(ev->Forward(ExecuterId)); } - Terminate(success, issues); + ReportStateAndMaybeDie(ev->Get()->Record.GetStatusCode(), issues, true); } void HandleExecuteBase(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { From 4bbb822623020b289a9b092598f11e3388fc03cb Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Thu, 22 Aug 2024 15:40:48 +0300 Subject: [PATCH 11/12] Handle all sorts of EvState events on shutdown --- .../kqp/executer_actor/kqp_data_executer.cpp | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index bad588f2d383..316cc8d6d53c 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2641,23 +2641,15 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet()->Record.GetState() == NDqProto::COMPUTE_STATE_FINISHED) { - YQL_ENSURE(Planner); - - if (ev->Get()->Record.GetStatusCode() != NDqProto::StatusIds::ABORTED) { - return; - } + YQL_ENSURE(Planner); - TActorId actor = ev->Sender; - ui64 taskId = ev->Get()->Record.GetTaskId(); + TActorId actor = ev->Sender; + ui64 taskId = ev->Get()->Record.GetTaskId(); - Planner->CompletedCA(taskId, actor); + Planner->CompletedCA(taskId, actor); - if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { - PassAway(); - } - } else { - // TODO: handle other states. + if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { + PassAway(); } } From 15c274a1fc7b81be440751584e09efa12b8b9d4d Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Thu, 22 Aug 2024 19:10:20 +0300 Subject: [PATCH 12/12] One more fix --- ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 15 +++++++++------ ydb/core/kqp/ut/query/kqp_limits_ut.cpp | 13 ++++++++----- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 316cc8d6d53c..1d18d034ac9d 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2601,6 +2601,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { + LOG_I("Shutdown immediately - nothing to wait"); PassAway(); } else { this->Become(&TThis::WaitShutdownState); @@ -2641,15 +2642,17 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) { + YQL_ENSURE(Planner); - TActorId actor = ev->Sender; - ui64 taskId = ev->Get()->Record.GetTaskId(); + TActorId actor = ev->Sender; + ui64 taskId = ev->Get()->Record.GetTaskId(); - Planner->CompletedCA(taskId, actor); + Planner->CompletedCA(taskId, actor); - if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { - PassAway(); + if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { + PassAway(); + } } } diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp index 180e386507fb..1d42b49de241 100644 --- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp @@ -860,14 +860,17 @@ Y_UNIT_TEST_SUITE(KqpLimits) { auto& runtime = *kikimr.GetTestServer().GetRuntime(); runtime.SetObserverFunc([&](TAutoPtr& ev) { if (ev->GetTypeRewrite() == NYql::NDq::TEvDqCompute::TEvState::EventType) { + ++totalEvState; if (!firstEvState) { executerId = ev->Recipient; ev = new IEventHandle(ev->Recipient, ev->Sender, new NKikimr::NKqp::TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues())); firstEvState = true; } - ++totalEvState; + } else if (ev->GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType && ev->Sender == executerId) { + UNIT_ASSERT_C(totalEvState == actorCount*2, "Executer sent response before waiting for CAs"); } + return TTestActorRuntime::EEventAction::PROCESS; }); @@ -912,21 +915,21 @@ Y_UNIT_TEST_SUITE(KqpLimits) { auto& runtime = *kikimr.GetTestServer().GetRuntime(); runtime.SetObserverFunc([&](TAutoPtr& ev) { if (ev->GetTypeRewrite() == NYql::NDq::TEvDqCompute::TEvState::EventType) { + ++totalEvState; if (!firstEvState) { executerId = ev->Recipient; ev = new IEventHandle(ev->Recipient, ev->Sender, new NKikimr::NKqp::TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues())); firstEvState = true; - } - ++totalEvState; - - if (totalEvState == actorCount*2) { + } else { return TTestActorRuntime::EEventAction::DROP; } } else if (ev->GetTypeRewrite() == TEvents::TEvPoison::EventType && totalEvState == actorCount*2 && ev->Sender == executerId && ev->Recipient == executerId) { timeoutPoison = true; + } else if (ev->GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType && ev->Sender == executerId) { + UNIT_ASSERT_C(timeoutPoison, "Executer sent response before waiting for CAs"); } return TTestActorRuntime::EEventAction::PROCESS;