Skip to content

Add support for RANGE and FOLDER in yqlrun #5407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 46 additions & 23 deletions ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,18 +453,24 @@ class TYtFileGateway : public IYtGateway {
auto pos = options.Pos();
try {
TSession* session = GetSession(options);

TSet<TString> uniqueTables;
if (options.Prefix().empty() && options.Suffix().empty()) {
for (auto& x : Services_->GetTablesMapping()) {
TVector<TString> parts;
Split(x.first, ".", parts);
if (parts.size() > 2 && parts[0] == YtProviderName) {
if (!parts[2].StartsWith(TStringBuf("Input"))) {
continue;
}
uniqueTables.insert(parts[2]);
}
for (const auto& [tableName, _] : Services_->GetTablesMapping()) {
TVector<TString> parts;
Split(tableName, ".", parts);
if (parts.size() != 3) {
continue;
}
if (parts[0] != YtProviderName || parts[1] != options.Cluster()) {
continue;
}
if (!options.Prefix().Empty() && !parts[2].StartsWith(options.Prefix() + '/')) {
continue;
}
if (!options.Suffix().Empty() && !parts[2].EndsWith('/' + options.Suffix())) {
continue;
}
uniqueTables.insert(parts[2]);
}

TTableRangeResult res;
Expand All @@ -484,8 +490,16 @@ class TYtFileGateway : public IYtGateway {
TProgramBuilder pgmBuilder(builder.GetTypeEnvironment(), *Services_->GetFunctionRegistry());

TVector<TRuntimeNode> strings;
for (auto& x: uniqueTables) {
strings.push_back(pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(x));
for (auto& tableName: uniqueTables) {
size_t beg = 0, end = 0;
if (!options.Prefix().Empty()) {
beg = options.Prefix().Size() + 1;
}
if (!options.Suffix().Empty()) {
end = tableName.Size() - (1 + options.Suffix().Size());
}
auto strippedTableName = tableName.substr(beg, end - beg);
strings.push_back(pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(strippedTableName));
}

auto inputNode = pgmBuilder.AsList(strings);
Expand All @@ -504,7 +518,14 @@ class TYtFileGateway : public IYtGateway {
const auto& value = compGraph->GetValue();
const auto it = value.GetListIterator();
for (NUdf::TUnboxedValue current; it.Next(current);) {
res.Tables.push_back(TCanonizedPath{TString(current.AsStringRef()), Nothing(), {}, Nothing()});
TString tableName = TString(current.AsStringRef());
if (!options.Prefix().Empty()) {
tableName = options.Prefix() + '/' + tableName;
}
if (!options.Suffix().Empty()) {
tableName = tableName + '/' + options.Suffix();
}
res.Tables.push_back(TCanonizedPath{tableName, Nothing(), {}, Nothing()});
}
}
else {
Expand All @@ -527,17 +548,19 @@ class TYtFileGateway : public IYtGateway {
auto pos = options.Pos();
try {
TSet<TString> uniqueTables;
if (options.Prefix().empty()) {
for (auto& x : Services_->GetTablesMapping()) {
TVector<TString> parts;
Split(x.first, ".", parts);
if (parts.size() > 2 && parts[0] == YtProviderName) {
if (!parts[2].StartsWith(TStringBuf("Input"))) {
continue;
}
uniqueTables.insert(parts[2]);
}
for (const auto& [tableName, _] : Services_->GetTablesMapping()) {
TVector<TString> parts;
Split(tableName, ".", parts);
if (parts.size() != 3) {
continue;
}
if (parts[0] != YtProviderName || parts[1] != options.Cluster()) {
continue;
}
if (!options.Prefix().Empty() && !parts[2].StartsWith(options.Prefix() + '/')) {
continue;
}
uniqueTables.insert(parts[2]);
}

TVector<TFolderResult::TFolderItem> items;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ TString TYtFileServices::GetTablePath(TStringBuf cluster, TStringBuf table, bool
return TString(TFsPath(TmpDir) / TString(table.substr(4)).append(TStringBuf(".tmp")));
}

auto tablePrefix = TString(YtProviderName).append('.').append(cluster);
auto fullTableName = TString(YtProviderName).append('.').append(cluster).append('.').append(table);
if (!noLocks) {
auto guard = Guard(Mutex);
Expand All @@ -41,6 +42,9 @@ TString TYtFileServices::GetTablePath(TStringBuf cluster, TStringBuf table, bool
if (auto p = TablesMapping.FindPtr(fullTableName)) {
return *p;
}
if (auto dirPtr = TablesDirMapping.FindPtr(tablePrefix)) {
return TFsPath(*dirPtr) / TString(table).append(".txt");
}
ythrow yexception() << "Table not found: " << cluster << '.' << table;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ class TYtFileServices: public TThrRefBase {
~TYtFileServices();

static TPtr Make(const NKikimr::NMiniKQL::IFunctionRegistry* registry, const THashMap<TString, TString>& mapping = {},
TFileStoragePtr fileStorage = {}, const TString& tmpDir = {}, bool keepTempTables = false)
TFileStoragePtr fileStorage = {}, const TString& tmpDir = {}, bool keepTempTables = false, const THashMap<TString, TString>& dirMapping = {})
{
return new TYtFileServices(registry, mapping, fileStorage, tmpDir.empty() ? GetSystemTempDir() : tmpDir, keepTempTables);
return new TYtFileServices(registry, mapping, fileStorage, tmpDir.empty() ? GetSystemTempDir() : tmpDir, keepTempTables, dirMapping);
}

const NKikimr::NMiniKQL::IFunctionRegistry* GetFunctionRegistry() const {
Expand Down Expand Up @@ -55,9 +55,17 @@ class TYtFileServices: public TThrRefBase {
}

private:
TYtFileServices(const NKikimr::NMiniKQL::IFunctionRegistry* registry, const THashMap<TString, TString>& mapping, TFileStoragePtr fileStorage, const TString& tmpDir, bool keepTempTables)
TYtFileServices(
const NKikimr::NMiniKQL::IFunctionRegistry* registry,
const THashMap<TString, TString>& mapping,
TFileStoragePtr fileStorage,
const TString& tmpDir,
bool keepTempTables,
const THashMap<TString, TString>& dirMapping
)
: FunctionRegistry(registry)
, TablesMapping(mapping)
, TablesDirMapping(dirMapping)
, TmpDir(tmpDir)
, KeepTempTables(keepTempTables)
{
Expand All @@ -71,6 +79,7 @@ class TYtFileServices: public TThrRefBase {
TFileStoragePtr FileStorage;
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
THashMap<TString, TString> TablesMapping; // [cluster].[name] -> [file path]
THashMap<TString, TString> TablesDirMapping; // [cluster] -> [dir path]
TString TmpDir;
bool KeepTempTables;

Expand Down
23 changes: 22 additions & 1 deletion ydb/library/yql/tools/yqlrun/yqlrun.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "gateway_spec.h"

#include <filesystem>
#include <ydb/library/yql/tools/yqlrun/http/yql_server.h>

#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file.h>
Expand Down Expand Up @@ -380,7 +381,9 @@ int Main(int argc, const char *argv[])
TOpts opts = TOpts::Default();
TString programFile;
TVector<TString> tablesMappingList;
TVector<TString> tablesDirMappingList;
THashMap<TString, TString> tablesMapping;
THashMap<TString, TString> tablesDirMapping;
TVector<TString> filesMappingList;
TUserDataTable filesMapping;
TVector<TString> urlsMappingList;
Expand Down Expand Up @@ -413,6 +416,7 @@ int Main(int argc, const char *argv[])
opts.AddLongOption('s', "sql", "program is SQL query").NoArgument();
opts.AddLongOption("pg", "program has PG syntax").NoArgument();
opts.AddLongOption('t', "table", "table@file").AppendTo(&tablesMappingList);
opts.AddLongOption("tables-dir", "cluster@dir").AppendTo(&tablesDirMappingList);
opts.AddLongOption('C', "cluster", "set cluster to service mapping").RequiredArgument("name@service").Handler(new TStoreMappingFunctor(&clusterMapping));
opts.AddLongOption("ndebug", "should be at first argument, do not show debug info in error output").NoArgument();
opts.AddLongOption("parse-only", "exit after program has been parsed").NoArgument();
Expand Down Expand Up @@ -502,6 +506,23 @@ int Main(int argc, const char *argv[])
tablesMapping[tableName] = filePath;
}

for (auto& s : tablesDirMappingList) {
TStringBuf clusterName, dirPath;
TStringBuf(s).Split('@', clusterName, dirPath);
if (clusterName.empty() || dirPath.empty()) {
Cerr << "Incorrect table directory mapping, expected form cluster@dir, e.g. yt.plato@/tmp/tables" << Endl;
return 1;
}
tablesDirMapping[clusterName] = dirPath;
for (const auto& entry : std::filesystem::recursive_directory_iterator(std::string(dirPath))) {
if (entry.is_regular_file() && entry.path().has_extension() && entry.path().extension() == ".txt") {
auto tableName = TString(clusterName) + '.' + std::filesystem::relative(entry.path(), std::string(dirPath));
tableName = tableName.substr(0, tableName.Size() - 4); // remove .txt extension
tablesMapping[tableName] = entry.path().string();
}
}
}

if (hasValidate) {
for (auto& s : filesMappingList) {
TStringBuf fileName, filePath;
Expand Down Expand Up @@ -644,7 +665,7 @@ int Main(int argc, const char *argv[])
bool emulateOutputForMultirun = false;
if (hasValidate) {
if (gatewayTypes.contains(YtProviderName) || res.Has("opt-collision")) {
auto yqlNativeServices = NFile::TYtFileServices::Make(funcRegistry.Get(), tablesMapping, fileStorage, tmpDir, res.Has("keep-temp"));
auto yqlNativeServices = NFile::TYtFileServices::Make(funcRegistry.Get(), tablesMapping, fileStorage, tmpDir, res.Has("keep-temp"), tablesDirMapping);
auto ytNativeGateway = CreateYtFileGateway(yqlNativeServices, &emulateOutputForMultirun);
dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway));
}
Expand Down
Loading