Skip to content

merge memory limitations features and staff to stable-24-3 #6803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2149,7 +2149,7 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu

// Create resource manager
auto rm = NKqp::CreateKqpResourceManagerActor(Config.GetTableServiceConfig().GetResourceManager(), nullptr,
{}, kqpProxySharedResources);
{}, kqpProxySharedResources, NodeId);
setup->LocalServices.push_back(std::make_pair(
NKqp::MakeKqpRmServiceID(NodeId),
TActorSetupCmd(rm, TMailboxType::HTSwap, appData->UserPoolId)));
Expand Down
146 changes: 93 additions & 53 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,137 +6,174 @@

namespace NKikimr::NKqp::NComputeActor {


struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {

TMemoryQuotaManager(std::shared_ptr<NRm::IKqpResourceManager> resourceManager
, NRm::EKqpMemoryPool memoryPool
, std::shared_ptr<IKqpNodeState> state
, ui64 txId
, ui64 taskId
, TIntrusivePtr<NRm::TTxState> tx
, TIntrusivePtr<NRm::TTaskState> task
, ui64 limit
, ui64 reasonableSpillingTreshold)
: NYql::NDq::TGuaranteeQuotaManager(limit, limit)
, ResourceManager(std::move(resourceManager))
, MemoryPool(memoryPool)
, State(std::move(state))
, TxId(txId)
, TaskId(taskId)
, Tx(std::move(tx))
, Task(std::move(task))
, ReasonableSpillingTreshold(reasonableSpillingTreshold)
{
}

~TMemoryQuotaManager() override {
State->OnTaskTerminate(TxId, TaskId, Success);
ResourceManager->FreeResources(TxId, TaskId);
if (State) {
State->OnTaskTerminate(Tx->TxId, Task->TaskId, Success);
}

ResourceManager->FreeResources(Tx, Task);
}

bool AllocateExtraQuota(ui64 extraSize) override {
auto result = ResourceManager->AllocateResources(TxId, TaskId,
auto result = ResourceManager->AllocateResources(Tx, Task,
NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize});

if (!result) {
AFL_WARN(NKikimrServices::KQP_COMPUTE)
("problem", "cannot_allocate_memory")
("tx_id", TxId)
("task_id", TaskId)
("tx_id", Tx->TxId)
("task_id", Task->TaskId)
("memory", extraSize);

return false;
}

TotalQueryAllocationsSize = result.TotalAllocatedQueryMemory;

return true;
}

void FreeExtraQuota(ui64 extraSize) override {
ResourceManager->FreeResources(TxId, TaskId,
NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize}
);
NRm::TKqpResourcesRequest request = NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize};
ResourceManager->FreeResources(Tx, Task, Task->FitRequest(request));
}

bool IsReasonableToUseSpilling() const override {
return TotalQueryAllocationsSize >= ReasonableSpillingTreshold;
return Tx->GetExtraMemoryAllocatedSize() >= ReasonableSpillingTreshold;
}

TString MemoryConsumptionDetails() const override {
return Tx->ToString();
}

void TerminateHandler(bool success, const NYql::TIssues& issues) {
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)
("problem", "finish_compute_actor")
("tx_id", TxId)("task_id", TaskId)("success", success)("message", issues.ToOneLineString());
("tx_id", Tx->TxId)("task_id", Task->TaskId)("success", success)("message", issues.ToOneLineString());
Success = success;
}

std::shared_ptr<NRm::IKqpResourceManager> ResourceManager;
NRm::EKqpMemoryPool MemoryPool;
std::shared_ptr<IKqpNodeState> State;
ui64 TxId;
ui64 TaskId;
TIntrusivePtr<NRm::TTxState> Tx;
TIntrusivePtr<NRm::TTaskState> Task;
bool Success = true;
ui64 TotalQueryAllocationsSize = 0;
ui64 ReasonableSpillingTreshold = 0;
};

class TKqpCaFactory : public IKqpNodeComputeActorFactory {
NKikimrConfig::TTableServiceConfig::TResourceManager Config;
std::shared_ptr<NRm::IKqpResourceManager> ResourceManager_;
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;

std::atomic<ui64> MkqlLightProgramMemoryLimit = 0;
std::atomic<ui64> MkqlHeavyProgramMemoryLimit = 0;
std::atomic<ui64> MinChannelBufferSize = 0;
std::atomic<ui64> ReasonableSpillingTreshold = 0;

public:
TKqpCaFactory(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
std::shared_ptr<NRm::IKqpResourceManager> resourceManager,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const std::optional<TKqpFederatedQuerySetup> federatedQuerySetup)
: Config(config)
, ResourceManager_(resourceManager)
: ResourceManager_(resourceManager)
, AsyncIoFactory(asyncIoFactory)
, FederatedQuerySetup(federatedQuerySetup)
{}
{
ApplyConfig(config);
}

TActorId CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* dqTask,
const NYql::NDq::TComputeRuntimeSettings& settings,
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena, const TString& serializedGUCSettings,
TComputeStagesWithScan& computesByStage, ui64 outputChunkMaxSize, std::shared_ptr<IKqpNodeState> state,
NRm::EKqpMemoryPool memoryPool, ui32 numberOfTasks)
void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config)
{
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
MinChannelBufferSize.store(config.GetMinChannelBufferSize());
ReasonableSpillingTreshold.store(config.GetReasonableSpillingTreshold());
}

TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) {
NYql::NDq::TComputeMemoryLimits memoryLimits;
memoryLimits.ChannelBufferSize = 0;
memoryLimits.MkqlLightProgramMemoryLimit = Config.GetMkqlLightProgramMemoryLimit();
memoryLimits.MkqlHeavyProgramMemoryLimit = Config.GetMkqlHeavyProgramMemoryLimit();
memoryLimits.MkqlLightProgramMemoryLimit = MkqlLightProgramMemoryLimit.load();
memoryLimits.MkqlHeavyProgramMemoryLimit = MkqlHeavyProgramMemoryLimit.load();

auto estimation = ResourceManager_->EstimateTaskResources(*args.Task, args.NumberOfTasks);
NRm::TKqpResourcesRequest resourcesRequest;
resourcesRequest.MemoryPool = args.MemoryPool;
resourcesRequest.ExecutionUnits = 1;
resourcesRequest.Memory = memoryLimits.MkqlLightProgramMemoryLimit;

TIntrusivePtr<NRm::TTaskState> task = MakeIntrusive<NRm::TTaskState>(args.Task->GetId(), args.TxInfo->CreatedAt);

auto estimation = EstimateTaskResources(*dqTask, Config, numberOfTasks);
auto rmResult = ResourceManager_->AllocateResources(
args.TxInfo, task, resourcesRequest);

if (!rmResult) {
return NRm::TKqpRMAllocateResult{rmResult};
}

{
ui32 inputChannelsCount = 0;
for (auto&& i : dqTask->GetInputs()) {
for (auto&& i : args.Task->GetInputs()) {
inputChannelsCount += i.ChannelsSize();
}

memoryLimits.ChannelBufferSize = std::max<ui32>(estimation.ChannelBufferMemoryLimit / std::max<ui32>(1, inputChannelsCount), Config.GetMinChannelBufferSize());
memoryLimits.OutputChunkMaxSize = outputChunkMaxSize;
memoryLimits.ChannelBufferSize = std::max<ui32>(estimation.ChannelBufferMemoryLimit / std::max<ui32>(1, inputChannelsCount), MinChannelBufferSize.load());
memoryLimits.OutputChunkMaxSize = args.OutputChunkMaxSize;
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "channel_info")
("ch_size", estimation.ChannelBufferMemoryLimit)
("ch_count", estimation.ChannelBuffersCount)
("ch_limit", memoryLimits.ChannelBufferSize)
("inputs", dqTask->InputsSize())
("inputs", args.Task->InputsSize())
("input_channels_count", inputChannelsCount);
}

auto& taskOpts = dqTask->GetProgram().GetSettings();
auto& taskOpts = args.Task->GetProgram().GetSettings();
auto limit = taskOpts.GetHasMapJoin() || taskOpts.GetHasStateAggregation()
? memoryLimits.MkqlHeavyProgramMemoryLimit
: memoryLimits.MkqlLightProgramMemoryLimit;

memoryLimits.MemoryQuotaManager = std::make_shared<TMemoryQuotaManager>(
ResourceManager_,
memoryPool,
std::move(state),
txId,
dqTask->GetId(),
args.MemoryPool,
std::move(args.State),
std::move(args.TxInfo),
std::move(task),
limit,
Config.GetReasonableSpillingTreshold());
ReasonableSpillingTreshold.load());

auto runtimeSettings = args.RuntimeSettings;
runtimeSettings.ExtraMemoryAllocationPool = args.MemoryPool;
runtimeSettings.UseSpilling = args.WithSpilling;
runtimeSettings.StatsMode = args.StatsMode;

if (args.Deadline) {
runtimeSettings.Timeout = args.Deadline - TAppData::TimeProvider->Now();
}

if (args.RlPath) {
runtimeSettings.RlPath = args.RlPath;
}

auto runtimeSettings = settings;
NYql::NDq::IMemoryQuotaManager::TWeakPtr memoryQuotaManager = memoryLimits.MemoryQuotaManager;
runtimeSettings.TerminateHandler = [memoryQuotaManager]
(bool success, const NYql::TIssues& issues) {
Expand All @@ -157,29 +194,32 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
};

ETableKind tableKind = ETableKind::Unknown;
if (dqTask->HasMetaId()) {
YQL_ENSURE(computesByStage.GetMetaById(*dqTask, meta) || dqTask->GetMeta().UnpackTo(&meta), "cannot take meta on MetaId exists in tasks");
if (args.Task->HasMetaId()) {
YQL_ENSURE(args.ComputesByStages);
YQL_ENSURE(args.ComputesByStages->GetMetaById(*args.Task, meta) || args.Task->GetMeta().UnpackTo(&meta), "cannot take meta on MetaId exists in tasks");
tableKind = tableKindExtract(meta);
} else if (dqTask->GetMeta().UnpackTo(&meta)) {
} else if (args.Task->GetMeta().UnpackTo(&meta)) {
tableKind = tableKindExtract(meta);
}

if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
auto& info = computesByStage.UpsertTaskWithScan(*dqTask, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
IActor* computeActor = CreateKqpScanComputeActor(executerId, txId, dqTask,
YQL_ENSURE(args.ComputesByStages);
auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.Task,
AsyncIoFactory, runtimeSettings, memoryLimits,
std::move(traceId), std::move(arena));
std::move(args.TraceId), std::move(args.Arena));
TActorId result = TlsActivationContext->Register(computeActor);
info.MutableActorIds().emplace_back(result);
return result;
} else {
std::shared_ptr<TGUCSettings> GUCSettings;
if (!serializedGUCSettings.empty()) {
GUCSettings = std::make_shared<TGUCSettings>(serializedGUCSettings);
if (!args.SerializedGUCSettings.empty()) {
GUCSettings = std::make_shared<TGUCSettings>(args.SerializedGUCSettings);
}
IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(executerId, txId, dqTask, AsyncIoFactory,
runtimeSettings, memoryLimits, std::move(traceId), std::move(arena), FederatedQuerySetup, GUCSettings);
return TlsActivationContext->Register(computeActor);
IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(args.ExecuterId, args.TxId, args.Task, AsyncIoFactory,
runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings);
return args.ShareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) :
TlsActivationContext->AsActorContext().Register(computeActor);
}
}
};
Expand Down
30 changes: 25 additions & 5 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,31 @@ struct IKqpNodeComputeActorFactory {
virtual ~IKqpNodeComputeActorFactory() = default;

public:
virtual NActors::TActorId CreateKqpComputeActor(const NActors::TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* task,
const NYql::NDq::TComputeRuntimeSettings& settings,
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena, const TString& serializedGUCSettings,
TComputeStagesWithScan& computeStages, ui64 outputChunkMaxSize, std::shared_ptr<IKqpNodeState> state,
NKikimr::NKqp::NRm::EKqpMemoryPool memoryPool, ui32 numberOfTasks) = 0;
struct TCreateArgs {
const NActors::TActorId& ExecuterId;
const ui64 TxId;
NYql::NDqProto::TDqTask* Task;
TIntrusivePtr<NRm::TTxState> TxInfo;
const NYql::NDq::TComputeRuntimeSettings& RuntimeSettings;
NWilson::TTraceId TraceId;
TIntrusivePtr<NActors::TProtoArenaHolder> Arena;
const TString& SerializedGUCSettings;
const ui32 NumberOfTasks;
const ui64 OutputChunkMaxSize;
const NKikimr::NKqp::NRm::EKqpMemoryPool MemoryPool;
const bool WithSpilling;
const NYql::NDqProto::EDqStatsMode StatsMode;
const TInstant& Deadline;
const bool ShareMailbox;
const TMaybe<NYql::NDqProto::TRlPath>& RlPath;
TComputeStagesWithScan* ComputesByStages = nullptr;
std::shared_ptr<IKqpNodeState> State = nullptr;
};

typedef std::variant<TActorId, NKikimr::NKqp::NRm::TKqpRMAllocateResult> TActorStartResult;
virtual TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) = 0;

virtual void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) = 0;
};

std::shared_ptr<IKqpNodeComputeActorFactory> MakeKqpCaFactory(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
Expand Down
5 changes: 1 addition & 4 deletions ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,7 @@ STFUNC(TKqpComputeActor::StateFunc) {
BaseStateFuncBody(ev);
}
} catch (const TMemoryLimitExceededException& e) {
InternalError(TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder()
<< "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit()
<< ", host: " << HostName()
<< ", canAllocateExtraMemory: " << CanAllocateExtraMemory);
TBase::OnMemoryLimitExceptionHandler();
} catch (const NMiniKQL::TKqpEnsureFail& e) {
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
} catch (const yexception& e) {
Expand Down
5 changes: 1 addition & 4 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanCo
BaseStateFuncBody(ev);
}
} catch (const TMemoryLimitExceededException& e) {
const TString sInfo = TStringBuilder() << "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit()
<< ", host: " << HostName() << ", canAllocateExtraMemory: " << CanAllocateExtraMemory;
CA_LOG_E("ERROR:" + sInfo);
InternalError(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED, sInfo);
TBase::OnMemoryLimitExceptionHandler();
} catch (const yexception& e) {
InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, NYql::TIssuesIds::DEFAULT_ERROR, e.what());
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/counters/kqp_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,10 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
RmExternalMemory = KqpGroup->GetCounter("RM/ExternalMemory", false);
RmNotEnoughMemory = KqpGroup->GetCounter("RM/NotEnoughMemory", true);
RmNotEnoughComputeActors = KqpGroup->GetCounter("RM/NotEnoughComputeActors", true);
RmOnStartAllocs = KqpGroup->GetCounter("Rm/OnStartAllocs", true);
RmExtraMemAllocs = KqpGroup->GetCounter("RM/ExtraMemAllocs", true);
RmExtraMemFree = KqpGroup->GetCounter("RM/ExtraMemFree", true);
RmOnCompleteFree = KqpGroup->GetCounter("RM/OnCompleteFree", true);
RmInternalError = KqpGroup->GetCounter("RM/InternalError", true);
RmSnapshotLatency = KqpGroup->GetHistogram(
"RM/SnapshotLatency", NMonitoring::ExponentialHistogram(20, 2, 1));
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/counters/kqp_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
::NMonitoring::TDynamicCounterPtr WorkloadManagerGroup;

::NMonitoring::TDynamicCounters::TCounterPtr FullScansExecuted;

// Lease updates counters
::NMonitoring::THistogramPtr LeaseUpdateLatency;
::NMonitoring::THistogramPtr RunActorLeaseUpdateBacklog;
Expand All @@ -377,6 +377,9 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
::NMonitoring::TDynamicCounters::TCounterPtr RmNotEnoughMemory;
::NMonitoring::TDynamicCounters::TCounterPtr RmNotEnoughComputeActors;
::NMonitoring::TDynamicCounters::TCounterPtr RmExtraMemAllocs;
::NMonitoring::TDynamicCounters::TCounterPtr RmOnStartAllocs;
::NMonitoring::TDynamicCounters::TCounterPtr RmExtraMemFree;
::NMonitoring::TDynamicCounters::TCounterPtr RmOnCompleteFree;
::NMonitoring::TDynamicCounters::TCounterPtr RmInternalError;
NMonitoring::THistogramPtr RmSnapshotLatency;
NMonitoring::THistogramPtr NodeServiceStartEventDelivery;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2489,7 +2489,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
.FederatedQuerySetup = FederatedQuerySetup,
.OutputChunkMaxSize = Request.OutputChunkMaxSize,
.GUCSettings = GUCSettings,
.MayRunTasksLocally = mayRunTasksLocally
.MayRunTasksLocally = mayRunTasksLocally,
.ResourceManager_ = Request.ResourceManager_,
.CaFactory_ = Request.CaFactory_
});

auto err = Planner->PlanExecution();
Expand Down
Loading
Loading