From 0790396054c812b211fdf17a0e5789fb0106f274 Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Tue, 13 Aug 2024 01:57:34 +0300 Subject: [PATCH] add stats for queries with errors --- .../kqp/executer_actor/kqp_data_executer.cpp | 34 ---- .../kqp/executer_actor/kqp_executer_impl.h | 146 +++++++----------- ydb/core/kqp/executer_actor/kqp_planner.cpp | 81 ++++++++-- ydb/core/kqp/executer_actor/kqp_planner.h | 9 +- .../kqp/executer_actor/kqp_scan_executer.cpp | 26 ---- ydb/core/kqp/executer_actor/kqp_tasks_graph.h | 1 + .../kqp/session_actor/kqp_session_actor.cpp | 21 ++- ydb/core/kqp/ut/query/kqp_limits_ut.cpp | 38 ++++- ydb/core/kqp/ut/query/kqp_stats_ut.cpp | 35 +++-- 9 files changed, 206 insertions(+), 185 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 807f20db6ed4..fd16b7b441a6 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -204,40 +204,6 @@ class TKqpDataExecuter : public TKqpExecuterBaseCollectStatsByLongTasks && HasOlapTable; - } - - void FillResponseStats(Ydb::StatusIds::StatusCode status) { - auto& response = *ResponseEv->Record.MutableResponse(); - - response.SetStatus(status); - - if (Stats) { - ReportEventElapsedTime(); - - Stats->FinishTs = TInstant::Now(); - Stats->Finish(); - - if (LogStatsByLongTasks() || CollectFullStats(Request.StatsMode)) { - for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) { - const auto& tx = Request.Transactions[txId].Body; - auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats()); - response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats); - } - } - - if (LogStatsByLongTasks()) { - const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats(); - if (!txPlansWithStats.empty()) { - LOG_N("Full stats: " << txPlansWithStats); - } - } - - Stats.reset(); - } - } - void Finalize() { if (LocksBroken) { TString message = "Transaction locks invalidated."; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 0555a10cb3fa..3165625a01aa 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -408,48 +408,28 @@ class TKqpExecuterBase : public TActorBootstrapped { } } + YQL_ENSURE(Planner); + bool populateChannels = Planner->AcknowledgeCA(taskId, computeActor, &state); + switch (state.GetState()) { case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: { YQL_ENSURE(false, "unexpected state from " << computeActor << ", task: " << taskId); return; } - case NYql::NDqProto::COMPUTE_STATE_FAILURE: { - ReplyErrorAndDie(NYql::NDq::DqStatusToYdbStatus(state.GetStatusCode()), state.MutableIssues()); - return; - } - case NYql::NDqProto::COMPUTE_STATE_EXECUTING: { - // initial TEvState event from Compute Actor - // there can be race with RM answer - if (Planner) { - if (Planner->GetPendingComputeTasks().erase(taskId)) { - auto it = Planner->GetPendingComputeActors().emplace(computeActor, TProgressStat()); - YQL_ENSURE(it.second); - - if (state.HasStats()) { - it.first->second.Set(state.GetStats()); - } - - auto& task = TasksGraph.GetTask(taskId); - task.ComputeActorId = computeActor; - - THashMap> updates; - CollectTaskChannelsUpdates(task, updates); - PropagateChannelsUpdates(updates); - } else { - auto it = Planner->GetPendingComputeActors().find(computeActor); - if (it != Planner->GetPendingComputeActors().end()) { - if (state.HasStats()) { - it->second.Set(state.GetStats()); - } - } - } + if (populateChannels) { + auto& task = TasksGraph.GetTask(taskId); + THashMap> updates; + CollectTaskChannelsUpdates(task, updates); + PropagateChannelsUpdates(updates); } break; } + case NYql::NDqProto::COMPUTE_STATE_FAILURE: case NYql::NDqProto::COMPUTE_STATE_FINISHED: { + ExtraData[computeActor].Swap(state.MutableExtraData()); if (Stats) { Stats->AddComputeActorStats( computeActor.NodeId(), @@ -457,37 +437,19 @@ class TKqpExecuterBase : public TActorBootstrapped { TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs()) ); } - ExtraData[computeActor].Swap(state.MutableExtraData()); LastTaskId = taskId; LastComputeActorId = computeActor.ToString(); - - if (Planner) { - auto it = Planner->GetPendingComputeActors().find(computeActor); - if (it == Planner->GetPendingComputeActors().end()) { - LOG_W("Got execution state for compute actor: " << computeActor - << ", task: " << taskId - << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState()) - << ", too early (waiting reply from RM)"); - - if (Planner && Planner->GetPendingComputeTasks().erase(taskId)) { - LOG_E("Got execution state for compute actor: " << computeActor - << ", for unknown task: " << state.GetTaskId() - << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState())); - return; - } - } else { - if (state.HasStats()) { - it->second.Set(state.GetStats()); - } - LastStats.emplace_back(std::move(it->second)); - Planner->GetPendingComputeActors().erase(it); - YQL_ENSURE(Planner->GetPendingComputeTasks().find(taskId) == Planner->GetPendingComputeTasks().end()); - } - } + YQL_ENSURE(Planner); + Planner->CompletedCA(taskId, computeActor); } } + if (state.GetState() == NYql::NDqProto::COMPUTE_STATE_FAILURE) { + ReplyErrorAndDie(NYql::NDq::DqStatusToYdbStatus(state.GetStatusCode()), state.MutableIssues()); + return; + } + static_cast(this)->CheckExecutionComplete(); } @@ -683,20 +645,14 @@ class TKqpExecuterBase : public TActorBootstrapped { auto taskId = startedTask.GetTaskId(); auto& task = TasksGraph.GetTask(taskId); - task.ComputeActorId = ActorIdFromProto(startedTask.GetActorId()); - - LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId); - - if (Planner) { - if (Planner->GetPendingComputeTasks().erase(taskId) == 0) { - LOG_D("Executing task: " << taskId << ", compute actor: " << task.ComputeActorId << ", already finished"); - } else { - auto result = Planner->GetPendingComputeActors().emplace(std::make_pair(task.ComputeActorId, TProgressStat())); - YQL_ENSURE(result.second); - - CollectTaskChannelsUpdates(task, channelsUpdates); - } + TActorId computeActorId = ActorIdFromProto(startedTask.GetActorId()); + LOG_D("Executing task: " << taskId << " on compute actor: " << computeActorId); + YQL_ENSURE(Planner); + bool channelUpdates = Planner->AcknowledgeCA(taskId, computeActorId, nullptr); + if (channelUpdates) { + CollectTaskChannelsUpdates(task, channelsUpdates); } + } PropagateChannelsUpdates(channelsUpdates); @@ -789,16 +745,9 @@ class TKqpExecuterBase : public TActorBootstrapped { LastResourceUsageUpdate = now; TProgressStat::TEntry consumption; - if (Planner) { - for (const auto& p : Planner->GetPendingComputeActors()) { - const auto& t = p.second.GetLastUsage(); - consumption += t; - } - } - for (const auto& p : LastStats) { - const auto& t = p.GetLastUsage(); - consumption += t; + if (Planner) { + consumption += Planner->CalculateConsumptionUpdate(); } auto ru = NRuCalc::CalcRequestUnit(consumption); @@ -811,13 +760,7 @@ class TKqpExecuterBase : public TActorBootstrapped { return; if (Planner) { - for (auto& p : Planner->GetPendingComputeActors()) { - p.second.Update(); - } - } - - for (auto& p : LastStats) { - p.Update(); + Planner->ShiftConsumption(); } if (Request.RlPath) { @@ -1758,7 +1701,7 @@ class TKqpExecuterBase : public TActorBootstrapped { ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status)); } - static_cast(this)->FillResponseStats(Ydb::StatusIds::TIMEOUT); + FillResponseStats(Ydb::StatusIds::TIMEOUT); // TEvAbortExecution can come from either ComputeActor or SessionActor (== Target). if (abortSender != Target) { @@ -1775,6 +1718,34 @@ class TKqpExecuterBase : public TActorBootstrapped { this->PassAway(); } + void FillResponseStats(Ydb::StatusIds::StatusCode status) { + auto& response = *ResponseEv->Record.MutableResponse(); + + response.SetStatus(status); + + if (Stats) { + ReportEventElapsedTime(); + + Stats->FinishTs = TInstant::Now(); + Stats->Finish(); + + if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) { + for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) { + const auto& tx = Request.Transactions[txId].Body; + auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats()); + response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats); + } + } + + if (Stats->CollectStatsByLongTasks) { + const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats(); + if (!txPlansWithStats.empty()) { + LOG_N("Full stats: " << txPlansWithStats); + } + } + } + } + virtual void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, google::protobuf::RepeatedPtrField* issues) { @@ -1794,7 +1765,8 @@ class TKqpExecuterBase : public TActorBootstrapped { AlreadyReplied = true; auto& response = *ResponseEv->Record.MutableResponse(); - response.SetStatus(status); + FillResponseStats(status); + response.MutableIssues()->Swap(issues); LOG_T("ReplyErrorAndDie. Response: " << response.DebugString() @@ -1972,8 +1944,6 @@ class TKqpExecuterBase : public TActorBootstrapped { TActorId KqpShardsResolverId; THashMap ExtraData; - TVector LastStats; - TInstant StartResolveTime; TInstant LastResourceUsageUpdate; diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 5d0a24d26964..3a13c9fb7f6e 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -389,12 +389,7 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize) TActorId* actorId = std::get_if(&startResult); Y_ABORT_UNLESS(actorId); - task.ComputeActorId = *actorId; - - LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId); - - auto result = PendingComputeActors.emplace(task.ComputeActorId, TProgressStat()); - YQL_ENSURE(result.second); + AcknowledgeCA(taskId, *actorId, nullptr); return TString(); } @@ -479,8 +474,6 @@ std::unique_ptr TKqpPlanner::PlanExecution() { if (!result.empty()) { return MakeActorStartFailureError(ExecuterId, result); } - - PendingComputeTasks.erase(taskId); } } } @@ -522,11 +515,79 @@ void TKqpPlanner::Unsubscribe() { } } -THashMap& TKqpPlanner::GetPendingComputeActors() { +bool TKqpPlanner::AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state) { + auto& task = TasksGraph.GetTask(taskId); + if (!task.ComputeActorId) { + task.ComputeActorId = computeActor; + PendingComputeTasks.erase(taskId); + auto [it, success] = PendingComputeActors.try_emplace(computeActor); + YQL_ENSURE(success); + if (state && state->HasStats()) { + it->second.Set(state->GetStats()); + } + + return true; + } + + YQL_ENSURE(task.ComputeActorId == computeActor); + auto it = PendingComputeActors.find(computeActor); + if (!task.Meta.Completed) { + YQL_ENSURE(it != PendingComputeActors.end()); + } + + if (it != PendingComputeActors.end() && state && state->HasStats()) { + it->second.Set(state->GetStats()); + } + + return false; +} + +void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) { + auto& task = TasksGraph.GetTask(taskId); + if (task.Meta.Completed) { + YQL_ENSURE(!PendingComputeActors.contains(computeActor)); + return; + } + + task.Meta.Completed = true; + auto it = PendingComputeActors.find(computeActor); + YQL_ENSURE(it != PendingComputeActors.end()); + LastStats.emplace_back(std::move(it->second)); + PendingComputeActors.erase(it); + return; +} + +TProgressStat::TEntry TKqpPlanner::CalculateConsumptionUpdate() { + TProgressStat::TEntry consumption; + + for (const auto& p : PendingComputeActors) { + const auto& t = p.second.GetLastUsage(); + consumption += t; + } + + for (const auto& p : LastStats) { + const auto& t = p.GetLastUsage(); + consumption += t; + } + + return consumption; +} + +void TKqpPlanner::ShiftConsumption() { + for (auto& p : PendingComputeActors) { + p.second.Update(); + } + + for (auto& p : LastStats) { + p.Update(); + } +} + +const THashMap& TKqpPlanner::GetPendingComputeActors() { return PendingComputeActors; } -THashSet& TKqpPlanner::GetPendingComputeTasks() { +const THashSet& TKqpPlanner::GetPendingComputeTasks() { return PendingComputeTasks; } diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 03ce07758cf5..639c53737055 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -72,12 +72,16 @@ class TKqpPlanner { bool SendStartKqpTasksRequest(ui32 requestId, const TActorId& target); std::unique_ptr PlanExecution(); std::unique_ptr AssignTasksToNodes(); + bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state); + void CompletedCA(ui64 taskId, TActorId computeActor); + TProgressStat::TEntry CalculateConsumptionUpdate(); + void ShiftConsumption(); void Submit(); ui32 GetCurrentRetryDelay(ui32 requestId); void Unsubscribe(); - THashMap& GetPendingComputeActors(); - THashSet& GetPendingComputeTasks(); + const THashMap& GetPendingComputeActors(); + const THashSet& GetPendingComputeTasks(); ui32 GetnScanTasks(); ui32 GetnComputeTasks(); @@ -134,6 +138,7 @@ class TKqpPlanner { std::shared_ptr ResourceManager_; std::shared_ptr CaFactory_; TIntrusivePtr TxInfo; + TVector LastStats; public: static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 3edc45600f94..277e77da71e0 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -272,32 +272,6 @@ class TKqpScanExecuter : public TKqpExecuterBaseRecord.MutableResponse(); - - response.SetStatus(status); - - if (Stats) { - ReportEventElapsedTime(); - - Stats->FinishTs = TInstant::Now(); - Stats->Finish(); - - if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) { - const auto& tx = Request.Transactions[0].Body; - auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats()); - response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats); - } - - if (Stats->CollectStatsByLongTasks) { - const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats(); - if (!txPlansWithStats.empty()) { - LOG_N("Full stats: " << txPlansWithStats); - } - } - } - } - void Finalize() { FillResponseStats(Ydb::StatusIds::SUCCESS); diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index e9141c7d5f84..1fd735a5062c 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -171,6 +171,7 @@ struct TTaskMeta { ui32 Type = Unknown; TActorId ResultChannelActorId; + bool Completed = false; THashMap TaskParams; // Params for sources/sinks TVector ReadRanges; // Partitioning for sources THashMap SecureParams; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 65bd3b1b5c45..bf8bfd5014c7 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -352,8 +352,9 @@ class TKqpSessionActor : public TActorBootstrapped { auto abortEv = TEvKqp::TEvAbortExecution::Aborted("Client lost"); // any status code can be here Send(ExecuterId, abortEv.Release()); + } else { + Cleanup(); } - Cleanup(); } void Handle(TEvKqp::TEvQueryRequest::TPtr& ev) { @@ -1425,6 +1426,12 @@ class TKqpSessionActor : public TActorBootstrapped { ExecuterId = TActorId{}; + auto& executerResults = *response->MutableResult(); + if (executerResults.HasStats()) { + QueryState->QueryStats.Executions.emplace_back(); + QueryState->QueryStats.Executions.back().Swap(executerResults.MutableStats()); + } + if (response->GetStatus() != Ydb::StatusIds::SUCCESS) { const auto executionType = ev->ExecutionType; @@ -1480,16 +1487,10 @@ class TKqpSessionActor : public TActorBootstrapped { QueryState->TxCtx->Locks.LockHandle = std::move(ev->LockHandle); } - auto& executerResults = *response->MutableResult(); if (!MergeLocksWithTxResult(executerResults)) { return; } - if (executerResults.HasStats()) { - QueryState->QueryStats.Executions.emplace_back(); - QueryState->QueryStats.Executions.back().Swap(executerResults.MutableStats()); - } - if (!response->GetIssues().empty()){ NYql::IssuesFromMessage(response->GetIssues(), QueryState->Issues); } @@ -1583,6 +1584,10 @@ class TKqpSessionActor : public TActorBootstrapped { void FillStats(NKikimrKqp::TEvQueryResponse* record) { YQL_ENSURE(QueryState); + // workaround to ensure that request was not transfered to worker. + if (WorkerId || !QueryState->RequestEv) { + return; + } FillSystemViewQueryStats(record); @@ -2218,6 +2223,8 @@ class TKqpSessionActor : public TActorBootstrapped { } } + FillStats(&QueryResponse->Record.GetRef()); + if (issues) { for (auto& i : *issues) { response->AddQueryIssues()->Swap(&i); diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp index eb88d79fe861..0eebf6bec7dc 100644 --- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp @@ -14,7 +14,7 @@ using namespace NYdb::NTable; using namespace NResourceBroker; -NKikimrResourceBroker::TResourceBrokerConfig MakeResourceBrokerTestConfig() { +NKikimrResourceBroker::TResourceBrokerConfig MakeResourceBrokerTestConfig(ui32 multiplier = 1) { NKikimrResourceBroker::TResourceBrokerConfig config; auto queue = config.AddQueues(); @@ -26,7 +26,7 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeResourceBrokerTestConfig() { queue->SetName("queue_kqp_resource_manager"); queue->SetWeight(20); queue->MutableLimit()->AddResource(4); - queue->MutableLimit()->AddResource(50'000); + queue->MutableLimit()->AddResource(33554453 * multiplier); auto task = config.AddTasks(); task->SetName("unknown"); @@ -181,7 +181,38 @@ Y_UNIT_TEST_SUITE(KqpLimits) { )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); result.GetIssues().PrintTo(Cerr); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::OVERLOADED); + UNIT_ASSERT_C(result.GetIssues().ToString().Contains("Mkql memory limit exceeded"), result.GetIssues().ToString()); + } + + Y_UNIT_TEST(ComputeActorMemoryAllocationFailureQueryService) { + auto app = NKikimrConfig::TAppConfig(); + app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(10); + app.MutableTableServiceConfig()->MutableResourceManager()->SetQueryMemoryLimit(2000); + + app.MutableResourceBrokerConfig()->CopyFrom(MakeResourceBrokerTestConfig(4)); + + TKikimrRunner kikimr(app); + CreateLargeTable(kikimr, 0, 0, 0); + + kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SLOW_LOG, NActors::NLog::PRI_ERROR); + + auto db = kikimr.GetQueryClient(); + NYdb::NQuery::TExecuteQuerySettings querySettings; + querySettings.StatsMode(NYdb::NQuery::EStatsMode::Full); + + auto result = db.ExecuteQuery(Q1_(R"( + SELECT * FROM `/Root/LargeTable`; + )"), NQuery::TTxControl::BeginTx().CommitTx(), querySettings).ExtractValueSync(); + result.GetIssues().PrintTo(Cerr); + + auto stats = result.GetStats(); + + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::OVERLOADED); + UNIT_ASSERT_C(result.GetIssues().ToString().Contains("Mkql memory limit exceeded"), result.GetIssues().ToString()); + UNIT_ASSERT(stats.Defined()); + + Cerr << stats->ToString(true) << Endl; } Y_UNIT_TEST(DatashardProgramSize) { @@ -1038,6 +1069,7 @@ Y_UNIT_TEST_SUITE(KqpLimits) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT(result.GetStats()); + Cerr << result.GetStats()->ToString(true) << Endl; UNIT_ASSERT(result.GetStats()->GetPlan()); NJson::TJsonValue plan; diff --git a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp index 0f1221423669..1192c399f334 100644 --- a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp @@ -451,7 +451,7 @@ Y_UNIT_TEST_TWIN(StreamLookupStats, StreamLookupJoin) { }); } -Y_UNIT_TEST(SysViewTimeout) { +Y_UNIT_TEST(SysViewClientLost) { TKikimrRunner kikimr; CreateLargeTable(kikimr, 500000, 10, 100, 5000, 1); @@ -490,12 +490,13 @@ Y_UNIT_TEST(SysViewTimeout) { auto settings = TStreamExecScanQuerySettings(); settings.ClientTimeout(TDuration::MilliSeconds(50)); - TStringStream request; - request << R"( + TStringStream timeoutedRequestStream; + timeoutedRequestStream << R"( SELECT COUNT(*) FROM `/Root/LargeTable` WHERE SUBSTRING(DataText, 50, 5) = "22222"; )"; + TString timeoutedRequest = timeoutedRequestStream.Str(); - auto result = db.StreamExecuteScanQuery(request.Str(), settings).GetValueSync(); + auto result = db.StreamExecuteScanQuery(timeoutedRequest, settings).GetValueSync(); if (result.IsSuccess()) { try { @@ -510,7 +511,13 @@ Y_UNIT_TEST(SysViewTimeout) { UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED); } + ui32 timeoutedCount = 0; + ui32 iterations = 10; + while (timeoutedCount == 0 && iterations > 0) { + iterations--; + Sleep(TDuration::Seconds(1)); + TStringStream request; request << "SELECT * FROM `/Root/.sys/top_queries_by_read_bytes_one_hour` ORDER BY Duration"; @@ -518,7 +525,6 @@ Y_UNIT_TEST(SysViewTimeout) { UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); ui64 queryCount = 0; - ui64 rowsCount = 0; for (;;) { auto streamPart = it.ReadNext().GetValueSync(); if (!streamPart.IsSuccess()) { @@ -533,17 +539,16 @@ Y_UNIT_TEST(SysViewTimeout) { while (parser.TryNextRow()) { auto value = parser.ColumnParser("QueryText").GetOptionalUtf8(); UNIT_ASSERT(value); - if (*value == request.Str()) { + if (*value == timeoutedRequest) { queryCount++; } - rowsCount++; } } } - - UNIT_ASSERT(queryCount == 1); - UNIT_ASSERT(rowsCount == 2); + timeoutedCount = queryCount; } + + UNIT_ASSERT(timeoutedCount == 1); } Y_UNIT_TEST(SysViewCancelled) { @@ -582,9 +587,9 @@ Y_UNIT_TEST(SysViewCancelled) { UNIT_ASSERT(rowsCount == 1); } - auto prepareResult = session.PrepareDataQuery(Q_(R"( - SELECT COUNT(*) FROM `/Root/LargeTable` WHERE SUBSTRING(DataText, 50, 5) = "33333"; - )")).GetValueSync(); + TStringStream cancelledRequest; + cancelledRequest << "SELECT COUNT(*) FROM `/Root/LargeTable` WHERE SUBSTRING(DataText, 50, 5) = \"33333\""; + auto prepareResult = session.PrepareDataQuery(cancelledRequest.Str()).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), NYdb::EStatus::SUCCESS, prepareResult.GetIssues().ToString()); auto dataQuery = prepareResult.GetQuery(); @@ -619,7 +624,7 @@ Y_UNIT_TEST(SysViewCancelled) { while (parser.TryNextRow()) { auto value = parser.ColumnParser("QueryText").GetOptionalUtf8(); UNIT_ASSERT(value); - if (*value == request.Str()) { + if (*value == cancelledRequest.Str()) { queryCount++; } rowsCount++; @@ -628,7 +633,7 @@ Y_UNIT_TEST(SysViewCancelled) { } UNIT_ASSERT(queryCount == 1); - UNIT_ASSERT(rowsCount == 2); + UNIT_ASSERT(rowsCount == 3); } }