Skip to content

Commit dec33cc

Browse files
authored
Improve csv import, step 4 (#11907)
1 parent e0b1bef commit dec33cc

File tree

1 file changed

+53
-37
lines changed

1 file changed

+53
-37
lines changed

ydb/public/lib/ydb_cli/import/import.cpp

Lines changed: 53 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ class TJobInFlightManager {
306306

307307
private:
308308
size_t GetSemaphoreMaxValue(size_t orderNum) const {
309-
return Max(1ul, MaxJobInFlight / CurrentFileCount
309+
return Max((size_t)1, MaxJobInFlight / CurrentFileCount
310310
// One more thread for the first <MaxJobInFlight % CurrentFileCount> managers to utilize max jobs inflight
311311
+ (orderNum < MaxJobInFlight % CurrentFileCount ? 1 : 0));
312312
}
@@ -341,7 +341,7 @@ class TImportFileClient::TImpl {
341341
TStatus UpsertCsvByBlocks(const TString& filePath,
342342
const TString& dbPath);
343343
TAsyncStatus UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder);
344-
TAsyncStatus UpsertTValueBuffer(const TString& dbPath, TValue&& rows);
344+
TAsyncStatus UpsertTValueBuffer(const TString& dbPath, std::function<TValue()>&& buildFunc);
345345
TStatus UpsertJson(IInputStream &input, const TString &dbPath, std::optional<ui64> inputSizeHint,
346346
ProgressCallbackFunc & progressCallback);
347347
TStatus UpsertParquet(const TString& filename, const TString& dbPath, ProgressCallbackFunc & progressCallback);
@@ -389,11 +389,8 @@ TImportFileClient::TImpl::TImpl(const TDriver& driver, const TClientCommand::TCo
389389
.MaxRetries(TImportFileSettings::MaxRetries)
390390
.Idempotent(true)
391391
.Verbose(rootConfig.IsVerbose());
392-
IThreadPool::TParams poolParams;
393-
if (!Settings.NewlineDelimited_) {
394-
poolParams.SetBlocking(true);
395-
}
396-
ProcessingPool = CreateThreadPool(Settings.Threads_, 0, poolParams);
392+
ProcessingPool = CreateThreadPool(Settings.Threads_, 0,
393+
IThreadPool::TParams().SetThreadNamePrefix("CsvProcessing"));
397394
RequestsInflight = std::make_unique<std::counting_semaphore<>>(Settings.MaxInFlightRequests_);
398395
}
399396

@@ -578,15 +575,24 @@ TAsyncStatus TImportFileClient::TImpl::UpsertTValueBuffer(const TString& dbPath,
578575
}
579576

580577
inline
581-
TAsyncStatus TImportFileClient::TImpl::UpsertTValueBuffer(const TString& dbPath, TValue&& rows) {
582-
auto retryFunc = [this, dbPath, rows = std::move(rows)]
578+
TAsyncStatus TImportFileClient::TImpl::UpsertTValueBuffer(const TString& dbPath, std::function<TValue()>&& buildFunc) {
579+
// For the first attempt values are built before acquiring request inflight semaphore
580+
std::optional<TValue> prebuiltValue = buildFunc();
581+
auto retryFunc = [this, &dbPath, buildFunc = std::move(buildFunc), prebuiltValue = std::move(prebuiltValue)]
583582
(NYdb::NTable::TTableClient& tableClient) mutable -> TAsyncStatus {
584-
NYdb::TValue rowsCopy(rows.GetType(), rows.GetProto());
585-
return tableClient.BulkUpsert(dbPath, std::move(rowsCopy), UpsertSettings)
586-
.Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) {
587-
NYdb::TStatus status = bulkUpsertResult.GetValueSync();
588-
return NThreading::MakeFuture(status);
589-
});
583+
auto buildTValueAndSendRequest = [this, &buildFunc, &dbPath, &tableClient, &prebuiltValue]() {
584+
// For every retry attempt after first request build value from strings again
585+
// to prevent copying data in retryFunc in a happy way when there is only one request
586+
TValue builtValue = prebuiltValue.has_value() ? std::move(prebuiltValue.value()) : buildFunc();
587+
prebuiltValue = std::nullopt;
588+
return tableClient.BulkUpsert(dbPath, std::move(builtValue), UpsertSettings)
589+
.Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) {
590+
NYdb::TStatus status = bulkUpsertResult.GetValueSync();
591+
return NThreading::MakeFuture(status);
592+
});
593+
};
594+
// Running heavy building task on processing pool:
595+
return NThreading::Async(std::move(buildTValueAndSendRequest), *ProcessingPool);
590596
};
591597
if (!RequestsInflight->try_acquire()) {
592598
if (Settings.Verbose_ && Settings.NewlineDelimited_) {
@@ -644,18 +650,22 @@ TStatus TImportFileClient::TImpl::UpsertCsv(IInputStream& input,
644650
std::vector<TString> buffer;
645651

646652
auto upsertCsvFunc = [&, jobInflightManager](std::vector<TString>&& buffer, ui64 row) {
647-
try {
648-
UpsertTValueBuffer(dbPath, parser.BuildList(buffer, filePath, row))
653+
auto buildFunc = [jobInflightManager, &parser, buffer = std::move(buffer), &filePath, row, this] () mutable {
654+
try {
655+
return parser.BuildList(buffer, filePath, row);
656+
} catch (const std::exception& e) {
657+
if (!Failed.exchange(true)) {
658+
ErrorStatus = MakeHolder<TStatus>(MakeStatus(EStatus::INTERNAL_ERROR, e.what()));
659+
}
660+
jobInflightManager->ReleaseJob();
661+
throw;
662+
}
663+
};
664+
UpsertTValueBuffer(dbPath, std::move(buildFunc))
649665
.Apply([jobInflightManager](const TAsyncStatus& asyncStatus) {
650666
jobInflightManager->ReleaseJob();
651667
return asyncStatus;
652668
});
653-
} catch (const std::exception& e) {
654-
if (!Failed.exchange(true)) {
655-
ErrorStatus = MakeHolder<TStatus>(MakeStatus(EStatus::INTERNAL_ERROR, e.what()));
656-
}
657-
jobInflightManager->ReleaseJob();
658-
}
659669
};
660670

661671
while (TString line = splitter.ConsumeLine()) {
@@ -738,9 +748,11 @@ TStatus TImportFileClient::TImpl::UpsertCsv(IInputStream& input,
738748
TStatus TImportFileClient::TImpl::UpsertCsvByBlocks(const TString& filePath,
739749
const TString& dbPath) {
740750
TString headerRow;
741-
ui64 maxThreads = Max(1ul, Settings.Threads_ / CurrentFileCount);
751+
ui64 maxThreads = Max((size_t)1, Settings.Threads_ / CurrentFileCount);
742752
TCsvFileReader splitter(filePath, Settings, headerRow, maxThreads);
743753
ui64 threadCount = splitter.GetSplitCount();
754+
THolder<IThreadPool> readingPool = CreateThreadPool(maxThreads, 0,
755+
IThreadPool::TParams().SetThreadNamePrefix("CsvReading"));
744756
// MaxInFlightRequests_ requests in flight on server and threadCount threads building TValue
745757
size_t maxJobInflightTotal = threadCount + Settings.MaxInFlightRequests_;
746758

@@ -765,19 +777,23 @@ TStatus TImportFileClient::TImpl::UpsertCsvByBlocks(const TString& filePath,
765777
// Job ends on receiving final request (after all retries)
766778
std::counting_semaphore<> jobsInflight(maxJobInflight);
767779
auto upsertCsvFunc = [&](std::vector<TString>&& buffer) {
768-
jobsInflight.acquire();
769-
try {
770-
UpsertTValueBuffer(dbPath, parser.BuildList(buffer, filePath))
771-
.Apply([&jobsInflight](const TAsyncStatus& asyncStatus) {
772-
jobsInflight.release();
773-
return asyncStatus;
774-
});
775-
} catch (const std::exception& e) {
776-
if (!Failed.exchange(true)) {
777-
ErrorStatus = MakeHolder<TStatus>(MakeStatus(EStatus::INTERNAL_ERROR, e.what()));
780+
auto buildFunc = [&jobsInflight, &parser, buffer = std::move(buffer), &filePath, this]() mutable {
781+
try {
782+
return parser.BuildList(buffer, filePath);
783+
} catch (const std::exception& e) {
784+
if (!Failed.exchange(true)) {
785+
ErrorStatus = MakeHolder<TStatus>(MakeStatus(EStatus::INTERNAL_ERROR, e.what()));
786+
}
787+
jobsInflight.release();
788+
throw;
778789
}
779-
jobsInflight.release();
780-
}
790+
};
791+
jobsInflight.acquire();
792+
UpsertTValueBuffer(dbPath, std::move(buildFunc))
793+
.Apply([&jobsInflight](const TAsyncStatus& asyncStatus) {
794+
jobsInflight.release();
795+
return asyncStatus;
796+
});
781797
};
782798
std::vector<TAsyncStatus> inFlightRequests;
783799
std::vector<TString> buffer;
@@ -846,7 +862,7 @@ TStatus TImportFileClient::TImpl::UpsertCsvByBlocks(const TString& filePath,
846862
promise.SetValue();
847863
};
848864

849-
if (!ProcessingPool->AddFunc(workerFunc)) {
865+
if (!readingPool->AddFunc(workerFunc)) {
850866
return MakeStatus(EStatus::INTERNAL_ERROR, "Couldn't add worker func");
851867
}
852868
}

0 commit comments

Comments
 (0)