Skip to content

Apply the last stats received from terminated CAs #8356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
CancelProposal(0);
}
HandleComputeStats(ev);
HandleComputeState(ev);
}

void HandlePrepare(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
Expand Down Expand Up @@ -1008,7 +1008,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
hFunc(TEvDqCompute::TEvState, HandleComputeState);
hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
Expand Down Expand Up @@ -2607,6 +2607,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
this->Become(&TThis::WaitShutdownState);
LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
<< Planner->GetPendingComputeActors().size() << " compute actors");
// TODO(ilezhankin): the CA awaiting timeout should be configurable.
TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
}
} else {
Expand Down Expand Up @@ -2642,17 +2643,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}

void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
YQL_ENSURE(Planner);

TActorId actor = ev->Sender;
ui64 taskId = ev->Get()->Record.GetTaskId();

Planner->CompletedCA(taskId, actor);
HandleComputeStats(ev);

if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
PassAway();
}
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
PassAway();
}
}

Expand Down
57 changes: 39 additions & 18 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId);
}

void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
bool HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
TActorId computeActor = ev->Sender;
auto& state = ev->Get()->Record;
ui64 taskId = state.GetTaskId();
Expand Down Expand Up @@ -409,7 +409,39 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
}

YQL_ENSURE(Planner);
bool populateChannels = Planner->AcknowledgeCA(taskId, computeActor, &state);
bool ack = Planner->AcknowledgeCA(taskId, computeActor, &state);

switch (state.GetState()) {
case NYql::NDqProto::COMPUTE_STATE_FAILURE:
case NYql::NDqProto::COMPUTE_STATE_FINISHED:
// Don't finalize stats twice.
if (Planner->CompletedCA(taskId, computeActor)) {
ExtraData[computeActor].Swap(state.MutableExtraData());

if (Stats) {
Stats->AddComputeActorStats(
computeActor.NodeId(),
std::move(*state.MutableStats()),
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
);
}

LastTaskId = taskId;
LastComputeActorId = computeActor.ToString();
}
default:
; // ignore all other states.
}

return ack;
}

void HandleComputeState(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
TActorId computeActor = ev->Sender;
auto& state = ev->Get()->Record;
ui64 taskId = state.GetTaskId();

bool populateChannels = HandleComputeStats(ev);

switch (state.GetState()) {
case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
Expand All @@ -427,22 +459,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
break;
}

case NYql::NDqProto::COMPUTE_STATE_FAILURE:
case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
ExtraData[computeActor].Swap(state.MutableExtraData());
if (Stats) {
Stats->AddComputeActorStats(
computeActor.NodeId(),
std::move(*state.MutableStats()),
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
);
}

LastTaskId = taskId;
LastComputeActorId = computeActor.ToString();
YQL_ENSURE(Planner);
Planner->CompletedCA(taskId, computeActor);
}
default:
; // ignore all other states.
}

if (state.GetState() == NYql::NDqProto::COMPUTE_STATE_FAILURE) {
Expand Down Expand Up @@ -1854,6 +1872,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {

void PassAway() override {
YQL_ENSURE(AlreadyReplied && ResponseEv);

// Actualize stats with the last stats from terminated CAs, but keep the status.
FillResponseStats(ResponseEv->Record.GetResponse().GetStatus());
this->Send(Target, ResponseEv.release());

for (auto channelPair: ResultChannelProxies) {
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -542,11 +542,11 @@ bool TKqpPlanner::AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::
return false;
}

void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
bool TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
auto& task = TasksGraph.GetTask(taskId);
if (task.Meta.Completed) {
YQL_ENSURE(!PendingComputeActors.contains(computeActor));
return;
return false;
}

task.Meta.Completed = true;
Expand All @@ -556,6 +556,8 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
PendingComputeActors.erase(it);

LOG_I("Compute actor has finished execution: " << computeActor.ToString());

return true;
}

void TKqpPlanner::TaskNotStarted(ui64 taskId) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class TKqpPlanner {
std::unique_ptr<IEventHandle> PlanExecution();
std::unique_ptr<IEventHandle> AssignTasksToNodes();
bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state);
void CompletedCA(ui64 taskId, TActorId computeActor);
bool CompletedCA(ui64 taskId, TActorId computeActor);
void TaskNotStarted(ui64 taskId);
TProgressStat::TEntry CalculateConsumptionUpdate();
void ShiftConsumption();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
STATEFN(ExecuteState) {
try {
switch (ev->GetTypeRewrite()) {
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
hFunc(TEvDqCompute::TEvState, HandleComputeState);
hFunc(TEvDqCompute::TEvChannelData, HandleChannelData); // from CA
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
Expand Down
Loading