From 4a8729b635365b0385e0066be5ba23099b085300 Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Tue, 9 Jul 2024 11:11:50 +0400 Subject: [PATCH 1/5] ca factory in executer to support extra allocations and memory tracking everywhere (#6084) --- .../run/kikimr_services_initializers.cpp | 2 +- .../kqp_compute_actor_factory.cpp | 97 +++++++++------ .../compute_actor/kqp_compute_actor_factory.h | 28 ++++- .../kqp/executer_actor/kqp_data_executer.cpp | 4 +- ydb/core/kqp/executer_actor/kqp_planner.cpp | 89 ++++++-------- ydb/core/kqp/executer_actor/kqp_planner.h | 7 +- .../kqp/executer_actor/kqp_scan_executer.cpp | 5 +- ydb/core/kqp/gateway/kqp_gateway.h | 11 ++ .../kqp/node_service/kqp_node_service.cpp | 105 ++++++++--------- ydb/core/kqp/node_service/kqp_node_service.h | 2 + .../kqp/proxy_service/kqp_proxy_service.cpp | 10 +- .../rm_service/kqp_resource_estimation.cpp | 28 +---- .../kqp/rm_service/kqp_resource_estimation.h | 4 - .../rm_service/kqp_resource_estimation_ut.cpp | 53 --------- ydb/core/kqp/rm_service/kqp_rm_service.cpp | 110 ++++++++++++------ ydb/core/kqp/rm_service/kqp_rm_service.h | 9 +- ydb/core/kqp/rm_service/kqp_rm_ut.cpp | 2 +- ydb/core/kqp/rm_service/ut/ya.make | 1 - .../kqp/session_actor/kqp_session_actor.cpp | 16 ++- .../kqp/session_actor/kqp_session_actor.h | 13 ++- ydb/core/testlib/test_client.cpp | 2 +- 21 files changed, 309 insertions(+), 289 deletions(-) delete mode 100644 ydb/core/kqp/rm_service/kqp_resource_estimation_ut.cpp diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index f482af16e987..53a4ee27983c 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -2149,7 +2149,7 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu // Create resource manager auto rm = NKqp::CreateKqpResourceManagerActor(Config.GetTableServiceConfig().GetResourceManager(), nullptr, - {}, kqpProxySharedResources); + {}, kqpProxySharedResources, NodeId); setup->LocalServices.push_back(std::make_pair( NKqp::MakeKqpRmServiceID(NodeId), TActorSetupCmd(rm, TMailboxType::HTSwap, appData->UserPoolId))); diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp index 9122f611e39c..6ae5ae6fc2b3 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp @@ -6,6 +6,7 @@ namespace NKikimr::NKqp::NComputeActor { + struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager { TMemoryQuotaManager(std::shared_ptr resourceManager @@ -26,7 +27,10 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager { } ~TMemoryQuotaManager() override { - State->OnTaskTerminate(TxId, TaskId, Success); + if (State) { + State->OnTaskTerminate(TxId, TaskId, Success); + } + ResourceManager->FreeResources(TxId, TaskId); } @@ -77,66 +81,86 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager { }; class TKqpCaFactory : public IKqpNodeComputeActorFactory { - NKikimrConfig::TTableServiceConfig::TResourceManager Config; std::shared_ptr ResourceManager_; NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; const std::optional FederatedQuerySetup; + std::atomic MkqlLightProgramMemoryLimit = 0; + std::atomic MkqlHeavyProgramMemoryLimit = 0; + std::atomic MinChannelBufferSize = 0; + std::atomic ReasonableSpillingTreshold = 0; + public: TKqpCaFactory(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, std::shared_ptr resourceManager, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const std::optional federatedQuerySetup) - : Config(config) - , ResourceManager_(resourceManager) + : ResourceManager_(resourceManager) , AsyncIoFactory(asyncIoFactory) , FederatedQuerySetup(federatedQuerySetup) - {} + { + ApplyConfig(config); + } - TActorId CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* dqTask, - const NYql::NDq::TComputeRuntimeSettings& settings, - NWilson::TTraceId traceId, TIntrusivePtr arena, const TString& serializedGUCSettings, - TComputeStagesWithScan& computesByStage, ui64 outputChunkMaxSize, std::shared_ptr state, - NRm::EKqpMemoryPool memoryPool, ui32 numberOfTasks) + void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) { + MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit()); + MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit()); + MinChannelBufferSize.store(config.GetMinChannelBufferSize()); + ReasonableSpillingTreshold.store(config.GetReasonableSpillingTreshold()); + } + + TActorId CreateKqpComputeActor(TCreateArgs&& args) { NYql::NDq::TComputeMemoryLimits memoryLimits; memoryLimits.ChannelBufferSize = 0; - memoryLimits.MkqlLightProgramMemoryLimit = Config.GetMkqlLightProgramMemoryLimit(); - memoryLimits.MkqlHeavyProgramMemoryLimit = Config.GetMkqlHeavyProgramMemoryLimit(); + memoryLimits.MkqlLightProgramMemoryLimit = MkqlLightProgramMemoryLimit.load(); + memoryLimits.MkqlHeavyProgramMemoryLimit = MkqlHeavyProgramMemoryLimit.load(); - auto estimation = EstimateTaskResources(*dqTask, Config, numberOfTasks); + auto estimation = ResourceManager_->EstimateTaskResources(*args.Task, args.NumberOfTasks); { ui32 inputChannelsCount = 0; - for (auto&& i : dqTask->GetInputs()) { + for (auto&& i : args.Task->GetInputs()) { inputChannelsCount += i.ChannelsSize(); } - memoryLimits.ChannelBufferSize = std::max(estimation.ChannelBufferMemoryLimit / std::max(1, inputChannelsCount), Config.GetMinChannelBufferSize()); - memoryLimits.OutputChunkMaxSize = outputChunkMaxSize; + memoryLimits.ChannelBufferSize = std::max(estimation.ChannelBufferMemoryLimit / std::max(1, inputChannelsCount), MinChannelBufferSize.load()); + memoryLimits.OutputChunkMaxSize = args.OutputChunkMaxSize; AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "channel_info") ("ch_size", estimation.ChannelBufferMemoryLimit) ("ch_count", estimation.ChannelBuffersCount) ("ch_limit", memoryLimits.ChannelBufferSize) - ("inputs", dqTask->InputsSize()) + ("inputs", args.Task->InputsSize()) ("input_channels_count", inputChannelsCount); } - auto& taskOpts = dqTask->GetProgram().GetSettings(); + auto& taskOpts = args.Task->GetProgram().GetSettings(); auto limit = taskOpts.GetHasMapJoin() || taskOpts.GetHasStateAggregation() ? memoryLimits.MkqlHeavyProgramMemoryLimit : memoryLimits.MkqlLightProgramMemoryLimit; memoryLimits.MemoryQuotaManager = std::make_shared( ResourceManager_, - memoryPool, - std::move(state), - txId, - dqTask->GetId(), + args.MemoryPool, + std::move(args.State), + args.TxId, + args.Task->GetId(), limit, - Config.GetReasonableSpillingTreshold()); + ReasonableSpillingTreshold.load()); + + auto runtimeSettings = args.RuntimeSettings; + runtimeSettings.ExtraMemoryAllocationPool = args.MemoryPool; + runtimeSettings.UseSpilling = args.WithSpilling; + runtimeSettings.StatsMode = args.StatsMode; + + if (args.Deadline) { + runtimeSettings.Timeout = args.Deadline - TAppData::TimeProvider->Now(); + } + + if (args.RlPath) { + runtimeSettings.RlPath = args.RlPath; + } - auto runtimeSettings = settings; NYql::NDq::IMemoryQuotaManager::TWeakPtr memoryQuotaManager = memoryLimits.MemoryQuotaManager; runtimeSettings.TerminateHandler = [memoryQuotaManager] (bool success, const NYql::TIssues& issues) { @@ -157,29 +181,32 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { }; ETableKind tableKind = ETableKind::Unknown; - if (dqTask->HasMetaId()) { - YQL_ENSURE(computesByStage.GetMetaById(*dqTask, meta) || dqTask->GetMeta().UnpackTo(&meta), "cannot take meta on MetaId exists in tasks"); + if (args.Task->HasMetaId()) { + YQL_ENSURE(args.ComputesByStages); + YQL_ENSURE(args.ComputesByStages->GetMetaById(*args.Task, meta) || args.Task->GetMeta().UnpackTo(&meta), "cannot take meta on MetaId exists in tasks"); tableKind = tableKindExtract(meta); - } else if (dqTask->GetMeta().UnpackTo(&meta)) { + } else if (args.Task->GetMeta().UnpackTo(&meta)) { tableKind = tableKindExtract(meta); } if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) { - auto& info = computesByStage.UpsertTaskWithScan(*dqTask, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead()); - IActor* computeActor = CreateKqpScanComputeActor(executerId, txId, dqTask, + YQL_ENSURE(args.ComputesByStages); + auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead()); + IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.Task, AsyncIoFactory, runtimeSettings, memoryLimits, - std::move(traceId), std::move(arena)); + std::move(args.TraceId), std::move(args.Arena)); TActorId result = TlsActivationContext->Register(computeActor); info.MutableActorIds().emplace_back(result); return result; } else { std::shared_ptr GUCSettings; - if (!serializedGUCSettings.empty()) { - GUCSettings = std::make_shared(serializedGUCSettings); + if (!args.SerializedGUCSettings.empty()) { + GUCSettings = std::make_shared(args.SerializedGUCSettings); } - IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(executerId, txId, dqTask, AsyncIoFactory, - runtimeSettings, memoryLimits, std::move(traceId), std::move(arena), FederatedQuerySetup, GUCSettings); - return TlsActivationContext->Register(computeActor); + IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(args.ExecuterId, args.TxId, args.Task, AsyncIoFactory, + runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings); + return args.ShareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) : + TlsActivationContext->AsActorContext().Register(computeActor); } } }; diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h index 13f0fa11a0da..e89fcabce098 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h @@ -103,11 +103,29 @@ struct IKqpNodeComputeActorFactory { virtual ~IKqpNodeComputeActorFactory() = default; public: - virtual NActors::TActorId CreateKqpComputeActor(const NActors::TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* task, - const NYql::NDq::TComputeRuntimeSettings& settings, - NWilson::TTraceId traceId, TIntrusivePtr arena, const TString& serializedGUCSettings, - TComputeStagesWithScan& computeStages, ui64 outputChunkMaxSize, std::shared_ptr state, - NKikimr::NKqp::NRm::EKqpMemoryPool memoryPool, ui32 numberOfTasks) = 0; + struct TCreateArgs { + const NActors::TActorId& ExecuterId; + const ui64 TxId; + NYql::NDqProto::TDqTask* Task; + const NYql::NDq::TComputeRuntimeSettings& RuntimeSettings; + NWilson::TTraceId TraceId; + TIntrusivePtr Arena; + const TString& SerializedGUCSettings; + const ui32 NumberOfTasks; + const ui64 OutputChunkMaxSize; + const NKikimr::NKqp::NRm::EKqpMemoryPool MemoryPool; + const bool WithSpilling; + const NYql::NDqProto::EDqStatsMode StatsMode; + const TInstant& Deadline; + const bool ShareMailbox; + const TMaybe& RlPath; + TComputeStagesWithScan* ComputesByStages = nullptr; + std::shared_ptr State = nullptr; + }; + + virtual NActors::TActorId CreateKqpComputeActor(TCreateArgs&& args) = 0; + + virtual void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) = 0; }; std::shared_ptr MakeKqpCaFactory(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 9bc57b456d02..87dd3ec0142e 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2489,7 +2489,9 @@ class TKqpDataExecuter : public TKqpExecuterBasePlanExecution(); diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index dcc6049176bf..b611624d1ff1 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -81,7 +81,14 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args) , OutputChunkMaxSize(args.OutputChunkMaxSize) , GUCSettings(std::move(args.GUCSettings)) , MayRunTasksLocally(args.MayRunTasksLocally) + , ResourceManager_(args.ResourceManager_) + , CaFactory_(args.CaFactory_) { + + if (GUCSettings) { + SerializedGUCSettings = GUCSettings->SerializeToString(); + } + if (!Database) { // a piece of magic for tests if (const auto& domain = AppData()->DomainsInfo->Domain) { @@ -205,8 +212,8 @@ std::unique_ptr TKqpPlanner::SerializeReque request.SetOutputChunkMaxSize(OutputChunkMaxSize); } - if (GUCSettings) { - request.SetSerializedGUCSettings(GUCSettings->SerializeToString()); + if (SerializedGUCSettings) { + request.SetSerializedGUCSettings(SerializedGUCSettings); } return result; @@ -242,7 +249,7 @@ std::unique_ptr TKqpPlanner::AssignTasksToNodes() { PrepareToProcess(); - auto localResources = GetKqpResourceManager()->GetLocalResources(); + auto localResources = ResourceManager_->GetLocalResources(); Y_UNUSED(MEMORY_ESTIMATION_OVERFLOW); if (LocalRunMemoryEst * MEMORY_ESTIMATION_OVERFLOW <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] && ResourceEstimations.size() <= localResources.ExecutionUnits && @@ -257,7 +264,7 @@ std::unique_ptr TKqpPlanner::AssignTasksToNodes() { } if (ResourcesSnapshot.empty()) { - ResourcesSnapshot = std::move(GetKqpResourceManager()->GetClusterResources()); + ResourcesSnapshot = std::move(ResourceManager_->GetClusterResources()); } if (ResourcesSnapshot.empty() || (ResourcesSnapshot.size() == 1 && ResourcesSnapshot[0].GetNodeId() == ExecuterId.NodeId())) { @@ -330,50 +337,28 @@ const IKqpGateway::TKqpSnapshot& TKqpPlanner::GetSnapshot() const { // optimizeProtoForLocalExecution - if we want to execute compute actor locally and don't want to serialize & then deserialize proto message // instead we just give ptr to proto message and after that we swap/copy it -void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool optimizeProtoForLocalExecution) { - +void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize) { auto& task = TasksGraph.GetTask(taskId); - NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task, /* serializeAsyncIoSettings = */ !optimizeProtoForLocalExecution); - + NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task, true); NYql::NDq::TComputeRuntimeSettings settings; - if (Deadline) { - settings.Timeout = Deadline - TAppData::TimeProvider->Now(); - } - - settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::DataQuery; - settings.FailOnUndelivery = true; - settings.StatsMode = GetDqStatsMode(StatsMode); - settings.UseSpilling = WithSpilling; - NYql::NDq::TComputeMemoryLimits limits; - limits.ChannelBufferSize = 32_MB; // Depends on NYql::NDq::TDqOutputChannelSettings::ChunkSizeLimit (now 48 MB) with a ratio of 1.5 - limits.OutputChunkMaxSize = OutputChunkMaxSize; - limits.MkqlLightProgramMemoryLimit = MkqlMemoryLimit > 0 ? std::min(500_MB, MkqlMemoryLimit) : 500_MB; - limits.MkqlHeavyProgramMemoryLimit = MkqlMemoryLimit > 0 ? std::min(2_GB, MkqlMemoryLimit) : 2_GB; - - auto& taskOpts = taskDesc->GetProgram().GetSettings(); - auto limit = taskOpts.GetHasMapJoin() /* || opts.GetHasSort()*/ - ? limits.MkqlHeavyProgramMemoryLimit - : limits.MkqlLightProgramMemoryLimit; - - limits.MemoryQuotaManager = std::make_shared(limit * 2, limit); - - auto computeActor = NKikimr::NKqp::CreateKqpComputeActor(ExecuterId, TxId, taskDesc, AsyncIoFactory, - settings, limits, ExecuterSpan.GetTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr(), FederatedQuerySetup, GUCSettings); - - if (optimizeProtoForLocalExecution) { - TVector& taskSourceSettings = static_cast(computeActor)->MutableTaskSourceSettings(); - taskSourceSettings.assign(task.Inputs.size(), nullptr); - for (size_t i = 0; i < task.Inputs.size(); ++i) { - const auto input = task.Inputs[i]; - if (input.Type() == NYql::NDq::TTaskInputType::Source && Y_LIKELY(input.Meta.SourceSettings)) { - taskSourceSettings[i] = (&(*input.Meta.SourceSettings)); - } - } - } - - auto computeActorId = shareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) : TlsActivationContext->AsActorContext().Register(computeActor); - task.ComputeActorId = computeActorId; + task.ComputeActorId = CaFactory_->CreateKqpComputeActor({ + .ExecuterId = ExecuterId, + .TxId = TxId, + .Task = taskDesc, + .RuntimeSettings = settings, + .TraceId = NWilson::TTraceId(ExecuterSpan.GetTraceId()), + .Arena = TasksGraph.GetMeta().GetArenaIntrusivePtr(), + .SerializedGUCSettings = SerializedGUCSettings, + .NumberOfTasks = computeTasksSize, + .OutputChunkMaxSize = OutputChunkMaxSize, + .MemoryPool = NRm::EKqpMemoryPool::DataQuery, + .WithSpilling = WithSpilling, + .StatsMode = GetDqStatsMode(StatsMode), + .Deadline = Deadline, + .ShareMailbox = (computeTasksSize <= 1), + .RlPath = Nothing() + }); LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId); @@ -415,9 +400,8 @@ std::unique_ptr TKqpPlanner::PlanExecution() { // explicit requirement to execute task on the same node because it has dependencies // on datashard tx. if (LocalComputeTasks) { - bool shareMailbox = (ComputeTasks.size() <= 1); for (ui64 taskId : ComputeTasks) { - ExecuteDataComputeTask(taskId, shareMailbox, /* optimizeProtoForLocalExecution = */ true); + ExecuteDataComputeTask(taskId, ComputeTasks.size()); } ComputeTasks.clear(); } @@ -427,7 +411,7 @@ std::unique_ptr TKqpPlanner::PlanExecution() { // to execute this task locally so we can avoid useless overhead for remote task launching. for (auto& [shardId, tasks]: TasksPerNode) { for (ui64 taskId: tasks) { - ExecuteDataComputeTask(taskId, true, /* optimizeProtoForLocalExecution = */ true); + ExecuteDataComputeTask(taskId, tasks.size()); } } @@ -452,9 +436,8 @@ std::unique_ptr TKqpPlanner::PlanExecution() { auto tasksOnNodeIt = TasksPerNode.find(ExecuterId.NodeId()); if (tasksOnNodeIt != TasksPerNode.end()) { auto& tasks = tasksOnNodeIt->second; - const bool shareMailbox = (tasks.size() <= 1); for (ui64 taskId: tasks) { - ExecuteDataComputeTask(taskId, shareMailbox, /* optimizeProtoForLocalExecution = */ true); + ExecuteDataComputeTask(taskId, tasks.size()); PendingComputeTasks.erase(taskId); } } @@ -506,8 +489,6 @@ THashSet& TKqpPlanner::GetPendingComputeTasks() { } void TKqpPlanner::PrepareToProcess() { - auto rmConfig = GetKqpResourceManager()->GetConfig(); - ui32 tasksCount = ComputeTasks.size(); for (auto& [shardId, tasks] : TasksPerNode) { tasksCount += tasks.size(); @@ -518,7 +499,7 @@ void TKqpPlanner::PrepareToProcess() { for (size_t i = 0; i < ComputeTasks.size(); ++i) { BuildInitialTaskResources(TasksGraph, ComputeTasks[i], ResourceEstimations[i]); - EstimateTaskResources(rmConfig, ResourceEstimations[i], ComputeTasks.size()); + ResourceManager_->EstimateTaskResources(ResourceEstimations[i], ComputeTasks.size()); LocalRunMemoryEst += ResourceEstimations[i].TotalMemoryLimit; } @@ -526,7 +507,7 @@ void TKqpPlanner::PrepareToProcess() { for(auto& [nodeId, tasks] : TasksPerNode) { for (ui64 taskId: tasks) { BuildInitialTaskResources(TasksGraph, taskId, ResourceEstimations[currentEst]); - EstimateTaskResources(rmConfig, ResourceEstimations[currentEst], tasks.size()); + ResourceManager_->EstimateTaskResources(ResourceEstimations[currentEst], tasks.size()); LocalRunMemoryEst += ResourceEstimations[currentEst].TotalMemoryLimit; ++currentEst; } diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 01efd3e79454..574827bf917b 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -64,6 +64,8 @@ class TKqpPlanner { const ui64 OutputChunkMaxSize = 0; const TGUCSettings::TPtr GUCSettings; const bool MayRunTasksLocally = false; + const std::shared_ptr& ResourceManager_; + const std::shared_ptr& CaFactory_; }; TKqpPlanner(TKqpPlanner::TArgs&& args); @@ -83,7 +85,7 @@ class TKqpPlanner { private: const IKqpGateway::TKqpSnapshot& GetSnapshot() const; - void ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool optimizeProtoForLocalExecution); + void ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize); void PrepareToProcess(); TString GetEstimationsInfo() const; @@ -128,6 +130,9 @@ class TKqpPlanner { const ui64 OutputChunkMaxSize; const TGUCSettings::TPtr GUCSettings; const bool MayRunTasksLocally; + TString SerializedGUCSettings; + std::shared_ptr ResourceManager_; + std::shared_ptr CaFactory_; public: static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 771b15510290..e98361797ca1 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -339,7 +339,10 @@ class TKqpScanExecuter : public TKqpExecuterBase ResourceManager_; + std::shared_ptr CaFactory_; NKikimrKqp::EIsolationLevel IsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED; TMaybe RlPath; bool NeedTxId = true; diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 1416a56be23a..ce0a9d9dc805 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -60,11 +60,16 @@ class TKqpNodeService : public TActorBootstrapped { return NKikimrServices::TActivity::KQP_NODE_SERVICE; } - TKqpNodeService(const NKikimrConfig::TTableServiceConfig& config, const TIntrusivePtr& counters, + TKqpNodeService(const NKikimrConfig::TTableServiceConfig& config, + std::shared_ptr resourceManager, + std::shared_ptr caFactory, + const TIntrusivePtr& counters, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const std::optional& federatedQuerySetup) : Config(config.GetResourceManager()) , Counters(counters) + , ResourceManager_(std::move(resourceManager)) + , CaFactory_(std::move(caFactory)) , AsyncIoFactory(std::move(asyncIoFactory)) , FederatedQuerySetup(federatedQuerySetup) , State_(std::make_shared()) @@ -159,29 +164,13 @@ class TKqpNodeService : public TActorBootstrapped { memoryPool = NRm::EKqpMemoryPool::Unspecified; } - ui32 requestChannels = 0; - ui64 totalMemory = 0; - for (auto& dqTask : *msg.MutableTasks()) { - auto estimation = EstimateTaskResources(dqTask, Config, msg.GetTasks().size()); - LOG_D("Resource estimation complete" - << ", TxId: " << txId << ", task id: " << dqTask.GetId() << ", node id: " << SelfId().NodeId() - << ", estimated resources: " << estimation.ToString()); - - NKqpNode::TTaskContext& taskCtx = request.InFlyTasks[dqTask.GetId()]; - YQL_ENSURE(taskCtx.TaskId == 0); - taskCtx.TaskId = dqTask.GetId(); - - LOG_D("TxId: " << txId << ", task: " << taskCtx.TaskId << ", requested memory: " << estimation.TotalMemoryLimit); - totalMemory += estimation.TotalMemoryLimit; - requestChannels += estimation.ChannelBuffersCount; - } - - LOG_D("TxId: " << txId << ", channels: " << requestChannels - << ", computeActors: " << msg.GetTasks().size() << ", memory: " << totalMemory); - TVector allocatedTasks; allocatedTasks.reserve(msg.GetTasks().size()); - for (auto& task : request.InFlyTasks) { + for (auto& task : *msg.MutableTasks()) { + NKqpNode::TTaskContext& taskCtx = request.InFlyTasks[task.GetId()]; + YQL_ENSURE(taskCtx.TaskId == 0); + taskCtx.TaskId = task.GetId(); + NRm::TKqpResourcesRequest resourcesRequest; resourcesRequest.MemoryPool = memoryPool; resourcesRequest.ExecutionUnits = 1; @@ -190,34 +179,24 @@ class TKqpNodeService : public TActorBootstrapped { // we have to allocate memory instead of reserve only. currently, this memory will not be used for request processing. resourcesRequest.Memory = (1 << 19) /* 512kb limit for check that memory exists for processing with minimal requirements */; - auto result = ResourceManager()->AllocateResources(txId, task.first, resourcesRequest); + auto result = ResourceManager_->AllocateResources(txId, task.GetId(), resourcesRequest); if (!result) { for (ui64 taskId : allocatedTasks) { - ResourceManager()->FreeResources(txId, taskId); + ResourceManager_->FreeResources(txId, taskId); } ReplyError(txId, request.Executer, msg, result.GetStatus(), result.GetFailReason()); return; } - allocatedTasks.push_back(task.first); + allocatedTasks.push_back(task.GetId()); } auto reply = MakeHolder(); reply->Record.SetTxId(txId); NYql::NDq::TComputeRuntimeSettings runtimeSettingsBase; - runtimeSettingsBase.ExtraMemoryAllocationPool = memoryPool; - runtimeSettingsBase.FailOnUndelivery = msgRtSettings.GetExecType() != NYql::NDqProto::TComputeRuntimeSettings::SCAN; - - runtimeSettingsBase.StatsMode = msgRtSettings.GetStatsMode(); - runtimeSettingsBase.UseSpilling = msgRtSettings.GetUseSpilling(); - - if (msgRtSettings.HasRlPath()) { - runtimeSettingsBase.RlPath = msgRtSettings.GetRlPath(); - } - runtimeSettingsBase.ReportStatsSettings = NYql::NDq::TReportStatsSettings{MinStatInterval, MaxStatInterval}; TShardsScanningPolicy scanPolicy(Config.GetShardsScanningPolicy()); @@ -228,16 +207,37 @@ class TKqpNodeService : public TActorBootstrapped { ev->Get()->Record.GetSerializedGUCSettings() : ""; // start compute actors + TMaybe rlPath = Nothing(); + if (msgRtSettings.HasRlPath()) { + rlPath.ConstructInPlace(msgRtSettings.GetRlPath()); + } + const ui32 tasksCount = msg.GetTasks().size(); for (int i = 0; i < msg.GetTasks().size(); ++i) { auto& dqTask = *msg.MutableTasks(i); auto& taskCtx = request.InFlyTasks[dqTask.GetId()]; + taskCtx.TaskId = dqTask.GetId(); YQL_ENSURE(taskCtx.TaskId != 0); - taskCtx.ComputeActorId = CaFactory()->CreateKqpComputeActor( - request.Executer, txId, &dqTask, runtimeSettingsBase, - NWilson::TTraceId(ev->TraceId), ev->Get()->Arena, serializedGUCSettings, computesByStage, - msg.GetOutputChunkMaxSize(), State_, memoryPool, tasksCount); + taskCtx.ComputeActorId = CaFactory_->CreateKqpComputeActor({ + .ExecuterId = request.Executer, + .TxId = txId, + .Task = &dqTask, + .RuntimeSettings = runtimeSettingsBase, + .TraceId = NWilson::TTraceId(ev->TraceId), + .Arena = ev->Get()->Arena, + .SerializedGUCSettings = serializedGUCSettings, + .NumberOfTasks = tasksCount, + .OutputChunkMaxSize = msg.GetOutputChunkMaxSize(), + .MemoryPool = memoryPool, + .WithSpilling = msgRtSettings.GetUseSpilling(), + .StatsMode = msgRtSettings.GetStatsMode(), + .Deadline = TInstant(), + .ShareMailbox = false, + .RlPath = rlPath, + .ComputesByStages = &computesByStage, + .State = State_ + }); LOG_D("TxId: " << txId << ", executing task: " << taskCtx.TaskId << " on compute actor: " << taskCtx.ComputeActorId); @@ -348,6 +348,8 @@ class TKqpNodeService : public TActorBootstrapped { LOG_I("Updated table service config: " << Config.DebugString()); } + CaFactory_->ApplyConfig(event.GetConfig().GetTableServiceConfig().GetResourceManager()); + if (event.GetConfig().GetTableServiceConfig().HasIteratorReadsRetrySettings()) { SetIteratorReadsRetrySettings(event.GetConfig().GetTableServiceConfig().GetIteratorReadsRetrySettings()); } @@ -358,6 +360,7 @@ class TKqpNodeService : public TActorBootstrapped { auto responseEv = MakeHolder(event); Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie); + } void SetIteratorReadsQuotaSettings(const NKikimrConfig::TTableServiceConfig::TIteratorReadQuotaSettings& settings) { @@ -442,24 +445,6 @@ class TKqpNodeService : public TActorBootstrapped { Send(executer, ev.Release()); } - std::shared_ptr ResourceManager() { - if (Y_LIKELY(ResourceManager_)) { - return ResourceManager_; - } - ResourceManager_ = GetKqpResourceManager(); - return ResourceManager_; - } - - std::shared_ptr CaFactory() { - if (Y_LIKELY(CaFactory_)) { - return CaFactory_; - } - - CaFactory_ = NComputeActor::MakeKqpCaFactory( - Config, ResourceManager(), AsyncIoFactory, FederatedQuerySetup); - return CaFactory_; - } - private: NKikimrConfig::TTableServiceConfig::TResourceManager Config; TIntrusivePtr Counters; @@ -468,7 +453,6 @@ class TKqpNodeService : public TActorBootstrapped { NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; const std::optional FederatedQuerySetup; - //state sharded by TxId std::shared_ptr State_; }; @@ -476,10 +460,13 @@ class TKqpNodeService : public TActorBootstrapped { } // anonymous namespace IActor* CreateKqpNodeService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig, + std::shared_ptr resourceManager, + std::shared_ptr caFactory, TIntrusivePtr counters, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const std::optional& federatedQuerySetup) { - return new TKqpNodeService(tableServiceConfig, counters, std::move(asyncIoFactory), federatedQuerySetup); + return new TKqpNodeService(tableServiceConfig, std::move(resourceManager), std::move(caFactory), + counters, std::move(asyncIoFactory), federatedQuerySetup); } } // namespace NKqp diff --git a/ydb/core/kqp/node_service/kqp_node_service.h b/ydb/core/kqp/node_service/kqp_node_service.h index d17a56cf76b7..93f7a7a4e633 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.h +++ b/ydb/core/kqp/node_service/kqp_node_service.h @@ -91,6 +91,8 @@ struct TNodeServiceState : public NKikimr::NKqp::NComputeActor::IKqpNodeState { }; NActors::IActor* CreateKqpNodeService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig, + std::shared_ptr resourceManager, + std::shared_ptr caFactory, TIntrusivePtr counters, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory = nullptr, const std::optional& federatedQuerySetup = std::nullopt); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 23be28bcb264..58a74ee9e2c8 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -269,7 +269,11 @@ class TKqpProxyService : public TActorBootstrapped { MakeKqpCompileComputationPatternServiceID(SelfId().NodeId()), CompileComputationPatternService); } - KqpNodeService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpNodeService(TableServiceConfig, Counters, AsyncIoFactory, FederatedQuerySetup)); + ResourceManager_ = GetKqpResourceManager(); + CaFactory_ = NComputeActor::MakeKqpCaFactory( + TableServiceConfig.GetResourceManager(), ResourceManager_, AsyncIoFactory, FederatedQuerySetup); + + KqpNodeService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpNodeService(TableServiceConfig, ResourceManager_, CaFactory_, Counters, AsyncIoFactory, FederatedQuerySetup)); TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService( MakeKqpNodeServiceID(SelfId().NodeId()), KqpNodeService); @@ -1479,7 +1483,7 @@ class TKqpProxyService : public TActorBootstrapped { auto config = CreateConfig(KqpSettings, workerSettings); - IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, + IActor* sessionActor = CreateKqpSessionActor(SelfId(), ResourceManager_, CaFactory_, sessionId, KqpSettings, workerSettings, FederatedQuerySetup, AsyncIoFactory, ModuleResolverState, Counters, QueryServiceConfig, KqpTempTablesAgentActor); auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId); @@ -1768,6 +1772,8 @@ class TKqpProxyService : public TActorBootstrapped { THashMap ConfigSubscriptions; THashMap TimeoutTimers; + std::shared_ptr ResourceManager_; + std::shared_ptr CaFactory_; TIntrusivePtr ShutdownState; TIntrusivePtr ModuleResolverState; diff --git a/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp b/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp index 5a228c190751..9c78862fcf5e 100644 --- a/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp +++ b/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp @@ -5,13 +5,7 @@ namespace NKikimr::NKqp { using namespace NYql::NDqProto; using namespace NKikimrConfig; -TTaskResourceEstimation EstimateTaskResources(const TDqTask& task, - const TTableServiceConfig::TResourceManager& config, const ui32 tasksCount) -{ - TTaskResourceEstimation ret = BuildInitialTaskResources(task); - EstimateTaskResources(config, ret, tasksCount); - return ret; -} + TTaskResourceEstimation BuildInitialTaskResources(const TDqTask& task) { TTaskResourceEstimation ret; @@ -23,24 +17,4 @@ TTaskResourceEstimation BuildInitialTaskResources(const TDqTask& task) { return ret; } -void EstimateTaskResources(const TTableServiceConfig::TResourceManager& config, - TTaskResourceEstimation& ret, const ui32 tasksCount) -{ - ui64 totalChannels = std::max(tasksCount, (ui32)1) * std::max(ret.ChannelBuffersCount, (ui32)1); - ui64 optimalChannelBufferSizeEstimation = totalChannels * config.GetChannelBufferSize(); - - optimalChannelBufferSizeEstimation = std::min(optimalChannelBufferSizeEstimation, config.GetMaxTotalChannelBuffersSize()); - - ret.ChannelBufferMemoryLimit = std::max(config.GetMinChannelBufferSize(), optimalChannelBufferSizeEstimation / totalChannels); - - if (ret.HeavyProgram) { - ret.MkqlProgramMemoryLimit = config.GetMkqlHeavyProgramMemoryLimit() / tasksCount; - } else { - ret.MkqlProgramMemoryLimit = config.GetMkqlLightProgramMemoryLimit() / tasksCount; - } - - ret.TotalMemoryLimit = ret.ChannelBuffersCount * ret.ChannelBufferMemoryLimit - + ret.MkqlProgramMemoryLimit; -} - } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/rm_service/kqp_resource_estimation.h b/ydb/core/kqp/rm_service/kqp_resource_estimation.h index e88c7065c2ed..2a6af1a5339c 100644 --- a/ydb/core/kqp/rm_service/kqp_resource_estimation.h +++ b/ydb/core/kqp/rm_service/kqp_resource_estimation.h @@ -30,9 +30,5 @@ struct TTaskResourceEstimation { TTaskResourceEstimation BuildInitialTaskResources(const NYql::NDqProto::TDqTask& task); -TTaskResourceEstimation EstimateTaskResources(const NYql::NDqProto::TDqTask& task, - const NKikimrConfig::TTableServiceConfig::TResourceManager& config, const ui32 tasksCount); - -void EstimateTaskResources(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TTaskResourceEstimation& result, const ui32 tasksCount); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/rm_service/kqp_resource_estimation_ut.cpp b/ydb/core/kqp/rm_service/kqp_resource_estimation_ut.cpp deleted file mode 100644 index b741dc2220cf..000000000000 --- a/ydb/core/kqp/rm_service/kqp_resource_estimation_ut.cpp +++ /dev/null @@ -1,53 +0,0 @@ -#include -#include - -#include - -namespace NKikimr::NKqp { - -Y_UNIT_TEST_SUITE(KqpResourceEstimation) { - -Y_UNIT_TEST(TestChannelSize) { - NKikimrConfig::TTableServiceConfig::TResourceManager config; - config.SetChannelBufferSize(8_MB); - config.SetMinChannelBufferSize(2_MB); - config.SetMaxTotalChannelBuffersSize(2_GB); - config.SetMkqlLightProgramMemoryLimit(100); - - NYql::NDqProto::TDqTask task; - - // 100 input channels - auto* input = task.MutableInputs()->Add(); - for (int i = 0; i < 100; ++i) { - input->MutableChannels()->Add(); - } - - // 100 input channels - input = task.MutableInputs()->Add(); - for (int i = 0; i < 100; ++i) { - input->MutableChannels()->Add(); - } - - auto* output = task.MutableOutputs()->Add(); - output->MutableChannels()->Add(); - - auto est = EstimateTaskResources(task, config, 1); - UNIT_ASSERT_EQUAL(2, est.ChannelBuffersCount); - UNIT_ASSERT_EQUAL(est.ChannelBufferMemoryLimit, config.GetChannelBufferSize()); - - // add more channels, to be more then 256 - input = task.MutableInputs()->Add(); - for (int i = 0; i < 100; ++i) { - input->MutableChannels()->Add(); - } - - est = EstimateTaskResources(task, config, 1); - UNIT_ASSERT_EQUAL(2, est.ChannelBuffersCount); - - UNIT_ASSERT(est.ChannelBufferMemoryLimit == config.GetChannelBufferSize()); - UNIT_ASSERT(est.ChannelBufferMemoryLimit >= config.GetMinChannelBufferSize()); -} - -} // suite KqpResourceEstimation - -} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index b02e74d3b24b..1f3116dcce67 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -167,27 +167,27 @@ class TKqpResourceManager : public IKqpResourceManager { public: TKqpResourceManager(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TIntrusivePtr counters) - : Config(config) - , Counters(counters) - , ExecutionUnitsResource(Config.GetComputeActorsCount()) - , ExecutionUnitsLimit(Config.GetComputeActorsCount()) - , ScanQueryMemoryResource(Config.GetQueryMemoryLimit()) - , PublishResourcesByExchanger(Config.GetEnablePublishResourcesByExchanger()) { - + : Counters(counters) + , ExecutionUnitsResource(config.GetComputeActorsCount()) + , ExecutionUnitsLimit(config.GetComputeActorsCount()) + , ScanQueryMemoryResource(config.GetQueryMemoryLimit()) + , PublishResourcesByExchanger(config.GetEnablePublishResourcesByExchanger()) + { + SetConfigValues(config); } - void Bootstrap(TActorSystem* actorSystem, TActorId selfId) { + void Bootstrap(NKikimrConfig::TTableServiceConfig::TResourceManager& config, TActorSystem* actorSystem, TActorId selfId) { if (!Counters) { Counters = MakeIntrusive(AppData()->Counters); } ActorSystem = actorSystem; SelfId = selfId; - UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes(), - Config.GetKqpPatternCacheCompiledCapacityBytes(), - Config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile()); + UpdatePatternCache(config.GetKqpPatternCacheCapacityBytes(), + config.GetKqpPatternCacheCompiledCapacityBytes(), + config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile()); if (PublishResourcesByExchanger) { - CreateResourceInfoExchanger(Config.GetInfoExchangerSettings()); + CreateResourceInfoExchanger(config.GetInfoExchangerSettings()); return; } } @@ -278,7 +278,7 @@ class TKqpResourceManager : public IKqpResourceManager { hasScanQueryMemory = ScanQueryMemoryResource.Has(resources.Memory); if (hasScanQueryMemory) { ScanQueryMemoryResource.Acquire(resources.Memory); - queryMemoryLimit = Config.GetQueryMemoryLimit(); + queryMemoryLimit = QueryMemoryLimit.load(); } } // with_lock (Lock) @@ -505,16 +505,45 @@ class TKqpResourceManager : public IKqpResourceManager { return result; } - NKikimrConfig::TTableServiceConfig::TResourceManager GetConfig() override { + std::shared_ptr GetPatternCache() override { with_lock (Lock) { - return Config; + return PatternCache; } } - std::shared_ptr GetPatternCache() override { - with_lock (Lock) { - return PatternCache; + TTaskResourceEstimation EstimateTaskResources(const NYql::NDqProto::TDqTask& task, const ui32 tasksCount) override + { + TTaskResourceEstimation ret = BuildInitialTaskResources(task); + EstimateTaskResources(ret, tasksCount); + return ret; + } + + void EstimateTaskResources(TTaskResourceEstimation& ret, const ui32 tasksCount) override + { + ui64 totalChannels = std::max(tasksCount, (ui32)1) * std::max(ret.ChannelBuffersCount, (ui32)1); + ui64 optimalChannelBufferSizeEstimation = totalChannels * ChannelBufferSize.load(); + + optimalChannelBufferSizeEstimation = std::min(optimalChannelBufferSizeEstimation, MaxTotalChannelBuffersSize.load()); + + ret.ChannelBufferMemoryLimit = std::max(MinChannelBufferSize.load(), optimalChannelBufferSizeEstimation / totalChannels); + + if (ret.HeavyProgram) { + ret.MkqlProgramMemoryLimit = MkqlHeavyProgramMemoryLimit.load() / std::max(tasksCount, (ui32)1); + } else { + ret.MkqlProgramMemoryLimit = MkqlLightProgramMemoryLimit.load() / std::max(tasksCount, (ui32)1); } + + ret.TotalMemoryLimit = ret.ChannelBuffersCount * ret.ChannelBufferMemoryLimit + + ret.MkqlProgramMemoryLimit; + } + + void SetConfigValues(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) { + MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit()); + MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit()); + ChannelBufferSize.store(config.GetChannelBufferSize()); + MinChannelBufferSize.store(config.GetMinChannelBufferSize()); + MaxTotalChannelBuffersSize.store(config.GetMaxTotalChannelBuffersSize()); + QueryMemoryLimit.store(config.GetQueryMemoryLimit()); } ui32 GetNodeId() override { @@ -549,7 +578,13 @@ class TKqpResourceManager : public IKqpResourceManager { TActorId SelfId; - NKikimrConfig::TTableServiceConfig::TResourceManager Config; // guarded by Lock + std::atomic QueryMemoryLimit; + std::atomic MkqlHeavyProgramMemoryLimit; + std::atomic MkqlLightProgramMemoryLimit; + std::atomic ChannelBufferSize; + std::atomic MinChannelBufferSize; + std::atomic MaxTotalChannelBuffersSize; + TIntrusivePtr Counters; TIntrusivePtr ResourceBroker; TActorSystem* ActorSystem = nullptr; @@ -601,16 +636,21 @@ class TKqpResourceManagerActor : public TActorBootstrapped counters, const TActorId& resourceBrokerId, - std::shared_ptr&& kqpProxySharedResources) - : ResourceBrokerId(resourceBrokerId ? resourceBrokerId : MakeResourceBrokerID()) + std::shared_ptr&& kqpProxySharedResources, ui32 nodeId) + : Config(config) + , ResourceBrokerId(resourceBrokerId ? resourceBrokerId : MakeResourceBrokerID()) , KqpProxySharedResources(std::move(kqpProxySharedResources)) , PublishResourcesByExchanger(config.GetEnablePublishResourcesByExchanger()) { ResourceManager = std::make_shared(config, counters); + with_lock (ResourceManagers.Lock) { + ResourceManagers.ByNodeId[nodeId] = ResourceManager; + ResourceManagers.Default = ResourceManager; + } } void Bootstrap() { - ResourceManager->Bootstrap(TlsActivationContext->ActorSystem(), SelfId()); + ResourceManager->Bootstrap(Config, TlsActivationContext->ActorSystem(), SelfId()); LOG_D("Start KqpResourceManagerActor at " << SelfId() << " with ResourceBroker at " << ResourceBrokerId); @@ -639,11 +679,6 @@ class TKqpResourceManagerActor : public TActorBootstrappedExecutionUnitsLimit.load(); ResourceManager->ExecutionUnitsLimit.store(config.GetComputeActorsCount()); ResourceManager->ExecutionUnitsResource.fetch_add((i32)config.GetComputeActorsCount() - prev); - ResourceManager->Config.Swap(&config); + ResourceManager->SetConfigValues(config); + Config.Swap(&config); } - } static void HandleWork(TEvents::TEvUndelivered::TPtr& ev) { @@ -881,11 +916,6 @@ class TKqpResourceManagerActor : public TActorBootstrappedLock) { - str << ResourceManager->Config.DebugString() << Endl; - } - str << "State storage key: " << WbState.Tenant << Endl; with_lock (ResourceManager->Lock) { str << "ScanQuery memory resource: " << ResourceManager->ScanQueryMemoryResource.ToString() << Endl; @@ -989,7 +1019,7 @@ class TKqpResourceManagerActor : public TActorBootstrapped publishScheduledAt; with_lock (ResourceManager->Lock) { - publishInterval = TDuration::Seconds(ResourceManager->Config.GetPublishStatisticsIntervalSec()); + publishInterval = TDuration::Seconds(Config.GetPublishStatisticsIntervalSec()); publishScheduledAt = ResourceManager->PublishScheduledAt; } @@ -1072,6 +1102,8 @@ class TKqpResourceManagerActor : public TActorBootstrapped counters, NActors::TActorId resourceBroker, - std::shared_ptr kqpProxySharedResources) + std::shared_ptr kqpProxySharedResources, ui32 nodeId) { - return new NRm::TKqpResourceManagerActor(config, counters, resourceBroker, std::move(kqpProxySharedResources)); + return new NRm::TKqpResourceManagerActor(config, counters, resourceBroker, std::move(kqpProxySharedResources), nodeId); } std::shared_ptr GetKqpResourceManager(TMaybe _nodeId) { @@ -1111,6 +1143,10 @@ std::shared_ptr GetKqpResourceManager(TMaybe _no } ui32 nodeId = _nodeId ? *_nodeId : TActivationContext::ActorSystem()->NodeId; + if (auto rm = TryGetKqpResourceManager(nodeId)) { + return rm; + } + Y_ABORT("KqpResourceManager not ready yet, node #%" PRIu32, nodeId); } diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h index fe5f3e7986c5..6c5ef32a0ab5 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.h +++ b/ydb/core/kqp/rm_service/kqp_rm_service.h @@ -11,6 +11,8 @@ #include #include +#include "kqp_resource_estimation.h" + #include #include #include @@ -87,6 +89,9 @@ class IKqpResourceManager : private TNonCopyable { using TResourcesAllocatedCallback = std::function; + virtual TTaskResourceEstimation EstimateTaskResources(const NYql::NDqProto::TDqTask& task, const ui32 tasksCount) = 0; + virtual void EstimateTaskResources(TTaskResourceEstimation& result, const ui32 tasksCount) = 0; + virtual void FreeResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) = 0; virtual void FreeResources(ui64 txId, ui64 taskId) = 0; @@ -96,7 +101,6 @@ class IKqpResourceManager : private TNonCopyable { virtual TVector GetClusterResources() const = 0; virtual TKqpLocalNodeResources GetLocalResources() const = 0; - virtual NKikimrConfig::TTableServiceConfig::TResourceManager GetConfig() = 0; virtual std::shared_ptr GetPatternCache() = 0; @@ -142,7 +146,8 @@ struct TKqpProxySharedResources { NActors::IActor* CreateKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TIntrusivePtr counters, NActors::TActorId resourceBroker = {}, - std::shared_ptr kqpProxySharedResources = nullptr); + std::shared_ptr kqpProxySharedResources = nullptr, + ui32 nodeId = 0); std::shared_ptr GetKqpResourceManager(TMaybe nodeId = Nothing()); std::shared_ptr TryGetKqpResourceManager(TMaybe nodeId = Nothing()); diff --git a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp index 764e9cf8b6ce..5dcf0b43e30c 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp @@ -151,7 +151,7 @@ class KqpRm : public TTestBase { void CreateKqpResourceManager( const NKikimrConfig::TTableServiceConfig::TResourceManager& config, ui32 nodeInd = 0) { auto kqpCounters = MakeIntrusive(Counters); - auto resman = CreateKqpResourceManagerActor(config, kqpCounters, ResourceBrokers[nodeInd]); + auto resman = CreateKqpResourceManagerActor(config, kqpCounters, ResourceBrokers[nodeInd], nullptr, Runtime->GetNodeId(nodeInd)); ResourceManagers.push_back(Runtime->Register(resman, nodeInd)); Runtime->RegisterService(MakeKqpResourceManagerServiceID( Runtime->GetNodeId(nodeInd)), ResourceManagers.back(), nodeInd); diff --git a/ydb/core/kqp/rm_service/ut/ya.make b/ydb/core/kqp/rm_service/ut/ya.make index 9c259fa0c3d4..fb595ee233f6 100644 --- a/ydb/core/kqp/rm_service/ut/ya.make +++ b/ydb/core/kqp/rm_service/ut/ya.make @@ -7,7 +7,6 @@ IF (SANITIZER_TYPE OR WITH_VALGRIND) ENDIF() SRCS( - kqp_resource_estimation_ut.cpp kqp_rm_ut.cpp ) diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index b66c28e20be1..10259a4cfca8 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -159,7 +159,10 @@ class TKqpSessionActor : public TActorBootstrapped { return NKikimrServices::TActivity::KQP_SESSION_ACTOR; } - TKqpSessionActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, + TKqpSessionActor(const TActorId& owner, + std::shared_ptr resourceManager, + std::shared_ptr caFactory, + const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, std::optional federatedQuerySetup, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, @@ -168,6 +171,8 @@ class TKqpSessionActor : public TActorBootstrapped { const TActorId& kqpTempTablesAgentActor) : Owner(owner) , SessionId(sessionId) + , ResourceManager_(std::move(resourceManager)) + , CaFactory_(std::move(caFactory)) , Counters(counters) , Settings(workerSettings) , AsyncIoFactory(std::move(asyncIoFactory)) @@ -1264,6 +1269,8 @@ class TKqpSessionActor : public TActorBootstrapped { request.PerRequestDataSizeLimit = RequestControls.PerRequestDataSizeLimit; request.MaxShardCount = RequestControls.MaxShardCount; request.TraceId = QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId(); + request.CaFactory_ = CaFactory_; + request.ResourceManager_ = ResourceManager_; LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize()); const bool useEvWrite = ((HasOlapTable && Settings.TableService.GetEnableOlapSink()) || (!HasOlapTable && Settings.TableService.GetEnableOltpSink())) @@ -2492,6 +2499,8 @@ class TKqpSessionActor : public TActorBootstrapped { TActorId Owner; TString SessionId; + std::shared_ptr ResourceManager_; + std::shared_ptr CaFactory_; // cached lookups to issue counters THashMap CachedIssueCounters; TInstant CreationTime; @@ -2531,7 +2540,8 @@ class TKqpSessionActor : public TActorBootstrapped { } // namespace -IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, +IActor* CreateKqpSessionActor(const TActorId& owner, std::shared_ptr resourceManager, + std::shared_ptr caFactory, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, std::optional federatedQuerySetup, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, @@ -2539,7 +2549,7 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TActorId& kqpTempTablesAgentActor) { - return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, federatedQuerySetup, + return new TKqpSessionActor(owner, std::move(resourceManager), std::move(caFactory), sessionId, kqpSettings, workerSettings, federatedQuerySetup, std::move(asyncIoFactory), std::move(moduleResolverState), counters, queryServiceConfig, kqpTempTablesAgentActor); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.h b/ydb/core/kqp/session_actor/kqp_session_actor.h index bbcaa76dab6f..f26fff2b00ca 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.h +++ b/ydb/core/kqp/session_actor/kqp_session_actor.h @@ -11,6 +11,14 @@ #include #include +namespace NKikimr::NKqp::NComputeActor { + struct IKqpNodeComputeActorFactory; +} + +namespace NKikimr::NKqp::NRm { + class IKqpResourceManager; +} + namespace NKikimr::NKqp { struct TKqpWorkerSettings { @@ -48,7 +56,10 @@ struct TKqpWorkerSettings { } }; -IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, +IActor* CreateKqpSessionActor(const TActorId& owner, + std::shared_ptr resourceManager_, + std::shared_ptr caFactory_, + const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, std::optional federatedQuerySetup, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index cfb7952b7a73..9028720970df 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -831,7 +831,7 @@ namespace Tests { auto kqpProxySharedResources = std::make_shared(); IActor* kqpRmService = NKqp::CreateKqpResourceManagerActor( - Settings->AppConfig->GetTableServiceConfig().GetResourceManager(), nullptr, {}, kqpProxySharedResources); + Settings->AppConfig->GetTableServiceConfig().GetResourceManager(), nullptr, {}, kqpProxySharedResources, Runtime->GetNodeId(nodeIdx)); TActorId kqpRmServiceId = Runtime->Register(kqpRmService, nodeIdx); Runtime->RegisterService(NKqp::MakeKqpRmServiceID(Runtime->GetNodeId(nodeIdx)), kqpRmServiceId, nodeIdx); From e57e16951a84d5aed7b5db9d2da6bbdcdc5ef4c9 Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Thu, 11 Jul 2024 15:18:31 +0400 Subject: [PATCH 2/5] improve memory limits errors observability (add debugging details) (#6551) --- .../kqp_compute_actor_factory.cpp | 4 +++ ydb/core/kqp/rm_service/kqp_rm_service.cpp | 22 ++++++++++++ ydb/core/kqp/rm_service/kqp_rm_service.h | 1 + ydb/core/kqp/ut/query/kqp_limits_ut.cpp | 21 ++++++++++++ .../yql/dq/actors/compute/dq_compute_actor.h | 4 +++ .../compute/dq_compute_actor_async_io.h | 1 + .../dq/actors/compute/dq_compute_actor_impl.h | 34 +++++++++++++------ .../yql/providers/dq/actors/worker_actor.cpp | 4 +++ 8 files changed, 81 insertions(+), 10 deletions(-) diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp index 6ae5ae6fc2b3..9f3bb0c6d73d 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp @@ -63,6 +63,10 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager { return TotalQueryAllocationsSize >= ReasonableSpillingTreshold; } + TString MemoryConsumptionDetails() const override { + return ResourceManager->GetTxResourcesUsageDebugInfo(TxId); + } + void TerminateHandler(bool success, const NYql::TIssues& issues) { AFL_DEBUG(NKikimrServices::KQP_COMPUTE) ("problem", "finish_compute_actor") diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index 1f3116dcce67..120c29334143 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -107,6 +107,15 @@ struct TTxState { ui32 TxExecutionUnits = 0; TInstant CreatedAt; + TString ToString() const { + return TStringBuilder() << "TxResourcesInfo{ " + << "Memory initially granted resources: " << TxExternalDataQueryMemory + << ", extra allocations " << TxScanQueryMemory + << ", execution units: " << TxExecutionUnits + << ", started at: " << CreatedAt + << " }"; + } + TTaskState& Allocated(ui64 taskId, TInstant now, const TKqpResourcesRequest& resources, bool memoryAsExternal = false) { ui64 externalMemory = resources.ExternalMemory; ui64 resourceBrokerMemory = 0; @@ -564,6 +573,19 @@ class TKqpResourceManager : public IKqpResourceManager { ActorSystem->Send(SelfId, new TEvPrivate::TEvSchedulePublishResources); } + TString GetTxResourcesUsageDebugInfo(ui64 txId) override { + auto& txBucket = TxBucket(txId); + with_lock(txBucket.Lock) { + auto it = txBucket.Txs.find(txId); + if (it == txBucket.Txs.end()) { + return ""; + } + + return it->second.ToString(); + } + } + + void UpdatePatternCache(ui64 maxSizeBytes, ui64 maxCompiledSizeBytes, ui64 patternAccessTimesBeforeTryToCompile) { if (maxSizeBytes == 0) { PatternCache.reset(); diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h index 6c5ef32a0ab5..268160695dfd 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.h +++ b/ydb/core/kqp/rm_service/kqp_rm_service.h @@ -94,6 +94,7 @@ class IKqpResourceManager : private TNonCopyable { virtual void FreeResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) = 0; virtual void FreeResources(ui64 txId, ui64 taskId) = 0; + virtual TString GetTxResourcesUsageDebugInfo(ui64 txId) = 0; virtual void NotifyExternalResourcesAllocated(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) = 0; diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp index a5c993eff08d..4a65ed26cf46 100644 --- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp @@ -128,6 +128,27 @@ Y_UNIT_TEST_SUITE(KqpLimits) { UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); } + Y_UNIT_TEST(ComputeActorMemoryAllocationFailure) { + auto app = NKikimrConfig::TAppConfig(); + app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(10); + app.MutableTableServiceConfig()->MutableResourceManager()->SetQueryMemoryLimit(2000); + + TKikimrRunner kikimr(app); + CreateLargeTable(kikimr, 0, 0, 0); + + kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SLOW_LOG, NActors::NLog::PRI_ERROR); + + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(Q1_(R"( + SELECT * FROM `/Root/LargeTable`; + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + result.GetIssues().PrintTo(Cerr); + + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::OVERLOADED); + } + Y_UNIT_TEST(DatashardProgramSize) { auto app = NKikimrConfig::TAppConfig(); app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(1'000'000'000); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index b8853f283fd9..cae0328043d2 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -331,6 +331,10 @@ struct TGuaranteeQuotaManager : public IMemoryQuotaManager { return MaxMemorySize; }; + TString MemoryConsumptionDetails() const override { + return TString(); + } + virtual bool AllocateExtraQuota(ui64) { return false; } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 0122b27b150a..b5d30d48521e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -68,6 +68,7 @@ struct IMemoryQuotaManager { virtual ui64 GetCurrentQuota() const = 0; virtual ui64 GetMaxMemorySize() const = 0; virtual bool IsReasonableToUseSpilling() const = 0; + virtual TString MemoryConsumptionDetails() const = 0; }; // Source/transform. diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index e87acb1f7b7f..709448b9abbb 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -147,10 +147,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped static_cast(this)->DoBootstrap(); } catch (const NKikimr::TMemoryLimitExceededException& e) { - InternalError(NYql::NDqProto::StatusIds::OVERLOADED, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder() - << "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit() - << ", host: " << HostName() - << ", canAllocateExtraMemory: " << CanAllocateExtraMemory); + OnMemoryLimitExceptionHandler(); } catch (const std::exception& e) { InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssuesIds::UNEXPECTED, e.what()); } @@ -225,10 +222,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped TComputeActorClass* self = static_cast(this); (self->*FuncBody)(ev); } catch (const NKikimr::TMemoryLimitExceededException& e) { - InternalError(NYql::NDqProto::StatusIds::OVERLOADED, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder() - << "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit() - << ", host: " << HostName() - << ", canAllocateExtraMemory: " << CanAllocateExtraMemory); + OnMemoryLimitExceptionHandler(); } catch (const std::exception& e) { if (PassExceptions) { throw; @@ -316,6 +310,20 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped virtual bool DoHandleChannelsAfterFinishImpl() = 0; + void OnMemoryLimitExceptionHandler() { + TString memoryConsumptionDetails = MemoryLimits.MemoryQuotaManager->MemoryConsumptionDetails(); + TStringBuilder failureReason = TStringBuilder() + << "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit() + << ", host: " << HostName() + << ", canAllocateExtraMemory: " << CanAllocateExtraMemory; + + if (!memoryConsumptionDetails.empty()) { + failureReason << ", memory manager details: " << memoryConsumptionDetails; + } + + InternalError(NYql::NDqProto::StatusIds::OVERLOADED, TIssuesIds::KIKIMR_PRECONDITION_FAILED, failureReason); + } + void ProcessOutputsImpl(ERunStatus status) { ProcessOutputsState.LastRunStatus = status; @@ -652,7 +660,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped OutputTransformsMap.at(outputIndex).FinishIsAcknowledged = true; ContinueExecute(EResumeSource::CATransformFinished); } - + protected: //TDqComputeActorCheckpoints::ICallbacks //bool ReadyToCheckpoint() is pure and must be overriden in a derived class @@ -1490,7 +1498,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped if (inputDesc.HasSource()) { const auto watermarksMode = inputDesc.GetSource().GetWatermarksMode(); auto result = SourcesMap.emplace( - i, + i, static_cast(this)->CreateInputHelper(LogPrefix, i, watermarksMode) ); YQL_ENSURE(result.second); @@ -1668,6 +1676,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } else { // in basic mode enum sources directly for (auto& [inputIndex, sourceInfo] : SourcesMap) { + if (!sourceInfo.AsyncInput) + continue; + const auto& ingressStats = sourceInfo.AsyncInput->GetIngressStats(); ingressBytes += ingressStats.Bytes; // ingress rows are usually not reported, so we count rows in task runner input @@ -1690,6 +1701,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped for (auto& [outputIndex, sinkInfo] : SinksMap) { if (auto* sink = GetSink(outputIndex, sinkInfo)) { + if (!sinkInfo.AsyncOutput) + continue; + const auto& egressStats = sinkInfo.AsyncOutput->GetEgressStats(); const auto& pushStats = sink->GetPushStats(); if (RuntimeSettings.CollectFull()) { diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index b2536df28abe..f7a33da883ec 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -82,6 +82,10 @@ class TDummyMemoryQuotaManager: public IMemoryQuotaManager { return std::numeric_limits::max(); } + TString MemoryConsumptionDetails() const override { + return TString(); + } + bool IsReasonableToUseSpilling() const override { return false; } From 7c2c3e0c55c22550bc08b81f12b32c4166fadccd Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Fri, 12 Jul 2024 17:50:35 +0400 Subject: [PATCH 3/5] move allocate resources call before actor launch from node service to factory (#6583) --- .../kqp_compute_actor_factory.cpp | 13 ++++- .../compute_actor/kqp_compute_actor_factory.h | 3 +- ydb/core/kqp/executer_actor/kqp_planner.cpp | 34 ++++++++++-- ydb/core/kqp/executer_actor/kqp_planner.h | 2 +- .../kqp/node_service/kqp_node_service.cpp | 54 +++++++------------ 5 files changed, 62 insertions(+), 44 deletions(-) diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp index 9f3bb0c6d73d..5f0a48064907 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp @@ -114,13 +114,24 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { ReasonableSpillingTreshold.store(config.GetReasonableSpillingTreshold()); } - TActorId CreateKqpComputeActor(TCreateArgs&& args) { + TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) { NYql::NDq::TComputeMemoryLimits memoryLimits; memoryLimits.ChannelBufferSize = 0; memoryLimits.MkqlLightProgramMemoryLimit = MkqlLightProgramMemoryLimit.load(); memoryLimits.MkqlHeavyProgramMemoryLimit = MkqlHeavyProgramMemoryLimit.load(); auto estimation = ResourceManager_->EstimateTaskResources(*args.Task, args.NumberOfTasks); + NRm::TKqpResourcesRequest resourcesRequest; + resourcesRequest.MemoryPool = args.MemoryPool; + resourcesRequest.ExecutionUnits = 1; + resourcesRequest.Memory = memoryLimits.MkqlLightProgramMemoryLimit; + + auto rmResult = ResourceManager_->AllocateResources( + args.TxId, args.Task->GetId(), resourcesRequest); + + if (!rmResult) { + return NRm::TKqpRMAllocateResult{rmResult}; + } { ui32 inputChannelsCount = 0; diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h index e89fcabce098..db9df8830812 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h @@ -123,7 +123,8 @@ struct IKqpNodeComputeActorFactory { std::shared_ptr State = nullptr; }; - virtual NActors::TActorId CreateKqpComputeActor(TCreateArgs&& args) = 0; + typedef std::variant TActorStartResult; + virtual TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) = 0; virtual void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) = 0; }; diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index b611624d1ff1..c3964ab9f32d 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -37,6 +37,11 @@ std::unique_ptr CheckTaskSize(ui64 TxId, const TIntru return nullptr; } +std::unique_ptr MakeActorStartFailureError(const TActorId& executerId, const TString& reason) { + auto ev = std::make_unique(NYql::NDqProto::StatusIds::OVERLOADED, reason); + return std::make_unique(executerId, executerId, ev.release()); +} + void BuildInitialTaskResources(const TKqpTasksGraph& graph, ui64 taskId, TTaskResourceEstimation& ret) { const auto& task = graph.GetTask(taskId); const auto& stageInfo = graph.GetStageInfo(task.StageId); @@ -337,12 +342,12 @@ const IKqpGateway::TKqpSnapshot& TKqpPlanner::GetSnapshot() const { // optimizeProtoForLocalExecution - if we want to execute compute actor locally and don't want to serialize & then deserialize proto message // instead we just give ptr to proto message and after that we swap/copy it -void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize) { +TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize) { auto& task = TasksGraph.GetTask(taskId); NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task, true); NYql::NDq::TComputeRuntimeSettings settings; - task.ComputeActorId = CaFactory_->CreateKqpComputeActor({ + auto startResult = CaFactory_->CreateKqpComputeActor({ .ExecuterId = ExecuterId, .TxId = TxId, .Task = taskDesc, @@ -360,10 +365,19 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize) { .RlPath = Nothing() }); + if (const auto* rmResult = std::get_if(&startResult)) { + return rmResult->GetFailReason(); + } + + TActorId* actorId = std::get_if(&startResult); + Y_ABORT_UNLESS(actorId); + task.ComputeActorId = *actorId; + LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId); auto result = PendingComputeActors.emplace(task.ComputeActorId, TProgressStat()); YQL_ENSURE(result.second); + return TString(); } ui32 TKqpPlanner::GetnScanTasks() { @@ -401,7 +415,10 @@ std::unique_ptr TKqpPlanner::PlanExecution() { // on datashard tx. if (LocalComputeTasks) { for (ui64 taskId : ComputeTasks) { - ExecuteDataComputeTask(taskId, ComputeTasks.size()); + auto result = ExecuteDataComputeTask(taskId, ComputeTasks.size()); + if (!result.empty()) { + return MakeActorStartFailureError(ExecuterId, result); + } } ComputeTasks.clear(); } @@ -411,7 +428,10 @@ std::unique_ptr TKqpPlanner::PlanExecution() { // to execute this task locally so we can avoid useless overhead for remote task launching. for (auto& [shardId, tasks]: TasksPerNode) { for (ui64 taskId: tasks) { - ExecuteDataComputeTask(taskId, tasks.size()); + auto result = ExecuteDataComputeTask(taskId, tasks.size()); + if (!result.empty()) { + return MakeActorStartFailureError(ExecuterId, result); + } } } @@ -437,7 +457,11 @@ std::unique_ptr TKqpPlanner::PlanExecution() { if (tasksOnNodeIt != TasksPerNode.end()) { auto& tasks = tasksOnNodeIt->second; for (ui64 taskId: tasks) { - ExecuteDataComputeTask(taskId, tasks.size()); + auto result = ExecuteDataComputeTask(taskId, tasks.size()); + if (!result.empty()) { + return MakeActorStartFailureError(ExecuterId, result); + } + PendingComputeTasks.erase(taskId); } } diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 574827bf917b..08d3a5b47a58 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -85,7 +85,7 @@ class TKqpPlanner { private: const IKqpGateway::TKqpSnapshot& GetSnapshot() const; - void ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize); + TString ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize); void PrepareToProcess(); TString GetEstimationsInfo() const; diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index ce0a9d9dc805..099adf41b463 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -164,35 +164,6 @@ class TKqpNodeService : public TActorBootstrapped { memoryPool = NRm::EKqpMemoryPool::Unspecified; } - TVector allocatedTasks; - allocatedTasks.reserve(msg.GetTasks().size()); - for (auto& task : *msg.MutableTasks()) { - NKqpNode::TTaskContext& taskCtx = request.InFlyTasks[task.GetId()]; - YQL_ENSURE(taskCtx.TaskId == 0); - taskCtx.TaskId = task.GetId(); - - NRm::TKqpResourcesRequest resourcesRequest; - resourcesRequest.MemoryPool = memoryPool; - resourcesRequest.ExecutionUnits = 1; - - // !!!!!!!!!!!!!!!!!!!!! - // we have to allocate memory instead of reserve only. currently, this memory will not be used for request processing. - resourcesRequest.Memory = (1 << 19) /* 512kb limit for check that memory exists for processing with minimal requirements */; - - auto result = ResourceManager_->AllocateResources(txId, task.GetId(), resourcesRequest); - - if (!result) { - for (ui64 taskId : allocatedTasks) { - ResourceManager_->FreeResources(txId, taskId); - } - - ReplyError(txId, request.Executer, msg, result.GetStatus(), result.GetFailReason()); - return; - } - - allocatedTasks.push_back(task.GetId()); - } - auto reply = MakeHolder(); reply->Record.SetTxId(txId); @@ -213,13 +184,8 @@ class TKqpNodeService : public TActorBootstrapped { } const ui32 tasksCount = msg.GetTasks().size(); - for (int i = 0; i < msg.GetTasks().size(); ++i) { - auto& dqTask = *msg.MutableTasks(i); - auto& taskCtx = request.InFlyTasks[dqTask.GetId()]; - taskCtx.TaskId = dqTask.GetId(); - YQL_ENSURE(taskCtx.TaskId != 0); - - taskCtx.ComputeActorId = CaFactory_->CreateKqpComputeActor({ + for (auto& dqTask: *msg.MutableTasks()) { + auto result = CaFactory_->CreateKqpComputeActor({ .ExecuterId = request.Executer, .TxId = txId, .Task = &dqTask, @@ -239,6 +205,22 @@ class TKqpNodeService : public TActorBootstrapped { .State = State_ }); + if (const auto* rmResult = std::get_if(&result)) { + ReplyError(txId, request.Executer, msg, rmResult->GetStatus(), rmResult->GetFailReason()); + bucket.NewRequest(std::move(request)); + TerminateTx(txId, rmResult->GetFailReason()); + return; + } + + auto& taskCtx = request.InFlyTasks[dqTask.GetId()]; + YQL_ENSURE(taskCtx.TaskId == 0); + taskCtx.TaskId = dqTask.GetId(); + YQL_ENSURE(taskCtx.TaskId != 0); + + TActorId* actorId = std::get_if(&result); + Y_ABORT_UNLESS(actorId); + taskCtx.ComputeActorId = *actorId; + LOG_D("TxId: " << txId << ", executing task: " << taskCtx.TaskId << " on compute actor: " << taskCtx.ComputeActorId); auto* startedTask = reply->Record.AddStartedTasks(); From 607b49318be5881dff4b7ea9e42b7a500b550781 Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Sat, 13 Jul 2024 17:39:18 +0400 Subject: [PATCH 4/5] use same on memory limit handler everywhere (#6641) --- ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp | 5 +---- ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index 1c00eca6c792..ac8f34e1979b 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -133,10 +133,7 @@ STFUNC(TKqpComputeActor::StateFunc) { BaseStateFuncBody(ev); } } catch (const TMemoryLimitExceededException& e) { - InternalError(TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder() - << "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit() - << ", host: " << HostName() - << ", canAllocateExtraMemory: " << CanAllocateExtraMemory); + TBase::OnMemoryLimitExceptionHandler(); } catch (const NMiniKQL::TKqpEnsureFail& e) { InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage()); } catch (const yexception& e) { diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h index 69b0bb613c1a..eae7196cfe9c 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h @@ -46,10 +46,7 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase Date: Wed, 24 Jul 2024 14:10:36 +0300 Subject: [PATCH 5/5] don't lock on zero memory change (#6926) --- .../kqp_compute_actor_factory.cpp | 44 +- .../compute_actor/kqp_compute_actor_factory.h | 1 + ydb/core/kqp/counters/kqp_counters.cpp | 3 + ydb/core/kqp/counters/kqp_counters.h | 5 +- ydb/core/kqp/executer_actor/kqp_planner.cpp | 5 + ydb/core/kqp/executer_actor/kqp_planner.h | 1 + .../kqp/node_service/kqp_node_service.cpp | 4 + ydb/core/kqp/rm_service/kqp_rm_service.cpp | 384 ++++-------------- ydb/core/kqp/rm_service/kqp_rm_service.h | 136 ++++++- ydb/core/kqp/rm_service/kqp_rm_ut.cpp | 93 +++-- .../build_kqp_data_tx_out_rs_unit.cpp | 10 +- .../tx/datashard/execute_kqp_data_tx_unit.cpp | 9 +- 12 files changed, 330 insertions(+), 365 deletions(-) diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp index 5f0a48064907..ca920e7112df 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp @@ -12,75 +12,71 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager { TMemoryQuotaManager(std::shared_ptr resourceManager , NRm::EKqpMemoryPool memoryPool , std::shared_ptr state - , ui64 txId - , ui64 taskId + , TIntrusivePtr tx + , TIntrusivePtr task , ui64 limit , ui64 reasonableSpillingTreshold) : NYql::NDq::TGuaranteeQuotaManager(limit, limit) , ResourceManager(std::move(resourceManager)) , MemoryPool(memoryPool) , State(std::move(state)) - , TxId(txId) - , TaskId(taskId) + , Tx(std::move(tx)) + , Task(std::move(task)) , ReasonableSpillingTreshold(reasonableSpillingTreshold) { } ~TMemoryQuotaManager() override { if (State) { - State->OnTaskTerminate(TxId, TaskId, Success); + State->OnTaskTerminate(Tx->TxId, Task->TaskId, Success); } - ResourceManager->FreeResources(TxId, TaskId); + ResourceManager->FreeResources(Tx, Task); } bool AllocateExtraQuota(ui64 extraSize) override { - auto result = ResourceManager->AllocateResources(TxId, TaskId, + auto result = ResourceManager->AllocateResources(Tx, Task, NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize}); if (!result) { AFL_WARN(NKikimrServices::KQP_COMPUTE) ("problem", "cannot_allocate_memory") - ("tx_id", TxId) - ("task_id", TaskId) + ("tx_id", Tx->TxId) + ("task_id", Task->TaskId) ("memory", extraSize); return false; } - TotalQueryAllocationsSize = result.TotalAllocatedQueryMemory; - return true; } void FreeExtraQuota(ui64 extraSize) override { - ResourceManager->FreeResources(TxId, TaskId, - NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize} - ); + NRm::TKqpResourcesRequest request = NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize}; + ResourceManager->FreeResources(Tx, Task, Task->FitRequest(request)); } bool IsReasonableToUseSpilling() const override { - return TotalQueryAllocationsSize >= ReasonableSpillingTreshold; + return Tx->GetExtraMemoryAllocatedSize() >= ReasonableSpillingTreshold; } TString MemoryConsumptionDetails() const override { - return ResourceManager->GetTxResourcesUsageDebugInfo(TxId); + return Tx->ToString(); } void TerminateHandler(bool success, const NYql::TIssues& issues) { AFL_DEBUG(NKikimrServices::KQP_COMPUTE) ("problem", "finish_compute_actor") - ("tx_id", TxId)("task_id", TaskId)("success", success)("message", issues.ToOneLineString()); + ("tx_id", Tx->TxId)("task_id", Task->TaskId)("success", success)("message", issues.ToOneLineString()); Success = success; } std::shared_ptr ResourceManager; NRm::EKqpMemoryPool MemoryPool; std::shared_ptr State; - ui64 TxId; - ui64 TaskId; + TIntrusivePtr Tx; + TIntrusivePtr Task; bool Success = true; - ui64 TotalQueryAllocationsSize = 0; ui64 ReasonableSpillingTreshold = 0; }; @@ -126,8 +122,10 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { resourcesRequest.ExecutionUnits = 1; resourcesRequest.Memory = memoryLimits.MkqlLightProgramMemoryLimit; + TIntrusivePtr task = MakeIntrusive(args.Task->GetId(), args.TxInfo->CreatedAt); + auto rmResult = ResourceManager_->AllocateResources( - args.TxId, args.Task->GetId(), resourcesRequest); + args.TxInfo, task, resourcesRequest); if (!rmResult) { return NRm::TKqpRMAllocateResult{rmResult}; @@ -158,8 +156,8 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { ResourceManager_, args.MemoryPool, std::move(args.State), - args.TxId, - args.Task->GetId(), + std::move(args.TxInfo), + std::move(task), limit, ReasonableSpillingTreshold.load()); diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h index db9df8830812..2b63e14e5025 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h @@ -107,6 +107,7 @@ struct IKqpNodeComputeActorFactory { const NActors::TActorId& ExecuterId; const ui64 TxId; NYql::NDqProto::TDqTask* Task; + TIntrusivePtr TxInfo; const NYql::NDq::TComputeRuntimeSettings& RuntimeSettings; NWilson::TTraceId TraceId; TIntrusivePtr Arena; diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index a8b0b8a1a2c5..70e5169c4546 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -776,7 +776,10 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co RmExternalMemory = KqpGroup->GetCounter("RM/ExternalMemory", false); RmNotEnoughMemory = KqpGroup->GetCounter("RM/NotEnoughMemory", true); RmNotEnoughComputeActors = KqpGroup->GetCounter("RM/NotEnoughComputeActors", true); + RmOnStartAllocs = KqpGroup->GetCounter("Rm/OnStartAllocs", true); RmExtraMemAllocs = KqpGroup->GetCounter("RM/ExtraMemAllocs", true); + RmExtraMemFree = KqpGroup->GetCounter("RM/ExtraMemFree", true); + RmOnCompleteFree = KqpGroup->GetCounter("RM/OnCompleteFree", true); RmInternalError = KqpGroup->GetCounter("RM/InternalError", true); RmSnapshotLatency = KqpGroup->GetHistogram( "RM/SnapshotLatency", NMonitoring::ExponentialHistogram(20, 2, 1)); diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index 4a3328cbb0d1..f302897f1b8d 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -350,7 +350,7 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter ::NMonitoring::TDynamicCounterPtr WorkloadManagerGroup; ::NMonitoring::TDynamicCounters::TCounterPtr FullScansExecuted; - + // Lease updates counters ::NMonitoring::THistogramPtr LeaseUpdateLatency; ::NMonitoring::THistogramPtr RunActorLeaseUpdateBacklog; @@ -377,6 +377,9 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter ::NMonitoring::TDynamicCounters::TCounterPtr RmNotEnoughMemory; ::NMonitoring::TDynamicCounters::TCounterPtr RmNotEnoughComputeActors; ::NMonitoring::TDynamicCounters::TCounterPtr RmExtraMemAllocs; + ::NMonitoring::TDynamicCounters::TCounterPtr RmOnStartAllocs; + ::NMonitoring::TDynamicCounters::TCounterPtr RmExtraMemFree; + ::NMonitoring::TDynamicCounters::TCounterPtr RmOnCompleteFree; ::NMonitoring::TDynamicCounters::TCounterPtr RmInternalError; NMonitoring::THistogramPtr RmSnapshotLatency; NMonitoring::THistogramPtr NodeServiceStartEventDelivery; diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index c3964ab9f32d..7065b49936f1 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -346,11 +346,16 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize) auto& task = TasksGraph.GetTask(taskId); NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task, true); NYql::NDq::TComputeRuntimeSettings settings; + if (!TxInfo) { + TxInfo = MakeIntrusive( + TxId, TInstant::Now(), ResourceManager_->GetCounters()); + } auto startResult = CaFactory_->CreateKqpComputeActor({ .ExecuterId = ExecuterId, .TxId = TxId, .Task = taskDesc, + .TxInfo = TxInfo, .RuntimeSettings = settings, .TraceId = NWilson::TTraceId(ExecuterSpan.GetTraceId()), .Arena = TasksGraph.GetMeta().GetArenaIntrusivePtr(), diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 08d3a5b47a58..03ce07758cf5 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -133,6 +133,7 @@ class TKqpPlanner { TString SerializedGUCSettings; std::shared_ptr ResourceManager_; std::shared_ptr CaFactory_; + TIntrusivePtr TxInfo; public: static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 099adf41b463..5babf184cbaa 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -183,12 +183,16 @@ class TKqpNodeService : public TActorBootstrapped { rlPath.ConstructInPlace(msgRtSettings.GetRlPath()); } + TIntrusivePtr txInfo = MakeIntrusive( + txId, TInstant::Now(), ResourceManager_->GetCounters()); + const ui32 tasksCount = msg.GetTasks().size(); for (auto& dqTask: *msg.MutableTasks()) { auto result = CaFactory_->CreateKqpComputeActor({ .ExecuterId = request.Executer, .TxId = txId, .Task = &dqTask, + .TxInfo = txInfo, .RuntimeSettings = runtimeSettingsBase, .TraceId = NWilson::TTraceId(ev->TraceId), .Arena = ev->Get()->Arena, diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index 120c29334143..da4ffccb0158 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -91,69 +91,6 @@ class TLimitedResource { T Used; }; -struct TTaskState { - bool AllocatedExecutionUnit = false; - ui64 ScanQueryMemory = 0; - ui64 ExternalDataQueryMemory = 0; - ui32 ExecutionUnits = 0; - ui64 ResourceBrokerTaskId = 0; - TInstant CreatedAt; -}; - -struct TTxState { - std::unordered_map Tasks; - ui64 TxScanQueryMemory = 0; - ui64 TxExternalDataQueryMemory = 0; - ui32 TxExecutionUnits = 0; - TInstant CreatedAt; - - TString ToString() const { - return TStringBuilder() << "TxResourcesInfo{ " - << "Memory initially granted resources: " << TxExternalDataQueryMemory - << ", extra allocations " << TxScanQueryMemory - << ", execution units: " << TxExecutionUnits - << ", started at: " << CreatedAt - << " }"; - } - - TTaskState& Allocated(ui64 taskId, TInstant now, const TKqpResourcesRequest& resources, bool memoryAsExternal = false) { - ui64 externalMemory = resources.ExternalMemory; - ui64 resourceBrokerMemory = 0; - if (memoryAsExternal) { - externalMemory += resources.Memory; - } else { - resourceBrokerMemory = resources.Memory; - } - - TxExternalDataQueryMemory += externalMemory; - TxScanQueryMemory += resourceBrokerMemory; - if (!CreatedAt) { - CreatedAt = now; - } - - if (resources.ExecutionUnits) { - Y_ABORT_UNLESS(!Tasks.contains(taskId)); - } - - auto& taskState = Tasks[taskId]; - taskState.ExecutionUnits += resources.ExecutionUnits; - taskState.ScanQueryMemory += resourceBrokerMemory; - taskState.ExternalDataQueryMemory += externalMemory; - if (!taskState.CreatedAt) { - taskState.CreatedAt = now; - } - - return taskState; - } -}; - -struct TTxStatesBucket { - std::unordered_map Txs; // TxId -> TxState - TMutex Lock; -}; - -constexpr ui64 BucketsCount = 64; - struct TEvPrivate { enum EEv { EvPublishResources = EventSpaceBegin(TEvents::ES_PRIVATE), @@ -201,6 +138,10 @@ class TKqpResourceManager : public IKqpResourceManager { } } + const TIntrusivePtr& GetCounters() const override { + return Counters; + } + void CreateResourceInfoExchanger( const NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings& settings) { PublishResourcesByExchanger = true; @@ -219,7 +160,6 @@ class TKqpResourceManager : public IKqpResourceManager { ExecutionUnitsResource.fetch_add(cnt); return false; } else { - Counters->RmComputeActors->Add(cnt); return true; } } @@ -230,11 +170,13 @@ class TKqpResourceManager : public IKqpResourceManager { } ExecutionUnitsResource.fetch_add(cnt); - Counters->RmComputeActors->Sub(cnt); } - TKqpRMAllocateResult AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) override + TKqpRMAllocateResult AllocateResources(TIntrusivePtr& tx, TIntrusivePtr& task, const TKqpResourcesRequest& resources) override { + const ui64 txId = tx->TxId; + const ui64 taskId = task->TaskId; + TKqpRMAllocateResult result; if (resources.ExecutionUnits) { if (!AllocateExecutionUnits(resources.ExecutionUnits)) { @@ -257,22 +199,18 @@ class TKqpResourceManager : public IKqpResourceManager { return result; } - auto now = ActorSystem->Timestamp(); bool hasScanQueryMemory = true; ui64 queryMemoryLimit = 0; - // NOTE(gvit): the first memory request from the data query pool always satisfied. + + // NOTE(gvit): the first memory request always satisfied. // all other requests are not guaranteed to be satisfied. // In the nearest future we need to implement several layers of memory requests. bool isFirstAllocationRequest = (resources.ExecutionUnits > 0 && resources.MemoryPool == EKqpMemoryPool::DataQuery); if (isFirstAllocationRequest) { - auto& txBucket = TxBucket(txId); - with_lock(txBucket.Lock) { - auto& tx = txBucket.Txs[txId]; - tx.Allocated(taskId, now, resources, /*memoryAsExternal=*/true); - ExternalDataQueryMemory.fetch_add(resources.Memory + resources.ExternalMemory); - Counters->RmExternalMemory->Add(resources.Memory + resources.ExternalMemory); - } - + TKqpResourcesRequest newRequest = resources; + newRequest.MoveToFreeTier(); + tx->Allocated(task, newRequest); + ExternalDataQueryMemory.fetch_add(newRequest.ExternalMemory); return result; } @@ -301,170 +239,93 @@ class TKqpResourceManager : public IKqpResourceManager { ui64 rbTaskId = LastResourceBrokerTaskId.fetch_add(1) + 1; TString rbTaskName = TStringBuilder() << "kqp-" << txId << '-' << taskId << '-' << rbTaskId; - bool extraAlloc = false; - - auto& txBucket = TxBucket(txId); - with_lock (txBucket.Lock) { - Y_DEFER { - if (!result) { - auto unguard = ::Unguard(txBucket.Lock); - Counters->RmNotEnoughMemory->Inc(); - with_lock (Lock) { - ScanQueryMemoryResource.Release(resources.Memory); - } // with_lock (Lock) - } - }; - auto& tx = txBucket.Txs[txId]; - ui64 txTotalRequestedMemory = tx.TxScanQueryMemory + resources.Memory; - result.TotalAllocatedQueryMemory = txTotalRequestedMemory; - if (txTotalRequestedMemory > queryMemoryLimit) { - TStringBuilder reason; - reason << "TxId: " << txId << ", taskId: " << taskId << ". Query memory limit exceeded: " - << "requested " << txTotalRequestedMemory; - result.SetError(NKikimrKqp::TEvStartKqpTasksResponse::QUERY_MEMORY_LIMIT_EXCEEDED, reason); - return result; + Y_DEFER { + if (!result) { + Counters->RmNotEnoughMemory->Inc(); + with_lock (Lock) { + ScanQueryMemoryResource.Release(resources.Memory); + } // with_lock (Lock) } + }; - bool allocated = ResourceBroker->SubmitTaskInstant( - TEvResourceBroker::TEvSubmitTask(rbTaskId, rbTaskName, {0, resources.Memory}, "kqp_query", 0, {}), - SelfId); - - if (!allocated) { - TStringBuilder reason; - reason << "TxId: " << txId << ", taskId: " << taskId << ". Not enough ScanQueryMemory: " - << "requested " << resources.Memory; - LOG_AS_N(reason); - result.SetError(NKikimrKqp::TEvStartKqpTasksResponse::NOT_ENOUGH_MEMORY, reason); - return result; - } + ui64 txTotalRequestedMemory = tx->GetExtraMemoryAllocatedSize() + resources.Memory; + if (txTotalRequestedMemory > queryMemoryLimit) { + TStringBuilder reason; + reason << "TxId: " << txId << ", taskId: " << taskId << ". Query memory limit exceeded: " + << "requested " << txTotalRequestedMemory; + result.SetError(NKikimrKqp::TEvStartKqpTasksResponse::QUERY_MEMORY_LIMIT_EXCEEDED, reason); + return result; + } - auto& taskState = tx.Allocated(taskId, now, resources); - if (!taskState.ResourceBrokerTaskId) { - taskState.ResourceBrokerTaskId = rbTaskId; - } else { - extraAlloc = true; - bool merged = ResourceBroker->MergeTasksInstant(taskState.ResourceBrokerTaskId, rbTaskId, SelfId); - Y_ABORT_UNLESS(merged); - } - } // with_lock (txBucket.Lock) + bool allocated = ResourceBroker->SubmitTaskInstant( + TEvResourceBroker::TEvSubmitTask(rbTaskId, rbTaskName, {0, resources.Memory}, "kqp_query", 0, {}), + SelfId); - LOG_AS_D("TxId: " << txId << ", taskId: " << taskId << ". Allocated " << resources.ToString()); + if (!allocated) { + TStringBuilder reason; + reason << "TxId: " << txId << ", taskId: " << taskId << ". Not enough ScanQueryMemory: " + << "requested " << resources.Memory; + LOG_AS_N(reason); + result.SetError(NKikimrKqp::TEvStartKqpTasksResponse::NOT_ENOUGH_MEMORY, reason); + return result; + } - Counters->RmMemory->Add(resources.Memory); - if (extraAlloc) { - Counters->RmExtraMemAllocs->Inc(); + tx->Allocated(task, resources); + if (!task->ResourceBrokerTaskId) { + task->ResourceBrokerTaskId = rbTaskId; + } else { + bool merged = ResourceBroker->MergeTasksInstant(task->ResourceBrokerTaskId, rbTaskId, SelfId); + Y_ABORT_UNLESS(merged); } + LOG_AS_D("TxId: " << txId << ", taskId: " << taskId << ". Allocated " << resources.ToString()); FireResourcesPublishing(); return result; } - void FreeResources(ui64 txId, ui64 taskId) override { - FreeResources(txId, taskId, TKqpResourcesRequest{.ReleaseAllResources=true}); + void FreeResources(TIntrusivePtr& tx, TIntrusivePtr& task) override { + FreeResources(tx, task, task->FreeResourcesRequest()); } - void FreeResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) override { - ui64 releaseScanQueryMemory = 0; - ui64 releaseExternalDataQueryMemory = 0; - - auto& txBucket = TxBucket(txId); - - { - TMaybe> guard; - guard.ConstructInPlace(txBucket.Lock); - - auto txIt = txBucket.Txs.find(txId); - if (txIt == txBucket.Txs.end()) { - return; - } - - auto& tx = txIt->second; - auto taskIt = tx.Tasks.find(taskId); - if (taskIt == tx.Tasks.end()) { - return; - } - - auto& task = taskIt->second; - if (resources.ReleaseAllResources && task.ExecutionUnits) { - FreeExecutionUnits(task.ExecutionUnits); - } - - if (resources.ReleaseAllResources) { - releaseExternalDataQueryMemory = task.ExternalDataQueryMemory; - releaseScanQueryMemory = task.ScanQueryMemory; - } else { - releaseScanQueryMemory = std::min(task.ScanQueryMemory, resources.Memory); - ui64 leftToRelease = resources.Memory - releaseScanQueryMemory; - releaseExternalDataQueryMemory = std::min(task.ExternalDataQueryMemory, resources.ExternalMemory + leftToRelease); - } - - task.ScanQueryMemory -= releaseScanQueryMemory; - tx.TxScanQueryMemory -= releaseScanQueryMemory; - - task.ExternalDataQueryMemory -= releaseExternalDataQueryMemory; - tx.TxExternalDataQueryMemory -= releaseExternalDataQueryMemory; + void FreeResources(TIntrusivePtr& tx, TIntrusivePtr& task, const TKqpResourcesRequest& resources) override { + if (resources.ExecutionUnits) { + FreeExecutionUnits(resources.ExecutionUnits); + } - if (task.ScanQueryMemory == 0) { - if (task.ResourceBrokerTaskId) { - bool finished = ResourceBroker->FinishTaskInstant( - TEvResourceBroker::TEvFinishTask(task.ResourceBrokerTaskId), SelfId); - Y_DEBUG_ABORT_UNLESS(finished); - task.ResourceBrokerTaskId = 0; - } + Y_ABORT_UNLESS(resources.Memory <= task->ScanQueryMemory); + if (resources.Memory > 0 && task->ResourceBrokerTaskId) { + if (resources.Memory == task->ScanQueryMemory) { + bool finished = ResourceBroker->FinishTaskInstant( + TEvResourceBroker::TEvFinishTask(task->ResourceBrokerTaskId), SelfId); + Y_DEBUG_ABORT_UNLESS(finished); + task->ResourceBrokerTaskId = 0; } else { bool reduced = ResourceBroker->ReduceTaskResourcesInstant( - taskIt->second.ResourceBrokerTaskId, {0, releaseScanQueryMemory}, SelfId); + task->ResourceBrokerTaskId, {0, resources.Memory}, SelfId); Y_DEBUG_ABORT_UNLESS(reduced); } + } - if (resources.ExecutionUnits) { - ui64 remainsTasks = tx.Tasks.size() - 1; - if (remainsTasks == 0) { - txBucket.Txs.erase(txIt); - } else { - tx.Tasks.erase(taskIt); - } - } - - i64 prev = ExternalDataQueryMemory.fetch_sub(releaseExternalDataQueryMemory); - Counters->RmExternalMemory->Sub(releaseExternalDataQueryMemory); - Y_DEBUG_ABORT_UNLESS(prev >= 0); - Counters->RmMemory->Sub(releaseScanQueryMemory); - Y_DEBUG_ABORT_UNLESS(Counters->RmMemory->Val() >= 0); - } // with_lock (txBucket.Lock) + tx->Released(task, resources); + i64 prev = ExternalDataQueryMemory.fetch_sub(resources.ExternalMemory); + Y_DEBUG_ABORT_UNLESS(prev >= 0); - with_lock (Lock) { - ScanQueryMemoryResource.Release(releaseScanQueryMemory); - } // with_lock (Lock) + if (resources.Memory > 0) { + with_lock (Lock) { + ScanQueryMemoryResource.Release(resources.Memory); + } // with_lock (Lock) + } - LOG_AS_D("TxId: " << txId << ", taskId: " << taskId << ". Released resources, " - << "ScanQueryMemory: " << releaseScanQueryMemory << ", " - << "ExternalDataQueryMemory " << releaseExternalDataQueryMemory << ", " + LOG_AS_D("TxId: " << tx->TxId << ", taskId: " << task->TaskId << ". Released resources, " + << "ScanQueryMemory: " << resources.Memory << ", " + << "ExternalDataQueryMemory " << resources.ExternalMemory << ", " << "ExecutionUnits " << resources.ExecutionUnits << "."); FireResourcesPublishing(); } - void NotifyExternalResourcesAllocated(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) override { - LOG_AS_D("TxId: " << txId << ", taskId: " << taskId << ". External allocation: " << resources.ToString()); - - // we don't register data execution units for now - //YQL_ENSURE(resources.ExecutionUnits == 0); - YQL_ENSURE(resources.MemoryPool == EKqpMemoryPool::DataQuery); - - auto& txBucket = TxBucket(txId); - with_lock (txBucket.Lock) { - txBucket.Txs[txId].Allocated(taskId, TInstant(), resources); - ExternalDataQueryMemory.fetch_add(resources.ExternalMemory); - Counters->RmExternalMemory->Add(resources.ExternalMemory); - } // with_lock (txBucket.Lock) - - - FireResourcesPublishing(); - } - TVector GetClusterResources() const override { TVector resources; Y_ABORT_UNLESS(PublishResourcesByExchanger); @@ -559,33 +420,13 @@ class TKqpResourceManager : public IKqpResourceManager { return SelfId.NodeId(); } - TTxStatesBucket& TxBucket(ui64 txId) { - return Buckets[txId % Buckets.size()]; - } - void FireResourcesPublishing() { - with_lock (Lock) { - if (PublishScheduledAt) { - return; - } - } - - ActorSystem->Send(SelfId, new TEvPrivate::TEvSchedulePublishResources); - } - - TString GetTxResourcesUsageDebugInfo(ui64 txId) override { - auto& txBucket = TxBucket(txId); - with_lock(txBucket.Lock) { - auto it = txBucket.Txs.find(txId); - if (it == txBucket.Txs.end()) { - return ""; - } - - return it->second.ToString(); + bool prev = PublishScheduled.test_and_set(); + if (!prev) { + ActorSystem->Send(SelfId, new TEvPrivate::TEvSchedulePublishResources); } } - void UpdatePatternCache(ui64 maxSizeBytes, ui64 maxCompiledSizeBytes, ui64 patternAccessTimesBeforeTryToCompile) { if (maxSizeBytes == 0) { PatternCache.reset(); @@ -621,12 +462,9 @@ class TKqpResourceManager : public IKqpResourceManager { std::atomic ExternalDataQueryMemory = 0; // current state - std::array Buckets; std::atomic LastResourceBrokerTaskId = 0; - // schedule info (guarded by Lock) - std::optional PublishScheduledAt; - + std::atomic_flag PublishScheduled; // pattern cache for different actors std::shared_ptr PatternCache; @@ -755,9 +593,7 @@ class TKqpResourceManagerActor : public TActorBootstrappedLock) { - ResourceManager->PublishScheduledAt.reset(); - } + PublishResourcesScheduledAt.reset(); PublishResourceUsage("batching"); } @@ -949,35 +785,8 @@ class TKqpResourceManagerActor : public TActorBootstrapped publishScheduledAt; - with_lock (ResourceManager->Lock) { - publishScheduledAt = ResourceManager->PublishScheduledAt; - } - - if (publishScheduledAt) { - str << "Next publish time: " << *publishScheduledAt << Endl; - } - - str << Endl << "Transactions:" << Endl; - for (auto& bucket : ResourceManager->Buckets) { - with_lock (bucket.Lock) { - for (auto& [txId, txState] : bucket.Txs) { - str << " TxId: " << txId << Endl; - str << " ScanQuery memory: " << txState.TxScanQueryMemory << Endl; - str << " External DataQuery memory: " << txState.TxExternalDataQueryMemory << Endl; - str << " Execution units: " << txState.TxExecutionUnits << Endl; - str << " Create at: " << txState.CreatedAt << Endl; - str << " Tasks:" << Endl; - for (auto& [taskId, taskState] : txState.Tasks) { - str << " TaskId: " << taskId << Endl; - str << " ScanQuery memory: " << taskState.ScanQueryMemory << Endl; - str << " External DataQuery memory: " << taskState.ExternalDataQueryMemory << Endl; - str << " Execution units: " << taskState.ExecutionUnits << Endl; - str << " ResourceBroker TaskId: " << taskState.ResourceBrokerTaskId << Endl; - str << " Created at: " << taskState.CreatedAt << Endl; - } - } - } // with_lock (bucket.Lock) + if (PublishResourcesScheduledAt) { + str << "Next publish time: " << *PublishResourcesScheduledAt << Endl; } if (snapshot.empty()) { @@ -992,13 +801,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped publishScheduledAt; - - with_lock (ResourceManager->Lock) { - publishInterval = TDuration::Seconds(Config.GetPublishStatisticsIntervalSec()); - publishScheduledAt = ResourceManager->PublishScheduledAt; - } - - if (publishScheduledAt) { + const TDuration publishInterval = TDuration::Seconds(Config.GetPublishStatisticsIntervalSec()); + if (PublishResourcesScheduledAt) { return; } auto now = ResourceManager->ActorSystem->Timestamp(); if (publishInterval && WbState.LastPublishTime && now - *WbState.LastPublishTime < publishInterval) { - publishScheduledAt = *WbState.LastPublishTime + publishInterval; - - with_lock (ResourceManager->Lock) { - ResourceManager->PublishScheduledAt = publishScheduledAt; - } + PublishResourcesScheduledAt = *WbState.LastPublishTime + publishInterval; - Schedule(*publishScheduledAt - now, new TEvPrivate::TEvPublishResources); - LOG_D("Schedule publish at " << *publishScheduledAt << ", after " << (*publishScheduledAt - now)); + Schedule(*PublishResourcesScheduledAt - now, new TEvPrivate::TEvPublishResources); + LOG_D("Schedule publish at " << *PublishResourcesScheduledAt << ", after " << (*PublishResourcesScheduledAt - now)); return; } + // starting resources publishing. + // saying resource manager that we are ready for the next publishing. + ResourceManager->PublishScheduled.clear(); + NKikimrKqp::TKqpNodeResources payload; payload.SetNodeId(SelfId().NodeId()); payload.SetTimestamp(now.Seconds()); @@ -1145,6 +940,7 @@ class TKqpResourceManagerActor : public TActorBootstrapped ResourceManager; + std::optional PublishResourcesScheduledAt; bool PublishResourcesByExchanger; std::optional SelfDataCenterId; }; diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h index 268160695dfd..7c7bd8714676 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.h +++ b/ydb/core/kqp/rm_service/kqp_rm_service.h @@ -42,18 +42,140 @@ struct TKqpResourcesRequest { ui64 ExternalMemory = 0; bool ReleaseAllResources = false; + void MoveToFreeTier() { + ExternalMemory += Memory; + Memory = 0; + } + TString ToString() const { return TStringBuilder() << "TKqpResourcesRequest{ MemoryPool: " << (ui32) MemoryPool << ", Memory: " << Memory << "ExternalMemory: " << ExternalMemory << " }"; } }; +class TTxState; + +class TTaskState : public TAtomicRefCount { + friend TTxState; + +public: + const ui64 TaskId = 0; + const TInstant CreatedAt; + ui64 ScanQueryMemory = 0; + ui64 ExternalDataQueryMemory = 0; + ui64 ResourceBrokerTaskId = 0; + ui32 ExecutionUnits = 0; + +public: + + // compute actor wants to release some memory. + // we distribute that memory across granted resources + TKqpResourcesRequest FitRequest(TKqpResourcesRequest& resources) { + ui64 releaseScanQueryMemory = std::min(ScanQueryMemory, resources.Memory); + ui64 leftToRelease = resources.Memory - releaseScanQueryMemory; + ui64 releaseExternalDataQueryMemory = std::min(ExternalDataQueryMemory, resources.ExternalMemory + leftToRelease); + + resources.Memory = releaseScanQueryMemory; + resources.ExternalMemory = releaseExternalDataQueryMemory; + return resources; + } + + TKqpResourcesRequest FreeResourcesRequest() const { + return TKqpResourcesRequest{ + .ExecutionUnits=ExecutionUnits, + .MemoryPool=EKqpMemoryPool::Unspecified, + .Memory=ScanQueryMemory, + .ExternalMemory=ExternalDataQueryMemory}; + } + + explicit TTaskState(ui64 taskId, TInstant createdAt) + : TaskId(taskId) + , CreatedAt(createdAt) + { + } +}; + +class TTxState : public TAtomicRefCount { + +public: + const ui64 TxId; + const TInstant CreatedAt; + TIntrusivePtr Counters; +private: + std::atomic TxScanQueryMemory = 0; + std::atomic TxExternalDataQueryMemory = 0; + std::atomic TxExecutionUnits = 0; + +public: + explicit TTxState(ui64 txId, TInstant now, TIntrusivePtr counters) + : TxId(txId) + , CreatedAt(now) + , Counters(std::move(counters)) + {} + + TString ToString() const { + return TStringBuilder() << "TxResourcesInfo{ " + << "TxId: " << TxId + << ", memory initially granted resources: " << TxExternalDataQueryMemory.load() + << ", extra allocations " << TxScanQueryMemory.load() + << ", execution units: " << TxExecutionUnits.load() + << ", started at: " << CreatedAt + << " }"; + } + + ui64 GetExtraMemoryAllocatedSize() { + return TxScanQueryMemory.load(); + } + + void Released(TIntrusivePtr& taskState, const TKqpResourcesRequest& resources) { + if (resources.ExecutionUnits) { + Counters->RmOnCompleteFree->Inc(); + } else { + Counters->RmExtraMemFree->Inc(); + } + + Counters->RmExternalMemory->Sub(resources.ExternalMemory); + TxExternalDataQueryMemory.fetch_sub(resources.ExternalMemory); + taskState->ExternalDataQueryMemory -= resources.ExternalMemory; + + TxScanQueryMemory.fetch_sub(resources.Memory); + taskState->ScanQueryMemory -= resources.Memory; + Counters->RmMemory->Sub(resources.Memory); + + TxExecutionUnits.fetch_sub(resources.ExecutionUnits); + taskState->ExecutionUnits -= resources.ExecutionUnits; + Counters->RmComputeActors->Sub(resources.ExecutionUnits); + } + + void Allocated(TIntrusivePtr& taskState, const TKqpResourcesRequest& resources) { + if (resources.ExecutionUnits > 0) { + Counters->RmOnStartAllocs->Inc(); + } + + Counters->RmExternalMemory->Add(resources.ExternalMemory); + TxExternalDataQueryMemory.fetch_add(resources.ExternalMemory); + taskState->ExternalDataQueryMemory += resources.ExternalMemory; + + TxScanQueryMemory.fetch_add(resources.Memory); + taskState->ScanQueryMemory += resources.Memory; + Counters->RmMemory->Add(resources.Memory); + if (resources.Memory) { + Counters->RmExtraMemAllocs->Inc(); + } + + TxExecutionUnits.fetch_add(resources.ExecutionUnits); + taskState->ExecutionUnits += resources.ExecutionUnits; + Counters->RmComputeActors->Add(resources.ExecutionUnits); + } +}; + /// detailed information on allocation failure struct TKqpRMAllocateResult { bool Success = true; NKikimrKqp::TEvStartKqpTasksResponse::ENotStartedTaskReason Status = NKikimrKqp::TEvStartKqpTasksResponse::INTERNAL_ERROR; TString FailReason; - ui64 TotalAllocatedQueryMemory = 0; + TIntrusivePtr TaskInfo; + TIntrusivePtr TxInfo; NKikimrKqp::TEvStartKqpTasksResponse::ENotStartedTaskReason GetStatus() const { return Status; @@ -85,19 +207,15 @@ class IKqpResourceManager : private TNonCopyable { public: virtual ~IKqpResourceManager() = default; - virtual TKqpRMAllocateResult AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) = 0; + virtual const TIntrusivePtr& GetCounters() const = 0; - using TResourcesAllocatedCallback = std::function; + virtual TKqpRMAllocateResult AllocateResources(TIntrusivePtr& tx, TIntrusivePtr& task, const TKqpResourcesRequest& resources) = 0; virtual TTaskResourceEstimation EstimateTaskResources(const NYql::NDqProto::TDqTask& task, const ui32 tasksCount) = 0; virtual void EstimateTaskResources(TTaskResourceEstimation& result, const ui32 tasksCount) = 0; - virtual void FreeResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) = 0; - virtual void FreeResources(ui64 txId, ui64 taskId) = 0; - virtual TString GetTxResourcesUsageDebugInfo(ui64 txId) = 0; - - virtual void NotifyExternalResourcesAllocated(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) = 0; - + virtual void FreeResources(TIntrusivePtr& tx, TIntrusivePtr& task, const TKqpResourcesRequest& resources) = 0; + virtual void FreeResources(TIntrusivePtr& tx, TIntrusivePtr& task) = 0; virtual void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) = 0; virtual TVector GetClusterResources() const = 0; diff --git a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp index 5dcf0b43e30c..09d7c0536254 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp @@ -185,6 +185,14 @@ class KqpRm : public TTestBase { UNIT_ASSERT_VALUES_EQUAL(t->GetCounter("InFlyTasks")->Val(), infly); } + TIntrusivePtr MakeTx(ui64 txId, std::shared_ptr rm) { + return MakeIntrusive(txId, TInstant::Now(), rm->GetCounters()); + } + + TIntrusivePtr MakeTask(ui64 taskId, TIntrusivePtr tx) { + return MakeIntrusive(taskId, tx->CreatedAt); + } + void AssertResourceManagerStats( std::shared_ptr rm, ui64 scanQueryMemory, ui32 executionUnits) { Y_UNUSED(executionUnits); @@ -316,14 +324,16 @@ void KqpRm::SingleTask() { NRm::TKqpResourcesRequest request; request.MemoryPool = NRm::EKqpMemoryPool::ScanQuery; request.Memory = 100; + auto tx1 = MakeTx(1, rm); + auto task2 = MakeTask(2, tx1); - bool allocated = rm->AllocateResources(1, 2, request); + bool allocated = rm->AllocateResources(tx1, task2, request); UNIT_ASSERT(allocated); AssertResourceManagerStats(rm, 900, 90); AssertResourceBrokerSensors(0, 100, 0, 0, 1); - rm->FreeResources(1, 2); + rm->FreeResources(tx1, task2); AssertResourceManagerStats(rm, 1000, 100); AssertResourceBrokerSensors(0, 0, 0, 1, 0); } @@ -338,14 +348,23 @@ void KqpRm::ManyTasks() { request.MemoryPool = NRm::EKqpMemoryPool::ScanQuery; request.Memory = 100; + auto tx1 = MakeTx(1, rm); + TIntrusivePtr task1; + for (ui32 i = 1; i < 10; ++i) { - bool allocated = rm->AllocateResources(1, i, request); + auto task = MakeTask(i, tx1); + if (!task1) { + task1 = task; + } + + bool allocated = rm->AllocateResources(tx1, task, request); UNIT_ASSERT(allocated); AssertResourceManagerStats(rm, 1000 - 100 * i, 100 - 10 * i); AssertResourceBrokerSensors(0, 100 * i, 0, 0, i); } +/* // invalid taskId rm->FreeResources(1, 0); AssertResourceManagerStats(rm, 100, 10); @@ -355,8 +374,9 @@ void KqpRm::ManyTasks() { rm->FreeResources(10, 1); AssertResourceManagerStats(rm, 100, 10); AssertResourceBrokerSensors(0, 900, 0, 0, 9); +*/ - rm->FreeResources(1, 1); + rm->FreeResources(tx1, task1); AssertResourceManagerStats(rm, 200, 20); AssertResourceBrokerSensors(0, 800, 0, 1, 8); } @@ -371,7 +391,10 @@ void KqpRm::NotEnoughMemory() { request.MemoryPool = NRm::EKqpMemoryPool::ScanQuery; request.Memory = 10'000; - bool allocated = rm->AllocateResources(1, 2, request); + auto tx = MakeTx(1, rm); + auto task = MakeTask(2, tx); + + bool allocated = rm->AllocateResources(tx, task, request); UNIT_ASSERT(!allocated); AssertResourceManagerStats(rm, 1000, 100); @@ -389,8 +412,10 @@ void KqpRm::NotEnoughExecutionUnits() { request.Memory = 100; request.ExecutionUnits = 1000; - bool allocated = true; - allocated &= rm->AllocateResources(1, 2, request); + auto tx = MakeTx(1, rm); + auto task = MakeTask(2, tx); + + bool allocated = rm->AllocateResources(tx, task, request); UNIT_ASSERT(!allocated); AssertResourceManagerStats(rm, 1000, 100); @@ -410,12 +435,15 @@ void KqpRm::ResourceBrokerNotEnoughResources() { request.MemoryPool = NRm::EKqpMemoryPool::ScanQuery; request.Memory = 1'000; - bool allocated = rm->AllocateResources(1, 2, request); + auto tx = MakeTx(1, rm); + auto task = MakeTask(2, tx); + + bool allocated = rm->AllocateResources(tx, task, request); UNIT_ASSERT(allocated); request.MemoryPool = NRm::EKqpMemoryPool::ScanQuery; request.Memory = 100'000; - allocated = rm->AllocateResources(1, 2, request); + allocated = rm->AllocateResources(tx, task, request); UNIT_ASSERT(!allocated); AssertResourceManagerStats(rm, config.GetQueryMemoryLimit() - 1000, 90); @@ -432,11 +460,16 @@ void KqpRm::Snapshot(bool byExchanger) { request.MemoryPool = NRm::EKqpMemoryPool::ScanQuery; request.Memory = 100; request.ExecutionUnits = 10; + auto tx1 = MakeTx(1, rm); + auto tx2 = MakeTx(2, rm); - bool allocated = rm->AllocateResources(1, 2, request); + auto task2 = MakeTask(2, tx1); + auto task1 = MakeTask(1, tx2); + + bool allocated = rm->AllocateResources(tx1, task2, request); UNIT_ASSERT(allocated); - allocated &= rm->AllocateResources(2, 1, request); + allocated &= rm->AllocateResources(tx2, task1, request); UNIT_ASSERT(allocated); AssertResourceManagerStats(rm, 800, 80); @@ -446,8 +479,8 @@ void KqpRm::Snapshot(bool byExchanger) { CheckSnapshot(0, {{800, 80}, {1000, 100}}, rm); - rm->FreeResources(1, 2); - rm->FreeResources(2, 1); + rm->FreeResources(tx1, task2); + rm->FreeResources(tx2, task1); AssertResourceManagerStats(rm, 1000, 100); AssertResourceBrokerSensors(0, 0, 0, 2, 0); @@ -474,8 +507,10 @@ void KqpRm::Reduce() { NRm::TKqpResourcesRequest request; request.MemoryPool = NRm::EKqpMemoryPool::ScanQuery; request.Memory = 100; + auto tx = MakeTx(1, rm); + auto task = MakeTask(1, tx); - bool allocated = rm->AllocateResources(1, 1, request); + bool allocated = rm->AllocateResources(tx, task, request); UNIT_ASSERT(allocated); AssertResourceManagerStats(rm, 1000 - 100, 100 - 10); @@ -485,6 +520,7 @@ void KqpRm::Reduce() { reduceRequest.MemoryPool = NRm::EKqpMemoryPool::ScanQuery; reduceRequest.Memory = 70; +/* // invalid taskId rm->FreeResources(1, 0); AssertResourceManagerStats(rm, 1000 - 100, 100 - 10); @@ -494,8 +530,9 @@ void KqpRm::Reduce() { rm->FreeResources(10, 1); AssertResourceManagerStats(rm, 1000 - 100, 100 - 10); AssertResourceBrokerSensors(0, 100, 0, 0, 1); +*/ - rm->FreeResources(1, 1, reduceRequest); + rm->FreeResources(tx, task, reduceRequest); AssertResourceManagerStats(rm, 1000 - 30, 100 - 7); AssertResourceBrokerSensors(0, 30, 0, 0, 1); } @@ -517,11 +554,21 @@ void KqpRm::SnapshotSharing(bool byExchanger) { request.Memory = 100; request.ExecutionUnits = 10; + auto tx1Rm1 = MakeTx(1, rm_first); + auto tx2Rm1 = MakeTx(2, rm_first); + auto task1Rm1 = MakeTask(1, tx1Rm1); + auto task2Rm1 = MakeTask(1, tx2Rm1); + + auto tx1Rm2 = MakeTx(1, rm_second); + auto tx2Rm2 = MakeTx(2, rm_second); + auto task1Rm2 = MakeTask(1, tx1Rm2); + auto task2Rm2 = MakeTask(2, tx2Rm2); + { - bool allocated = rm_first->AllocateResources(1, 2, request); + bool allocated = rm_first->AllocateResources(tx1Rm1, task1Rm1, request); UNIT_ASSERT(allocated); - allocated &= rm_first->AllocateResources(2, 1, request); + allocated &= rm_first->AllocateResources(tx2Rm1, task2Rm1, request); UNIT_ASSERT(allocated); Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); @@ -530,10 +577,10 @@ void KqpRm::SnapshotSharing(bool byExchanger) { } { - bool allocated = rm_second->AllocateResources(1, 2, request); + bool allocated = rm_second->AllocateResources(tx1Rm2, task1Rm2, request); UNIT_ASSERT(allocated); - allocated &= rm_second->AllocateResources(2, 1, request); + allocated &= rm_second->AllocateResources(tx2Rm2, task2Rm2, request); UNIT_ASSERT(allocated); Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); @@ -542,8 +589,8 @@ void KqpRm::SnapshotSharing(bool byExchanger) { } { - rm_first->FreeResources(1, 2); - rm_first->FreeResources(2, 1); + rm_first->FreeResources(tx1Rm1, task1Rm1); + rm_first->FreeResources(tx2Rm1, task2Rm1); Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); @@ -551,8 +598,8 @@ void KqpRm::SnapshotSharing(bool byExchanger) { } { - rm_second->FreeResources(1, 2); - rm_second->FreeResources(2, 1); + rm_second->FreeResources(tx1Rm2, task1Rm2); + rm_second->FreeResources(tx2Rm2, task2Rm2); Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); diff --git a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp index 1e661132460d..5aafd5a20ada 100644 --- a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp @@ -83,15 +83,9 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac auto allocGuard = tasksRunner.BindAllocator(txc.GetMemoryLimit() - dataTx->GetTxSize()); - NKqp::NRm::TKqpResourcesRequest req; - req.MemoryPool = NKqp::NRm::EKqpMemoryPool::DataQuery; - req.ExternalMemory = txc.GetMemoryLimit(); - ui64 taskId = dataTx->GetFirstKqpTaskId(); - - NKqp::GetKqpResourceManager()->NotifyExternalResourcesAllocated(tx->GetTxId(), taskId, req); - + NKqp::GetKqpResourceManager()->GetCounters()->RmExternalMemory->Add(txc.GetMemoryLimit()); Y_DEFER { - NKqp::GetKqpResourceManager()->FreeResources(tx->GetTxId(), taskId); + NKqp::GetKqpResourceManager()->GetCounters()->RmExternalMemory->Sub(txc.GetMemoryLimit()); }; LOG_T("Operation " << *op << " (build_kqp_data_tx_out_rs) at " << tabletId diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 74c0d4e740a5..b61b78d7f180 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -217,14 +217,9 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio auto allocGuard = tasksRunner.BindAllocator(txc.GetMemoryLimit() - dataTx->GetTxSize()); - NKqp::NRm::TKqpResourcesRequest req; - req.MemoryPool = NKqp::NRm::EKqpMemoryPool::DataQuery; - req.ExternalMemory = txc.GetMemoryLimit(); - ui64 taskId = dataTx->GetFirstKqpTaskId(); - NKqp::GetKqpResourceManager()->NotifyExternalResourcesAllocated(txId, taskId, req); - + NKqp::GetKqpResourceManager()->GetCounters()->RmExternalMemory->Add(txc.GetMemoryLimit()); Y_DEFER { - NKqp::GetKqpResourceManager()->FreeResources(txId, taskId); + NKqp::GetKqpResourceManager()->GetCounters()->RmExternalMemory->Sub(txc.GetMemoryLimit()); }; LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId