Skip to content

Commit 89fd4d0

Browse files
authored
Merge compute limits (#8923)
1 parent f837701 commit 89fd4d0

25 files changed

+1091
-52
lines changed

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ using namespace NYql::NDqProto;
134134
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
135135
TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
136136
const NYql::NDq::TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
137-
TIntrusivePtr<NActors::TProtoArenaHolder> arena) {
138-
return new NScanPrivate::TKqpScanComputeActor(executerId, txId, lockTxId, lockNodeId, task, std::move(asyncIoFactory),
137+
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions schedulingOptions) {
138+
return new NScanPrivate::TKqpScanComputeActor(std::move(schedulingOptions), executerId, txId, lockTxId, lockNodeId, task, std::move(asyncIoFactory),
139139
settings, memoryLimits, std::move(traceId), std::move(arena));
140140
}
141141

ydb/core/kqp/compute_actor/kqp_compute_actor.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
44
#include <ydb/core/kqp/counters/kqp_counters.h>
55
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
6+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
67
#include <ydb/core/scheme/scheme_tabledefs.h>
78
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
89
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
@@ -48,12 +49,12 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr
4849
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
4950
NWilson::TTraceId traceId,
5051
TIntrusivePtr<NActors::TProtoArenaHolder> arena,
51-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
52+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions);
5253

5354
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
5455
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
5556
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
56-
TIntrusivePtr<NActors::TProtoArenaHolder> arena);
57+
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions);
5758

5859
IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
5960
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
104104
ApplyConfig(config);
105105
}
106106

107-
void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config)
107+
void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) override
108108
{
109109
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
110110
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
@@ -114,7 +114,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
114114
MinMemFreeSize.store(config.GetMinMemFreeSize());
115115
}
116116

117-
TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) {
117+
TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) override {
118118
NYql::NDq::TComputeMemoryLimits memoryLimits;
119119
memoryLimits.ChannelBufferSize = 0;
120120
memoryLimits.MkqlLightProgramMemoryLimit = MkqlLightProgramMemoryLimit.load();
@@ -213,7 +213,8 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
213213
auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
214214
IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.LockTxId, args.LockNodeId, args.Task,
215215
AsyncIoFactory, runtimeSettings, memoryLimits,
216-
std::move(args.TraceId), std::move(args.Arena));
216+
std::move(args.TraceId), std::move(args.Arena),
217+
std::move(args.SchedulingOptions));
217218
TActorId result = TlsActivationContext->Register(computeActor);
218219
info.MutableActorIds().emplace_back(result);
219220
return result;
@@ -223,7 +224,8 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
223224
GUCSettings = std::make_shared<TGUCSettings>(args.SerializedGUCSettings);
224225
}
225226
IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(args.ExecuterId, args.TxId, args.Task, AsyncIoFactory,
226-
runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings);
227+
runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings,
228+
std::move(args.SchedulingOptions));
227229
return args.ShareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) :
228230
TlsActivationContext->AsActorContext().Register(computeActor);
229231
}

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
77
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
88

9+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
10+
911
#include <vector>
1012

1113
namespace NKikimr::NKqp {
@@ -124,6 +126,7 @@ struct IKqpNodeComputeActorFactory {
124126
const TMaybe<NYql::NDqProto::TRlPath>& RlPath;
125127
TComputeStagesWithScan* ComputesByStages = nullptr;
126128
std::shared_ptr<IKqpNodeState> State = nullptr;
129+
TComputeActorSchedulingOptions SchedulingOptions = {};
127130
};
128131

129132
typedef std::variant<TActorId, NKikimr::NKqp::NRm::TKqpRMAllocateResult> TActorStartResult;
@@ -137,4 +140,4 @@ std::shared_ptr<IKqpNodeComputeActorFactory> MakeKqpCaFactory(const NKikimrConfi
137140
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
138141
const std::optional<TKqpFederatedQuerySetup> federatedQuerySetup);
139142

140-
} // namespace NKikimr::NKqp::NComputeActor
143+
} // namespace NKikimr::NKqp::NComputeActor

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ TKqpComputeActor::TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqPro
1414
IDqAsyncIoFactory::TPtr asyncIoFactory,
1515
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
1616
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
17-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings)
18-
: TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings)
17+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions schedulingOptions)
18+
: TBase(std::move(schedulingOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings)
1919
, ComputeCtx(settings.StatsMode)
2020
, FederatedQuerySetup(federatedQuerySetup)
2121
{
@@ -121,9 +121,12 @@ void TKqpComputeActor::DoBootstrap() {
121121

122122
ContinueExecute();
123123
Become(&TKqpComputeActor::StateFunc);
124+
125+
TBase::DoBoostrap();
124126
}
125127

126128
STFUNC(TKqpComputeActor::StateFunc) {
129+
CA_LOG_D("CA StateFunc " << ev->GetTypeRewrite());
127130
try {
128131
switch (ev->GetTypeRewrite()) {
129132
hFunc(TEvKqpCompute::TEvScanInitActor, HandleExecute);
@@ -278,10 +281,10 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T
278281
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
279282
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
280283
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
281-
const TGUCSettings::TPtr& GUCSettings)
284+
const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions cpuOptions)
282285
{
283286
return new TKqpComputeActor(executerId, txId, task, std::move(asyncIoFactory),
284-
settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings);
287+
settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings, std::move(cpuOptions));
285288
}
286289

287290
} // namespace NKqp

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,16 @@
88
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
99
#include <ydb/core/kqp/runtime/kqp_compute.h>
1010
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
11+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
1112
#include <ydb/core/sys_view/scan.h>
1213
#include <ydb/library/yverify_stream/yverify_stream.h>
1314

14-
#include <ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h>
15-
1615

1716
namespace NKikimr {
1817
namespace NKqp {
1918

20-
class TKqpComputeActor : public TDqSyncComputeActorBase<TKqpComputeActor> {
21-
using TBase = TDqSyncComputeActorBase<TKqpComputeActor>;
19+
class TKqpComputeActor : public TSchedulableComputeActorBase<TKqpComputeActor> {
20+
using TBase = TSchedulableComputeActorBase<TKqpComputeActor>;
2221

2322
public:
2423
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -29,7 +28,8 @@ class TKqpComputeActor : public TDqSyncComputeActorBase<TKqpComputeActor> {
2928
IDqAsyncIoFactory::TPtr asyncIoFactory,
3029
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
3130
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
32-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
31+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
32+
TComputeActorSchedulingOptions);
3333

3434
void DoBootstrap();
3535

@@ -68,7 +68,8 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T
6868
IDqAsyncIoFactory::TPtr asyncIoFactory,
6969
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
7070
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
71-
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
71+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
72+
TComputeActorSchedulingOptions);
7273

7374
} // namespace NKqp
7475
} // namespace NKikimr

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50);
2323

2424
} // anonymous namespace
2525

26-
TKqpScanComputeActor::TKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
26+
TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
2727
NDqProto::TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
2828
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
2929
TIntrusivePtr<NActors::TProtoArenaHolder> arena)
30-
: TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings,
30+
: TBase(std::move(cpuOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings,
3131
memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena))
3232
, ComputeCtx(settings.StatsMode)
3333
, LockTxId(lockTxId)
@@ -251,6 +251,8 @@ void TKqpScanComputeActor::DoBootstrap() {
251251
ScanData->TaskId = GetTask().GetId();
252252
ScanData->TableReader = CreateKqpTableReader(*ScanData);
253253
Become(&TKqpScanComputeActor::StateFunc);
254+
255+
TBase::DoBoostrap();
254256
}
255257

256258
}

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22
#include "kqp_scan_events.h"
33

44
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
5-
#include <ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h>
5+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
66
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
77
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
88

99
namespace NKikimr::NKqp::NScanPrivate {
1010

11-
class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanComputeActor> {
11+
class TKqpScanComputeActor: public TSchedulableComputeActorBase<TKqpScanComputeActor> {
1212
private:
13-
using TBase = NYql::NDq::TDqSyncComputeActorBase<TKqpScanComputeActor>;
13+
using TBase = TSchedulableComputeActorBase<TKqpScanComputeActor>;
1414

1515
NMiniKQL::TKqpScanComputeContext ComputeCtx;
1616
NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta;
@@ -65,7 +65,7 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanCo
6565
return NKikimrServices::TActivity::KQP_SCAN_COMPUTE_ACTOR;
6666
}
6767

68-
TKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
68+
TKqpScanComputeActor(TComputeActorSchedulingOptions, const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
6969
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
7070
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
7171
TIntrusivePtr<NActors::TProtoArenaHolder> arena);

ydb/core/kqp/counters/kqp_counters.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
#include <ydb/core/sys_view/service/sysview_service.h>
99

1010
#include <ydb/library/actors/core/log.h>
11-
1211
#include <util/generic/size_literals.h>
1312

1413
#include <ydb/library/yql/core/issue/protos/issue_id.pb.h>
@@ -829,6 +828,13 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
829828
"PhyTx/ScanTxTotalTimeMs", NMonitoring::ExponentialHistogram(20, 2, 1));
830829

831830
FullScansExecuted = KqpGroup->GetCounter("FullScans", true);
831+
832+
SchedulerThrottled = KqpGroup->GetCounter("NodeScheduler/ThrottledUs", true);
833+
SchedulerCapacity = KqpGroup->GetCounter("NodeScheduler/Capacity");
834+
ComputeActorExecutions = KqpGroup->GetHistogram("NodeScheduler/BatchUs", NMonitoring::ExponentialHistogram(20, 2, 1));
835+
ComputeActorDelays = KqpGroup->GetHistogram("NodeScheduler/Delays", NMonitoring::ExponentialHistogram(20, 2, 1));
836+
ThrottledActorsSpuriousActivations = KqpGroup->GetCounter("NodeScheduler/SpuriousActivations", true);
837+
SchedulerDelays = KqpGroup->GetHistogram("NodeScheduler/Delay", NMonitoring::ExponentialHistogram(20, 2, 1));
832838
}
833839

834840
::NMonitoring::TDynamicCounterPtr TKqpCounters::GetKqpCounters() const {

ydb/core/kqp/counters/kqp_counters.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,14 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
409409
::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages;
410410
::NMonitoring::TDynamicCounters::TCounterPtr IteratorDeliveryProblems;
411411

412+
// Scheduler signals
413+
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerThrottled;
414+
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerCapacity;
415+
NMonitoring::THistogramPtr ComputeActorExecutions;
416+
NMonitoring::THistogramPtr ComputeActorDelays;
417+
::NMonitoring::TDynamicCounters::TCounterPtr ThrottledActorsSpuriousActivations;
418+
NMonitoring::THistogramPtr SchedulerDelays;
419+
412420
// Sequences counters
413421
::NMonitoring::TDynamicCounters::TCounterPtr SequencerActorsCount;
414422
::NMonitoring::TDynamicCounters::TCounterPtr SequencerErrors;

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ void BuildInitialTaskResources(const TKqpTasksGraph& graph, ui64 taskId, TTaskRe
5353
ret.HeavyProgram = opts.GetHasMapJoin();
5454
}
5555

56+
bool LimitCPU(TIntrusivePtr<TUserRequestContext> ctx) {
57+
return ctx->PoolId && ctx->PoolConfig.has_value() && ctx->PoolConfig->TotalCpuLimitPercentPerNode > 0;
58+
}
59+
5660
}
5761

5862
bool TKqpPlanner::UseMockEmptyPlanner = false;
@@ -101,6 +105,10 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
101105
LOG_E("Database not set, use " << Database);
102106
}
103107
}
108+
109+
if (LimitCPU(UserRequestContext)) {
110+
AllowSinglePartitionOpt = false;
111+
}
104112
}
105113

106114
// ResourcesSnapshot, ResourceEstimations
@@ -223,6 +231,13 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
223231
request.SetSerializedGUCSettings(SerializedGUCSettings);
224232
}
225233

234+
request.SetSchedulerGroup(UserRequestContext->PoolId);
235+
request.SetDatabase(Database);
236+
if (UserRequestContext->PoolConfig.has_value()) {
237+
request.SetMemoryPoolPercent(UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode);
238+
request.SetMaxCpuShare(UserRequestContext->PoolConfig->TotalCpuLimitPercentPerNode / 100.0);
239+
}
240+
226241
return result;
227242
}
228243

0 commit comments

Comments
 (0)