Skip to content

Commit bc843db

Browse files
authored
YQL-17542 move allocator ownership from TDqTaskRunner to actors (#1335)
1 parent 9f3844c commit bc843db

25 files changed

+152
-101
lines changed

ydb/core/fq/libs/init/init.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ void Init(
246246
if (!mkqlAllocSize) {
247247
mkqlAllocSize = 30_MB;
248248
}
249+
Y_ABORT_UNLESS(appData->FunctionRegistry);
249250
NYql::NDqs::TLocalWorkerManagerOptions lwmOptions;
250251
lwmOptions.Counters = workerManagerCounters;
251252
lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, nullptr, false);
@@ -257,8 +258,9 @@ void Init(
257258
lwmOptions.MkqlProgramHardMemoryLimit = protoConfig.GetResourceManager().GetMkqlTaskHardMemoryLimit();
258259
lwmOptions.MkqlMinAllocSize = mkqlAllocSize;
259260
lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
260-
[=](const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) {
261-
return lwmOptions.Factory->Get(task, statsMode);
261+
*appData->FunctionRegistry,
262+
[=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) {
263+
return lwmOptions.Factory->Get(alloc, task, statsMode);
262264
});
263265
if (protoConfig.GetRateLimiter().GetDataPlaneEnabled()) {
264266
lwmOptions.QuoterServiceActorId = NFq::YqQuoterServiceActorId();

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ void TKqpComputeActor::DoBootstrap() {
4646
execCtx.ComputeCtx = &ComputeCtx;
4747
execCtx.ComputationFactory = NMiniKQL::GetKqpActorComputeFactory(&ComputeCtx);
4848
execCtx.ApplyCtx = nullptr;
49-
execCtx.Alloc = nullptr;
5049
execCtx.TypeEnv = nullptr;
5150
execCtx.PatternCache = GetKqpResourceManager()->GetPatternCache();
5251

@@ -68,7 +67,7 @@ void TKqpComputeActor::DoBootstrap() {
6867
settings.ReadRanges.push_back(readRange);
6968
}
7069

71-
auto taskRunner = MakeDqTaskRunner(execCtx, settings, logger);
70+
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocator(), execCtx, settings, logger);
7271
SetTaskRunner(taskRunner);
7372

7473
auto wakeup = [this]{ ContinueExecute(); };

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,6 @@ void TKqpScanComputeActor::DoBootstrap() {
188188
execCtx.RandomProvider = TAppData::RandomProvider.Get();
189189
execCtx.TimeProvider = TAppData::TimeProvider.Get();
190190
execCtx.ApplyCtx = nullptr;
191-
execCtx.Alloc = nullptr;
192191
execCtx.TypeEnv = nullptr;
193192
execCtx.PatternCache = GetKqpResourceManager()->GetPatternCache();
194193

@@ -219,7 +218,7 @@ void TKqpScanComputeActor::DoBootstrap() {
219218
};
220219
}
221220

222-
auto taskRunner = MakeDqTaskRunner(execCtx, settings, logger);
221+
auto taskRunner = MakeDqTaskRunner(GetAllocator(), execCtx, settings, logger);
223222
TBase::SetTaskRunner(taskRunner);
224223

225224
auto wakeup = [this] { ContinueExecute(); };

ydb/core/kqp/executer_actor/kqp_literal_executer.cpp

+6-7
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ using namespace NYql::NDq;
1919

2020
namespace {
2121

22-
std::unique_ptr<TDqTaskRunnerContext> CreateTaskRunnerContext(NMiniKQL::TKqpComputeContextBase* computeCtx, NMiniKQL::TScopedAlloc* alloc,
22+
std::unique_ptr<TDqTaskRunnerContext> CreateTaskRunnerContext(NMiniKQL::TKqpComputeContextBase* computeCtx,
2323
NMiniKQL::TTypeEnvironment* typeEnv)
2424
{
2525
std::unique_ptr<TDqTaskRunnerContext> context = std::make_unique<TDqTaskRunnerContext>();
@@ -44,7 +44,6 @@ std::unique_ptr<TDqTaskRunnerContext> CreateTaskRunnerContext(NMiniKQL::TKqpComp
4444
return nullptr;
4545
};
4646

47-
context->Alloc = alloc;
4847
context->TypeEnv = typeEnv;
4948
context->ApplyCtx = nullptr;
5049
return context;
@@ -167,12 +166,12 @@ class TKqpLiteralExecuter {
167166

168167
// task runner settings
169168
ComputeCtx = std::make_unique<NMiniKQL::TKqpComputeContextBase>();
170-
RunnerContext = CreateTaskRunnerContext(ComputeCtx.get(), &Request.TxAlloc->Alloc, &Request.TxAlloc->TypeEnv);
169+
RunnerContext = CreateTaskRunnerContext(ComputeCtx.get(), &Request.TxAlloc->TypeEnv);
171170
RunnerContext->PatternCache = GetKqpResourceManager()->GetPatternCache();
172171
TDqTaskRunnerSettings settings = CreateTaskRunnerSettings(Request.StatsMode);
173172

174173
for (auto& task : TasksGraph.GetTasks()) {
175-
RunTask(task, *RunnerContext, settings);
174+
RunTask(Request.TxAlloc->Alloc, task, *RunnerContext, settings);
176175

177176
if (TerminateIfTimeout()) {
178177
return;
@@ -183,7 +182,7 @@ class TKqpLiteralExecuter {
183182
UpdateCounters();
184183
}
185184

186-
void RunTask(TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) {
185+
void RunTask(NMiniKQL::TScopedAlloc& alloc, TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) {
187186
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
188187
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
189188

@@ -218,7 +217,7 @@ class TKqpLiteralExecuter {
218217
<< message);
219218
};
220219

221-
auto taskRunner = MakeDqTaskRunner(context, settings, log);
220+
auto taskRunner = MakeDqTaskRunner(alloc, context, settings, log);
222221
TaskRunners.emplace_back(taskRunner);
223222

224223
auto taskSettings = NDq::TDqTaskSettings(&protoTask);
@@ -228,7 +227,7 @@ class TKqpLiteralExecuter {
228227
auto status = taskRunner->Run();
229228
YQL_ENSURE(status == ERunStatus::Finished);
230229

231-
with_lock (*context.Alloc) { // allocator is used only by outputChannel->PopAll()
230+
with_lock (alloc) { // allocator is used only by outputChannel->PopAll()
232231
for (auto& taskOutput : task.Outputs) {
233232
for (ui64 outputChannelId : taskOutput.Channels) {
234233
auto outputChannel = taskRunner->GetOutputChannel(outputChannelId);

ydb/core/kqp/runtime/kqp_tasks_runner.cpp

+7-6
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,11 @@ IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outp
7070

7171

7272
TKqpTasksRunner::TKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks,
73+
NKikimr::NMiniKQL::TScopedAlloc& alloc,
7374
const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
7475
: LogFunc(logFunc)
75-
, Alloc(execCtx.Alloc)
76+
, Alloc(alloc)
7677
{
77-
YQL_ENSURE(execCtx.Alloc);
7878
YQL_ENSURE(execCtx.TypeEnv);
7979

8080
ApplyCtx = dynamic_cast<NMiniKQL::TKqpDatashardApplyContext *>(execCtx.ApplyCtx);
@@ -86,7 +86,7 @@ TKqpTasksRunner::TKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TD
8686
try {
8787
for (auto&& task : tasks) {
8888
ui64 taskId = task.GetId();
89-
auto runner = MakeDqTaskRunner(execCtx, settings, logFunc);
89+
auto runner = MakeDqTaskRunner(alloc, execCtx, settings, logFunc);
9090
if (auto* stats = runner->GetStats()) {
9191
Stats.emplace(taskId, stats);
9292
}
@@ -230,15 +230,16 @@ const NYql::NDq::TDqTaskSettings& TKqpTasksRunner::GetTask(ui64 taskId) const {
230230

231231
TGuard<NMiniKQL::TScopedAlloc> TKqpTasksRunner::BindAllocator(TMaybe<ui64> memoryLimit) {
232232
if (memoryLimit) {
233-
Alloc->SetLimit(*memoryLimit);
233+
Alloc.SetLimit(*memoryLimit);
234234
}
235-
return TGuard(*Alloc);
235+
return TGuard(Alloc);
236236
}
237237

238238
TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks,
239+
NKikimr::NMiniKQL::TScopedAlloc& alloc,
239240
const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
240241
{
241-
return new TKqpTasksRunner(std::move(tasks), execCtx, settings, logFunc);
242+
return new TKqpTasksRunner(std::move(tasks), alloc, execCtx, settings, logFunc);
242243
}
243244

244245
} // namespace NKqp

ydb/core/kqp/runtime/kqp_tasks_runner.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ NYql::NDq::IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NYql::NDqProto::
1616
class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCopyable {
1717
public:
1818
TKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks,
19+
NKikimr::NMiniKQL::TScopedAlloc& alloc,
1920
const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings,
2021
const NYql::NDq::TLogFunc& logFunc);
2122

@@ -50,15 +51,15 @@ class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCop
5051
// otherwise use particular memory limit
5152
TGuard<NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit = Nothing());
5253

53-
ui64 GetAllocatedMemory() const { return Alloc->GetAllocated(); }
54+
ui64 GetAllocatedMemory() const { return Alloc.GetAllocated(); }
5455

5556
const TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> GetTasksStats() const { return Stats; }
5657
private:
5758
TMap<ui64, TIntrusivePtr<NYql::NDq::IDqTaskRunner>> TaskRunners;
5859
TMap<ui64, NYql::NDq::TDqTaskSettings> Tasks;
5960
TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> Stats;
6061
NYql::NDq::TLogFunc LogFunc;
61-
NMiniKQL::TScopedAlloc* Alloc;
62+
NMiniKQL::TScopedAlloc& Alloc;
6263
NMiniKQL::TKqpComputeContextBase* ComputeCtx;
6364
NMiniKQL::TKqpDatashardApplyContext* ApplyCtx;
6465

@@ -72,6 +73,7 @@ class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCop
7273

7374

7475
TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks,
76+
NKikimr::NMiniKQL::TScopedAlloc& alloc,
7577
const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings,
7678
const NYql::NDq::TLogFunc& logFunc);
7779

ydb/core/tx/datashard/datashard__engine_host.cpp

+2-3
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,6 @@ TEngineBay::TEngineBay(TDataShard* self, TTransactionContext& txc, const TActorC
521521
KqpExecCtx.RandomProvider = TAppData::RandomProvider.Get();
522522
KqpExecCtx.TimeProvider = TAppData::TimeProvider.Get();
523523
KqpExecCtx.ApplyCtx = KqpApplyCtx.Get();
524-
KqpExecCtx.Alloc = KqpAlloc.Get();
525524
KqpExecCtx.TypeEnv = KqpTypeEnv.Get();
526525
if (auto rm = NKqp::TryGetKqpResourceManager()) {
527526
KqpExecCtx.PatternCache = rm->GetPatternCache();
@@ -701,9 +700,9 @@ NKqp::TKqpTasksRunner& TEngineBay::GetKqpTasksRunner(NKikimrTxDataShard::TKqpTra
701700

702701
settings.OptLLVM = "OFF";
703702
settings.TerminateOnError = false;
704-
703+
Y_ABORT_UNLESS(KqpAlloc);
705704
KqpAlloc->SetLimit(10_MB);
706-
KqpTasksRunner = NKqp::CreateKqpTasksRunner(std::move(*tx.MutableTasks()), KqpExecCtx, settings, KqpLogFunc);
705+
KqpTasksRunner = NKqp::CreateKqpTasksRunner(std::move(*tx.MutableTasks()), *KqpAlloc.Get(), KqpExecCtx, settings, KqpLogFunc);
707706
}
708707

709708
return *KqpTasksRunner;

ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class TDqComputeActor : public TDqComputeActorBase<TDqComputeActor> {
5454
};
5555
}
5656

57-
auto taskRunner = TaskRunnerFactory(Task, RuntimeSettings.StatsMode, logger);
57+
auto taskRunner = TaskRunnerFactory(GetAllocator(), Task, RuntimeSettings.StatsMode, logger);
5858
SetTaskRunner(taskRunner);
5959
auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
6060
TDqTaskRunnerExecutionContext execCtx(TxId, RuntimeSettings.UseSpilling, std::move(wakeup));

ydb/library/yql/dq/actors/compute/dq_compute_actor.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,11 @@ struct TComputeMemoryLimits {
364364
IMemoryQuotaManager::TPtr MemoryQuotaManager;
365365
};
366366

367+
//temporary flag to integarate changes in interface
368+
#define Y_YQL_DQ_TASK_RUNNER_REQUIRES_ALLOCATOR 1
369+
367370
using TTaskRunnerFactory = std::function<
368-
TIntrusivePtr<IDqTaskRunner>(const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TLogFunc& logFunc)
371+
TIntrusivePtr<IDqTaskRunner>(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TLogFunc& logFunc)
369372
>;
370373

371374
void FillAsyncStats(NDqProto::TDqAsyncBufferStats& proto, TDqAsyncStats stats);

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

+16-5
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,12 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
206206
, Running(!Task.GetCreateSuspended())
207207
, PassExceptions(passExceptions)
208208
{
209+
Alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(
210+
__LOCATION__,
211+
NKikimr::TAlignedPagePoolCounters(),
212+
FunctionRegistry->SupportsSizedAllocators(),
213+
false
214+
);
209215
InitMonCounters(taskCounters);
210216
InitializeTask();
211217
if (ownMemoryQuota) {
@@ -626,8 +632,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
626632
void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssues issues) {
627633
CA_LOG_E(InternalErrorLogString(statusCode, issues));
628634
if (TaskRunner) {
629-
TaskRunner->GetAllocatorPtr()->InvalidateMemInfo();
630-
TaskRunner->GetAllocatorPtr()->DisableStrictAllocationCheck();
635+
TaskRunner->GetAllocator().InvalidateMemInfo();
636+
TaskRunner->GetAllocator().DisableStrictAllocationCheck();
631637
}
632638
State = NDqProto::COMPUTE_STATE_FAILURE;
633639
ReportStateAndMaybeDie(statusCode, issues);
@@ -1365,6 +1371,10 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
13651371
}
13661372
}
13671373

1374+
protected:
1375+
NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() {
1376+
return *Alloc.get();
1377+
}
13681378
private:
13691379
virtual const TDqMemoryQuota::TProfileStats* GetMemoryProfileStats() const {
13701380
Y_ABORT_UNLESS(MemoryQuota);
@@ -1586,7 +1596,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
15861596
.TypeEnv = typeEnv,
15871597
.HolderFactory = holderFactory,
15881598
.TaskCounters = TaskCounters,
1589-
.Alloc = TaskRunner ? TaskRunner->GetAllocatorPtr() : nullptr,
1599+
.Alloc = TaskRunner ? Alloc : nullptr,
15901600
.MemoryQuotaManager = MemoryLimits.MemoryQuotaManager,
15911601
.SourceSettings = (!settings.empty() ? settings.at(inputIndex) : nullptr),
15921602
.Arena = Task.GetArena(),
@@ -1619,7 +1629,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
16191629
.TypeEnv = typeEnv,
16201630
.HolderFactory = holderFactory,
16211631
.ProgramBuilder = *transform.ProgramBuilder,
1622-
.Alloc = TaskRunner->GetAllocatorPtr(),
1632+
.Alloc = Alloc,
16231633
.TraceId = ComputeActorSpan.GetTraceId()
16241634
});
16251635
} catch (const std::exception& ex) {
@@ -2222,7 +2232,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
22222232

22232233
LastSendStatsTime = now;
22242234
}
2225-
2235+
private:
2236+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; //must be declared on top to be destroyed after all the rest
22262237
protected:
22272238
const NActors::TActorId ExecuterId;
22282239
const TTxId TxId;

ydb/library/yql/dq/actors/task_runner/task_runner_actor.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ struct ITaskRunnerActorFactory {
4949
THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota = {}) = 0;
5050
};
5151

52-
ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory);
52+
ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory);
5353

5454
} // namespace NTaskRunnerActor
5555

ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp

+19-7
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,23 @@ class TLocalTaskRunnerActor
3838
public:
3939
static constexpr char ActorName[] = "YQL_DQ_TASK_RUNNER";
4040

41-
TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota)
41+
TLocalTaskRunnerActor(ITaskRunnerActor::ICallbacks* parent, const TTaskRunnerFactory& factory, const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTxId& txId, ui64 taskId, THashSet<ui32>&& inputChannelsWithDisabledCheckpoints, THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota)
4242
: TActor<TLocalTaskRunnerActor>(&TLocalTaskRunnerActor::Handler)
43+
, FuncRegistry(funcRegistry)
4344
, Parent(parent)
4445
, Factory(factory)
4546
, TxId(txId)
4647
, TaskId(taskId)
4748
, InputChannelsWithDisabledCheckpoints(std::move(inputChannelsWithDisabledCheckpoints))
4849
, MemoryQuota(std::move(memoryQuota))
49-
{ }
50+
{
51+
Alloc = std::make_unique<NKikimr::NMiniKQL::TScopedAlloc>(
52+
__LOCATION__,
53+
NKikimr::TAlignedPagePoolCounters(),
54+
FuncRegistry.SupportsSizedAllocators(),
55+
false
56+
);
57+
}
5058

5159
~TLocalTaskRunnerActor()
5260
{ }
@@ -407,7 +415,7 @@ class TLocalTaskRunnerActor
407415
void OnDqTask(TEvTaskRunnerCreate::TPtr& ev) {
408416
ParentId = ev->Sender;
409417
auto settings = NDq::TDqTaskSettings(&ev->Get()->Task);
410-
TaskRunner = Factory(settings, ev->Get()->StatsMode, [this](const TString& message) {
418+
TaskRunner = Factory(*Alloc.get(), settings, ev->Get()->StatsMode, [this](const TString& message) {
411419
LOG_D(message);
412420
});
413421

@@ -463,6 +471,8 @@ class TLocalTaskRunnerActor
463471
THolder<TEvDq::TEvAbortExecution> GetError(const TString& message) {
464472
return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::BAD_REQUEST, TVector<TIssue>{TIssue(message).SetCode(TIssuesIds::DQ_GATEWAY_ERROR, TSeverityIds::S_ERROR)});
465473
}
474+
const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry;
475+
std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
466476

467477
NActors::TActorId ParentId;
468478
ITaskRunnerActor::ICallbacks* Parent;
@@ -477,8 +487,9 @@ class TLocalTaskRunnerActor
477487
};
478488

479489
struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory {
480-
TLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory)
490+
TLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory)
481491
: Factory(factory)
492+
, FuncRegistry(funcRegistry)
482493
{ }
483494

484495
std::tuple<ITaskRunnerActor*, NActors::IActor*> Create(
@@ -488,19 +499,20 @@ struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory {
488499
THashSet<ui32>&& inputChannelsWithDisabledCheckpoints,
489500
THolder<NYql::NDq::TDqMemoryQuota>&& memoryQuota) override
490501
{
491-
auto* actor = new TLocalTaskRunnerActor(parent, Factory, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota));
502+
auto* actor = new TLocalTaskRunnerActor(parent, Factory, FuncRegistry, txId, taskId, std::move(inputChannelsWithDisabledCheckpoints), std::move(memoryQuota));
492503
return std::make_tuple(
493504
static_cast<ITaskRunnerActor*>(actor),
494505
static_cast<NActors::IActor*>(actor)
495506
);
496507
}
497508

498509
TTaskRunnerFactory Factory;
510+
const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry;
499511
};
500512

501-
ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory)
513+
ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const TTaskRunnerFactory& factory)
502514
{
503-
return ITaskRunnerActorFactory::TPtr(new TLocalTaskRunnerActorFactory(factory));
515+
return ITaskRunnerActorFactory::TPtr(new TLocalTaskRunnerActorFactory(funcRegistry, factory));
504516
}
505517

506518
} // namespace NTaskRunnerActor

0 commit comments

Comments
 (0)