Skip to content

Commit c520e07

Browse files
authored
Merge 9d94100 into 66d836f
2 parents 66d836f + 9d94100 commit c520e07

32 files changed

+734
-666
lines changed

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -2149,7 +2149,7 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
21492149

21502150
// Create resource manager
21512151
auto rm = NKqp::CreateKqpResourceManagerActor(Config.GetTableServiceConfig().GetResourceManager(), nullptr,
2152-
{}, kqpProxySharedResources);
2152+
{}, kqpProxySharedResources, NodeId);
21532153
setup->LocalServices.push_back(std::make_pair(
21542154
NKqp::MakeKqpRmServiceID(NodeId),
21552155
TActorSetupCmd(rm, TMailboxType::HTSwap, appData->UserPoolId)));

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

+93-53
Original file line numberDiff line numberDiff line change
@@ -6,137 +6,174 @@
66

77
namespace NKikimr::NKqp::NComputeActor {
88

9+
910
struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
1011

1112
TMemoryQuotaManager(std::shared_ptr<NRm::IKqpResourceManager> resourceManager
1213
, NRm::EKqpMemoryPool memoryPool
1314
, std::shared_ptr<IKqpNodeState> state
14-
, ui64 txId
15-
, ui64 taskId
15+
, TIntrusivePtr<NRm::TTxState> tx
16+
, TIntrusivePtr<NRm::TTaskState> task
1617
, ui64 limit
1718
, ui64 reasonableSpillingTreshold)
1819
: NYql::NDq::TGuaranteeQuotaManager(limit, limit)
1920
, ResourceManager(std::move(resourceManager))
2021
, MemoryPool(memoryPool)
2122
, State(std::move(state))
22-
, TxId(txId)
23-
, TaskId(taskId)
23+
, Tx(std::move(tx))
24+
, Task(std::move(task))
2425
, ReasonableSpillingTreshold(reasonableSpillingTreshold)
2526
{
2627
}
2728

2829
~TMemoryQuotaManager() override {
29-
State->OnTaskTerminate(TxId, TaskId, Success);
30-
ResourceManager->FreeResources(TxId, TaskId);
30+
if (State) {
31+
State->OnTaskTerminate(Tx->TxId, Task->TaskId, Success);
32+
}
33+
34+
ResourceManager->FreeResources(Tx, Task);
3135
}
3236

3337
bool AllocateExtraQuota(ui64 extraSize) override {
34-
auto result = ResourceManager->AllocateResources(TxId, TaskId,
38+
auto result = ResourceManager->AllocateResources(Tx, Task,
3539
NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize});
3640

3741
if (!result) {
3842
AFL_WARN(NKikimrServices::KQP_COMPUTE)
3943
("problem", "cannot_allocate_memory")
40-
("tx_id", TxId)
41-
("task_id", TaskId)
44+
("tx_id", Tx->TxId)
45+
("task_id", Task->TaskId)
4246
("memory", extraSize);
4347

4448
return false;
4549
}
4650

47-
TotalQueryAllocationsSize = result.TotalAllocatedQueryMemory;
48-
4951
return true;
5052
}
5153

5254
void FreeExtraQuota(ui64 extraSize) override {
53-
ResourceManager->FreeResources(TxId, TaskId,
54-
NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize}
55-
);
55+
NRm::TKqpResourcesRequest request = NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize};
56+
ResourceManager->FreeResources(Tx, Task, Task->FitRequest(request));
5657
}
5758

5859
bool IsReasonableToUseSpilling() const override {
59-
return TotalQueryAllocationsSize >= ReasonableSpillingTreshold;
60+
return Tx->GetExtraMemoryAllocatedSize() >= ReasonableSpillingTreshold;
61+
}
62+
63+
TString MemoryConsumptionDetails() const override {
64+
return Tx->ToString();
6065
}
6166

6267
void TerminateHandler(bool success, const NYql::TIssues& issues) {
6368
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)
6469
("problem", "finish_compute_actor")
65-
("tx_id", TxId)("task_id", TaskId)("success", success)("message", issues.ToOneLineString());
70+
("tx_id", Tx->TxId)("task_id", Task->TaskId)("success", success)("message", issues.ToOneLineString());
6671
Success = success;
6772
}
6873

6974
std::shared_ptr<NRm::IKqpResourceManager> ResourceManager;
7075
NRm::EKqpMemoryPool MemoryPool;
7176
std::shared_ptr<IKqpNodeState> State;
72-
ui64 TxId;
73-
ui64 TaskId;
77+
TIntrusivePtr<NRm::TTxState> Tx;
78+
TIntrusivePtr<NRm::TTaskState> Task;
7479
bool Success = true;
75-
ui64 TotalQueryAllocationsSize = 0;
7680
ui64 ReasonableSpillingTreshold = 0;
7781
};
7882

7983
class TKqpCaFactory : public IKqpNodeComputeActorFactory {
80-
NKikimrConfig::TTableServiceConfig::TResourceManager Config;
8184
std::shared_ptr<NRm::IKqpResourceManager> ResourceManager_;
8285
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
8386
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
8487

88+
std::atomic<ui64> MkqlLightProgramMemoryLimit = 0;
89+
std::atomic<ui64> MkqlHeavyProgramMemoryLimit = 0;
90+
std::atomic<ui64> MinChannelBufferSize = 0;
91+
std::atomic<ui64> ReasonableSpillingTreshold = 0;
92+
8593
public:
8694
TKqpCaFactory(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
8795
std::shared_ptr<NRm::IKqpResourceManager> resourceManager,
8896
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
8997
const std::optional<TKqpFederatedQuerySetup> federatedQuerySetup)
90-
: Config(config)
91-
, ResourceManager_(resourceManager)
98+
: ResourceManager_(resourceManager)
9299
, AsyncIoFactory(asyncIoFactory)
93100
, FederatedQuerySetup(federatedQuerySetup)
94-
{}
101+
{
102+
ApplyConfig(config);
103+
}
95104

96-
TActorId CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* dqTask,
97-
const NYql::NDq::TComputeRuntimeSettings& settings,
98-
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena, const TString& serializedGUCSettings,
99-
TComputeStagesWithScan& computesByStage, ui64 outputChunkMaxSize, std::shared_ptr<IKqpNodeState> state,
100-
NRm::EKqpMemoryPool memoryPool, ui32 numberOfTasks)
105+
void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config)
101106
{
107+
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
108+
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
109+
MinChannelBufferSize.store(config.GetMinChannelBufferSize());
110+
ReasonableSpillingTreshold.store(config.GetReasonableSpillingTreshold());
111+
}
112+
113+
TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) {
102114
NYql::NDq::TComputeMemoryLimits memoryLimits;
103115
memoryLimits.ChannelBufferSize = 0;
104-
memoryLimits.MkqlLightProgramMemoryLimit = Config.GetMkqlLightProgramMemoryLimit();
105-
memoryLimits.MkqlHeavyProgramMemoryLimit = Config.GetMkqlHeavyProgramMemoryLimit();
116+
memoryLimits.MkqlLightProgramMemoryLimit = MkqlLightProgramMemoryLimit.load();
117+
memoryLimits.MkqlHeavyProgramMemoryLimit = MkqlHeavyProgramMemoryLimit.load();
118+
119+
auto estimation = ResourceManager_->EstimateTaskResources(*args.Task, args.NumberOfTasks);
120+
NRm::TKqpResourcesRequest resourcesRequest;
121+
resourcesRequest.MemoryPool = args.MemoryPool;
122+
resourcesRequest.ExecutionUnits = 1;
123+
resourcesRequest.Memory = memoryLimits.MkqlLightProgramMemoryLimit;
124+
125+
TIntrusivePtr<NRm::TTaskState> task = MakeIntrusive<NRm::TTaskState>(args.Task->GetId(), args.TxInfo->CreatedAt);
106126

107-
auto estimation = EstimateTaskResources(*dqTask, Config, numberOfTasks);
127+
auto rmResult = ResourceManager_->AllocateResources(
128+
args.TxInfo, task, resourcesRequest);
129+
130+
if (!rmResult) {
131+
return NRm::TKqpRMAllocateResult{rmResult};
132+
}
108133

109134
{
110135
ui32 inputChannelsCount = 0;
111-
for (auto&& i : dqTask->GetInputs()) {
136+
for (auto&& i : args.Task->GetInputs()) {
112137
inputChannelsCount += i.ChannelsSize();
113138
}
114139

115-
memoryLimits.ChannelBufferSize = std::max<ui32>(estimation.ChannelBufferMemoryLimit / std::max<ui32>(1, inputChannelsCount), Config.GetMinChannelBufferSize());
116-
memoryLimits.OutputChunkMaxSize = outputChunkMaxSize;
140+
memoryLimits.ChannelBufferSize = std::max<ui32>(estimation.ChannelBufferMemoryLimit / std::max<ui32>(1, inputChannelsCount), MinChannelBufferSize.load());
141+
memoryLimits.OutputChunkMaxSize = args.OutputChunkMaxSize;
117142
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "channel_info")
118143
("ch_size", estimation.ChannelBufferMemoryLimit)
119144
("ch_count", estimation.ChannelBuffersCount)
120145
("ch_limit", memoryLimits.ChannelBufferSize)
121-
("inputs", dqTask->InputsSize())
146+
("inputs", args.Task->InputsSize())
122147
("input_channels_count", inputChannelsCount);
123148
}
124149

125-
auto& taskOpts = dqTask->GetProgram().GetSettings();
150+
auto& taskOpts = args.Task->GetProgram().GetSettings();
126151
auto limit = taskOpts.GetHasMapJoin() || taskOpts.GetHasStateAggregation()
127152
? memoryLimits.MkqlHeavyProgramMemoryLimit
128153
: memoryLimits.MkqlLightProgramMemoryLimit;
129154

130155
memoryLimits.MemoryQuotaManager = std::make_shared<TMemoryQuotaManager>(
131156
ResourceManager_,
132-
memoryPool,
133-
std::move(state),
134-
txId,
135-
dqTask->GetId(),
157+
args.MemoryPool,
158+
std::move(args.State),
159+
std::move(args.TxInfo),
160+
std::move(task),
136161
limit,
137-
Config.GetReasonableSpillingTreshold());
162+
ReasonableSpillingTreshold.load());
163+
164+
auto runtimeSettings = args.RuntimeSettings;
165+
runtimeSettings.ExtraMemoryAllocationPool = args.MemoryPool;
166+
runtimeSettings.UseSpilling = args.WithSpilling;
167+
runtimeSettings.StatsMode = args.StatsMode;
168+
169+
if (args.Deadline) {
170+
runtimeSettings.Timeout = args.Deadline - TAppData::TimeProvider->Now();
171+
}
172+
173+
if (args.RlPath) {
174+
runtimeSettings.RlPath = args.RlPath;
175+
}
138176

139-
auto runtimeSettings = settings;
140177
NYql::NDq::IMemoryQuotaManager::TWeakPtr memoryQuotaManager = memoryLimits.MemoryQuotaManager;
141178
runtimeSettings.TerminateHandler = [memoryQuotaManager]
142179
(bool success, const NYql::TIssues& issues) {
@@ -157,29 +194,32 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
157194
};
158195

159196
ETableKind tableKind = ETableKind::Unknown;
160-
if (dqTask->HasMetaId()) {
161-
YQL_ENSURE(computesByStage.GetMetaById(*dqTask, meta) || dqTask->GetMeta().UnpackTo(&meta), "cannot take meta on MetaId exists in tasks");
197+
if (args.Task->HasMetaId()) {
198+
YQL_ENSURE(args.ComputesByStages);
199+
YQL_ENSURE(args.ComputesByStages->GetMetaById(*args.Task, meta) || args.Task->GetMeta().UnpackTo(&meta), "cannot take meta on MetaId exists in tasks");
162200
tableKind = tableKindExtract(meta);
163-
} else if (dqTask->GetMeta().UnpackTo(&meta)) {
201+
} else if (args.Task->GetMeta().UnpackTo(&meta)) {
164202
tableKind = tableKindExtract(meta);
165203
}
166204

167205
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
168-
auto& info = computesByStage.UpsertTaskWithScan(*dqTask, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
169-
IActor* computeActor = CreateKqpScanComputeActor(executerId, txId, dqTask,
206+
YQL_ENSURE(args.ComputesByStages);
207+
auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
208+
IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.Task,
170209
AsyncIoFactory, runtimeSettings, memoryLimits,
171-
std::move(traceId), std::move(arena));
210+
std::move(args.TraceId), std::move(args.Arena));
172211
TActorId result = TlsActivationContext->Register(computeActor);
173212
info.MutableActorIds().emplace_back(result);
174213
return result;
175214
} else {
176215
std::shared_ptr<TGUCSettings> GUCSettings;
177-
if (!serializedGUCSettings.empty()) {
178-
GUCSettings = std::make_shared<TGUCSettings>(serializedGUCSettings);
216+
if (!args.SerializedGUCSettings.empty()) {
217+
GUCSettings = std::make_shared<TGUCSettings>(args.SerializedGUCSettings);
179218
}
180-
IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(executerId, txId, dqTask, AsyncIoFactory,
181-
runtimeSettings, memoryLimits, std::move(traceId), std::move(arena), FederatedQuerySetup, GUCSettings);
182-
return TlsActivationContext->Register(computeActor);
219+
IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(args.ExecuterId, args.TxId, args.Task, AsyncIoFactory,
220+
runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings);
221+
return args.ShareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) :
222+
TlsActivationContext->AsActorContext().Register(computeActor);
183223
}
184224
}
185225
};

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h

+25-5
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,31 @@ struct IKqpNodeComputeActorFactory {
103103
virtual ~IKqpNodeComputeActorFactory() = default;
104104

105105
public:
106-
virtual NActors::TActorId CreateKqpComputeActor(const NActors::TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* task,
107-
const NYql::NDq::TComputeRuntimeSettings& settings,
108-
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena, const TString& serializedGUCSettings,
109-
TComputeStagesWithScan& computeStages, ui64 outputChunkMaxSize, std::shared_ptr<IKqpNodeState> state,
110-
NKikimr::NKqp::NRm::EKqpMemoryPool memoryPool, ui32 numberOfTasks) = 0;
106+
struct TCreateArgs {
107+
const NActors::TActorId& ExecuterId;
108+
const ui64 TxId;
109+
NYql::NDqProto::TDqTask* Task;
110+
TIntrusivePtr<NRm::TTxState> TxInfo;
111+
const NYql::NDq::TComputeRuntimeSettings& RuntimeSettings;
112+
NWilson::TTraceId TraceId;
113+
TIntrusivePtr<NActors::TProtoArenaHolder> Arena;
114+
const TString& SerializedGUCSettings;
115+
const ui32 NumberOfTasks;
116+
const ui64 OutputChunkMaxSize;
117+
const NKikimr::NKqp::NRm::EKqpMemoryPool MemoryPool;
118+
const bool WithSpilling;
119+
const NYql::NDqProto::EDqStatsMode StatsMode;
120+
const TInstant& Deadline;
121+
const bool ShareMailbox;
122+
const TMaybe<NYql::NDqProto::TRlPath>& RlPath;
123+
TComputeStagesWithScan* ComputesByStages = nullptr;
124+
std::shared_ptr<IKqpNodeState> State = nullptr;
125+
};
126+
127+
typedef std::variant<TActorId, NKikimr::NKqp::NRm::TKqpRMAllocateResult> TActorStartResult;
128+
virtual TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) = 0;
129+
130+
virtual void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) = 0;
111131
};
112132

113133
std::shared_ptr<IKqpNodeComputeActorFactory> MakeKqpCaFactory(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

+1-4
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,7 @@ STFUNC(TKqpComputeActor::StateFunc) {
133133
BaseStateFuncBody(ev);
134134
}
135135
} catch (const TMemoryLimitExceededException& e) {
136-
InternalError(TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder()
137-
<< "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit()
138-
<< ", host: " << HostName()
139-
<< ", canAllocateExtraMemory: " << CanAllocateExtraMemory);
136+
TBase::OnMemoryLimitExceptionHandler();
140137
} catch (const NMiniKQL::TKqpEnsureFail& e) {
141138
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
142139
} catch (const yexception& e) {

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h

+1-4
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,7 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanCo
4646
BaseStateFuncBody(ev);
4747
}
4848
} catch (const TMemoryLimitExceededException& e) {
49-
const TString sInfo = TStringBuilder() << "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit()
50-
<< ", host: " << HostName() << ", canAllocateExtraMemory: " << CanAllocateExtraMemory;
51-
CA_LOG_E("ERROR:" + sInfo);
52-
InternalError(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED, sInfo);
49+
TBase::OnMemoryLimitExceptionHandler();
5350
} catch (const yexception& e) {
5451
InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, NYql::TIssuesIds::DEFAULT_ERROR, e.what());
5552
}

ydb/core/kqp/counters/kqp_counters.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -776,7 +776,10 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
776776
RmExternalMemory = KqpGroup->GetCounter("RM/ExternalMemory", false);
777777
RmNotEnoughMemory = KqpGroup->GetCounter("RM/NotEnoughMemory", true);
778778
RmNotEnoughComputeActors = KqpGroup->GetCounter("RM/NotEnoughComputeActors", true);
779+
RmOnStartAllocs = KqpGroup->GetCounter("Rm/OnStartAllocs", true);
779780
RmExtraMemAllocs = KqpGroup->GetCounter("RM/ExtraMemAllocs", true);
781+
RmExtraMemFree = KqpGroup->GetCounter("RM/ExtraMemFree", true);
782+
RmOnCompleteFree = KqpGroup->GetCounter("RM/OnCompleteFree", true);
780783
RmInternalError = KqpGroup->GetCounter("RM/InternalError", true);
781784
RmSnapshotLatency = KqpGroup->GetHistogram(
782785
"RM/SnapshotLatency", NMonitoring::ExponentialHistogram(20, 2, 1));

ydb/core/kqp/counters/kqp_counters.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
350350
::NMonitoring::TDynamicCounterPtr WorkloadManagerGroup;
351351

352352
::NMonitoring::TDynamicCounters::TCounterPtr FullScansExecuted;
353-
353+
354354
// Lease updates counters
355355
::NMonitoring::THistogramPtr LeaseUpdateLatency;
356356
::NMonitoring::THistogramPtr RunActorLeaseUpdateBacklog;
@@ -377,6 +377,9 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
377377
::NMonitoring::TDynamicCounters::TCounterPtr RmNotEnoughMemory;
378378
::NMonitoring::TDynamicCounters::TCounterPtr RmNotEnoughComputeActors;
379379
::NMonitoring::TDynamicCounters::TCounterPtr RmExtraMemAllocs;
380+
::NMonitoring::TDynamicCounters::TCounterPtr RmOnStartAllocs;
381+
::NMonitoring::TDynamicCounters::TCounterPtr RmExtraMemFree;
382+
::NMonitoring::TDynamicCounters::TCounterPtr RmOnCompleteFree;
380383
::NMonitoring::TDynamicCounters::TCounterPtr RmInternalError;
381384
NMonitoring::THistogramPtr RmSnapshotLatency;
382385
NMonitoring::THistogramPtr NodeServiceStartEventDelivery;

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -2489,7 +2489,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24892489
.FederatedQuerySetup = FederatedQuerySetup,
24902490
.OutputChunkMaxSize = Request.OutputChunkMaxSize,
24912491
.GUCSettings = GUCSettings,
2492-
.MayRunTasksLocally = mayRunTasksLocally
2492+
.MayRunTasksLocally = mayRunTasksLocally,
2493+
.ResourceManager_ = Request.ResourceManager_,
2494+
.CaFactory_ = Request.CaFactory_
24932495
});
24942496

24952497
auto err = Planner->PlanExecution();

0 commit comments

Comments
 (0)