Skip to content

Commit 0d73bf9

Browse files
authored
merge dc locality (#8500)
1 parent 3c07380 commit 0d73bf9

File tree

6 files changed

+55
-25
lines changed

6 files changed

+55
-25
lines changed

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -343,33 +343,58 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
343343
planner->SetLogFunc([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D(msg); });
344344
}
345345

346-
THashMap<ui64, size_t> nodeIdtoIdx;
347-
for (size_t idx = 0; idx < ResourcesSnapshot.size(); ++idx) {
348-
nodeIdtoIdx[ResourcesSnapshot[idx].nodeid()] = idx;
349-
}
350-
351346
LogMemoryStatistics([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D(msg); });
352347

353-
auto plan = planner->Plan(ResourcesSnapshot, ResourceEstimations);
348+
ui64 selfNodeId = ExecuterId.NodeId();
349+
TString selfNodeDC;
354350

355-
if (!plan.empty()) {
356-
for (auto& group : plan) {
357-
for(ui64 taskId: group.TaskIds) {
358-
auto [it, success] = alreadyAssigned.emplace(taskId, group.NodeId);
359-
if (success) {
360-
TasksPerNode[group.NodeId].push_back(taskId);
361-
}
362-
}
351+
TVector<const NKikimrKqp::TKqpNodeResources*> allNodes;
352+
TVector<const NKikimrKqp::TKqpNodeResources*> executerDcNodes;
353+
allNodes.reserve(ResourcesSnapshot.size());
354+
355+
for(auto& snapNode: ResourcesSnapshot) {
356+
const TString& dc = snapNode.GetKqpProxyNodeResources().GetDataCenterId();
357+
if (snapNode.GetNodeId() == selfNodeId) {
358+
selfNodeDC = dc;
359+
break;
363360
}
361+
}
364362

365-
return nullptr;
366-
} else {
363+
for(auto& snapNode: ResourcesSnapshot) {
364+
allNodes.push_back(&snapNode);
365+
if (selfNodeDC == snapNode.GetKqpProxyNodeResources().GetDataCenterId()) {
366+
executerDcNodes.push_back(&snapNode);
367+
}
368+
}
369+
370+
TVector<IKqpPlannerStrategy::TResult> plan;
371+
372+
if (!executerDcNodes.empty() && placingOptions.PreferLocalDatacenterExecution) {
373+
plan = planner->Plan(executerDcNodes, ResourceEstimations);
374+
}
375+
376+
if (plan.empty()) {
377+
plan = planner->Plan(allNodes, ResourceEstimations);
378+
}
379+
380+
if (plan.empty()) {
367381
LogMemoryStatistics([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_E(msg); });
368382

369383
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
370384
TStringBuilder() << "Not enough resources to execute query. " << "TraceId: " << UserRequestContext->TraceId);
371385
return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release());
372386
}
387+
388+
for (auto& group : plan) {
389+
for(ui64 taskId: group.TaskIds) {
390+
auto [it, success] = alreadyAssigned.emplace(taskId, group.NodeId);
391+
if (success) {
392+
TasksPerNode[group.NodeId].push_back(taskId);
393+
}
394+
}
395+
}
396+
397+
return nullptr;
373398
}
374399

375400
const IKqpGateway::TKqpSnapshot& TKqpPlanner::GetSnapshot() const {

ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,16 @@ class TNodesManager {
9090
return result;
9191
}
9292

93-
TNodesManager(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources) {
93+
TNodesManager(const TVector<const NKikimrKqp::TKqpNodeResources*>& nodeResources) {
9494
for (auto& node : nodeResources) {
95-
if (!node.GetAvailableComputeActors()) {
95+
if (!node->GetAvailableComputeActors()) {
9696
continue;
9797
}
9898
Nodes.emplace_back(TNodeDesc{
99-
node.GetNodeId(),
100-
ActorIdFromProto(node.GetResourceManagerActorId()),
101-
node.GetTotalMemory() - node.GetUsedMemory(),
102-
node.GetAvailableComputeActors(),
99+
node->GetNodeId(),
100+
ActorIdFromProto(node->GetResourceManagerActorId()),
101+
node->GetTotalMemory() - node->GetUsedMemory(),
102+
node->GetAvailableComputeActors(),
103103
{}
104104
});
105105
}
@@ -111,7 +111,7 @@ class TKqpGreedyPlanner : public IKqpPlannerStrategy {
111111
public:
112112
~TKqpGreedyPlanner() override {}
113113

114-
TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources,
114+
TVector<TResult> Plan(const TVector<const NKikimrKqp::TKqpNodeResources*>& nodeResources,
115115
const TVector<TTaskResourceEstimation>& tasks) override
116116
{
117117
TVector<TResult> result;
@@ -161,7 +161,7 @@ class TKqpMockEmptyPlanner : public IKqpPlannerStrategy {
161161
public:
162162
~TKqpMockEmptyPlanner() override {}
163163

164-
TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>&,
164+
TVector<TResult> Plan(const TVector<const NKikimrKqp::TKqpNodeResources*>&,
165165
const TVector<TTaskResourceEstimation>&) override
166166
{
167167
return {};

ydb/core/kqp/executer_actor/kqp_planner_strategy.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class IKqpPlannerStrategy {
2323
TVector<ui64> TaskIds;
2424
};
2525

26-
virtual TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources,
26+
virtual TVector<TResult> Plan(const TVector<const NKikimrKqp::TKqpNodeResources*>& nodeResources,
2727
const TVector<TTaskResourceEstimation>& estimatedResources) = 0;
2828

2929
protected:

ydb/core/kqp/rm_service/kqp_rm_service.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ class TKqpResourceManager : public IKqpResourceManager {
146146
return TPlannerPlacingOptions{
147147
.MaxNonParallelTasksExecutionLimit = MaxNonParallelTasksExecutionLimit.load(),
148148
.MaxNonParallelTopStageExecutionLimit = MaxNonParallelTopStageExecutionLimit.load(),
149+
.PreferLocalDatacenterExecution = PreferLocalDatacenterExecution.load(),
149150
};
150151
}
151152

@@ -423,6 +424,7 @@ class TKqpResourceManager : public IKqpResourceManager {
423424
QueryMemoryLimit.store(config.GetQueryMemoryLimit());
424425
MaxNonParallelTopStageExecutionLimit.store(config.GetMaxNonParallelTopStageExecutionLimit());
425426
MaxNonParallelTasksExecutionLimit.store(config.GetMaxNonParallelTasksExecutionLimit());
427+
PreferLocalDatacenterExecution.store(config.GetPreferLocalDatacenterExecution());
426428
}
427429

428430
ui32 GetNodeId() override {
@@ -471,6 +473,7 @@ class TKqpResourceManager : public IKqpResourceManager {
471473
std::atomic<i64> ExternalDataQueryMemory = 0;
472474
std::atomic<ui64> MaxNonParallelTopStageExecutionLimit = 1;
473475
std::atomic<ui64> MaxNonParallelTasksExecutionLimit = 8;
476+
std::atomic<bool> PreferLocalDatacenterExecution = true;
474477

475478
// current state
476479
std::atomic<ui64> LastResourceBrokerTaskId = 0;

ydb/core/kqp/rm_service/kqp_rm_service.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ struct TKqpLocalNodeResources {
205205
struct TPlannerPlacingOptions {
206206
ui64 MaxNonParallelTasksExecutionLimit = 8;
207207
ui64 MaxNonParallelTopStageExecutionLimit = 1;
208+
bool PreferLocalDatacenterExecution = true;
208209
};
209210

210211
/// per node singleton with instant API

ydb/core/protos/table_service_config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ message TTableServiceConfig {
4949

5050
optional uint64 MaxNonParallelTasksExecutionLimit = 25 [default = 8];
5151
optional uint64 MaxNonParallelTopStageExecutionLimit = 26 [default = 1];
52+
optional bool PreferLocalDatacenterExecution = 27 [ default = true ];
5253
}
5354

5455
message TSpillingServiceConfig {

0 commit comments

Comments
 (0)