Skip to content

Commit 827cf58

Browse files
authored
Fix resharding, add logging to writer (#10864)
1 parent 0227c59 commit 827cf58

File tree

6 files changed

+61
-47
lines changed

6 files changed

+61
-47
lines changed

ydb/public/lib/ydb_cli/dump/dump.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class TClient::TImpl {
3434
}
3535

3636
TRestoreResult Restore(const TString& fsPath, const TString& dbPath, const TRestoreSettings& settings) {
37-
auto client = TRestoreClient(Driver, *Log);
37+
auto client = TRestoreClient(Driver, Log);
3838
return client.Restore(fsPath, dbPath, settings);
3939
}
4040

ydb/public/lib/ydb_cli/dump/restore_impl.cpp

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@
77
#include <ydb/public/lib/ydb_cli/common/recursive_list.h>
88
#include <ydb/public/lib/ydb_cli/common/recursive_remove.h>
99
#include <ydb/public/lib/ydb_cli/common/retry_func.h>
10+
#include <ydb/public/lib/ydb_cli/dump/util/log.h>
1011
#include <ydb/public/lib/ydb_cli/dump/util/util.h>
1112

12-
#include <library/cpp/logger/log.h>
13-
1413
#include <util/generic/hash.h>
1514
#include <util/generic/hash_set.h>
1615
#include <util/generic/maybe.h>
@@ -19,17 +18,6 @@
1918
#include <util/string/builder.h>
2019
#include <util/string/join.h>
2120

22-
#define LOG_IMPL(log, level, message) \
23-
if (log.FiltrationLevel() >= level) { \
24-
log.Write(level, TStringBuilder() << message); \
25-
} \
26-
Y_SEMICOLON_GUARD
27-
28-
#define LOG_D(message) LOG_IMPL(Log, ELogPriority::TLOG_DEBUG, message)
29-
#define LOG_I(message) LOG_IMPL(Log, ELogPriority::TLOG_INFO, message)
30-
#define LOG_W(message) LOG_IMPL(Log, ELogPriority::TLOG_WARNING, message)
31-
#define LOG_E(message) LOG_IMPL(Log, ELogPriority::TLOG_ERR, message)
32-
3321
namespace NYdb {
3422
namespace NDump {
3523

@@ -48,7 +36,7 @@ bool IsFileExists(const TFsPath& path) {
4836
return path.Exists() && path.IsFile();
4937
}
5038

51-
Ydb::Table::CreateTableRequest ReadTableScheme(const TString& fsPath, TLog& log) {
39+
Ydb::Table::CreateTableRequest ReadTableScheme(const TString& fsPath, const TLog* log) {
5240
LOG_IMPL(log, ELogPriority::TLOG_DEBUG, "Read scheme from " << fsPath.Quote());
5341
Ydb::Table::CreateTableRequest proto;
5442
Y_ENSURE(google::protobuf::TextFormat::ParseFromString(TFileInput(fsPath).ReadAll(), &proto));
@@ -64,7 +52,7 @@ TTableDescription TableDescriptionWithoutIndexesFromProto(Ydb::Table::CreateTabl
6452
return TableDescriptionFromProto(proto);
6553
}
6654

67-
Ydb::Scheme::ModifyPermissionsRequest ReadPermissions(const TString& fsPath, TLog& log) {
55+
Ydb::Scheme::ModifyPermissionsRequest ReadPermissions(const TString& fsPath, const TLog* log) {
6856
LOG_IMPL(log, ELogPriority::TLOG_DEBUG, "Read ACL from " << fsPath.Quote());
6957
Ydb::Scheme::ModifyPermissionsRequest proto;
7058
Y_ENSURE(google::protobuf::TextFormat::ParseFromString(TFileInput(fsPath).ReadAll(), &proto));
@@ -95,12 +83,12 @@ bool IsOperationStarted(TStatus operationStatus) {
9583

9684
} // anonymous
9785

98-
TRestoreClient::TRestoreClient(const TDriver& driver, TLog& log)
99-
: Log(log)
100-
, ImportClient(driver)
86+
TRestoreClient::TRestoreClient(const TDriver& driver, const std::shared_ptr<TLog>& log)
87+
: ImportClient(driver)
10188
, OperationClient(driver)
10289
, SchemeClient(driver)
10390
, TableClient(driver)
91+
, Log(log)
10492
{
10593
}
10694

@@ -257,7 +245,7 @@ TRestoreResult TRestoreClient::RestoreTable(const TFsPath& fsPath, const TString
257245
TStringBuilder() << "There is incomplete file in folder: " << fsPath.GetPath());
258246
}
259247

260-
auto scheme = ReadTableScheme(fsPath.Child(SCHEME_FILE_NAME), Log);
248+
auto scheme = ReadTableScheme(fsPath.Child(SCHEME_FILE_NAME), Log.get());
261249
auto dumpedDesc = TableDescriptionFromProto(scheme);
262250

263251
if (dumpedDesc.GetAttributes().contains(DOC_API_TABLE_VERSION_ATTR) && settings.SkipDocumentTables_) {
@@ -384,7 +372,7 @@ TRestoreResult TRestoreClient::RestoreData(const TFsPath& fsPath, const TString&
384372
}
385373

386374
accumulator.Reset(CreateImportDataAccumulator(desc, *actualDesc, settings));
387-
writer.Reset(CreateImportDataWriter(dbPath, desc, ImportClient, TableClient, accumulator.Get(), settings));
375+
writer.Reset(CreateImportDataWriter(dbPath, desc, ImportClient, TableClient, accumulator.Get(), settings, Log));
388376

389377
break;
390378
}
@@ -504,7 +492,7 @@ TRestoreResult TRestoreClient::RestorePermissions(const TFsPath& fsPath, const T
504492

505493
LOG_D("Restore ACL " << fsPath.GetPath().Quote() << " to " << dbPath.Quote());
506494

507-
auto permissions = ReadPermissions(fsPath.Child(PERMISSIONS_FILE_NAME), Log);
495+
auto permissions = ReadPermissions(fsPath.Child(PERMISSIONS_FILE_NAME), Log.get());
508496
return ModifyPermissions(SchemeClient, dbPath, TModifyPermissionsSettings(permissions));
509497
}
510498

ydb/public/lib/ydb_cli/dump/restore_impl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,16 @@ class TRestoreClient {
4747
TRestoreResult RestorePermissions(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const THashSet<TString>& oldEntries);
4848

4949
public:
50-
explicit TRestoreClient(const TDriver& driver, TLog& log);
50+
explicit TRestoreClient(const TDriver& driver, const std::shared_ptr<TLog>& log);
5151

5252
TRestoreResult Restore(const TString& fsPath, const TString& dbPath, const TRestoreSettings& settings = {});
5353

5454
private:
55-
TLog& Log;
5655
NImport::TImportClient ImportClient;
5756
NOperation::TOperationClient OperationClient;
5857
NScheme::TSchemeClient SchemeClient;
5958
NTable::TTableClient TableClient;
59+
std::shared_ptr<TLog> Log;
6060

6161
}; // TRestoreClient
6262

ydb/public/lib/ydb_cli/dump/restore_import_data.cpp

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "restore_import_data.h"
22

33
#include <ydb/public/lib/ydb_cli/common/retry_func.h>
4+
#include <ydb/public/lib/ydb_cli/dump/util/log.h>
45
#include <ydb/public/lib/ydb_cli/dump/util/util.h>
56

67
#include <library/cpp/string_utils/quote/quote.h>
@@ -662,29 +663,29 @@ class TTableRows {
662663
return false;
663664
}
664665

665-
TString GetData(ui64 memLimit, ui64 batchSize, bool force = false) {
666+
TString GetData(ui64 memLimit, ui64 batchSize, bool force = false) {
666667
Y_ENSURE(HasData(memLimit, batchSize, force));
667668
Y_ENSURE(!ByMemSize.empty());
668669
Y_ENSURE(!ByRecordsSize.empty());
669670
Y_ENSURE(!ByMemSize.begin()->second.empty());
670671
Y_ENSURE(!ByRecordsSize.begin()->second.empty());
671672

672673
auto get = [this, batchSize](TRowsBy& from) {
673-
auto it = from.begin()->second.begin();
674-
auto& rows = (*it)->second;
674+
auto it = *from.begin()->second.begin();
675+
auto& rows = it->second;
675676

676-
RemoveFromSizeTracker(ByMemSize, rows.MemSize(), *it);
677-
RemoveFromSizeTracker(ByRecordsSize, rows.RecordsSize(), *it);
677+
RemoveFromSizeTracker(ByMemSize, rows.MemSize(), it);
678+
RemoveFromSizeTracker(ByRecordsSize, rows.RecordsSize(), it);
678679

679680
MemSize -= rows.MemSize();
680681
auto ret = rows.Serialize(batchSize);
681682
MemSize += rows.MemSize();
682683

683684
if (rows.MemSize()) {
684-
Y_ENSURE(ByMemSize[rows.MemSize()].insert(*it).second);
685+
Y_ENSURE(ByMemSize[rows.MemSize()].insert(it).second);
685686
}
686687
if (rows.RecordsSize()) {
687-
Y_ENSURE(ByRecordsSize[rows.RecordsSize()].insert(*it).second);
688+
Y_ENSURE(ByRecordsSize[rows.RecordsSize()].insert(it).second);
688689
}
689690

690691
return ret;
@@ -739,9 +740,16 @@ class TDataAccumulator: public NPrivate::IDataAccumulator {
739740
return Rows.GetData(MemLimit, BatchSize, force);
740741
}
741742

742-
void Reshard(const TVector<TKeyRange>& keyRanges) {
743+
void Reshard(const TVector<TKeyRange>& keyRanges, const TString& data) {
743744
TGuard<TMutex> lock(Mutex);
744745
Rows.Reshard(keyRanges);
746+
747+
TStringInput input(data);
748+
TString line;
749+
750+
while (input.ReadLine(line)) {
751+
Rows.Add(KeyBuilder.Build(line), std::move(line));
752+
}
745753
}
746754

747755
private:
@@ -792,6 +800,7 @@ class TDataWriter: public NPrivate::IDataWriter {
792800
}
793801

794802
if (retryNumber == maxRetries) {
803+
LOG_E("There is no retries left, last result: " << importResult.GetIssues().ToOneLineString());
795804
return false;
796805
}
797806

@@ -801,19 +810,12 @@ class TDataWriter: public NPrivate::IDataWriter {
801810
TMaybe<TTableDescription> desc;
802811
auto descResult = DescribeTable(TableClient, Path, desc);
803812
if (!descResult.IsSuccess()) {
813+
LOG_E("Describe table " << Path.Quote() << " failed: " << descResult.GetIssues().ToOneLineString());
804814
return false;
805815
}
806816

807-
Accumulator->Reshard(desc->GetKeyRanges());
808-
809-
TStringInput input(data);
810-
TString line;
811-
812-
while (input.ReadLine(line)) {
813-
Accumulator->Feed(std::move(line));
814-
}
815-
816-
break;
817+
Accumulator->Reshard(desc->GetKeyRanges(), data);
818+
return true;
817819
}
818820

819821
case EStatus::ABORTED:
@@ -855,12 +857,14 @@ class TDataWriter: public NPrivate::IDataWriter {
855857
const TRestoreSettings& settings,
856858
TImportClient& importClient,
857859
TTableClient& tableClient,
858-
NPrivate::IDataAccumulator* accumulator)
860+
NPrivate::IDataAccumulator* accumulator,
861+
const std::shared_ptr<TLog>& log)
859862
: Path(path)
860863
, Settings(MakeSettings(settings, desc))
861864
, ImportClient(importClient)
862865
, TableClient(tableClient)
863866
, Accumulator(dynamic_cast<TDataAccumulator*>(accumulator))
867+
, Log(log)
864868
, RateLimiterSettings(settings.RateLimiterSettings_)
865869
, RequestLimiter(RateLimiterSettings.GetRps(), RateLimiterSettings.GetRps())
866870
{
@@ -871,7 +875,10 @@ class TDataWriter: public NPrivate::IDataWriter {
871875
}
872876

873877
bool Push(TString&& data) override {
874-
Y_ENSURE(data.size() < TRestoreSettings::MaxBytesPerRequest, "Data is too long");
878+
if (data.size() >= TRestoreSettings::MaxBytesPerRequest) {
879+
LOG_E("Data is too long");
880+
return false;
881+
}
875882

876883
if (IsStopped()) {
877884
return false;
@@ -896,6 +903,7 @@ class TDataWriter: public NPrivate::IDataWriter {
896903
TImportClient& ImportClient;
897904
TTableClient& TableClient;
898905
TDataAccumulator* Accumulator;
906+
const std::shared_ptr<TLog> Log;
899907

900908
const TRateLimiterSettings RateLimiterSettings;
901909

@@ -922,8 +930,9 @@ NPrivate::IDataWriter* CreateImportDataWriter(
922930
TImportClient& importClient,
923931
TTableClient& tableClient,
924932
NPrivate::IDataAccumulator* accumulator,
925-
const TRestoreSettings& settings) {
926-
return new TDataWriter(path, desc, settings, importClient, tableClient, accumulator);
933+
const TRestoreSettings& settings,
934+
const std::shared_ptr<TLog>& log) {
935+
return new TDataWriter(path, desc, settings, importClient, tableClient, accumulator, log);
927936
}
928937

929938
} // NDump

ydb/public/lib/ydb_cli/dump/restore_import_data.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
#include "restore_impl.h"
44

5+
class TLog;
6+
57
namespace NYdb {
68
namespace NDump {
79

@@ -16,7 +18,8 @@ NPrivate::IDataWriter* CreateImportDataWriter(
1618
NImport::TImportClient& importClient,
1719
NTable::TTableClient& tableClient,
1820
NPrivate::IDataAccumulator* accumulator,
19-
const TRestoreSettings& settings);
21+
const TRestoreSettings& settings,
22+
const std::shared_ptr<TLog>& log);
2023

2124
} // NDump
2225
} // NYdb
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#pragma once
2+
3+
#include <library/cpp/logger/log.h>
4+
5+
#define LOG_IMPL(log, level, message) \
6+
if (log->FiltrationLevel() >= level) { \
7+
log->Write(level, TStringBuilder() << message); \
8+
} \
9+
Y_SEMICOLON_GUARD
10+
11+
#define LOG_D(message) LOG_IMPL(Log, ELogPriority::TLOG_DEBUG, message)
12+
#define LOG_I(message) LOG_IMPL(Log, ELogPriority::TLOG_INFO, message)
13+
#define LOG_W(message) LOG_IMPL(Log, ELogPriority::TLOG_WARNING, message)
14+
#define LOG_E(message) LOG_IMPL(Log, ELogPriority::TLOG_ERR, message)

0 commit comments

Comments
 (0)