Skip to content

Commit d82c9ad

Browse files
authored
fix saveload logic & support loading custom udfs (#6144)
1 parent a48b189 commit d82c9ad

File tree

4 files changed

+63
-9
lines changed

4 files changed

+63
-9
lines changed

ydb/tools/query_replay_yt/main.cpp

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,28 @@
1313

1414
#include <yt/cpp/mapreduce/interface/logging/logger.h>
1515

16+
#include <util/string/split.h>
17+
1618
using namespace NActors;
1719

20+
TVector<std::pair<TString, TString>> GetJobFiles(TVector<TString> udfs) {
21+
TVector<std::pair<TString, TString>> result;
22+
23+
for(const TString& udf: udfs) {
24+
TVector<TString> splitResult;
25+
Split(udf.data(), "/", splitResult);
26+
while(!splitResult.empty() && splitResult.back().empty()) {
27+
splitResult.pop_back();
28+
}
29+
30+
Y_ENSURE(!splitResult.empty());
31+
32+
result.push_back(std::make_pair(udf, splitResult.back()));
33+
}
34+
35+
return result;
36+
}
37+
1838
class TQueryReplayMapper
1939
: public NYT::IMapper<NYT::TTableReader<NYT::TNode>, NYT::TTableWriter<NYT::TNode>>
2040
{
@@ -25,7 +45,8 @@ class TQueryReplayMapper
2545
TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry;
2646
TIntrusivePtr<NKikimr::NKqp::TModuleResolverState> ModuleResolverState;
2747

28-
TQueryReplayConfig Config;
48+
TVector<TString> UdfFiles;
49+
ui32 ActorSystemThreadsCount = 5;
2950

3051
TString GetFailReason(const TQueryReplayEvents::TCheckQueryPlanStatus& status) {
3152
switch (status) {
@@ -58,16 +79,30 @@ class TQueryReplayMapper
5879

5980
public:
6081
TQueryReplayMapper() = default;
61-
TQueryReplayMapper(const TQueryReplayConfig& config) : Config(config) {
62-
}
82+
83+
Y_SAVELOAD_JOB(UdfFiles, ActorSystemThreadsCount);
84+
85+
TQueryReplayMapper(TVector<TString> udfFiles, ui32 actorSystemThreadsCount)
86+
: UdfFiles(udfFiles)
87+
, ActorSystemThreadsCount(actorSystemThreadsCount)
88+
{}
6389

6490
void Start(NYT::TTableWriter<NYT::TNode>*) override {
6591
TypeRegistry.Reset(new NKikimr::NScheme::TKikimrTypeRegistry());
6692
FunctionRegistry.Reset(NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone());
6793
NKikimr::NMiniKQL::FillStaticModules(*FunctionRegistry);
94+
NKikimr::NMiniKQL::TUdfModuleRemappings remappings;
95+
THashSet<TString> usedUdfPaths;
96+
97+
for(const auto& [_, udfPath]: GetJobFiles(UdfFiles)) {
98+
if (usedUdfPaths.insert(udfPath).second) {
99+
FunctionRegistry->LoadUdfs(udfPath, remappings, 0);
100+
}
101+
}
102+
68103
AppData.Reset(new NKikimr::TAppData(0, 0, 0, 0, {}, TypeRegistry.Get(), FunctionRegistry.Get(), nullptr, nullptr));
69104
AppData->Counters = MakeIntrusive<NMonitoring::TDynamicCounters>(new NMonitoring::TDynamicCounters());
70-
auto setup = BuildActorSystemSetup(Config.ActorSystemThreadsCount);
105+
auto setup = BuildActorSystemSetup(ActorSystemThreadsCount);
71106
ActorSystem.Reset(new TActorSystem(setup, AppData.Get()));
72107
ActorSystem->Start();
73108
ActorSystem->Register(NKikimr::NKqp::CreateKqpResourceManagerActor({}, nullptr));
@@ -164,9 +199,17 @@ int main(int argc, const char** argv) {
164199
NYT::TMapOperationSpec spec;
165200
spec.AddInput<NYT::TNode>(config.SrcPath);
166201
spec.AddOutput<NYT::TNode>(NYT::TRichYPath(config.DstPath).Schema(OutputSchema()));
167-
spec.MapperSpec(NYT::TUserJobSpec().MemoryLimit(5_GB));
168202

169-
client->Map(spec, new TQueryReplayMapper(config));
203+
auto userJobSpec = NYT::TUserJobSpec();
204+
userJobSpec.MemoryLimit(1_GB);
205+
206+
for(const auto& [udf, udfInJob]: GetJobFiles(config.UdfFiles)) {
207+
userJobSpec.AddLocalFile(udf, NYT::TAddLocalFileOptions().PathInJob(udfInJob));
208+
}
209+
210+
spec.MapperSpec(userJobSpec);
211+
212+
client->Map(spec, new TQueryReplayMapper(config.UdfFiles, config.ActorSystemThreadsCount));
170213

171214
return EXIT_SUCCESS;
172215
}

ydb/tools/query_replay_yt/query_compiler.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ class TReplayCompileActor: public TActorBootstrapped<TReplayCompileActor> {
221221
, Config(MakeIntrusive<TKikimrConfiguration>())
222222
, FunctionRegistry(functionRegistry)
223223
{
224+
Config->EnableKqpScanQueryStreamLookup = true;
224225
}
225226

226227
void Bootstrap() {
@@ -278,8 +279,16 @@ class TReplayCompileActor: public TActorBootstrapped<TReplayCompileActor> {
278279
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
279280
AsyncCompileResult = KqpHost->PrepareGenericScript(Query->Text, prepareSettings);
280281
break;
281-
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
282-
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY:
282+
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: {
283+
prepareSettings.ConcurrentResults = false;
284+
AsyncCompileResult = KqpHost->PrepareGenericQuery(Query->Text, prepareSettings, nullptr);
285+
break;
286+
}
287+
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY: {
288+
AsyncCompileResult = KqpHost->PrepareGenericQuery(Query->Text, prepareSettings, nullptr);
289+
break;
290+
}
291+
283292
default:
284293
YQL_ENSURE(false, "Unexpected query type: " << Query->Settings.QueryType);
285294
}

ydb/tools/query_replay_yt/query_replay.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ void TQueryReplayConfig::ParseConfig(int argc, const char** argv) {
1717
opts.AddLongOption("cluster", "YT cluster").StoreResult(&Cluster).Required();
1818
opts.AddLongOption("src-path", "Source table path").StoreResult(&SrcPath).Required();
1919
opts.AddLongOption("dst-path", "Target table path").StoreResult(&DstPath).Required();
20-
opts.AddLongOption("threads", "Number of ActorSystem threads").StoreResult(&DstPath);
20+
opts.AddLongOption("threads", "Number of ActorSystem threads").StoreResult(&ActorSystemThreadsCount);
21+
opts.AddLongOption("udf-file", "UDFS to load").AppendTo(&UdfFiles);
2122

2223
NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv);
2324
}

ydb/tools/query_replay_yt/query_replay.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ struct TQueryReplayConfig {
1818
TString SrcPath;
1919
TString DstPath;
2020
ui32 ActorSystemThreadsCount = 5;
21+
TVector<TString> UdfFiles;
2122

2223
void ParseConfig(int argc, const char** argv);
2324
};

0 commit comments

Comments
 (0)