Skip to content

Commit 15c274a

Browse files
committed
One more fix
1 parent 4bbb822 commit 15c274a

File tree

2 files changed

+17
-11
lines changed

2 files changed

+17
-11
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2601,6 +2601,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
26012601
void Shutdown() override {
26022602
if (Planner) {
26032603
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2604+
LOG_I("Shutdown immediately - nothing to wait");
26042605
PassAway();
26052606
} else {
26062607
this->Become(&TThis::WaitShutdownState);
@@ -2641,15 +2642,17 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
26412642
}
26422643

26432644
void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
2644-
YQL_ENSURE(Planner);
2645+
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
2646+
YQL_ENSURE(Planner);
26452647

2646-
TActorId actor = ev->Sender;
2647-
ui64 taskId = ev->Get()->Record.GetTaskId();
2648+
TActorId actor = ev->Sender;
2649+
ui64 taskId = ev->Get()->Record.GetTaskId();
26482650

2649-
Planner->CompletedCA(taskId, actor);
2651+
Planner->CompletedCA(taskId, actor);
26502652

2651-
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2652-
PassAway();
2653+
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2654+
PassAway();
2655+
}
26532656
}
26542657
}
26552658

ydb/core/kqp/ut/query/kqp_limits_ut.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -860,14 +860,17 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
860860
auto& runtime = *kikimr.GetTestServer().GetRuntime();
861861
runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
862862
if (ev->GetTypeRewrite() == NYql::NDq::TEvDqCompute::TEvState::EventType) {
863+
++totalEvState;
863864
if (!firstEvState) {
864865
executerId = ev->Recipient;
865866
ev = new IEventHandle(ev->Recipient, ev->Sender,
866867
new NKikimr::NKqp::TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues()));
867868
firstEvState = true;
868869
}
869-
++totalEvState;
870+
} else if (ev->GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType && ev->Sender == executerId) {
871+
UNIT_ASSERT_C(totalEvState == actorCount*2, "Executer sent response before waiting for CAs");
870872
}
873+
871874
return TTestActorRuntime::EEventAction::PROCESS;
872875
});
873876

@@ -912,21 +915,21 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
912915
auto& runtime = *kikimr.GetTestServer().GetRuntime();
913916
runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
914917
if (ev->GetTypeRewrite() == NYql::NDq::TEvDqCompute::TEvState::EventType) {
918+
++totalEvState;
915919
if (!firstEvState) {
916920
executerId = ev->Recipient;
917921
ev = new IEventHandle(ev->Recipient, ev->Sender,
918922
new NKikimr::NKqp::TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues()));
919923
firstEvState = true;
920-
}
921-
++totalEvState;
922-
923-
if (totalEvState == actorCount*2) {
924+
} else {
924925
return TTestActorRuntime::EEventAction::DROP;
925926
}
926927
} else if (ev->GetTypeRewrite() == TEvents::TEvPoison::EventType && totalEvState == actorCount*2 &&
927928
ev->Sender == executerId && ev->Recipient == executerId)
928929
{
929930
timeoutPoison = true;
931+
} else if (ev->GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType && ev->Sender == executerId) {
932+
UNIT_ASSERT_C(timeoutPoison, "Executer sent response before waiting for CAs");
930933
}
931934

932935
return TTestActorRuntime::EEventAction::PROCESS;

0 commit comments

Comments
 (0)