Skip to content

Commit 0982055

Browse files
authored
Fixed session per file in CLI ydb import file (#7785)
1 parent 79898e6 commit 0982055

File tree

2 files changed

+88
-100
lines changed

2 files changed

+88
-100
lines changed

ydb/public/lib/ydb_cli/import/import.cpp

+81-95
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,43 @@ TStatus WaitForQueue(const size_t maxQueueSize, std::vector<TAsyncStatus>& inFli
7979
return MakeStatus();
8080
}
8181

82+
void InitCsvParser(TCsvParser& parser,
83+
bool& removeLastDelimiter,
84+
TString&& defaultHeader,
85+
const TImportFileSettings& settings,
86+
const std::map<TString, TType>* columnTypes,
87+
const NTable::TTableDescription* dbTableInfo) {
88+
if (settings.Header_ || settings.HeaderRow_) {
89+
TString headerRow;
90+
if (settings.Header_) {
91+
headerRow = std::move(defaultHeader);
92+
}
93+
if (settings.HeaderRow_) {
94+
headerRow = settings.HeaderRow_;
95+
}
96+
if (headerRow.EndsWith("\r\n")) {
97+
headerRow.erase(headerRow.Size() - 2);
98+
}
99+
if (headerRow.EndsWith("\n")) {
100+
headerRow.erase(headerRow.Size() - 1);
101+
}
102+
if (headerRow.EndsWith(settings.Delimiter_)) {
103+
removeLastDelimiter = true;
104+
headerRow.erase(headerRow.Size() - settings.Delimiter_.Size());
105+
}
106+
parser = TCsvParser(std::move(headerRow), settings.Delimiter_[0], settings.NullValue_, columnTypes);
107+
return;
108+
}
109+
110+
TVector<TString> columns;
111+
Y_ENSURE_BT(dbTableInfo);
112+
for (const auto& column : dbTableInfo->GetColumns()) {
113+
columns.push_back(column.Name);
114+
}
115+
parser = TCsvParser(std::move(columns), settings.Delimiter_[0], settings.NullValue_, columnTypes);
116+
return;
117+
}
118+
82119
FHANDLE GetStdinFileno() {
83120
#if defined(_win32_)
84121
return GetStdHandle(STD_INPUT_HANDLE);
@@ -222,9 +259,8 @@ class TCsvFileReader {
222259
} // namespace
223260

224261
TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig)
225-
: OperationClient(std::make_shared<NOperation::TOperationClient>(driver))
262+
: TableClient(std::make_shared<NTable::TTableClient>(driver))
226263
, SchemeClient(std::make_shared<NScheme::TSchemeClient>(driver))
227-
, TableClient(std::make_shared<NTable::TTableClient>(driver))
228264
{
229265
RetrySettings
230266
.MaxRetries(TImportFileSettings::MaxRetries)
@@ -239,11 +275,25 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri
239275
TStringBuilder() << "Illegal delimiter for TSV format, only tab is allowed");
240276
}
241277

242-
auto result = NDump::DescribePath(*SchemeClient, dbPath);
243-
auto resultStatus = result.GetStatus();
244-
if (resultStatus != EStatus::SUCCESS) {
245-
return MakeStatus(EStatus::SCHEME_ERROR,
246-
TStringBuilder() << result.GetIssues().ToString() << dbPath);
278+
auto resultStatus = TableClient->RetryOperationSync(
279+
[this, dbPath](NTable::TSession session) {
280+
auto result = session.DescribeTable(dbPath).ExtractValueSync();
281+
if (result.IsSuccess()) {
282+
DbTableInfo = std::make_unique<const NTable::TTableDescription>(result.GetTableDescription());
283+
}
284+
return result;
285+
}, NTable::TRetryOperationSettings{RetrySettings}.MaxRetries(10));
286+
287+
if (!resultStatus.IsSuccess()) {
288+
/// TODO: Remove this after server fix: https://github.com/ydb-platform/ydb/issues/7791
289+
if (resultStatus.GetStatus() == EStatus::SCHEME_ERROR) {
290+
auto describePathResult = NDump::DescribePath(*SchemeClient, dbPath);
291+
if (describePathResult.GetStatus() != EStatus::SUCCESS) {
292+
return MakeStatus(EStatus::SCHEME_ERROR,
293+
TStringBuilder() << describePathResult.GetIssues().ToString() << dbPath);
294+
}
295+
}
296+
return resultStatus;
247297
}
248298

249299
UpsertSettings
@@ -374,45 +424,13 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,
374424

375425
TCountingInput countInput(&input);
376426
NCsvFormat::TLinesSplitter splitter(countInput);
377-
TCsvParser parser;
378-
bool RemoveLastDelimiter = false;
379427

380-
NTable::TCreateSessionResult sessionResult = TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync();
381-
if (!sessionResult.IsSuccess())
382-
return sessionResult;
383-
NTable::TDescribeTableResult tableResult = sessionResult.GetSession().DescribeTable(dbPath).GetValueSync();
384-
if (!tableResult.IsSuccess())
385-
return tableResult;
428+
auto columnTypes = GetColumnTypes();
429+
ValidateTValueUpsertTable();
386430

387-
auto columnTypes = GetColumnTypes(tableResult.GetTableDescription());
388-
ValidateTable(tableResult.GetTableDescription());
389-
390-
if (settings.Header_ || settings.HeaderRow_) {
391-
TString headerRow;
392-
if (settings.Header_) {
393-
headerRow = splitter.ConsumeLine();
394-
}
395-
if (settings.HeaderRow_) {
396-
headerRow = settings.HeaderRow_;
397-
}
398-
if (headerRow.EndsWith("\r\n")) {
399-
headerRow.erase(headerRow.Size() - 2);
400-
}
401-
if (headerRow.EndsWith("\n")) {
402-
headerRow.erase(headerRow.Size() - 1);
403-
}
404-
if (headerRow.EndsWith(settings.Delimiter_)) {
405-
RemoveLastDelimiter = true;
406-
headerRow.erase(headerRow.Size() - settings.Delimiter_.Size());
407-
}
408-
parser = TCsvParser(std::move(headerRow), settings.Delimiter_[0], settings.NullValue_, &columnTypes);
409-
} else {
410-
TVector<TString> columns;
411-
for (const auto& column : tableResult.GetTableDescription().GetColumns()) {
412-
columns.push_back(column.Name);
413-
}
414-
parser = TCsvParser(std::move(columns), settings.Delimiter_[0], settings.NullValue_, &columnTypes);
415-
}
431+
TCsvParser parser;
432+
bool removeLastDelimiter = false;
433+
InitCsvParser(parser, removeLastDelimiter, splitter.ConsumeLine(), settings, &columnTypes, DbTableInfo.get());
416434

417435
for (ui32 i = 0; i < settings.SkipRows_; ++i) {
418436
splitter.ConsumeLine();
@@ -450,7 +468,7 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,
450468
readBytes += line.Size();
451469
batchBytes += line.Size();
452470

453-
if (RemoveLastDelimiter) {
471+
if (removeLastDelimiter) {
454472
if (!line.EndsWith(settings.Delimiter_)) {
455473
return MakeStatus(EStatus::BAD_REQUEST,
456474
"According to the header, lines should end with a delimiter");
@@ -498,42 +516,14 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,
498516
TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings) {
499517
TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount);
500518
TString headerRow;
501-
TCsvParser parser;
502519
TCsvFileReader splitter(filePath, settings, headerRow, inFlightGetter);
503-
bool RemoveLastDelimiter = false;
504-
505-
NTable::TCreateSessionResult sessionResult = TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync();
506-
if (!sessionResult.IsSuccess())
507-
return sessionResult;
508-
NTable::TDescribeTableResult tableResult = sessionResult.GetSession().DescribeTable(dbPath).GetValueSync();
509-
if (!tableResult.IsSuccess())
510-
return tableResult;
511520

512-
auto columnTypes = GetColumnTypes(tableResult.GetTableDescription());
513-
ValidateTable(tableResult.GetTableDescription());
521+
auto columnTypes = GetColumnTypes();
522+
ValidateTValueUpsertTable();
514523

515-
if (settings.Header_ || settings.HeaderRow_) {
516-
if (settings.HeaderRow_) {
517-
headerRow = settings.HeaderRow_;
518-
}
519-
if (headerRow.EndsWith("\r\n")) {
520-
headerRow.erase(headerRow.Size() - 2);
521-
}
522-
if (headerRow.EndsWith("\n")) {
523-
headerRow.erase(headerRow.Size() - 1);
524-
}
525-
if (headerRow.EndsWith(settings.Delimiter_)) {
526-
RemoveLastDelimiter = true;
527-
headerRow.erase(headerRow.Size() - settings.Delimiter_.Size());
528-
}
529-
parser = TCsvParser(std::move(headerRow), settings.Delimiter_[0], settings.NullValue_, &columnTypes);
530-
} else {
531-
TVector<TString> columns;
532-
for (const auto& column : tableResult.GetTableDescription().GetColumns()) {
533-
columns.push_back(column.Name);
534-
}
535-
parser = TCsvParser(std::move(columns), settings.Delimiter_[0], settings.NullValue_, &columnTypes);
536-
}
524+
TCsvParser parser;
525+
bool removeLastDelimiter = false;
526+
InitCsvParser(parser, removeLastDelimiter, std::move(headerRow), settings, &columnTypes, DbTableInfo.get());
537527

538528
TType lineType = parser.GetColumnsType();
539529

@@ -565,7 +555,7 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TStr
565555
}
566556
readBytes += line.size();
567557
batchBytes += line.size();
568-
if (RemoveLastDelimiter) {
558+
if (removeLastDelimiter) {
569559
if (!line.EndsWith(settings.Delimiter_)) {
570560
return MakeStatus(EStatus::BAD_REQUEST,
571561
"According to the header, lines should end with a delimiter");
@@ -611,15 +601,8 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TStr
611601

612602
TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings,
613603
std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback) {
614-
NTable::TCreateSessionResult sessionResult = TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync();
615-
if (!sessionResult.IsSuccess())
616-
return sessionResult;
617-
NTable::TDescribeTableResult tableResult = sessionResult.GetSession().DescribeTable(dbPath).GetValueSync();
618-
if (!tableResult.IsSuccess())
619-
return tableResult;
620-
621-
const TType tableType = GetTableType(tableResult.GetTableDescription());
622-
ValidateTable(tableResult.GetTableDescription());
604+
const TType tableType = GetTableType();
605+
ValidateTValueUpsertTable();
623606
const NYdb::EBinaryStringEncoding stringEncoding =
624607
(settings.Format_ == EOutputFormat::JsonBase64) ? NYdb::EBinaryStringEncoding::Base64 :
625608
NYdb::EBinaryStringEncoding::Unicode;
@@ -818,36 +801,39 @@ TAsyncStatus TImportFileClient::UpsertParquetBuffer(const TString& dbPath, const
818801
return TableClient->RetryOperation(upsert, RetrySettings);
819802
}
820803

821-
TType TImportFileClient::GetTableType(const NTable::TTableDescription& tableDescription) {
804+
TType TImportFileClient::GetTableType() {
822805
TTypeBuilder typeBuilder;
823806
typeBuilder.BeginStruct();
824-
const auto& columns = tableDescription.GetTableColumns();
807+
Y_ENSURE_BT(DbTableInfo);
808+
const auto& columns = DbTableInfo->GetTableColumns();
825809
for (auto it = columns.begin(); it != columns.end(); it++) {
826810
typeBuilder.AddMember((*it).Name, (*it).Type);
827811
}
828812
typeBuilder.EndStruct();
829813
return typeBuilder.Build();
830814
}
831815

832-
std::map<TString, TType> TImportFileClient::GetColumnTypes(const NTable::TTableDescription& tableDescription) {
816+
std::map<TString, TType> TImportFileClient::GetColumnTypes() {
833817
std::map<TString, TType> columnTypes;
834-
const auto& columns = tableDescription.GetTableColumns();
818+
Y_ENSURE_BT(DbTableInfo);
819+
const auto& columns = DbTableInfo->GetTableColumns();
835820
for (auto it = columns.begin(); it != columns.end(); it++) {
836821
columnTypes.insert({(*it).Name, (*it).Type});
837822
}
838823
return columnTypes;
839824
}
840825

841-
void TImportFileClient::ValidateTable(const NTable::TTableDescription& tableDescription) {
842-
auto columnTypes = GetColumnTypes(tableDescription);
826+
void TImportFileClient::ValidateTValueUpsertTable() {
827+
auto columnTypes = GetColumnTypes();
843828
bool hasPgType = false;
844829
for (const auto& [_, type] : columnTypes) {
845830
if (TTypeParser(type).GetKind() == TTypeParser::ETypeKind::Pg) {
846831
hasPgType = true;
847832
break;
848833
}
849834
}
850-
if (tableDescription.GetStoreType() == NTable::EStoreType::Column && hasPgType) {
835+
Y_ENSURE_BT(DbTableInfo);
836+
if (DbTableInfo->GetStoreType() == NTable::EStoreType::Column && hasPgType) {
851837
throw TMisuseException() << "Import into column table with Pg type columns in not supported";
852838
}
853839
}

ydb/public/lib/ydb_cli/import/import.h

+7-5
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,14 @@ class TImportFileClient {
6666
TStatus Import(const TVector<TString>& fsPaths, const TString& dbPath, const TImportFileSettings& settings = {});
6767

6868
private:
69-
std::shared_ptr<NOperation::TOperationClient> OperationClient;
70-
std::shared_ptr<NScheme::TSchemeClient> SchemeClient;
7169
std::shared_ptr<NTable::TTableClient> TableClient;
70+
std::shared_ptr<NScheme::TSchemeClient> SchemeClient;
7271

7372
NTable::TBulkUpsertSettings UpsertSettings;
7473
NTable::TRetryOperationSettings RetrySettings;
7574

75+
std::unique_ptr<const NTable::TTableDescription> DbTableInfo;
76+
7677
std::atomic<ui64> FilesCount;
7778

7879
static constexpr ui32 VerboseModeReadSize = 1 << 27; // 100 MB
@@ -86,13 +87,14 @@ class TImportFileClient {
8687

8788
TStatus UpsertJson(IInputStream &input, const TString &dbPath, const TImportFileSettings &settings,
8889
std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback);
89-
TType GetTableType(const NTable::TTableDescription& tableDescription);
90-
std::map<TString, TType> GetColumnTypes(const NTable::TTableDescription& tableDescription);
91-
void ValidateTable(const NTable::TTableDescription& tableDescription);
9290

9391
TStatus UpsertParquet(const TString& filename, const TString& dbPath, const TImportFileSettings& settings,
9492
ProgressCallbackFunc & progressCallback);
9593
TAsyncStatus UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema);
94+
95+
TType GetTableType();
96+
std::map<TString, TType> GetColumnTypes();
97+
void ValidateTValueUpsertTable();
9698
};
9799

98100
}

0 commit comments

Comments
 (0)