Skip to content

Commit 88a9d6c

Browse files
committed
Revert "Apply the last stats received from terminated CAs (ydb-platform#8356)"
This reverts commit 93089ee.
1 parent 3519a81 commit 88a9d6c

File tree

5 files changed

+38
-56
lines changed

5 files changed

+38
-56
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
547547
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
548548
CancelProposal(0);
549549
}
550-
HandleComputeState(ev);
550+
HandleComputeStats(ev);
551551
}
552552

553553
void HandlePrepare(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
@@ -1016,7 +1016,7 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
10161016
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
10171017
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
10181018
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
1019-
hFunc(TEvDqCompute::TEvState, HandleComputeState);
1019+
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
10201020
hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
10211021
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
10221022
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
@@ -2258,7 +2258,7 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
22582258
// Volatile transactions must always use generic readsets
22592259
VolatileTx ||
22602260
// Transactions with topics must always use generic readsets
2261-
!topicTxs.empty() ||
2261+
!topicTxs.empty() ||
22622262
// HTAP transactions always use generic readsets
22632263
!evWriteTxs.empty());
22642264

@@ -2644,8 +2644,7 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
26442644
} else {
26452645
this->Become(&TThis::WaitShutdownState);
26462646
LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
2647-
<< Planner->GetPendingComputeActors().size() << " compute actors");
2648-
// TODO(ilezhankin): the CA awaiting timeout should be configurable.
2647+
<< Planner->GetPendingComputeActors().size() << " compute actors");
26492648
TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
26502649
}
26512650
} else {
@@ -2681,10 +2680,17 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
26812680
}
26822681

26832682
void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
2684-
HandleComputeStats(ev);
2683+
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
2684+
YQL_ENSURE(Planner);
26852685

2686-
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2687-
PassAway();
2686+
TActorId actor = ev->Sender;
2687+
ui64 taskId = ev->Get()->Record.GetTaskId();
2688+
2689+
Planner->CompletedCA(taskId, actor);
2690+
2691+
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2692+
PassAway();
2693+
}
26882694
}
26892695
}
26902696

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 20 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
378378
this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId);
379379
}
380380

381-
bool HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
381+
void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
382382
TActorId computeActor = ev->Sender;
383383
auto& state = ev->Get()->Record;
384384
ui64 taskId = state.GetTaskId();
@@ -407,42 +407,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
407407
}
408408

409409
YQL_ENSURE(Planner);
410-
bool ack = Planner->AcknowledgeCA(taskId, computeActor, &state);
411-
412-
switch (state.GetState()) {
413-
case NYql::NDqProto::COMPUTE_STATE_FAILURE:
414-
case NYql::NDqProto::COMPUTE_STATE_FINISHED:
415-
// Don't finalize stats twice.
416-
if (Planner->CompletedCA(taskId, computeActor)) {
417-
auto& extraData = ExtraData[computeActor];
418-
extraData.TaskId = taskId;
419-
extraData.Data.Swap(state.MutableExtraData());
420-
421-
422-
if (Stats) {
423-
Stats->AddComputeActorStats(
424-
computeActor.NodeId(),
425-
std::move(*state.MutableStats()),
426-
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
427-
);
428-
}
429-
430-
LastTaskId = taskId;
431-
LastComputeActorId = computeActor.ToString();
432-
}
433-
default:
434-
; // ignore all other states.
435-
}
436-
437-
return ack;
438-
}
439-
440-
void HandleComputeState(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
441-
TActorId computeActor = ev->Sender;
442-
auto& state = ev->Get()->Record;
443-
ui64 taskId = state.GetTaskId();
444-
445-
bool populateChannels = HandleComputeStats(ev);
410+
bool populateChannels = Planner->AcknowledgeCA(taskId, computeActor, &state);
446411

447412
switch (state.GetState()) {
448413
case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
@@ -460,8 +425,24 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
460425
break;
461426
}
462427

463-
default:
464-
; // ignore all other states.
428+
case NYql::NDqProto::COMPUTE_STATE_FAILURE:
429+
case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
430+
auto& extraData = ExtraData[computeActor];
431+
extraData.TaskId = taskId;
432+
extraData.Data.Swap(state.MutableExtraData());
433+
if (Stats) {
434+
Stats->AddComputeActorStats(
435+
computeActor.NodeId(),
436+
std::move(*state.MutableStats()),
437+
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
438+
);
439+
}
440+
441+
LastTaskId = taskId;
442+
LastComputeActorId = computeActor.ToString();
443+
YQL_ENSURE(Planner);
444+
Planner->CompletedCA(taskId, computeActor);
445+
}
465446
}
466447

467448
if (state.GetState() == NYql::NDqProto::COMPUTE_STATE_FAILURE) {
@@ -1851,9 +1832,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18511832

18521833
void PassAway() override {
18531834
YQL_ENSURE(AlreadyReplied && ResponseEv);
1854-
1855-
// Actualize stats with the last stats from terminated CAs, but keep the status.
1856-
FillResponseStats(ResponseEv->Record.GetResponse().GetStatus());
18571835
this->Send(Target, ResponseEv.release());
18581836

18591837
for (auto channelPair: ResultChannelProxies) {

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -626,11 +626,11 @@ bool TKqpPlanner::AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::
626626
return false;
627627
}
628628

629-
bool TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
629+
void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
630630
auto& task = TasksGraph.GetTask(taskId);
631631
if (task.Meta.Completed) {
632632
YQL_ENSURE(!PendingComputeActors.contains(computeActor));
633-
return false;
633+
return;
634634
}
635635

636636
task.Meta.Completed = true;
@@ -640,8 +640,6 @@ bool TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
640640
PendingComputeActors.erase(it);
641641

642642
LOG_I("Compute actor has finished execution: " << computeActor.ToString());
643-
644-
return true;
645643
}
646644

647645
void TKqpPlanner::TaskNotStarted(ui64 taskId) {

ydb/core/kqp/executer_actor/kqp_planner.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class TKqpPlanner {
7575
std::unique_ptr<IEventHandle> PlanExecution();
7676
std::unique_ptr<IEventHandle> AssignTasksToNodes();
7777
bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state);
78-
bool CompletedCA(ui64 taskId, TActorId computeActor);
78+
void CompletedCA(ui64 taskId, TActorId computeActor);
7979
void TaskNotStarted(ui64 taskId);
8080
TProgressStat::TEntry CalculateConsumptionUpdate();
8181
void ShiftConsumption();

ydb/core/kqp/executer_actor/kqp_scan_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
110110
STATEFN(ExecuteState) {
111111
try {
112112
switch (ev->GetTypeRewrite()) {
113-
hFunc(TEvDqCompute::TEvState, HandleComputeState);
113+
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
114114
hFunc(TEvDqCompute::TEvChannelData, HandleChannelData); // from CA
115115
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
116116
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);

0 commit comments

Comments
 (0)