6
6
7
7
namespace NKikimr ::NKqp::NComputeActor {
8
8
9
+
9
10
struct TMemoryQuotaManager : public NYql ::NDq::TGuaranteeQuotaManager {
10
11
11
12
TMemoryQuotaManager (std::shared_ptr<NRm::IKqpResourceManager> resourceManager
12
13
, NRm::EKqpMemoryPool memoryPool
13
14
, std::shared_ptr<IKqpNodeState> state
14
- , ui64 txId
15
- , ui64 taskId
15
+ , TIntrusivePtr<NRm::TTxState> tx
16
+ , TIntrusivePtr<NRm::TTaskState> task
16
17
, ui64 limit
17
18
, ui64 reasonableSpillingTreshold)
18
19
: NYql::NDq::TGuaranteeQuotaManager(limit, limit)
19
20
, ResourceManager(std::move(resourceManager))
20
21
, MemoryPool(memoryPool)
21
22
, State(std::move(state))
22
- , TxId(txId )
23
- , TaskId(taskId )
23
+ , Tx(std::move(tx) )
24
+ , Task(std::move(task) )
24
25
, ReasonableSpillingTreshold(reasonableSpillingTreshold)
25
26
{
26
27
}
27
28
28
29
~TMemoryQuotaManager () override {
29
- State->OnTaskTerminate (TxId, TaskId, Success);
30
- ResourceManager->FreeResources (TxId, TaskId);
30
+ if (State) {
31
+ State->OnTaskTerminate (Tx->TxId , Task->TaskId , Success);
32
+ }
33
+
34
+ ResourceManager->FreeResources (Tx, Task);
31
35
}
32
36
33
37
bool AllocateExtraQuota (ui64 extraSize) override {
34
- auto result = ResourceManager->AllocateResources (TxId, TaskId ,
38
+ auto result = ResourceManager->AllocateResources (Tx, Task ,
35
39
NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize});
36
40
37
41
if (!result) {
38
42
AFL_WARN (NKikimrServices::KQP_COMPUTE)
39
43
(" problem" , " cannot_allocate_memory" )
40
- (" tx_id" , TxId)
41
- (" task_id" , TaskId)
44
+ (" tx_id" , Tx-> TxId )
45
+ (" task_id" , Task-> TaskId )
42
46
(" memory" , extraSize);
43
47
44
48
return false ;
45
49
}
46
50
47
- TotalQueryAllocationsSize = result.TotalAllocatedQueryMemory ;
48
-
49
51
return true ;
50
52
}
51
53
52
54
void FreeExtraQuota (ui64 extraSize) override {
53
- ResourceManager->FreeResources (TxId, TaskId,
54
- NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize}
55
- );
55
+ NRm::TKqpResourcesRequest request = NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize};
56
+ ResourceManager->FreeResources (Tx, Task, Task->FitRequest (request));
56
57
}
57
58
58
59
bool IsReasonableToUseSpilling () const override {
59
- return TotalQueryAllocationsSize >= ReasonableSpillingTreshold;
60
+ return Tx->GetExtraMemoryAllocatedSize () >= ReasonableSpillingTreshold;
61
+ }
62
+
63
+ TString MemoryConsumptionDetails () const override {
64
+ return Tx->ToString ();
60
65
}
61
66
62
67
void TerminateHandler (bool success, const NYql::TIssues& issues) {
63
68
AFL_DEBUG (NKikimrServices::KQP_COMPUTE)
64
69
(" problem" , " finish_compute_actor" )
65
- (" tx_id" , TxId)(" task_id" , TaskId)(" success" , success)(" message" , issues.ToOneLineString ());
70
+ (" tx_id" , Tx-> TxId )(" task_id" , Task-> TaskId )(" success" , success)(" message" , issues.ToOneLineString ());
66
71
Success = success;
67
72
}
68
73
69
74
std::shared_ptr<NRm::IKqpResourceManager> ResourceManager;
70
75
NRm::EKqpMemoryPool MemoryPool;
71
76
std::shared_ptr<IKqpNodeState> State;
72
- ui64 TxId ;
73
- ui64 TaskId ;
77
+ TIntrusivePtr<NRm::TTxState> Tx ;
78
+ TIntrusivePtr<NRm::TTaskState> Task ;
74
79
bool Success = true ;
75
- ui64 TotalQueryAllocationsSize = 0 ;
76
80
ui64 ReasonableSpillingTreshold = 0 ;
77
81
};
78
82
79
83
class TKqpCaFactory : public IKqpNodeComputeActorFactory {
80
- NKikimrConfig::TTableServiceConfig::TResourceManager Config;
81
84
std::shared_ptr<NRm::IKqpResourceManager> ResourceManager_;
82
85
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
83
86
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
84
87
88
+ std::atomic<ui64> MkqlLightProgramMemoryLimit = 0 ;
89
+ std::atomic<ui64> MkqlHeavyProgramMemoryLimit = 0 ;
90
+ std::atomic<ui64> MinChannelBufferSize = 0 ;
91
+ std::atomic<ui64> ReasonableSpillingTreshold = 0 ;
92
+
85
93
public:
86
94
TKqpCaFactory (const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
87
95
std::shared_ptr<NRm::IKqpResourceManager> resourceManager,
88
96
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
89
97
const std::optional<TKqpFederatedQuerySetup> federatedQuerySetup)
90
- : Config(config)
91
- , ResourceManager_(resourceManager)
98
+ : ResourceManager_(resourceManager)
92
99
, AsyncIoFactory(asyncIoFactory)
93
100
, FederatedQuerySetup(federatedQuerySetup)
94
- {}
101
+ {
102
+ ApplyConfig (config);
103
+ }
95
104
96
- TActorId CreateKqpComputeActor (const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* dqTask,
97
- const NYql::NDq::TComputeRuntimeSettings& settings,
98
- NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena, const TString& serializedGUCSettings,
99
- TComputeStagesWithScan& computesByStage, ui64 outputChunkMaxSize, std::shared_ptr<IKqpNodeState> state,
100
- NRm::EKqpMemoryPool memoryPool, ui32 numberOfTasks)
105
+ void ApplyConfig (const NKikimrConfig::TTableServiceConfig::TResourceManager& config)
101
106
{
107
+ MkqlLightProgramMemoryLimit.store (config.GetMkqlLightProgramMemoryLimit ());
108
+ MkqlHeavyProgramMemoryLimit.store (config.GetMkqlHeavyProgramMemoryLimit ());
109
+ MinChannelBufferSize.store (config.GetMinChannelBufferSize ());
110
+ ReasonableSpillingTreshold.store (config.GetReasonableSpillingTreshold ());
111
+ }
112
+
113
+ TActorStartResult CreateKqpComputeActor (TCreateArgs&& args) {
102
114
NYql::NDq::TComputeMemoryLimits memoryLimits;
103
115
memoryLimits.ChannelBufferSize = 0 ;
104
- memoryLimits.MkqlLightProgramMemoryLimit = Config.GetMkqlLightProgramMemoryLimit ();
105
- memoryLimits.MkqlHeavyProgramMemoryLimit = Config.GetMkqlHeavyProgramMemoryLimit ();
116
+ memoryLimits.MkqlLightProgramMemoryLimit = MkqlLightProgramMemoryLimit.load ();
117
+ memoryLimits.MkqlHeavyProgramMemoryLimit = MkqlHeavyProgramMemoryLimit.load ();
118
+
119
+ auto estimation = ResourceManager_->EstimateTaskResources (*args.Task , args.NumberOfTasks );
120
+ NRm::TKqpResourcesRequest resourcesRequest;
121
+ resourcesRequest.MemoryPool = args.MemoryPool ;
122
+ resourcesRequest.ExecutionUnits = 1 ;
123
+ resourcesRequest.Memory = memoryLimits.MkqlLightProgramMemoryLimit ;
124
+
125
+ TIntrusivePtr<NRm::TTaskState> task = MakeIntrusive<NRm::TTaskState>(args.Task ->GetId (), args.TxInfo ->CreatedAt );
106
126
107
- auto estimation = EstimateTaskResources (*dqTask, Config, numberOfTasks);
127
+ auto rmResult = ResourceManager_->AllocateResources (
128
+ args.TxInfo , task, resourcesRequest);
129
+
130
+ if (!rmResult) {
131
+ return NRm::TKqpRMAllocateResult{rmResult};
132
+ }
108
133
109
134
{
110
135
ui32 inputChannelsCount = 0 ;
111
- for (auto && i : dqTask ->GetInputs ()) {
136
+ for (auto && i : args. Task ->GetInputs ()) {
112
137
inputChannelsCount += i.ChannelsSize ();
113
138
}
114
139
115
- memoryLimits.ChannelBufferSize = std::max<ui32>(estimation.ChannelBufferMemoryLimit / std::max<ui32>(1 , inputChannelsCount), Config. GetMinChannelBufferSize ());
116
- memoryLimits.OutputChunkMaxSize = outputChunkMaxSize ;
140
+ memoryLimits.ChannelBufferSize = std::max<ui32>(estimation.ChannelBufferMemoryLimit / std::max<ui32>(1 , inputChannelsCount), MinChannelBufferSize. load ());
141
+ memoryLimits.OutputChunkMaxSize = args. OutputChunkMaxSize ;
117
142
AFL_DEBUG (NKikimrServices::KQP_COMPUTE)(" event" , " channel_info" )
118
143
(" ch_size" , estimation.ChannelBufferMemoryLimit )
119
144
(" ch_count" , estimation.ChannelBuffersCount )
120
145
(" ch_limit" , memoryLimits.ChannelBufferSize )
121
- (" inputs" , dqTask ->InputsSize ())
146
+ (" inputs" , args. Task ->InputsSize ())
122
147
(" input_channels_count" , inputChannelsCount);
123
148
}
124
149
125
- auto & taskOpts = dqTask ->GetProgram ().GetSettings ();
150
+ auto & taskOpts = args. Task ->GetProgram ().GetSettings ();
126
151
auto limit = taskOpts.GetHasMapJoin () || taskOpts.GetHasStateAggregation ()
127
152
? memoryLimits.MkqlHeavyProgramMemoryLimit
128
153
: memoryLimits.MkqlLightProgramMemoryLimit ;
129
154
130
155
memoryLimits.MemoryQuotaManager = std::make_shared<TMemoryQuotaManager>(
131
156
ResourceManager_,
132
- memoryPool ,
133
- std::move (state ),
134
- txId ,
135
- dqTask-> GetId ( ),
157
+ args. MemoryPool ,
158
+ std::move (args. State ),
159
+ std::move (args. TxInfo ) ,
160
+ std::move (task ),
136
161
limit,
137
- Config.GetReasonableSpillingTreshold ());
162
+ ReasonableSpillingTreshold.load ());
163
+
164
+ auto runtimeSettings = args.RuntimeSettings ;
165
+ runtimeSettings.ExtraMemoryAllocationPool = args.MemoryPool ;
166
+ runtimeSettings.UseSpilling = args.WithSpilling ;
167
+ runtimeSettings.StatsMode = args.StatsMode ;
168
+
169
+ if (args.Deadline ) {
170
+ runtimeSettings.Timeout = args.Deadline - TAppData::TimeProvider->Now ();
171
+ }
172
+
173
+ if (args.RlPath ) {
174
+ runtimeSettings.RlPath = args.RlPath ;
175
+ }
138
176
139
- auto runtimeSettings = settings;
140
177
NYql::NDq::IMemoryQuotaManager::TWeakPtr memoryQuotaManager = memoryLimits.MemoryQuotaManager ;
141
178
runtimeSettings.TerminateHandler = [memoryQuotaManager]
142
179
(bool success, const NYql::TIssues& issues) {
@@ -157,29 +194,32 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
157
194
};
158
195
159
196
ETableKind tableKind = ETableKind::Unknown;
160
- if (dqTask->HasMetaId ()) {
161
- YQL_ENSURE (computesByStage.GetMetaById (*dqTask, meta) || dqTask->GetMeta ().UnpackTo (&meta), " cannot take meta on MetaId exists in tasks" );
197
+ if (args.Task ->HasMetaId ()) {
198
+ YQL_ENSURE (args.ComputesByStages );
199
+ YQL_ENSURE (args.ComputesByStages ->GetMetaById (*args.Task , meta) || args.Task ->GetMeta ().UnpackTo (&meta), " cannot take meta on MetaId exists in tasks" );
162
200
tableKind = tableKindExtract (meta);
163
- } else if (dqTask ->GetMeta ().UnpackTo (&meta)) {
201
+ } else if (args. Task ->GetMeta ().UnpackTo (&meta)) {
164
202
tableKind = tableKindExtract (meta);
165
203
}
166
204
167
205
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
168
- auto & info = computesByStage.UpsertTaskWithScan (*dqTask, meta, !AppData ()->FeatureFlags .GetEnableSeparationComputeActorsFromRead ());
169
- IActor* computeActor = CreateKqpScanComputeActor (executerId, txId, dqTask,
206
+ YQL_ENSURE (args.ComputesByStages );
207
+ auto & info = args.ComputesByStages ->UpsertTaskWithScan (*args.Task , meta, !AppData ()->FeatureFlags .GetEnableSeparationComputeActorsFromRead ());
208
+ IActor* computeActor = CreateKqpScanComputeActor (args.ExecuterId , args.TxId , args.Task ,
170
209
AsyncIoFactory, runtimeSettings, memoryLimits,
171
- std::move (traceId ), std::move (arena ));
210
+ std::move (args. TraceId ), std::move (args. Arena ));
172
211
TActorId result = TlsActivationContext->Register (computeActor);
173
212
info.MutableActorIds ().emplace_back (result);
174
213
return result;
175
214
} else {
176
215
std::shared_ptr<TGUCSettings> GUCSettings;
177
- if (!serializedGUCSettings .empty ()) {
178
- GUCSettings = std::make_shared<TGUCSettings>(serializedGUCSettings );
216
+ if (!args. SerializedGUCSettings .empty ()) {
217
+ GUCSettings = std::make_shared<TGUCSettings>(args. SerializedGUCSettings );
179
218
}
180
- IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor (executerId, txId, dqTask, AsyncIoFactory,
181
- runtimeSettings, memoryLimits, std::move (traceId), std::move (arena), FederatedQuerySetup, GUCSettings);
182
- return TlsActivationContext->Register (computeActor);
219
+ IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor (args.ExecuterId , args.TxId , args.Task , AsyncIoFactory,
220
+ runtimeSettings, memoryLimits, std::move (args.TraceId ), std::move (args.Arena ), FederatedQuerySetup, GUCSettings);
221
+ return args.ShareMailbox ? TlsActivationContext->AsActorContext ().RegisterWithSameMailbox (computeActor) :
222
+ TlsActivationContext->AsActorContext ().Register (computeActor);
183
223
}
184
224
}
185
225
};
0 commit comments