diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 32353e34cd1f..89589652d9ae 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -215,9 +215,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.MutableResponse(); - - FillResponseStats(Ydb::StatusIds::SUCCESS); + ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS); Counters->TxProxyMon->ReportStatusOK->Inc(); auto addLocks = [this](const auto& data) { @@ -257,7 +255,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseLockHandle = std::move(LockHandle); } - BuildLocks(*response.MutableResult()->MutableLocks(), Locks); + BuildLocks(*ResponseEv->Record.MutableResponse()->MutableResult()->MutableLocks(), Locks); } auto resultSize = ResponseEv->GetByteSize(); @@ -283,7 +281,6 @@ class TKqpDataExecuter : public TKqpExecuterBase { } void ReportEventElapsedTime() { - if (Stats) { - ui64 elapsedMicros = TlsActivationContext->GetCurrentEventTicksAsSeconds() * 1'000'000; - Stats->ExecuterCpuTime += TDuration::MicroSeconds(elapsedMicros); - } + YQL_ENSURE(Stats); + + ui64 elapsedMicros = TlsActivationContext->GetCurrentEventTicksAsSeconds() * 1'000'000; + Stats->ExecuterCpuTime += TDuration::MicroSeconds(elapsedMicros); } protected: @@ -330,11 +330,10 @@ class TKqpExecuterBase : public TActorBootstrapped { } YQL_ENSURE(channel.DstTask == 0); + YQL_ENSURE(Stats); - if (Stats) { - Stats->ResultBytes += batch.Size(); - Stats->ResultRows += batch.RowCount(); - } + Stats->ResultBytes += batch.Size(); + Stats->ResultRows += batch.RowCount(); LOG_T("Got result, channelId: " << channel.Id << ", shardId: " << task.Meta.ShardId << ", inputIndex: " << channel.DstInputIndex << ", from: " << ev->Sender @@ -391,7 +390,9 @@ class TKqpExecuterBase : public TActorBootstrapped { << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState()) << ", stats: " << state.GetStats()); - if (Stats && state.HasStats() && Request.ProgressStatsPeriod) { + YQL_ENSURE(Stats); + + if (state.HasStats() && Request.ProgressStatsPeriod) { Stats->UpdateTaskStats(taskId, state.GetStats()); auto now = TInstant::Now(); if (LastProgressStats + Request.ProgressStatsPeriod <= now) { @@ -418,13 +419,11 @@ class TKqpExecuterBase : public TActorBootstrapped { if (Planner->CompletedCA(taskId, computeActor)) { ExtraData[computeActor].Swap(state.MutableExtraData()); - if (Stats) { - Stats->AddComputeActorStats( - computeActor.NodeId(), - std::move(*state.MutableStats()), - TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs()) - ); - } + Stats->AddComputeActorStats( + computeActor.NodeId(), + std::move(*state.MutableStats()), + TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs()) + ); LastTaskId = taskId; LastComputeActorId = computeActor.ToString(); @@ -512,9 +511,9 @@ class TKqpExecuterBase : public TActorBootstrapped { auto now = TAppData::TimeProvider->Now(); StartResolveTime = now; - if (Stats) { - Stats->StartTs = now; - } + YQL_ENSURE(Stats); + + Stats->StartTs = now; } TMaybe FindReadRangesSource(const NKqpProto::TKqpPhyStage& stage) { @@ -1167,8 +1166,9 @@ class TKqpExecuterBase : public TActorBootstrapped { : Nothing(); YQL_ENSURE(!shardsResolved || nodeId); + YQL_ENSURE(Stats); - if (shardId && Stats) { + if (shardId) { Stats->AffectedShards.insert(*shardId); } @@ -1236,11 +1236,13 @@ class TKqpExecuterBase : public TActorBootstrapped { if (partitions.size() > 0 && source.GetSequentialInFlightShards() > 0 && partitions.size() > source.GetSequentialInFlightShards()) { auto [startShard, shardInfo] = MakeVirtualTablePartition(source, stageInfo, HolderFactory(), TypeEnv()); - if (Stats) { - for (auto& [shardId, _] : partitions) { - Stats->AffectedShards.insert(shardId); - } + + YQL_ENSURE(Stats); + + for (auto& [shardId, _] : partitions) { + Stats->AffectedShards.insert(shardId); } + if (shardInfo.KeyReadRanges) { addPartiton(startShard, {}, shardInfo, source.GetSequentialInFlightShards()); fillRangesForTasks(); @@ -1507,6 +1509,8 @@ class TKqpExecuterBase : public TActorBootstrapped { THashMap assignedShardsCount; auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); + YQL_ENSURE(Stats); + const auto& tableInfo = stageInfo.Meta.TableConstInfo; const auto& keyTypes = tableInfo->KeyColumnTypes; ui32 metaId = 0; @@ -1535,7 +1539,7 @@ class TKqpExecuterBase : public TActorBootstrapped { nodeShards[nodeId].emplace_back(TShardInfoWithId(i.first, std::move(i.second))); } - if (Stats && CollectProfileStats(Request.StatsMode)) { + if (CollectProfileStats(Request.StatsMode)) { for (auto&& i : nodeShards) { Stats->AddNodeShardsCount(stageInfo.Id.StageId, i.first, i.second.size()); } @@ -1720,7 +1724,7 @@ class TKqpExecuterBase : public TActorBootstrapped { ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status)); } - FillResponseStats(Ydb::StatusIds::TIMEOUT); + ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::TIMEOUT); // TEvAbortExecution can come from either ComputeActor or SessionActor (== Target). if (abortSender != Target) { @@ -1730,38 +1734,9 @@ class TKqpExecuterBase : public TActorBootstrapped { LOG_E("Sending timeout response to: " << Target); - Request.Transactions.crop(0); this->Shutdown(); } - void FillResponseStats(Ydb::StatusIds::StatusCode status) { - auto& response = *ResponseEv->Record.MutableResponse(); - - response.SetStatus(status); - - if (Stats) { - ReportEventElapsedTime(); - - Stats->FinishTs = TInstant::Now(); - Stats->Finish(); - - if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) { - for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) { - const auto& tx = Request.Transactions[txId].Body; - auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats()); - response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats); - } - } - - if (Stats->CollectStatsByLongTasks) { - const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats(); - if (!txPlansWithStats.empty()) { - LOG_N("Full stats: " << txPlansWithStats); - } - } - } - } - virtual void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, google::protobuf::RepeatedPtrField* issues) { @@ -1775,8 +1750,7 @@ class TKqpExecuterBase : public TActorBootstrapped { AlreadyReplied = true; auto& response = *ResponseEv->Record.MutableResponse(); - FillResponseStats(status); - + response.SetStatus(status); response.MutableIssues()->Swap(issues); LOG_T("ReplyErrorAndDie. Response: " << response.DebugString() @@ -1795,7 +1769,6 @@ class TKqpExecuterBase : public TActorBootstrapped { ExecuterSpan.EndError(response.DebugString()); ExecuterStateSpan.EndError(response.DebugString()); - Request.Transactions.crop(0); this->Shutdown(); } @@ -1873,8 +1846,35 @@ class TKqpExecuterBase : public TActorBootstrapped { void PassAway() override { YQL_ENSURE(AlreadyReplied && ResponseEv); - // Actualize stats with the last stats from terminated CAs, but keep the status. - FillResponseStats(ResponseEv->Record.GetResponse().GetStatus()); + // Fill response stats + { + auto& response = *ResponseEv->Record.MutableResponse(); + + YQL_ENSURE(Stats); + + ReportEventElapsedTime(); + + Stats->FinishTs = TInstant::Now(); + Stats->Finish(); + + if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) { + response.MutableResult()->MutableStats()->ClearTxPlansWithStats(); + for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) { + const auto& tx = Request.Transactions[txId].Body; + auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats()); + response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats); + } + } + + if (Stats->CollectStatsByLongTasks) { + const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats(); + if (!txPlansWithStats.empty()) { + LOG_N("Full stats: " << response.GetResult().GetStats()); + } + } + } + + Request.Transactions.crop(0); this->Send(Target, ResponseEv.release()); for (auto channelPair: ResultChannelProxies) { diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp index ad05470ba1e4..177fd8567ff0 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp @@ -701,6 +701,10 @@ void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProt } // checking whether the task is long + + // TODO(ilezhankin): investigate - for some reason `task.FinishTimeMs` may be large (or small?) + // enough to result in an enormous duration - triggering the "long tasks" mode. + auto taskDuration = TDuration::MilliSeconds(task.GetFinishTimeMs() - task.GetStartTimeMs()); bool longTask = taskDuration > collectLongTaskStatsTimeout; if (longTask) { @@ -1126,7 +1130,7 @@ void TQueryExecutionStats::Finish() { Result->SetCpuTimeUs(Result->GetCpuTimeUs() + ExecuterCpuTime.MicroSeconds()); Result->SetDurationUs(FinishTs.MicroSeconds() - StartTs.MicroSeconds()); - // Result->Result* feilds are (temporary?) commented out in proto due to lack of use + // Result->Result* fields are (temporary?) commented out in proto due to lack of use // // Result->SetResultBytes(ResultBytes); // Result->SetResultRows(ResultRows); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index f7357744e14a..28a17b9a5ef5 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -276,7 +276,7 @@ class TKqpScanExecuter : public TKqpExecuterBaseRecord.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS); LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize()); diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index 7f2bec1c66e5..d3df96d72062 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -2033,9 +2033,9 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan, newPlans.AppendValue(ReconstructQueryPlanRec( plan.GetMapSafe().at("Plans").GetArraySafe()[0], - 0, - planIndex, - precomputes, + 0, + planIndex, + precomputes, nodeCounter)); newPlans.AppendValue(lookupPlan); diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h index c1771cc8f836..f6cb7783561c 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.h +++ b/ydb/core/kqp/rm_service/kqp_rm_service.h @@ -139,9 +139,9 @@ class TTxState : public TAtomicRefCount { } TString ToString() const { - auto res = TStringBuilder() << "TxResourcesInfo{ " + auto res = TStringBuilder() << "TxResourcesInfo { " << "TxId: " << TxId - << "Database: " << Database; + << ", Database: " << Database; if (!PoolId.empty()) { res << ", PoolId: " << PoolId @@ -149,7 +149,7 @@ class TTxState : public TAtomicRefCount { } res << ", memory initially granted resources: " << TxExternalDataQueryMemory.load() - << ", extra allocations " << TxScanQueryMemory.load() + << ", tx total allocations " << TxScanQueryMemory.load() << ", execution units: " << TxExecutionUnits.load() << ", started at: " << CreatedAt << " }"; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 13a17b1d5d0b..2b8a5dae1de3 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -313,12 +313,12 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped void OnMemoryLimitExceptionHandler() { TString memoryConsumptionDetails = MemoryLimits.MemoryQuotaManager->MemoryConsumptionDetails(); TStringBuilder failureReason = TStringBuilder() - << "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit() + << "Mkql memory limit exceeded, allocated by task " << Task.GetId() << ": " << GetMkqlMemoryLimit() << ", host: " << HostName() << ", canAllocateExtraMemory: " << CanAllocateExtraMemory; if (!memoryConsumptionDetails.empty()) { - failureReason << ", memory manager details: " << memoryConsumptionDetails; + failureReason << ", memory manager details for current node: " << memoryConsumptionDetails; } InternalError(NYql::NDqProto::StatusIds::OVERLOADED, TIssuesIds::KIKIMR_PRECONDITION_FAILED, failureReason); @@ -1825,6 +1825,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } } } + } else { + // TODO: what should happen in this case? } static_cast(this)->FillExtraStats(dst, last); diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto index ab026e3037a1..733bf969dc2c 100644 --- a/ydb/library/yql/dq/actors/protos/dq_stats.proto +++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto @@ -377,7 +377,7 @@ message TDqExecutionStats { uint64 CpuTimeUs = 1; // total cpu time, executer + compute actors + ... uint64 DurationUs = 2; // execution wall time - // these feilds are never used, needs to be reviewed + // these fields are never used, needs to be reviewed reserved 3; // uint64 ResultRows = 3; reserved 4; // uint64 ResultBytes = 4;