Skip to content

Commit cb5a70d

Browse files
authored
Move yql dq job core in os (#1464)
1 parent 6517468 commit cb5a70d

File tree

4 files changed

+453
-0
lines changed

4 files changed

+453
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,377 @@
1+
#include "dq_worker.h"
2+
3+
#include <ydb/library/yql/utils/signals/signals.h>
4+
#include <ydb/library/yql/utils/bind_in_range.h>
5+
6+
#include <ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h>
7+
#include <ydb/library/yql/providers/dq/actors/yt/nodeid_assigner.h>
8+
#include <ydb/library/yql/providers/dq/actors/dynamic_nameserver.h>
9+
#include <ydb/library/yql/providers/dq/service/interconnect_helpers.h>
10+
#include <ydb/library/yql/providers/dq/global_worker_manager/coordination_helper.h>
11+
12+
#include <ydb/library/yql/providers/dq/runtime/file_cache.h>
13+
#include <ydb/library/yql/providers/dq/runtime/runtime_data.h>
14+
#include <ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h>
15+
16+
#include <ydb/library/yql/dq/actors/spilling/spilling_file.h>
17+
18+
#include <ydb/library/yql/utils/log/log.h>
19+
#include <ydb/library/yql/utils/log/tls_backend.h>
20+
#include <ydb/library/yql/utils/yql_panic.h>
21+
#include <ydb/library/yql/utils/range_walker.h>
22+
23+
#include <yt/yt/core/actions/invoker.h>
24+
#include <yt/yt/core/concurrency/action_queue.h>
25+
#include <yt/yt/core/concurrency/thread_pool.h>
26+
27+
#include <library/cpp/protobuf/util/pb_io.h>
28+
29+
#include <util/system/fs.h>
30+
#include <util/stream/file.h>
31+
#include <util/system/env.h>
32+
#include <util/system/shellcommand.h>
33+
34+
using namespace NYql::NDqs;
35+
36+
namespace {
37+
template <typename TMessage>
38+
THolder<TMessage> ParseProtoConfig(const TString& cfgFile) {
39+
auto config = MakeHolder<TMessage>();
40+
TString configData = TFileInput(cfgFile).ReadAll();;
41+
42+
using ::google::protobuf::TextFormat;
43+
if (!TextFormat::ParseFromString(configData, config.Get())) {
44+
YQL_LOG(ERROR) << "Bad format of dq_vanilla_job configuration";
45+
return {};
46+
}
47+
48+
return config;
49+
}
50+
51+
static NThreading::TPromise<void> ShouldContinue = NThreading::NewPromise<void>();
52+
53+
static void OnTerminate(int) {
54+
ShouldContinue.SetValue();
55+
}
56+
57+
class TSerializedTaskRunnerInvoker: public ITaskRunnerInvoker {
58+
public:
59+
TSerializedTaskRunnerInvoker(const NYT::IInvokerPtr& invoker)
60+
: Invoker(NYT::NConcurrency::CreateSerializedInvoker(invoker))
61+
{ }
62+
63+
void Invoke(const std::function<void(void)>& f) override {
64+
Invoker->Invoke(BIND(f));
65+
}
66+
67+
private:
68+
const NYT::IInvokerPtr Invoker;
69+
};
70+
71+
class TConcurrentInvokerFactory: public ITaskRunnerInvokerFactory {
72+
public:
73+
TConcurrentInvokerFactory(int capacity)
74+
: ThreadPool(NYT::NConcurrency::CreateThreadPool(capacity, "WorkerActor"))
75+
{ }
76+
77+
ITaskRunnerInvoker::TPtr Create() override {
78+
return new TSerializedTaskRunnerInvoker(ThreadPool->GetInvoker());
79+
}
80+
81+
NYT::NConcurrency::IThreadPoolPtr ThreadPool;
82+
};
83+
84+
void ConfigurePorto(const NYql::NProto::TDqConfig::TYtBackend& config, const TString portoCtl) {
85+
TString settings[][2] = {
86+
{"enable_porto", "isolate"},
87+
{"respawn", "true"}
88+
};
89+
int nSettings = 2;
90+
{
91+
TShellCommand cmd(portoCtl, {"create", "Outer"});
92+
cmd.Run().Wait();
93+
}
94+
for (int i = 0; i < nSettings; i++) {
95+
TShellCommand cmd(portoCtl, {"set", "Outer", settings[i][0], settings[i][1]});
96+
cmd.Run().Wait();
97+
}
98+
for (const auto& attr : config.GetPortoSettings().GetSetting()) {
99+
TShellCommand cmd(portoCtl, {"set", "Outer", attr.GetName(), attr.GetValue()});
100+
cmd.Run().Wait();
101+
}
102+
{
103+
TShellCommand cmd(portoCtl, {"start", "Outer"});
104+
cmd.Run().Wait();
105+
}
106+
{
107+
TShellCommand cmd(portoCtl, {"wait", "Outer"});
108+
cmd.Run().Wait();
109+
}
110+
}
111+
}
112+
113+
namespace NYql::NDq::NWorker {
114+
115+
void TDefaultWorkerConfigurator::ConfigureMetrics(const THolder<NYql::NProto::TLoggingConfig>& /*loggerConfig*/, const THolder<NActors::TActorSystem>& /*actorSystem*/, const NProto::TDqConfig::TYtBackend& /*backendConfig*/, const TResourceManagerOptions& /*rmOptions*/, ui32 /*nodeId*/) const {
116+
}
117+
118+
NDq::IDqAsyncIoFactory::TPtr TDefaultWorkerConfigurator::CreateAsyncIoFactory() const {
119+
return MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
120+
}
121+
122+
void TDefaultWorkerConfigurator::OnWorkerFinish() {
123+
}
124+
125+
TWorkerJob::TWorkerJob()
126+
: WorkerConfigurator(MakeHolder<TDefaultWorkerConfigurator>(TDefaultWorkerConfigurator()))
127+
{ }
128+
129+
void TWorkerJob::SetConfigFile(const TString& configFile) {
130+
ConfigFile = configFile;
131+
}
132+
133+
void TWorkerJob::SetWorkerConfigurator(THolder<IWorkerConfigurator> workerConfigurator) {
134+
WorkerConfigurator = std::move(workerConfigurator);
135+
}
136+
137+
void TWorkerJob::Do() {
138+
139+
auto loggerConfig = MakeHolder<NYql::NProto::TLoggingConfig>();
140+
141+
ui16 startPort = 0;
142+
143+
auto deterministicMode = !!GetEnv("YQL_DETERMINISTIC_MODE");
144+
145+
YQL_ENSURE(TryFromString<ui16>(GetEnv(NCommonJobVars::ACTOR_PORT), startPort),
146+
"Invalid service config port env var empty");
147+
148+
ui32 tryNodeId;
149+
YQL_ENSURE(TryFromString<ui32>(GetEnv(NCommonJobVars::ACTOR_NODE_ID, "0"), tryNodeId),
150+
"Invalid nodeId env var");
151+
152+
if (!ConfigFile.empty()) {
153+
loggerConfig = ParseProtoConfig<NYql::NProto::TLoggingConfig>(ConfigFile);
154+
155+
for (auto& logDest : *loggerConfig->MutableLogDest()) {
156+
if (logDest.GetType() == NYql::NProto::TLoggingConfig::FILE) {
157+
TString logFile = logDest.GetTarget() + "." + ToString(tryNodeId);
158+
logDest.SetTarget(logFile);
159+
}
160+
}
161+
162+
loggerConfig->SetAllComponentsLevel(NYql::NProto::TLoggingConfig::TRACE);
163+
} else {
164+
loggerConfig->SetAllComponentsLevel(NYql::NProto::TLoggingConfig::DEBUG);
165+
}
166+
NYql::NLog::InitLogger(*loggerConfig, false);
167+
InitSignals();
168+
169+
TString fileCacheDir = GetEnv(NCommonJobVars::UDFS_PATH);
170+
TString ytCoordinatorStr = GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_COORDINATOR);
171+
172+
TString ytBackendStr = GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_BACKEND);
173+
174+
TString operationId = GetEnv("YT_OPERATION_ID");
175+
TString jobId = GetEnv("YT_JOB_ID");
176+
177+
TString operationSize = GetEnv(NCommonJobVars::OPERATION_SIZE);
178+
179+
NProto::TDqConfig::TYtCoordinator coordinatorConfig;
180+
TStringInput inputStream1(ytCoordinatorStr);
181+
ParseFromTextFormat(inputStream1, coordinatorConfig, EParseFromTextFormatOption::AllowUnknownField);
182+
183+
NProto::TDqConfig::TYtBackend backendConfig;
184+
TStringInput inputStream2(ytBackendStr);
185+
ParseFromTextFormat(inputStream2, backendConfig, EParseFromTextFormatOption::AllowUnknownField);
186+
187+
TRangeWalker<int> portWalker(startPort, startPort+100);
188+
auto ports = BindInRange(portWalker);
189+
190+
auto [host, ip] = NYql::NDqs::GetLocalAddress(
191+
coordinatorConfig.HasHostName() ? &coordinatorConfig.GetHostName() : nullptr
192+
);
193+
194+
auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "worker_node", ports[1].Addr.GetPort(), host, ip);
195+
i64 cacheSize = backendConfig.HasCacheSize()
196+
? backendConfig.GetCacheSize()
197+
: 16000000000L;
198+
TIntrusivePtr<IFileCache> fileCache = new TFileCache(fileCacheDir + "/cache", cacheSize);
199+
NFs::SymLink(fileCacheDir, "file_cache"); // COMPAT
200+
TString layerDir = fileCacheDir + "/layer";
201+
if (backendConfig.GetPortoLayer().size() > 0) {
202+
NFs::MakeDirectoryRecursive(layerDir + "/mnt/work");
203+
for (const auto& layerPath : backendConfig.GetPortoLayer()) {
204+
auto pos = layerPath.rfind('/');
205+
auto archive = layerPath.substr(pos+1);
206+
TShellCommand cmd("tar", {"xf", archive, "-C", layerDir});
207+
cmd.Run().Wait();
208+
}
209+
} else {
210+
NFs::MakeDirectoryRecursive("mnt/work");
211+
NFs::MakeDirectoryRecursive("usr/local/bin");
212+
}
213+
214+
int capacity = backendConfig.GetWorkerCapacity()
215+
? backendConfig.GetWorkerCapacity()
216+
: 1;
217+
218+
NYql::NTaskRunnerProxy::TPipeFactoryOptions pfOptions;
219+
pfOptions.ExecPath = GetExecPath();
220+
pfOptions.FileCache = fileCache;
221+
if (deterministicMode) {
222+
YQL_LOG(DEBUG) << "deterministicMode On";
223+
pfOptions.Env["YQL_DETERMINISTIC_MODE"] = "1";
224+
}
225+
if (backendConfig.GetEnforceJobUtc()) {
226+
pfOptions.Env["TZ"] = "UTC0";
227+
}
228+
pfOptions.EnablePorto = backendConfig.GetEnablePorto() == "isolate";
229+
pfOptions.PortoLayer = backendConfig.GetPortoLayer().size() == 0 ? "" : layerDir;
230+
pfOptions.MaxProcesses = capacity*1.5;
231+
pfOptions.ContainerName = "Outer";
232+
233+
TResourceManagerOptions rmOptions;
234+
rmOptions.YtBackend = backendConfig;
235+
rmOptions.FileCache = fileCache;
236+
rmOptions.TmpDir = fileCacheDir + "/tmp";
237+
238+
if (NFs::Exists(layerDir + "/usr/bin/portoctl")) {
239+
TString dst = fileCache->GetDir() + "/portoctl";
240+
NFs::Copy(layerDir + "/usr/bin/portoctl", dst);
241+
NFs::HardLink(layerDir + "/usr/bin/portoctl", layerDir + "/usr/sbin/portoctl"); // workaround PORTO-997
242+
chmod(dst.c_str(), 0755);
243+
pfOptions.PortoCtlPath = dst;
244+
rmOptions.DieOnFileAbsence = dst; // die on file absence
245+
}
246+
247+
Cerr << host + ":" + ip << Endl;
248+
249+
THashMap<TString, TString> attributes;
250+
attributes[NCommonAttrs::OPERATIONID_ATTR] = operationId;
251+
attributes[NCommonAttrs::OPERATIONSIZE_ATTR] = operationSize;
252+
attributes[NCommonAttrs::JOBID_ATTR] = jobId;
253+
attributes[NCommonAttrs::CLUSTERNAME_ATTR] = backendConfig.GetClusterName();
254+
255+
auto nodeIdOpt = (tryNodeId == 0)
256+
? TMaybe<ui32>()
257+
: TMaybe<ui32>(tryNodeId);
258+
auto nodeId = coordinator->GetNodeId(
259+
nodeIdOpt,
260+
{},
261+
static_cast<ui32>(NDqs::ENodeIdLimits::MinWorkerNodeId),
262+
static_cast<ui32>(NDqs::ENodeIdLimits::MaxWorkerNodeId),
263+
attributes);
264+
265+
Y_ABORT_UNLESS(
266+
static_cast<ui32>(NDqs::ENodeIdLimits::MinWorkerNodeId) <= nodeId &&
267+
nodeId < static_cast<ui32>(NDqs::ENodeIdLimits::MaxWorkerNodeId));
268+
269+
Cerr << "My nodeId: " << nodeId << Endl;
270+
271+
Cerr << "Configure porto" << Endl;
272+
if (backendConfig.GetEnablePorto() == "isolate") {
273+
ConfigurePorto(backendConfig, pfOptions.PortoCtlPath);
274+
}
275+
Cerr << "Configure porto done" << Endl;
276+
277+
auto dqSensors = GetSensorsGroupFor(NSensorComponent::kDq);
278+
THolder<NActors::TActorSystemSetup> setup;
279+
TIntrusivePtr<NActors::NLog::TSettings> logSettings;
280+
std::tie(setup, logSettings) = BuildActorSetup(
281+
nodeId,
282+
ip,
283+
ports[1].Addr.GetPort(),
284+
ports[1].Socket->Release(),
285+
{},
286+
dqSensors,
287+
[](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) {
288+
return NYql::NDqs::CreateDynamicNameserver(setup);
289+
},
290+
Nothing(),
291+
backendConfig.GetICSettings());
292+
293+
auto statsCollector = CreateStatsCollector(5, *setup.Get(), dqSensors);
294+
295+
auto actorSystem = MakeHolder<NActors::TActorSystem>(setup, nullptr, logSettings);
296+
297+
actorSystem->Start();
298+
299+
actorSystem->Register(statsCollector);
300+
301+
TVector<TString> hostPortPairs;
302+
for (auto hostPortPair : coordinatorConfig.GetServiceNodeHostPort()) {
303+
hostPortPairs.emplace_back(hostPortPair);
304+
// tests
305+
if (hostPortPair.StartsWith("localhost")) {
306+
rmOptions.ExitOnPingFail = true;
307+
}
308+
}
309+
310+
WorkerConfigurator->ConfigureMetrics(loggerConfig, actorSystem, backendConfig, rmOptions, nodeId);
311+
312+
// rmOptions.MetricsRegistry = CreateMetricsRegistry(dqSensors); // send metrics to gwm, unsupported
313+
auto resolver = coordinator->CreateServiceNodeResolver(actorSystem.Get(), hostPortPairs);
314+
actorSystem->Register(coordinator->CreateServiceNodePinger(resolver, rmOptions, attributes));
315+
316+
NLog::YqlLogger().UpdateProcInfo(jobId + "/" + GetGuidAsString(coordinator->GetRuntimeData()->WorkerId));
317+
318+
// For testing only
319+
THashMap<TString, TString> clusterMapping;
320+
clusterMapping["plato"] = backendConfig.GetClusterName();
321+
322+
auto proxyFactory = NTaskRunnerProxy::CreatePipeFactory(pfOptions);
323+
ITaskRunnerInvokerFactory::TPtr invokerFactory = new TConcurrentInvokerFactory(2*capacity);
324+
#if defined(Y_YQL_DQ_TASK_RUNNER_ACTOR_FACTORY_COMPATIBILITY_1)
325+
auto taskRunnerActorFactory = NTaskRunnerActor::CreateTaskRunnerActorFactory(proxyFactory, invokerFactory, coordinator->GetRuntimeData());
326+
#else
327+
auto taskRunnerActorFactory = NTaskRunnerActor::CreateTaskRunnerActorFactory(proxyFactory, invokerFactory, nullptr, coordinator->GetRuntimeData());
328+
#endif
329+
330+
TLocalWorkerManagerOptions lwmOptions;
331+
lwmOptions.Factory = proxyFactory;
332+
lwmOptions.TaskRunnerActorFactory = taskRunnerActorFactory;
333+
lwmOptions.AsyncIoFactory = WorkerConfigurator->CreateAsyncIoFactory();
334+
lwmOptions.RuntimeData = coordinator->GetRuntimeData();
335+
lwmOptions.TaskRunnerInvokerFactory = invokerFactory;
336+
lwmOptions.ClusterNamesMapping = clusterMapping;
337+
lwmOptions.ComputeActorOwnsCounters = true;
338+
339+
auto resman = NDqs::CreateLocalWorkerManager(lwmOptions);
340+
341+
auto workerManagerActorId = actorSystem->Register(resman);
342+
actorSystem->RegisterLocalService(MakeWorkerManagerActorID(nodeId), workerManagerActorId);
343+
344+
if (backendConfig.HasSpillingSettings()) {
345+
auto spilling = NDq::CreateDqLocalFileSpillingService(
346+
NDq::TFileSpillingServiceConfig {
347+
.Root = backendConfig.GetSpillingSettings().GetRoot(),
348+
.MaxTotalSize = backendConfig.GetSpillingSettings().GetMaxTotalSize(),
349+
.MaxFileSize = backendConfig.GetSpillingSettings().GetMaxFileSize(),
350+
.MaxFilePartSize = backendConfig.GetSpillingSettings().GetMaxFilePartSize(),
351+
.IoThreadPoolWorkersCount = backendConfig.GetSpillingSettings().GetIoThreadPoolWorkersCount(),
352+
.IoThreadPoolQueueSize = backendConfig.GetSpillingSettings().GetIoThreadPoolQueueSize(),
353+
.CleanupOnShutdown = backendConfig.GetSpillingSettings().GetCleanupOnShutdown()
354+
},
355+
MakeIntrusive<NDq::TSpillingCounters>(dqSensors)
356+
);
357+
auto spillingActor = actorSystem->Register(spilling);
358+
actorSystem->RegisterLocalService(NDq::MakeDqLocalFileSpillingServiceID(nodeId), spillingActor);
359+
}
360+
361+
auto endFuture = ShouldContinue.GetFuture();
362+
363+
signal(SIGINT, &OnTerminate);
364+
signal(SIGTERM, &OnTerminate);
365+
signal(SIGPIPE, SIG_IGN);
366+
367+
// run forever
368+
369+
endFuture.Wait();
370+
WorkerConfigurator->OnWorkerFinish();
371+
actorSystem->Stop();
372+
dqSensors->OutputHtml(Cerr);
373+
}
374+
375+
REGISTER_VANILLA_JOB(TWorkerJob);
376+
377+
} // namespace NYql::NDq::NWorker

0 commit comments

Comments
 (0)