Skip to content

Commit 16145e3

Browse files
avevadVadim Averin
and
Vadim Averin
authored
Add support for RANGE and FOLDER in yqlrun (#5407)
Co-authored-by: Vadim Averin <[email protected]>
1 parent 6ff66f6 commit 16145e3

File tree

16 files changed

+117
-66
lines changed

16 files changed

+117
-66
lines changed

ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp

+40-23
Original file line numberDiff line numberDiff line change
@@ -453,18 +453,26 @@ class TYtFileGateway : public IYtGateway {
453453
auto pos = options.Pos();
454454
try {
455455
TSession* session = GetSession(options);
456+
456457
TSet<TString> uniqueTables;
457-
if (options.Prefix().empty() && options.Suffix().empty()) {
458-
for (auto& x : Services_->GetTablesMapping()) {
459-
TVector<TString> parts;
460-
Split(x.first, ".", parts);
461-
if (parts.size() > 2 && parts[0] == YtProviderName) {
462-
if (!parts[2].StartsWith(TStringBuf("Input"))) {
463-
continue;
464-
}
465-
uniqueTables.insert(parts[2]);
466-
}
458+
const auto fullPrefix = options.Prefix().Empty() ? TString() : (options.Prefix() + '/');
459+
const auto fullSuffix = options.Suffix().Empty() ? TString() : ('/' + options.Suffix());
460+
for (const auto& [tableName, _] : Services_->GetTablesMapping()) {
461+
TVector<TString> parts;
462+
Split(tableName, ".", parts);
463+
if (parts.size() != 3) {
464+
continue;
465+
}
466+
if (parts[0] != YtProviderName || parts[1] != options.Cluster()) {
467+
continue;
468+
}
469+
if (!parts[2].StartsWith(fullPrefix)) {
470+
continue;
471+
}
472+
if (!parts[2].EndsWith(fullSuffix)) {
473+
continue;
467474
}
475+
uniqueTables.insert(parts[2]);
468476
}
469477

470478
TTableRangeResult res;
@@ -484,8 +492,11 @@ class TYtFileGateway : public IYtGateway {
484492
TProgramBuilder pgmBuilder(builder.GetTypeEnvironment(), *Services_->GetFunctionRegistry());
485493

486494
TVector<TRuntimeNode> strings;
487-
for (auto& x: uniqueTables) {
488-
strings.push_back(pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(x));
495+
for (auto& tableName: uniqueTables) {
496+
auto stripped = TStringBuf(tableName);
497+
stripped.SkipPrefix(fullPrefix);
498+
stripped.ChopSuffix(fullSuffix);
499+
strings.push_back(pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(TString(stripped)));
489500
}
490501

491502
auto inputNode = pgmBuilder.AsList(strings);
@@ -504,7 +515,10 @@ class TYtFileGateway : public IYtGateway {
504515
const auto& value = compGraph->GetValue();
505516
const auto it = value.GetListIterator();
506517
for (NUdf::TUnboxedValue current; it.Next(current);) {
507-
res.Tables.push_back(TCanonizedPath{TString(current.AsStringRef()), Nothing(), {}, Nothing()});
518+
TString tableName = TString(current.AsStringRef());
519+
tableName.prepend(fullPrefix);
520+
tableName.append(fullSuffix);
521+
res.Tables.push_back(TCanonizedPath{std::move(tableName), Nothing(), {}, Nothing()});
508522
}
509523
}
510524
else {
@@ -527,17 +541,20 @@ class TYtFileGateway : public IYtGateway {
527541
auto pos = options.Pos();
528542
try {
529543
TSet<TString> uniqueTables;
530-
if (options.Prefix().empty()) {
531-
for (auto& x : Services_->GetTablesMapping()) {
532-
TVector<TString> parts;
533-
Split(x.first, ".", parts);
534-
if (parts.size() > 2 && parts[0] == YtProviderName) {
535-
if (!parts[2].StartsWith(TStringBuf("Input"))) {
536-
continue;
537-
}
538-
uniqueTables.insert(parts[2]);
539-
}
544+
const auto fullPrefix = options.Prefix().Empty() ? "" : (options.Prefix() + '/');
545+
for (const auto& [tableName, _] : Services_->GetTablesMapping()) {
546+
TVector<TString> parts;
547+
Split(tableName, ".", parts);
548+
if (parts.size() != 3) {
549+
continue;
550+
}
551+
if (parts[0] != YtProviderName || parts[1] != options.Cluster()) {
552+
continue;
553+
}
554+
if (!parts[2].StartsWith(fullPrefix)) {
555+
continue;
540556
}
557+
uniqueTables.insert(parts[2]);
541558
}
542559

543560
TVector<TFolderResult::TFolderItem> items;

ydb/library/yql/providers/yt/gateway/file/yql_yt_file_services.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ TString TYtFileServices::GetTablePath(TStringBuf cluster, TStringBuf table, bool
3131
return TString(TFsPath(TmpDir) / TString(table.substr(4)).append(TStringBuf(".tmp")));
3232
}
3333

34-
auto fullTableName = TString(YtProviderName).append('.').append(cluster).append('.').append(table);
34+
const auto tablePrefix = TString(YtProviderName).append('.').append(cluster);
35+
const auto fullTableName = TString(tablePrefix).append('.').append(table);
3536
if (!noLocks) {
3637
auto guard = Guard(Mutex);
3738
if (auto p = Locks.FindPtr(fullTableName)) {
@@ -41,6 +42,9 @@ TString TYtFileServices::GetTablePath(TStringBuf cluster, TStringBuf table, bool
4142
if (auto p = TablesMapping.FindPtr(fullTableName)) {
4243
return *p;
4344
}
45+
if (auto dirPtr = TablesDirMapping.FindPtr(tablePrefix)) {
46+
return TFsPath(*dirPtr) / TString(table).append(".txt");
47+
}
4448
ythrow yexception() << "Table not found: " << cluster << '.' << table;
4549
}
4650

ydb/library/yql/providers/yt/gateway/file/yql_yt_file_services.h

+12-3
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ class TYtFileServices: public TThrRefBase {
2222
~TYtFileServices();
2323

2424
static TPtr Make(const NKikimr::NMiniKQL::IFunctionRegistry* registry, const THashMap<TString, TString>& mapping = {},
25-
TFileStoragePtr fileStorage = {}, const TString& tmpDir = {}, bool keepTempTables = false)
25+
TFileStoragePtr fileStorage = {}, const TString& tmpDir = {}, bool keepTempTables = false, const THashMap<TString, TString>& dirMapping = {})
2626
{
27-
return new TYtFileServices(registry, mapping, fileStorage, tmpDir.empty() ? GetSystemTempDir() : tmpDir, keepTempTables);
27+
return new TYtFileServices(registry, mapping, fileStorage, tmpDir.empty() ? GetSystemTempDir() : tmpDir, keepTempTables, dirMapping);
2828
}
2929

3030
const NKikimr::NMiniKQL::IFunctionRegistry* GetFunctionRegistry() const {
@@ -55,9 +55,17 @@ class TYtFileServices: public TThrRefBase {
5555
}
5656

5757
private:
58-
TYtFileServices(const NKikimr::NMiniKQL::IFunctionRegistry* registry, const THashMap<TString, TString>& mapping, TFileStoragePtr fileStorage, const TString& tmpDir, bool keepTempTables)
58+
TYtFileServices(
59+
const NKikimr::NMiniKQL::IFunctionRegistry* registry,
60+
const THashMap<TString, TString>& mapping,
61+
TFileStoragePtr fileStorage,
62+
const TString& tmpDir,
63+
bool keepTempTables,
64+
const THashMap<TString, TString>& dirMapping
65+
)
5966
: FunctionRegistry(registry)
6067
, TablesMapping(mapping)
68+
, TablesDirMapping(dirMapping)
6169
, TmpDir(tmpDir)
6270
, KeepTempTables(keepTempTables)
6371
{
@@ -71,6 +79,7 @@ class TYtFileServices: public TThrRefBase {
7179
TFileStoragePtr FileStorage;
7280
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
7381
THashMap<TString, TString> TablesMapping; // [cluster].[name] -> [file path]
82+
THashMap<TString, TString> TablesDirMapping; // [cluster] -> [dir path]
7483
TString TmpDir;
7584
bool KeepTempTables;
7685

ydb/library/yql/tests/s-expressions/suites/ManyInputTables/SplitTableRange.yql

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#comment
33
(let mr_source (DataSource 'yt 'plato))
44
(let world (Configure! world (DataSource '"yt" '"$all") '"Attr" '"maxinputtables" '"2"))
5-
(let x (Read! world mr_source (Key '('table (MrTableRange '""))) (Void) '()))
5+
(let x (Read! world mr_source (Key '('table (MrTableRange '"" (lambda '($i) (And (>= $i (String '"Input1")) (<= $i (String '"Input~")))) '""))) (Void) '()))
66
(let world (Left! x))
77
(let tables (Right! x))
88
(let tables (Sort tables '((Bool 'true) (Bool 'true)) (lambda '(item) '((Member item 'key) (Member item 'subkey)))))

ydb/library/yql/tests/s-expressions/suites/Udf/RecordRemapRangeDiffUdf.yql

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#comment
33
(let mr_source (DataSource 'yt 'plato))
44
(let x (Read! world mr_source
5-
(Key '('table (MrTableRange '"")))
5+
(Key '('table (MrTableRange '"" (lambda '($i) (And (>= $i (String '"Input1")) (<= $i (String '"Input~")))) '"")))
66
'('key 'subkey 'value) '()))
77
(let world (Left! x))
88
(let table (Right! x))

ydb/library/yql/tests/s-expressions/suites/Udf/RecordRemapRangeDiffUdfPartial.yql

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#comment
33
(let mr_source (DataSource 'yt 'plato))
44
(let x (Read! world mr_source
5-
(Key '('table (MrTableRange '"")))
5+
(Key '('table (MrTableRange '"" (lambda '($i) (And (>= $i (String '"Input1")) (<= $i (String '"Input~")))) '"")))
66
'('key 'subkey 'value) '()))
77
(let world (Left! x))
88
(let table (Right! x))

ydb/library/yql/tests/s-expressions/suites/Udf/RecordRemapRangeSameUdf.yql

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#comment
33
(let mr_source (DataSource 'yt 'plato))
44
(let x (Read! world mr_source
5-
(Key '('table (MrTableRange '"")))
5+
(Key '('table (MrTableRange '"" (lambda '($i) (And (>= $i (String '"Input1")) (<= $i (String '"Input~")))) '"")))
66
'('key 'subkey 'value) '()))
77
(let world (Left! x))
88
(let table (Right! x))

ydb/library/yql/tests/s-expressions/suites/Udf/RecordRemapWeakRange.yql

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(
22
#comment
33
(let mr_source (DataSource 'yt 'plato))
4-
(let x (Read! world mr_source (Key '('table (MrTableRange '""))) '('key) '()))
4+
(let x (Read! world mr_source (Key '('table (MrTableRange '"" (lambda '($i) (And (>= $i (String '"Input")) (<= $i (String '"Input~")))) '""))) '('key) '()))
55
(let world (Left! x))
66
(let table1 (Right! x))
77
(let res_sink (DataSink 'result))

ydb/library/yql/tests/sql/dq_file/part3/canondata/result.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@
9595
],
9696
"test.test[action-subquery_merge_nested_subquery-default.txt-Debug]": [
9797
{
98-
"checksum": "87afe2e82ba47dd25434edfbf30701f9",
99-
"size": 467,
100-
"uri": "https://{canondata_backend}/1899731/5f48750839c300c592c921895adce61b6bdd10c7/resource.tar.gz#test.test_action-subquery_merge_nested_subquery-default.txt-Debug_/opt.yql_patched"
98+
"checksum": "286067a89e2bb5a177c709e7eefa85de",
99+
"size": 621,
100+
"uri": "https://{canondata_backend}/1881367/d9d29884f2b57bc77cedad27e7b24a2adc89e30d/resource.tar.gz#test.test_action-subquery_merge_nested_subquery-default.txt-Debug_/opt.yql_patched"
101101
}
102102
],
103103
"test.test[action-subquery_merge_nested_subquery-default.txt-Plan]": [

ydb/library/yql/tests/sql/hybrid_file/part2/canondata/result.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@
5757
],
5858
"test.test[action-subquery_merge_nested_subquery-default.txt-Debug]": [
5959
{
60-
"checksum": "d32a3afdd03b6999f6e278e75caf9d06",
61-
"size": 466,
62-
"uri": "https://{canondata_backend}/1936842/51593b2a750dbb036388d012a30fa937edaab5f0/resource.tar.gz#test.test_action-subquery_merge_nested_subquery-default.txt-Debug_/opt.yql_patched"
60+
"checksum": "9b9bd9b5c87918ed2451269a9938d2c4",
61+
"size": 620,
62+
"uri": "https://{canondata_backend}/1942278/8b999765d5b3ee30766f09cb37b290aab0244d6e/resource.tar.gz#test.test_action-subquery_merge_nested_subquery-default.txt-Debug_/opt.yql_patched"
6363
}
6464
],
6565
"test.test[action-subquery_merge_nested_subquery-default.txt-Plan]": [

ydb/library/yql/tests/sql/sql2yql/canondata/result.json

+18-18
Original file line numberDiff line numberDiff line change
@@ -456,9 +456,9 @@
456456
],
457457
"test_sql2yql.test[action-insert_each_from_folder]": [
458458
{
459-
"checksum": "dfda31784ae59d009f003ee44ca0f4a2",
460-
"size": 3140,
461-
"uri": "https://{canondata_backend}/1937027/973c239492ba32946806ddc66cf0af4b38c06ae8/resource.tar.gz#test_sql2yql.test_action-insert_each_from_folder_/sql.yql"
459+
"checksum": "7c5460adea49ac910a8da3e167784434",
460+
"size": 3446,
461+
"uri": "https://{canondata_backend}/1920236/49d24809be769996fac3586d53f86d8c72bdeb78/resource.tar.gz#test_sql2yql.test_action-insert_each_from_folder_/sql.yql"
462462
}
463463
],
464464
"test_sql2yql.test[action-lambda_arg_count]": [
@@ -694,9 +694,9 @@
694694
],
695695
"test_sql2yql.test[action-subquery_merge_nested_world]": [
696696
{
697-
"checksum": "0eba6f388da133f0f09d55d25be70567",
698-
"size": 3226,
699-
"uri": "https://{canondata_backend}/1784117/d56ae82ad9d30397a41490647be1bd2124718f98/resource.tar.gz#test_sql2yql.test_action-subquery_merge_nested_world_/sql.yql"
697+
"checksum": "54c3a4cecc1ea4b1bf1fc7ed4b67d9a9",
698+
"size": 3613,
699+
"uri": "https://{canondata_backend}/1920236/49d24809be769996fac3586d53f86d8c72bdeb78/resource.tar.gz#test_sql2yql.test_action-subquery_merge_nested_world_/sql.yql"
700700
}
701701
],
702702
"test_sql2yql.test[action-subquery_opt_args]": [
@@ -729,9 +729,9 @@
729729
],
730730
"test_sql2yql.test[action-table_content_before_from_folder]": [
731731
{
732-
"checksum": "e4fed412d365d928d7b9d1c18af8144d",
733-
"size": 3452,
734-
"uri": "https://{canondata_backend}/1784117/d56ae82ad9d30397a41490647be1bd2124718f98/resource.tar.gz#test_sql2yql.test_action-table_content_before_from_folder_/sql.yql"
732+
"checksum": "74563a298fcb8f48e9090ad7d5d29add",
733+
"size": 3831,
734+
"uri": "https://{canondata_backend}/1920236/49d24809be769996fac3586d53f86d8c72bdeb78/resource.tar.gz#test_sql2yql.test_action-table_content_before_from_folder_/sql.yql"
735735
}
736736
],
737737
"test_sql2yql.test[agg_apply-avg_const_interval]": [
@@ -19447,9 +19447,9 @@
1944719447
],
1944819448
"test_sql_format.test[action-insert_each_from_folder]": [
1944919449
{
19450-
"checksum": "42fbd294c401e7a47b4319533de45aee",
19451-
"size": 366,
19452-
"uri": "https://{canondata_backend}/1880306/64654158d6bfb1289c66c626a8162239289559d0/resource.tar.gz#test_sql_format.test_action-insert_each_from_folder_/formatted.sql"
19450+
"checksum": "c51ad51888f8e943d2edc55f296bf202",
19451+
"size": 389,
19452+
"uri": "https://{canondata_backend}/1925821/6c2f883a1c33f02b8bcef287229b2b73dd762cff/resource.tar.gz#test_sql_format.test_action-insert_each_from_folder_/formatted.sql"
1945319453
}
1945419454
],
1945519455
"test_sql_format.test[action-lambda_arg_count]": [
@@ -19685,9 +19685,9 @@
1968519685
],
1968619686
"test_sql_format.test[action-subquery_merge_nested_world]": [
1968719687
{
19688-
"checksum": "02cdaf09ad3c7c45b643f6e37cd099f1",
19689-
"size": 292,
19690-
"uri": "https://{canondata_backend}/1880306/64654158d6bfb1289c66c626a8162239289559d0/resource.tar.gz#test_sql_format.test_action-subquery_merge_nested_world_/formatted.sql"
19688+
"checksum": "8c0ad08eb1ba55aa5f7c977d6029a8c7",
19689+
"size": 325,
19690+
"uri": "https://{canondata_backend}/1925821/6c2f883a1c33f02b8bcef287229b2b73dd762cff/resource.tar.gz#test_sql_format.test_action-subquery_merge_nested_world_/formatted.sql"
1969119691
}
1969219692
],
1969319693
"test_sql_format.test[action-subquery_opt_args]": [
@@ -19720,9 +19720,9 @@
1972019720
],
1972119721
"test_sql_format.test[action-table_content_before_from_folder]": [
1972219722
{
19723-
"checksum": "55901f202fac5c4226ff35047872c116",
19724-
"size": 258,
19725-
"uri": "https://{canondata_backend}/1880306/64654158d6bfb1289c66c626a8162239289559d0/resource.tar.gz#test_sql_format.test_action-table_content_before_from_folder_/formatted.sql"
19723+
"checksum": "45dce176f9281a8754c8da9514085c3f",
19724+
"size": 287,
19725+
"uri": "https://{canondata_backend}/1925821/6c2f883a1c33f02b8bcef287229b2b73dd762cff/resource.tar.gz#test_sql_format.test_action-table_content_before_from_folder_/formatted.sql"
1972619726
}
1972719727
],
1972819728
"test_sql_format.test[agg_apply-avg_const_interval]": [

ydb/library/yql/tests/sql/suites/action/insert_each_from_folder.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use plato;
66
$list = (
77
select aggregate_list(Path) from (
88
select Path from folder("")
9-
where Type = "table"
9+
where Type = "table" and Path like "Input%"
1010
order by Path desc
1111
limit 30
1212
)

ydb/library/yql/tests/sql/suites/action/subquery_merge_nested_world.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use plato;
44

55
DEFINE SUBQUERY $s($_i) AS
6-
$t = SELECT AGGREGATE_LIST(Path) FROM FOLDER('');
6+
$t = SELECT AGGREGATE_LIST(Path) FROM FOLDER('') WHERE Path LIKE "Input%";
77
SELECT
88
*
99
FROM EACH($t);

ydb/library/yql/tests/sql/suites/action/table_content_before_from_folder.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33
use plato;
44
pragma yt.EvaluationTableSizeLimit="1";
55
select * from Input limit 1;
6-
$tables = (select aggregate_list(Path) as dates from folder(""));
6+
$tables = (select aggregate_list(Path) as dates from folder("") where Path like "Input%");
77
select count(*) from each($tables);

ydb/library/yql/tests/sql/yt_native_file/part3/canondata/result.json

+6-6
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@
7878
],
7979
"test.test[action-subquery_merge_nested_subquery-default.txt-Debug]": [
8080
{
81-
"checksum": "1dc78c0483e5fa3351b051e8e9de8679",
82-
"size": 399,
83-
"uri": "https://{canondata_backend}/1924537/3d2705efadc402a8a106ede4d6132e0dbc7b7516/resource.tar.gz#test.test_action-subquery_merge_nested_subquery-default.txt-Debug_/opt.yql"
81+
"checksum": "77b06488833b67e7d27ef6221c3418e5",
82+
"size": 553,
83+
"uri": "https://{canondata_backend}/1920236/df296391156851095e326102a665e0179365e8d4/resource.tar.gz#test.test_action-subquery_merge_nested_subquery-default.txt-Debug_/opt.yql"
8484
}
8585
],
8686
"test.test[action-subquery_merge_nested_subquery-default.txt-Plan]": [
@@ -92,9 +92,9 @@
9292
],
9393
"test.test[action-subquery_merge_nested_subquery-default.txt-Results]": [
9494
{
95-
"checksum": "ab3678f99412b6d14f44db6b69d41e5f",
96-
"size": 668,
97-
"uri": "https://{canondata_backend}/1871182/5a4449eecae85d31c6f70eca057fe425dfafe11b/resource.tar.gz#test.test_action-subquery_merge_nested_subquery-default.txt-Results_/results.txt"
95+
"checksum": "6a84f2308a09ca7b3f129ecebf10f516",
96+
"size": 747,
97+
"uri": "https://{canondata_backend}/1920236/df296391156851095e326102a665e0179365e8d4/resource.tar.gz#test.test_action-subquery_merge_nested_subquery-default.txt-Results_/results.txt"
9898
}
9999
],
100100
"test.test[agg_phases-min_by-default.txt-Debug]": [

0 commit comments

Comments
 (0)