Skip to content

Commit 8c6ee31

Browse files
authored
ca factory in executer to support extra allocations and memory tracking everywhere (#6084)
1 parent 77cb18f commit 8c6ee31

21 files changed

+309
-289
lines changed

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2149,7 +2149,7 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
21492149

21502150
// Create resource manager
21512151
auto rm = NKqp::CreateKqpResourceManagerActor(Config.GetTableServiceConfig().GetResourceManager(), nullptr,
2152-
{}, kqpProxySharedResources);
2152+
{}, kqpProxySharedResources, NodeId);
21532153
setup->LocalServices.push_back(std::make_pair(
21542154
NKqp::MakeKqpRmServiceID(NodeId),
21552155
TActorSetupCmd(rm, TMailboxType::HTSwap, appData->UserPoolId)));

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 62 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
namespace NKikimr::NKqp::NComputeActor {
88

9+
910
struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
1011

1112
TMemoryQuotaManager(std::shared_ptr<NRm::IKqpResourceManager> resourceManager
@@ -26,7 +27,10 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
2627
}
2728

2829
~TMemoryQuotaManager() override {
29-
State->OnTaskTerminate(TxId, TaskId, Success);
30+
if (State) {
31+
State->OnTaskTerminate(TxId, TaskId, Success);
32+
}
33+
3034
ResourceManager->FreeResources(TxId, TaskId);
3135
}
3236

@@ -77,66 +81,86 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
7781
};
7882

7983
class TKqpCaFactory : public IKqpNodeComputeActorFactory {
80-
NKikimrConfig::TTableServiceConfig::TResourceManager Config;
8184
std::shared_ptr<NRm::IKqpResourceManager> ResourceManager_;
8285
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
8386
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
8487

88+
std::atomic<ui64> MkqlLightProgramMemoryLimit = 0;
89+
std::atomic<ui64> MkqlHeavyProgramMemoryLimit = 0;
90+
std::atomic<ui64> MinChannelBufferSize = 0;
91+
std::atomic<ui64> ReasonableSpillingTreshold = 0;
92+
8593
public:
8694
TKqpCaFactory(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
8795
std::shared_ptr<NRm::IKqpResourceManager> resourceManager,
8896
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
8997
const std::optional<TKqpFederatedQuerySetup> federatedQuerySetup)
90-
: Config(config)
91-
, ResourceManager_(resourceManager)
98+
: ResourceManager_(resourceManager)
9299
, AsyncIoFactory(asyncIoFactory)
93100
, FederatedQuerySetup(federatedQuerySetup)
94-
{}
101+
{
102+
ApplyConfig(config);
103+
}
95104

96-
TActorId CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* dqTask,
97-
const NYql::NDq::TComputeRuntimeSettings& settings,
98-
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena, const TString& serializedGUCSettings,
99-
TComputeStagesWithScan& computesByStage, ui64 outputChunkMaxSize, std::shared_ptr<IKqpNodeState> state,
100-
NRm::EKqpMemoryPool memoryPool, ui32 numberOfTasks)
105+
void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config)
101106
{
107+
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
108+
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
109+
MinChannelBufferSize.store(config.GetMinChannelBufferSize());
110+
ReasonableSpillingTreshold.store(config.GetReasonableSpillingTreshold());
111+
}
112+
113+
TActorId CreateKqpComputeActor(TCreateArgs&& args) {
102114
NYql::NDq::TComputeMemoryLimits memoryLimits;
103115
memoryLimits.ChannelBufferSize = 0;
104-
memoryLimits.MkqlLightProgramMemoryLimit = Config.GetMkqlLightProgramMemoryLimit();
105-
memoryLimits.MkqlHeavyProgramMemoryLimit = Config.GetMkqlHeavyProgramMemoryLimit();
116+
memoryLimits.MkqlLightProgramMemoryLimit = MkqlLightProgramMemoryLimit.load();
117+
memoryLimits.MkqlHeavyProgramMemoryLimit = MkqlHeavyProgramMemoryLimit.load();
106118

107-
auto estimation = EstimateTaskResources(*dqTask, Config, numberOfTasks);
119+
auto estimation = ResourceManager_->EstimateTaskResources(*args.Task, args.NumberOfTasks);
108120

109121
{
110122
ui32 inputChannelsCount = 0;
111-
for (auto&& i : dqTask->GetInputs()) {
123+
for (auto&& i : args.Task->GetInputs()) {
112124
inputChannelsCount += i.ChannelsSize();
113125
}
114126

115-
memoryLimits.ChannelBufferSize = std::max<ui32>(estimation.ChannelBufferMemoryLimit / std::max<ui32>(1, inputChannelsCount), Config.GetMinChannelBufferSize());
116-
memoryLimits.OutputChunkMaxSize = outputChunkMaxSize;
127+
memoryLimits.ChannelBufferSize = std::max<ui32>(estimation.ChannelBufferMemoryLimit / std::max<ui32>(1, inputChannelsCount), MinChannelBufferSize.load());
128+
memoryLimits.OutputChunkMaxSize = args.OutputChunkMaxSize;
117129
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "channel_info")
118130
("ch_size", estimation.ChannelBufferMemoryLimit)
119131
("ch_count", estimation.ChannelBuffersCount)
120132
("ch_limit", memoryLimits.ChannelBufferSize)
121-
("inputs", dqTask->InputsSize())
133+
("inputs", args.Task->InputsSize())
122134
("input_channels_count", inputChannelsCount);
123135
}
124136

125-
auto& taskOpts = dqTask->GetProgram().GetSettings();
137+
auto& taskOpts = args.Task->GetProgram().GetSettings();
126138
auto limit = taskOpts.GetHasMapJoin() || taskOpts.GetHasStateAggregation()
127139
? memoryLimits.MkqlHeavyProgramMemoryLimit
128140
: memoryLimits.MkqlLightProgramMemoryLimit;
129141

130142
memoryLimits.MemoryQuotaManager = std::make_shared<TMemoryQuotaManager>(
131143
ResourceManager_,
132-
memoryPool,
133-
std::move(state),
134-
txId,
135-
dqTask->GetId(),
144+
args.MemoryPool,
145+
std::move(args.State),
146+
args.TxId,
147+
args.Task->GetId(),
136148
limit,
137-
Config.GetReasonableSpillingTreshold());
149+
ReasonableSpillingTreshold.load());
150+
151+
auto runtimeSettings = args.RuntimeSettings;
152+
runtimeSettings.ExtraMemoryAllocationPool = args.MemoryPool;
153+
runtimeSettings.UseSpilling = args.WithSpilling;
154+
runtimeSettings.StatsMode = args.StatsMode;
155+
156+
if (args.Deadline) {
157+
runtimeSettings.Timeout = args.Deadline - TAppData::TimeProvider->Now();
158+
}
159+
160+
if (args.RlPath) {
161+
runtimeSettings.RlPath = args.RlPath;
162+
}
138163

139-
auto runtimeSettings = settings;
140164
NYql::NDq::IMemoryQuotaManager::TWeakPtr memoryQuotaManager = memoryLimits.MemoryQuotaManager;
141165
runtimeSettings.TerminateHandler = [memoryQuotaManager]
142166
(bool success, const NYql::TIssues& issues) {
@@ -157,29 +181,32 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
157181
};
158182

159183
ETableKind tableKind = ETableKind::Unknown;
160-
if (dqTask->HasMetaId()) {
161-
YQL_ENSURE(computesByStage.GetMetaById(*dqTask, meta) || dqTask->GetMeta().UnpackTo(&meta), "cannot take meta on MetaId exists in tasks");
184+
if (args.Task->HasMetaId()) {
185+
YQL_ENSURE(args.ComputesByStages);
186+
YQL_ENSURE(args.ComputesByStages->GetMetaById(*args.Task, meta) || args.Task->GetMeta().UnpackTo(&meta), "cannot take meta on MetaId exists in tasks");
162187
tableKind = tableKindExtract(meta);
163-
} else if (dqTask->GetMeta().UnpackTo(&meta)) {
188+
} else if (args.Task->GetMeta().UnpackTo(&meta)) {
164189
tableKind = tableKindExtract(meta);
165190
}
166191

167192
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
168-
auto& info = computesByStage.UpsertTaskWithScan(*dqTask, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
169-
IActor* computeActor = CreateKqpScanComputeActor(executerId, txId, dqTask,
193+
YQL_ENSURE(args.ComputesByStages);
194+
auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
195+
IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.Task,
170196
AsyncIoFactory, runtimeSettings, memoryLimits,
171-
std::move(traceId), std::move(arena));
197+
std::move(args.TraceId), std::move(args.Arena));
172198
TActorId result = TlsActivationContext->Register(computeActor);
173199
info.MutableActorIds().emplace_back(result);
174200
return result;
175201
} else {
176202
std::shared_ptr<TGUCSettings> GUCSettings;
177-
if (!serializedGUCSettings.empty()) {
178-
GUCSettings = std::make_shared<TGUCSettings>(serializedGUCSettings);
203+
if (!args.SerializedGUCSettings.empty()) {
204+
GUCSettings = std::make_shared<TGUCSettings>(args.SerializedGUCSettings);
179205
}
180-
IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(executerId, txId, dqTask, AsyncIoFactory,
181-
runtimeSettings, memoryLimits, std::move(traceId), std::move(arena), FederatedQuerySetup, GUCSettings);
182-
return TlsActivationContext->Register(computeActor);
206+
IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(args.ExecuterId, args.TxId, args.Task, AsyncIoFactory,
207+
runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings);
208+
return args.ShareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) :
209+
TlsActivationContext->AsActorContext().Register(computeActor);
183210
}
184211
}
185212
};

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,29 @@ struct IKqpNodeComputeActorFactory {
103103
virtual ~IKqpNodeComputeActorFactory() = default;
104104

105105
public:
106-
virtual NActors::TActorId CreateKqpComputeActor(const NActors::TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* task,
107-
const NYql::NDq::TComputeRuntimeSettings& settings,
108-
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena, const TString& serializedGUCSettings,
109-
TComputeStagesWithScan& computeStages, ui64 outputChunkMaxSize, std::shared_ptr<IKqpNodeState> state,
110-
NKikimr::NKqp::NRm::EKqpMemoryPool memoryPool, ui32 numberOfTasks) = 0;
106+
struct TCreateArgs {
107+
const NActors::TActorId& ExecuterId;
108+
const ui64 TxId;
109+
NYql::NDqProto::TDqTask* Task;
110+
const NYql::NDq::TComputeRuntimeSettings& RuntimeSettings;
111+
NWilson::TTraceId TraceId;
112+
TIntrusivePtr<NActors::TProtoArenaHolder> Arena;
113+
const TString& SerializedGUCSettings;
114+
const ui32 NumberOfTasks;
115+
const ui64 OutputChunkMaxSize;
116+
const NKikimr::NKqp::NRm::EKqpMemoryPool MemoryPool;
117+
const bool WithSpilling;
118+
const NYql::NDqProto::EDqStatsMode StatsMode;
119+
const TInstant& Deadline;
120+
const bool ShareMailbox;
121+
const TMaybe<NYql::NDqProto::TRlPath>& RlPath;
122+
TComputeStagesWithScan* ComputesByStages = nullptr;
123+
std::shared_ptr<IKqpNodeState> State = nullptr;
124+
};
125+
126+
virtual NActors::TActorId CreateKqpComputeActor(TCreateArgs&& args) = 0;
127+
128+
virtual void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) = 0;
111129
};
112130

113131
std::shared_ptr<IKqpNodeComputeActorFactory> MakeKqpCaFactory(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2489,7 +2489,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24892489
.FederatedQuerySetup = FederatedQuerySetup,
24902490
.OutputChunkMaxSize = Request.OutputChunkMaxSize,
24912491
.GUCSettings = GUCSettings,
2492-
.MayRunTasksLocally = mayRunTasksLocally
2492+
.MayRunTasksLocally = mayRunTasksLocally,
2493+
.ResourceManager_ = Request.ResourceManager_,
2494+
.CaFactory_ = Request.CaFactory_
24932495
});
24942496

24952497
auto err = Planner->PlanExecution();

0 commit comments

Comments
 (0)