Skip to content

Commit 5136f15

Browse files
authored
Merge eeface3 into 5130358
2 parents 5130358 + eeface3 commit 5136f15

File tree

15 files changed

+107
-48
lines changed

15 files changed

+107
-48
lines changed

ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -453,18 +453,26 @@ class TYtFileGateway : public IYtGateway {
453453
auto pos = options.Pos();
454454
try {
455455
TSession* session = GetSession(options);
456+
456457
TSet<TString> uniqueTables;
457-
if (options.Prefix().empty() && options.Suffix().empty()) {
458-
for (auto& x : Services_->GetTablesMapping()) {
459-
TVector<TString> parts;
460-
Split(x.first, ".", parts);
461-
if (parts.size() > 2 && parts[0] == YtProviderName) {
462-
if (!parts[2].StartsWith(TStringBuf("Input"))) {
463-
continue;
464-
}
465-
uniqueTables.insert(parts[2]);
466-
}
458+
const auto fullPrefix = options.Prefix().Empty() ? TString() : (options.Prefix() + '/');
459+
const auto fullSuffix = options.Suffix().Empty() ? TString() : ('/' + options.Suffix());
460+
for (const auto& [tableName, _] : Services_->GetTablesMapping()) {
461+
TVector<TString> parts;
462+
Split(tableName, ".", parts);
463+
if (parts.size() != 3) {
464+
continue;
465+
}
466+
if (parts[0] != YtProviderName || parts[1] != options.Cluster()) {
467+
continue;
467468
}
469+
if (!parts[2].StartsWith(fullPrefix)) {
470+
continue;
471+
}
472+
if (!parts[2].EndsWith(fullSuffix)) {
473+
continue;
474+
}
475+
uniqueTables.insert(parts[2]);
468476
}
469477

470478
TTableRangeResult res;
@@ -484,8 +492,15 @@ class TYtFileGateway : public IYtGateway {
484492
TProgramBuilder pgmBuilder(builder.GetTypeEnvironment(), *Services_->GetFunctionRegistry());
485493

486494
TVector<TRuntimeNode> strings;
487-
for (auto& x: uniqueTables) {
488-
strings.push_back(pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(x));
495+
for (auto& tableName: uniqueTables) {
496+
TStringBuf strippedTableName = tableName;
497+
if (!options.Prefix().Empty()) {
498+
strippedTableName.Skip(options.Prefix().Size() + 1);
499+
}
500+
if (!options.Suffix().Empty()) {
501+
strippedTableName.Chop(1 + options.Suffix().Size());
502+
}
503+
strings.push_back(pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(TString(strippedTableName)));
489504
}
490505

491506
auto inputNode = pgmBuilder.AsList(strings);
@@ -504,7 +519,14 @@ class TYtFileGateway : public IYtGateway {
504519
const auto& value = compGraph->GetValue();
505520
const auto it = value.GetListIterator();
506521
for (NUdf::TUnboxedValue current; it.Next(current);) {
507-
res.Tables.push_back(TCanonizedPath{TString(current.AsStringRef()), Nothing(), {}, Nothing()});
522+
TString tableName = TString(current.AsStringRef());
523+
if (!options.Prefix().Empty()) {
524+
tableName = TString(options.Prefix()).append('/').append(tableName);
525+
}
526+
if (!options.Suffix().Empty()) {
527+
tableName = TString(tableName).append('/').append(options.Suffix());
528+
}
529+
res.Tables.push_back(TCanonizedPath{std::move(tableName), Nothing(), {}, Nothing()});
508530
}
509531
}
510532
else {
@@ -527,17 +549,20 @@ class TYtFileGateway : public IYtGateway {
527549
auto pos = options.Pos();
528550
try {
529551
TSet<TString> uniqueTables;
530-
if (options.Prefix().empty()) {
531-
for (auto& x : Services_->GetTablesMapping()) {
532-
TVector<TString> parts;
533-
Split(x.first, ".", parts);
534-
if (parts.size() > 2 && parts[0] == YtProviderName) {
535-
if (!parts[2].StartsWith(TStringBuf("Input"))) {
536-
continue;
537-
}
538-
uniqueTables.insert(parts[2]);
539-
}
552+
const auto fullPrefix = options.Prefix().Empty() ? "" : (options.Prefix() + '/');
553+
for (const auto& [tableName, _] : Services_->GetTablesMapping()) {
554+
TVector<TString> parts;
555+
Split(tableName, ".", parts);
556+
if (parts.size() != 3) {
557+
continue;
558+
}
559+
if (parts[0] != YtProviderName || parts[1] != options.Cluster()) {
560+
continue;
561+
}
562+
if (!parts[2].StartsWith(fullPrefix)) {
563+
continue;
540564
}
565+
uniqueTables.insert(parts[2]);
541566
}
542567

543568
TVector<TFolderResult::TFolderItem> items;

ydb/library/yql/providers/yt/gateway/file/yql_yt_file_services.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ TString TYtFileServices::GetTablePath(TStringBuf cluster, TStringBuf table, bool
3131
return TString(TFsPath(TmpDir) / TString(table.substr(4)).append(TStringBuf(".tmp")));
3232
}
3333

34-
auto fullTableName = TString(YtProviderName).append('.').append(cluster).append('.').append(table);
34+
const auto tablePrefix = TString(YtProviderName).append('.').append(cluster);
35+
const auto fullTableName = TString(tablePrefix).append('.').append(table);
3536
if (!noLocks) {
3637
auto guard = Guard(Mutex);
3738
if (auto p = Locks.FindPtr(fullTableName)) {
@@ -41,6 +42,9 @@ TString TYtFileServices::GetTablePath(TStringBuf cluster, TStringBuf table, bool
4142
if (auto p = TablesMapping.FindPtr(fullTableName)) {
4243
return *p;
4344
}
45+
if (auto dirPtr = TablesDirMapping.FindPtr(tablePrefix)) {
46+
return TFsPath(*dirPtr) / TString(table).append(".txt");
47+
}
4448
ythrow yexception() << "Table not found: " << cluster << '.' << table;
4549
}
4650

ydb/library/yql/providers/yt/gateway/file/yql_yt_file_services.h

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ class TYtFileServices: public TThrRefBase {
2222
~TYtFileServices();
2323

2424
static TPtr Make(const NKikimr::NMiniKQL::IFunctionRegistry* registry, const THashMap<TString, TString>& mapping = {},
25-
TFileStoragePtr fileStorage = {}, const TString& tmpDir = {}, bool keepTempTables = false)
25+
TFileStoragePtr fileStorage = {}, const TString& tmpDir = {}, bool keepTempTables = false, const THashMap<TString, TString>& dirMapping = {})
2626
{
27-
return new TYtFileServices(registry, mapping, fileStorage, tmpDir.empty() ? GetSystemTempDir() : tmpDir, keepTempTables);
27+
return new TYtFileServices(registry, mapping, fileStorage, tmpDir.empty() ? GetSystemTempDir() : tmpDir, keepTempTables, dirMapping);
2828
}
2929

3030
const NKikimr::NMiniKQL::IFunctionRegistry* GetFunctionRegistry() const {
@@ -55,9 +55,17 @@ class TYtFileServices: public TThrRefBase {
5555
}
5656

5757
private:
58-
TYtFileServices(const NKikimr::NMiniKQL::IFunctionRegistry* registry, const THashMap<TString, TString>& mapping, TFileStoragePtr fileStorage, const TString& tmpDir, bool keepTempTables)
58+
TYtFileServices(
59+
const NKikimr::NMiniKQL::IFunctionRegistry* registry,
60+
const THashMap<TString, TString>& mapping,
61+
TFileStoragePtr fileStorage,
62+
const TString& tmpDir,
63+
bool keepTempTables,
64+
const THashMap<TString, TString>& dirMapping
65+
)
5966
: FunctionRegistry(registry)
6067
, TablesMapping(mapping)
68+
, TablesDirMapping(dirMapping)
6169
, TmpDir(tmpDir)
6270
, KeepTempTables(keepTempTables)
6371
{
@@ -71,6 +79,7 @@ class TYtFileServices: public TThrRefBase {
7179
TFileStoragePtr FileStorage;
7280
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
7381
THashMap<TString, TString> TablesMapping; // [cluster].[name] -> [file path]
82+
THashMap<TString, TString> TablesDirMapping; // [cluster] -> [dir path]
7483
TString TmpDir;
7584
bool KeepTempTables;
7685

ydb/library/yql/tests/s-expressions/suites/ManyInputTables/SplitTableRange.yql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#comment
33
(let mr_source (DataSource 'yt 'plato))
44
(let world (Configure! world (DataSource '"yt" '"$all") '"Attr" '"maxinputtables" '"2"))
5-
(let x (Read! world mr_source (Key '('table (MrTableRange '""))) (Void) '()))
5+
(let x (Read! world mr_source (Key '('table (MrTableRange '"" '"Input" '"Input~"))) (Void) '()))
66
(let world (Left! x))
77
(let tables (Right! x))
88
(let tables (Sort tables '((Bool 'true) (Bool 'true)) (lambda '(item) '((Member item 'key) (Member item 'subkey)))))

ydb/library/yql/tests/s-expressions/suites/Udf/RecordRemapRangeDiffUdf.yql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#comment
33
(let mr_source (DataSource 'yt 'plato))
44
(let x (Read! world mr_source
5-
(Key '('table (MrTableRange '"")))
5+
(Key '('table (MrTableRange '"" '"Input" '"Input~")))
66
'('key 'subkey 'value) '()))
77
(let world (Left! x))
88
(let table (Right! x))

ydb/library/yql/tests/s-expressions/suites/Udf/RecordRemapRangeDiffUdfPartial.yql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#comment
33
(let mr_source (DataSource 'yt 'plato))
44
(let x (Read! world mr_source
5-
(Key '('table (MrTableRange '"")))
5+
(Key '('table (MrTableRange '"" '"Input" '"Input~")))
66
'('key 'subkey 'value) '()))
77
(let world (Left! x))
88
(let table (Right! x))

ydb/library/yql/tests/s-expressions/suites/Udf/RecordRemapRangeSameUdf.yql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#comment
33
(let mr_source (DataSource 'yt 'plato))
44
(let x (Read! world mr_source
5-
(Key '('table (MrTableRange '"")))
5+
(Key '('table (MrTableRange '"" '"Input" '"Input~")))
66
'('key 'subkey 'value) '()))
77
(let world (Left! x))
88
(let table (Right! x))

ydb/library/yql/tests/s-expressions/suites/Udf/RecordRemapWeakRange.yql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(
22
#comment
33
(let mr_source (DataSource 'yt 'plato))
4-
(let x (Read! world mr_source (Key '('table (MrTableRange '""))) '('key) '()))
4+
(let x (Read! world mr_source (Key '('table (MrTableRange '"" '"Input" '"Input~"))) '('key) '()))
55
(let world (Left! x))
66
(let table1 (Right! x))
77
(let res_sink (DataSink 'result))

ydb/library/yql/tests/sql/dq_file/part3/canondata/result.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@
9595
],
9696
"test.test[action-subquery_merge_nested_subquery-default.txt-Debug]": [
9797
{
98-
"checksum": "87afe2e82ba47dd25434edfbf30701f9",
99-
"size": 467,
100-
"uri": "https://{canondata_backend}/1899731/5f48750839c300c592c921895adce61b6bdd10c7/resource.tar.gz#test.test_action-subquery_merge_nested_subquery-default.txt-Debug_/opt.yql_patched"
98+
"checksum": "286067a89e2bb5a177c709e7eefa85de",
99+
"size": 621,
100+
"uri": "https://{canondata_backend}/1881367/d9d29884f2b57bc77cedad27e7b24a2adc89e30d/resource.tar.gz#test.test_action-subquery_merge_nested_subquery-default.txt-Debug_/opt.yql_patched"
101101
}
102102
],
103103
"test.test[action-subquery_merge_nested_subquery-default.txt-Plan]": [

ydb/library/yql/tests/sql/hybrid_file/part2/canondata/result.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@
5757
],
5858
"test.test[action-subquery_merge_nested_subquery-default.txt-Debug]": [
5959
{
60-
"checksum": "d32a3afdd03b6999f6e278e75caf9d06",
61-
"size": 466,
62-
"uri": "https://{canondata_backend}/1936842/51593b2a750dbb036388d012a30fa937edaab5f0/resource.tar.gz#test.test_action-subquery_merge_nested_subquery-default.txt-Debug_/opt.yql_patched"
60+
"checksum": "9b9bd9b5c87918ed2451269a9938d2c4",
61+
"size": 620,
62+
"uri": "https://{canondata_backend}/1942278/8b999765d5b3ee30766f09cb37b290aab0244d6e/resource.tar.gz#test.test_action-subquery_merge_nested_subquery-default.txt-Debug_/opt.yql_patched"
6363
}
6464
],
6565
"test.test[action-subquery_merge_nested_subquery-default.txt-Plan]": [

ydb/library/yql/tests/sql/suites/action/insert_each_from_folder.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use plato;
66
$list = (
77
select aggregate_list(Path) from (
88
select Path from folder("")
9-
where Type = "table"
9+
where Type = "table" and Path like "Input%"
1010
order by Path desc
1111
limit 30
1212
)

ydb/library/yql/tests/sql/suites/action/subquery_merge_nested_world.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use plato;
44

55
DEFINE SUBQUERY $s($_i) AS
6-
$t = SELECT AGGREGATE_LIST(Path) FROM FOLDER('');
6+
$t = SELECT AGGREGATE_LIST(Path) FROM FOLDER('') WHERE Path LIKE "Input%";
77
SELECT
88
*
99
FROM EACH($t);

ydb/library/yql/tests/sql/suites/action/table_content_before_from_folder.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33
use plato;
44
pragma yt.EvaluationTableSizeLimit="1";
55
select * from Input limit 1;
6-
$tables = (select aggregate_list(Path) as dates from folder(""));
6+
$tables = (select aggregate_list(Path) as dates from folder("") where Path like "Input%");
77
select count(*) from each($tables);

ydb/library/yql/tests/sql/yt_native_file/part3/canondata/result.json

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@
7878
],
7979
"test.test[action-subquery_merge_nested_subquery-default.txt-Debug]": [
8080
{
81-
"checksum": "1dc78c0483e5fa3351b051e8e9de8679",
82-
"size": 399,
83-
"uri": "https://{canondata_backend}/1924537/3d2705efadc402a8a106ede4d6132e0dbc7b7516/resource.tar.gz#test.test_action-subquery_merge_nested_subquery-default.txt-Debug_/opt.yql"
81+
"checksum": "77b06488833b67e7d27ef6221c3418e5",
82+
"size": 553,
83+
"uri": "https://{canondata_backend}/1920236/df296391156851095e326102a665e0179365e8d4/resource.tar.gz#test.test_action-subquery_merge_nested_subquery-default.txt-Debug_/opt.yql"
8484
}
8585
],
8686
"test.test[action-subquery_merge_nested_subquery-default.txt-Plan]": [
@@ -92,9 +92,9 @@
9292
],
9393
"test.test[action-subquery_merge_nested_subquery-default.txt-Results]": [
9494
{
95-
"checksum": "ab3678f99412b6d14f44db6b69d41e5f",
96-
"size": 668,
97-
"uri": "https://{canondata_backend}/1871182/5a4449eecae85d31c6f70eca057fe425dfafe11b/resource.tar.gz#test.test_action-subquery_merge_nested_subquery-default.txt-Results_/results.txt"
95+
"checksum": "6a84f2308a09ca7b3f129ecebf10f516",
96+
"size": 747,
97+
"uri": "https://{canondata_backend}/1920236/df296391156851095e326102a665e0179365e8d4/resource.tar.gz#test.test_action-subquery_merge_nested_subquery-default.txt-Results_/results.txt"
9898
}
9999
],
100100
"test.test[agg_phases-min_by-default.txt-Debug]": [

ydb/library/yql/tools/yqlrun/yqlrun.cpp

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "gateway_spec.h"
22

3+
#include <filesystem>
34
#include <ydb/library/yql/tools/yqlrun/http/yql_server.h>
45

56
#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file.h>
@@ -380,7 +381,9 @@ int Main(int argc, const char *argv[])
380381
TOpts opts = TOpts::Default();
381382
TString programFile;
382383
TVector<TString> tablesMappingList;
384+
TVector<TString> tablesDirMappingList;
383385
THashMap<TString, TString> tablesMapping;
386+
THashMap<TString, TString> tablesDirMapping;
384387
TVector<TString> filesMappingList;
385388
TUserDataTable filesMapping;
386389
TVector<TString> urlsMappingList;
@@ -413,6 +416,7 @@ int Main(int argc, const char *argv[])
413416
opts.AddLongOption('s', "sql", "program is SQL query").NoArgument();
414417
opts.AddLongOption("pg", "program has PG syntax").NoArgument();
415418
opts.AddLongOption('t', "table", "table@file").AppendTo(&tablesMappingList);
419+
opts.AddLongOption("tables-dir", "cluster@dir").AppendTo(&tablesDirMappingList);
416420
opts.AddLongOption('C', "cluster", "set cluster to service mapping").RequiredArgument("name@service").Handler(new TStoreMappingFunctor(&clusterMapping));
417421
opts.AddLongOption("ndebug", "should be at first argument, do not show debug info in error output").NoArgument();
418422
opts.AddLongOption("parse-only", "exit after program has been parsed").NoArgument();
@@ -502,6 +506,23 @@ int Main(int argc, const char *argv[])
502506
tablesMapping[tableName] = filePath;
503507
}
504508

509+
for (auto& s : tablesDirMappingList) {
510+
TStringBuf clusterName, dirPath;
511+
TStringBuf(s).Split('@', clusterName, dirPath);
512+
if (clusterName.empty() || dirPath.empty()) {
513+
Cerr << "Incorrect table directory mapping, expected form cluster@dir, e.g. yt.plato@/tmp/tables" << Endl;
514+
return 1;
515+
}
516+
tablesDirMapping[clusterName] = dirPath;
517+
for (const auto& entry : std::filesystem::recursive_directory_iterator(std::string(dirPath))) {
518+
if (entry.is_regular_file() && entry.path().has_extension() && entry.path().extension() == ".txt") {
519+
auto tableName = TString(clusterName) + '.' + std::filesystem::relative(entry.path(), std::string(dirPath));
520+
tableName = tableName.substr(0, tableName.Size() - 4); // remove .txt extension
521+
tablesMapping[tableName] = entry.path().string();
522+
}
523+
}
524+
}
525+
505526
if (hasValidate) {
506527
for (auto& s : filesMappingList) {
507528
TStringBuf fileName, filePath;
@@ -644,7 +665,7 @@ int Main(int argc, const char *argv[])
644665
bool emulateOutputForMultirun = false;
645666
if (hasValidate) {
646667
if (gatewayTypes.contains(YtProviderName) || res.Has("opt-collision")) {
647-
auto yqlNativeServices = NFile::TYtFileServices::Make(funcRegistry.Get(), tablesMapping, fileStorage, tmpDir, res.Has("keep-temp"));
668+
auto yqlNativeServices = NFile::TYtFileServices::Make(funcRegistry.Get(), tablesMapping, fileStorage, tmpDir, res.Has("keep-temp"), tablesDirMapping);
648669
auto ytNativeGateway = CreateYtFileGateway(yqlNativeServices, &emulateOutputForMultirun);
649670
dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway));
650671
}

0 commit comments

Comments
 (0)