10
10
#include < ydb/library/yql/providers/dq/common/attrs.h>
11
11
#include < ydb/library/yql/providers/dq/actors/dynamic_nameserver.h>
12
12
#include < ydb/library/yql/providers/dq/actors/resource_allocator.h>
13
+ #include < ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
14
+ #include < ydb/library/yql/dq/integration/transform/yql_dq_task_transform.h>
15
+ #include < ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.h>
16
+ #include < ydb/library/yql/providers/common/comp_nodes/yql_factory.h>
17
+ #include < ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
18
+ #include < ydb/library/yql/dq/transform/yql_common_dq_transform.h>
19
+ #include < ydb/library/yql/providers/dq/task_runner/tasks_runner_local.h>
13
20
14
21
using namespace NYql ;
15
22
using namespace NActors ;
@@ -178,11 +185,26 @@ class TGlobalWorkerManagerTest: public TTestBase {
178
185
TActorSetupCmd{gwmActor, TMailboxType::Simple, 0 });
179
186
180
187
// Local WM.
188
+ FunctionRegistry_ = CreateFunctionRegistry (NKikimr::NMiniKQL::CreateBuiltinRegistry ());
189
+ auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory ({
190
+ NYql::GetCommonDqFactory (),
191
+ NKikimr::NMiniKQL::GetYqlFactory ()
192
+ });
193
+
194
+ auto dqTaskTransformFactory = NYql::CreateCompositeTaskTransformFactory ({
195
+ NYql::CreateCommonDqTaskTransformFactory ()
196
+ });
197
+
198
+ auto patternCache = std::make_shared<NKikimr::NMiniKQL::TComputationPatternLRUCache>(NKikimr::NMiniKQL::TComputationPatternLRUCache::Config (200_MB, 200_MB));
199
+
200
+ auto factory = NTaskRunnerProxy::CreateFactory (FunctionRegistry_.Get (), dqCompFactory, dqTaskTransformFactory, patternCache, true );
181
201
for (ui32 i = 1 ; i < nodesNumber; i++) {
182
202
NYql::NDqs::TLocalWorkerManagerOptions lwmOptions;
183
203
lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory ();
184
204
lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateTaskRunnerActorFactory (
185
205
lwmOptions.Factory , lwmOptions.TaskRunnerInvokerFactory );
206
+ lwmOptions.FunctionRegistry = FunctionRegistry_.Get ();
207
+ lwmOptions.Factory = factory;
186
208
auto localWM = CreateLocalWorkerManager (lwmOptions);
187
209
ActorRuntime_->AddLocalService (MakeWorkerManagerActorID (NodeId (i)),
188
210
TActorSetupCmd{localWM, TMailboxType::Simple, 0 }, i);
@@ -192,6 +214,10 @@ class TGlobalWorkerManagerTest: public TTestBase {
192
214
193
215
ActorRuntime_->Initialize ();
194
216
217
+ for (ui32 i = 1 ; i < nodesNumber; i++) {
218
+ ActorRuntime_->GetLogSettings (i)->Mask = 0xffffffff ;
219
+ }
220
+
195
221
NActors::TDispatchOptions options;
196
222
options.FinalEvents .emplace_back (NActors::TEvents::TSystem::Bootstrap, nodesNumber);
197
223
ActorRuntime_->DispatchEvents (options);
@@ -360,6 +386,8 @@ class TGlobalWorkerManagerTest: public TTestBase {
360
386
TActorId RegisterResourceAllocator (const ui32 workersCount, const TActorId& execActor) const {
361
387
TIntrusivePtr<NMonitoring::TDynamicCounters> counters = MakeIntrusive<NMonitoring::TDynamicCounters>();
362
388
auto gwmActor = MakeWorkerManagerActorID (NodeId ());
389
+ TVector<NYql::NDqProto::TDqTask> tasks (workersCount);
390
+ // auto allocator = CreateResourceAllocator(gwmActor, execActor, execActor, workersCount, "TraceId", new TDqConfiguration(), counters, tasks, "sync");
363
391
auto allocator = CreateResourceAllocator (gwmActor, execActor, execActor, workersCount, " TraceId" , new TDqConfiguration (), counters);
364
392
const auto allocatorId = ActorRuntime_->Register (allocator);
365
393
return allocatorId;
@@ -434,6 +462,7 @@ class TGlobalWorkerManagerTest: public TTestBase {
434
462
}
435
463
436
464
THolder<NActors::TTestActorRuntimeBase> ActorRuntime_;
465
+ TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> FunctionRegistry_;
437
466
};
438
467
439
468
UNIT_TEST_SUITE_REGISTRATION (TGlobalWorkerManagerTest)
0 commit comments