Skip to content

Merge stable 24 3 #8545

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 83 additions & 22 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ bool TKqpPlanner::UseMockEmptyPlanner = false;
// Task can allocate extra memory during execution.
// So, we estimate total memory amount required for task as apriori task size multiplied by this constant.
constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2;
constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 8;

TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
: TxId(args.TxId)
Expand Down Expand Up @@ -256,9 +255,18 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {

auto localResources = ResourceManager_->GetLocalResources();
Y_UNUSED(MEMORY_ESTIMATION_OVERFLOW);

auto placingOptions = ResourceManager_->GetPlacingOptions();

bool singleNodeExecutionMakeSence = (
ResourceEstimations.size() <= placingOptions.MaxNonParallelTasksExecutionLimit ||
// all readers are located on the one node.
TasksPerNode.size() == 1
);

if (LocalRunMemoryEst * MEMORY_ESTIMATION_OVERFLOW <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] &&
ResourceEstimations.size() <= localResources.ExecutionUnits &&
ResourceEstimations.size() <= MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT)
singleNodeExecutionMakeSence)
{
ui64 selfNodeId = ExecuterId.NodeId();
for(ui64 taskId: ComputeTasks) {
Expand Down Expand Up @@ -293,47 +301,100 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release());
}

std::vector<ui64> deepestTasks;
ui64 maxLevel = 0;
for(auto& task: TasksGraph.GetTasks()) {
// const auto& task = TasksGraph.GetTask(taskId);
const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id);
const ui64 stageLevel = stage.GetProgram().GetSettings().GetStageLevel();

if (stageLevel > maxLevel) {
maxLevel = stageLevel;
deepestTasks.clear();
}

if (stageLevel == maxLevel) {
deepestTasks.push_back(task.Id);
}
}

THashMap<ui64, ui64> alreadyAssigned;
for(auto& [nodeId, tasks] : TasksPerNode) {
for(ui64 taskId: tasks) {
alreadyAssigned.emplace(taskId, nodeId);
}
}

if (deepestTasks.size() <= placingOptions.MaxNonParallelTopStageExecutionLimit) {
// looks like the merge / union all connection
for(ui64 taskId: deepestTasks) {
auto [it, success] = alreadyAssigned.emplace(taskId, ExecuterId.NodeId());
if (success) {
TasksPerNode[ExecuterId.NodeId()].push_back(taskId);
}
}
}

auto planner = (UseMockEmptyPlanner ? CreateKqpMockEmptyPlanner() : CreateKqpGreedyPlanner()); // KqpMockEmptyPlanner is a mock planner for tests

auto ctx = TlsActivationContext->AsActorContext();
if (ctx.LoggerSettings() && ctx.LoggerSettings()->Satisfies(NActors::NLog::PRI_DEBUG, NKikimrServices::KQP_EXECUTER)) {
planner->SetLogFunc([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D(msg); });
}

THashMap<ui64, size_t> nodeIdtoIdx;
for (size_t idx = 0; idx < ResourcesSnapshot.size(); ++idx) {
nodeIdtoIdx[ResourcesSnapshot[idx].nodeid()] = idx;
}

LogMemoryStatistics([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D(msg); });

auto plan = planner->Plan(ResourcesSnapshot, ResourceEstimations);
ui64 selfNodeId = ExecuterId.NodeId();
TString selfNodeDC;

THashMap<ui64, ui64> alreadyAssigned;
for(auto& [nodeId, tasks] : TasksPerNode) {
for(ui64 taskId: tasks) {
alreadyAssigned.emplace(taskId, nodeId);
TVector<const NKikimrKqp::TKqpNodeResources*> allNodes;
TVector<const NKikimrKqp::TKqpNodeResources*> executerDcNodes;
allNodes.reserve(ResourcesSnapshot.size());

for(auto& snapNode: ResourcesSnapshot) {
const TString& dc = snapNode.GetKqpProxyNodeResources().GetDataCenterId();
if (snapNode.GetNodeId() == selfNodeId) {
selfNodeDC = dc;
break;
}
}

if (!plan.empty()) {
for (auto& group : plan) {
for(ui64 taskId: group.TaskIds) {
auto [it, success] = alreadyAssigned.emplace(taskId, group.NodeId);
if (success) {
TasksPerNode[group.NodeId].push_back(taskId);
}
}
for(auto& snapNode: ResourcesSnapshot) {
allNodes.push_back(&snapNode);
if (selfNodeDC == snapNode.GetKqpProxyNodeResources().GetDataCenterId()) {
executerDcNodes.push_back(&snapNode);
}
}

return nullptr;
} else {
TVector<IKqpPlannerStrategy::TResult> plan;

if (!executerDcNodes.empty() && placingOptions.PreferLocalDatacenterExecution) {
plan = planner->Plan(executerDcNodes, ResourceEstimations);
}

if (plan.empty()) {
plan = planner->Plan(allNodes, ResourceEstimations);
}

if (plan.empty()) {
LogMemoryStatistics([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_E(msg); });

auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
TStringBuilder() << "Not enough resources to execute query. " << "TraceId: " << UserRequestContext->TraceId);
return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release());
}

for (auto& group : plan) {
for(ui64 taskId: group.TaskIds) {
auto [it, success] = alreadyAssigned.emplace(taskId, group.NodeId);
if (success) {
TasksPerNode[group.NodeId].push_back(taskId);
}
}
}

return nullptr;
}

const IKqpGateway::TKqpSnapshot& TKqpPlanner::GetSnapshot() const {
Expand Down
16 changes: 8 additions & 8 deletions ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,16 @@ class TNodesManager {
return result;
}

TNodesManager(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources) {
TNodesManager(const TVector<const NKikimrKqp::TKqpNodeResources*>& nodeResources) {
for (auto& node : nodeResources) {
if (!node.GetAvailableComputeActors()) {
if (!node->GetAvailableComputeActors()) {
continue;
}
Nodes.emplace_back(TNodeDesc{
node.GetNodeId(),
ActorIdFromProto(node.GetResourceManagerActorId()),
node.GetTotalMemory() - node.GetUsedMemory(),
node.GetAvailableComputeActors(),
node->GetNodeId(),
ActorIdFromProto(node->GetResourceManagerActorId()),
node->GetTotalMemory() - node->GetUsedMemory(),
node->GetAvailableComputeActors(),
{}
});
}
Expand All @@ -111,7 +111,7 @@ class TKqpGreedyPlanner : public IKqpPlannerStrategy {
public:
~TKqpGreedyPlanner() override {}

TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources,
TVector<TResult> Plan(const TVector<const NKikimrKqp::TKqpNodeResources*>& nodeResources,
const TVector<TTaskResourceEstimation>& tasks) override
{
TVector<TResult> result;
Expand Down Expand Up @@ -161,7 +161,7 @@ class TKqpMockEmptyPlanner : public IKqpPlannerStrategy {
public:
~TKqpMockEmptyPlanner() override {}

TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>&,
TVector<TResult> Plan(const TVector<const NKikimrKqp::TKqpNodeResources*>&,
const TVector<TTaskResourceEstimation>&) override
{
return {};
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_planner_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class IKqpPlannerStrategy {
TVector<ui64> TaskIds;
};

virtual TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources,
virtual TVector<TResult> Plan(const TVector<const NKikimrKqp::TKqpNodeResources*>& nodeResources,
const TVector<TTaskResourceEstimation>& estimatedResources) = 0;

protected:
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/kqp/rm_service/kqp_rm_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ class TKqpResourceManager : public IKqpResourceManager {
return Counters;
}

TPlannerPlacingOptions GetPlacingOptions() override {
return TPlannerPlacingOptions{
.MaxNonParallelTasksExecutionLimit = MaxNonParallelTasksExecutionLimit.load(),
.MaxNonParallelTopStageExecutionLimit = MaxNonParallelTopStageExecutionLimit.load(),
.PreferLocalDatacenterExecution = PreferLocalDatacenterExecution.load(),
};
}

void CreateResourceInfoExchanger(
const NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings& settings) {
PublishResourcesByExchanger = true;
Expand Down Expand Up @@ -414,6 +422,9 @@ class TKqpResourceManager : public IKqpResourceManager {
MinChannelBufferSize.store(config.GetMinChannelBufferSize());
MaxTotalChannelBuffersSize.store(config.GetMaxTotalChannelBuffersSize());
QueryMemoryLimit.store(config.GetQueryMemoryLimit());
MaxNonParallelTopStageExecutionLimit.store(config.GetMaxNonParallelTopStageExecutionLimit());
MaxNonParallelTasksExecutionLimit.store(config.GetMaxNonParallelTasksExecutionLimit());
PreferLocalDatacenterExecution.store(config.GetPreferLocalDatacenterExecution());
}

ui32 GetNodeId() override {
Expand Down Expand Up @@ -460,6 +471,9 @@ class TKqpResourceManager : public IKqpResourceManager {
std::atomic<i32> ExecutionUnitsLimit;
TLimitedResource<ui64> ScanQueryMemoryResource;
std::atomic<i64> ExternalDataQueryMemory = 0;
std::atomic<ui64> MaxNonParallelTopStageExecutionLimit = 1;
std::atomic<ui64> MaxNonParallelTasksExecutionLimit = 8;
std::atomic<bool> PreferLocalDatacenterExecution = true;

// current state
std::atomic<ui64> LastResourceBrokerTaskId = 0;
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/kqp/rm_service/kqp_rm_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ struct TKqpLocalNodeResources {
std::array<ui64, EKqpMemoryPool::Count> Memory;
};

struct TPlannerPlacingOptions {
ui64 MaxNonParallelTasksExecutionLimit = 8;
ui64 MaxNonParallelTopStageExecutionLimit = 1;
bool PreferLocalDatacenterExecution = true;
};

/// per node singleton with instant API
class IKqpResourceManager : private TNonCopyable {
public:
Expand All @@ -211,6 +217,7 @@ class IKqpResourceManager : private TNonCopyable {

virtual TKqpRMAllocateResult AllocateResources(TIntrusivePtr<TTxState>& tx, TIntrusivePtr<TTaskState>& task, const TKqpResourcesRequest& resources) = 0;

virtual TPlannerPlacingOptions GetPlacingOptions() = 0;
virtual TTaskResourceEstimation EstimateTaskResources(const NYql::NDqProto::TDqTask& task, const ui32 tasksCount) = 0;
virtual void EstimateTaskResources(TTaskResourceEstimation& result, const ui32 tasksCount) = 0;

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/protos/table_service_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ message TTableServiceConfig {

optional uint64 MinMemAllocSize = 23 [default = 8388608]; // 8 MiB
optional uint64 MinMemFreeSize = 24 [default = 33554432]; // 32 MiB

optional uint64 MaxNonParallelTasksExecutionLimit = 25 [default = 8];
optional uint64 MaxNonParallelTopStageExecutionLimit = 26 [default = 1];
optional bool PreferLocalDatacenterExecution = 27 [ default = true ];
}

message TSpillingServiceConfig {
Expand Down
Loading