Skip to content

Commit cb2aea6

Browse files
authored
Support of mrjob in dqrun (#4243)
1 parent b96a902 commit cb2aea6

File tree

2 files changed

+38
-0
lines changed

2 files changed

+38
-0
lines changed

ydb/library/yql/tools/dqrun/dqrun.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
#include <library/cpp/getopt/last_getopt.h>
8585
#include <library/cpp/logger/priority.h>
8686
#include <library/cpp/protobuf/util/pb_io.h>
87+
#include <library/cpp/digest/md5/md5.h>
8788
#include <ydb/library/actors/http/http_proxy.h>
8889

8990
#include <util/generic/string.h>
@@ -164,6 +165,28 @@ void ReadGatewaysConfig(const TString& configFile, TGatewaysConfig* config, THas
164165
}
165166
}
166167

168+
void PatchGatewaysConfig(TGatewaysConfig* config, const TString& mrJobBin, const TString& mrJobUdfsDir,
169+
size_t numThreads, bool keepTemp)
170+
{
171+
auto ytConfig = config->MutableYt();
172+
ytConfig->SetGatewayThreads(numThreads);
173+
if (mrJobBin.empty()) {
174+
ytConfig->ClearMrJobBin();
175+
} else {
176+
ytConfig->SetMrJobBin(mrJobBin);
177+
ytConfig->SetMrJobBinMd5(MD5::File(mrJobBin));
178+
}
179+
180+
if (mrJobUdfsDir.empty()) {
181+
ytConfig->ClearMrJobUdfsDir();
182+
} else {
183+
ytConfig->SetMrJobUdfsDir(mrJobUdfsDir);
184+
}
185+
auto attr = ytConfig->MutableDefaultSettings()->Add();
186+
attr->SetName("KeepTempTables");
187+
attr->SetValue(keepTemp ? "yes" : "no");
188+
}
189+
167190
TFileStoragePtr CreateFS(const TString& paramsFile, const TString& defYtServer) {
168191
TFileStorageConfig params;
169192
LoadFsConfigFromFile(paramsFile ? paramsFile : "../../../../../yql/cfg/local/fs.conf", params);
@@ -454,6 +477,9 @@ int RunMain(int argc, const char* argv[])
454477
int verbosity = 3;
455478
bool showLog = false;
456479
bool emulateYt = false;
480+
TString mrJobBin;
481+
TString mrJobUdfsDir;
482+
size_t numYtThreads = 1;
457483
TString token = GetEnv("YQL_TOKEN");
458484
if (!token) {
459485
TString home = GetEnv("HOME");
@@ -550,6 +576,16 @@ int RunMain(int argc, const char* argv[])
550576
.Optional()
551577
.NoArgument()
552578
.SetFlag(&udfResolverFilterSyscalls);
579+
opts.AddLongOption("mrjob-bin", "Path to mrjob binary")
580+
.Optional()
581+
.StoreResult(&mrJobBin);
582+
opts.AddLongOption("mrjob-udfsdir", "Path to udfs for mr jobs")
583+
.Optional()
584+
.StoreResult(&mrJobUdfsDir);
585+
opts.AddLongOption("yt-threads", "YT gateway threads")
586+
.Optional()
587+
.RequiredArgument("COUNT")
588+
.StoreResult(&numYtThreads);
553589
opts.AddLongOption('v', "verbosity", "Log verbosity level")
554590
.Optional()
555591
.RequiredArgument("LEVEL")
@@ -737,6 +773,7 @@ int RunMain(int argc, const char* argv[])
737773

738774
TGatewaysConfig gatewaysConfig;
739775
ReadGatewaysConfig(gatewaysCfgFile, &gatewaysConfig, sqlFlags);
776+
PatchGatewaysConfig(&gatewaysConfig, mrJobBin, mrJobUdfsDir, numYtThreads, res.Has("keep-temp"));
740777
if (runOptions.AnalyzeQuery) {
741778
auto* setting = gatewaysConfig.MutableDq()->AddDefaultSettings();
742779
setting->SetName("AnalyzeQuery");

ydb/library/yql/tools/dqrun/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ ENDIF()
2525
library/cpp/logger
2626
library/cpp/resource
2727
library/cpp/yson
28+
library/cpp/digest/md5
2829
yt/cpp/mapreduce/interface
2930
ydb/library/yql/sql/pg
3031
ydb/library/yql/core/facade

0 commit comments

Comments
 (0)