Skip to content

Commit 9d29e83

Browse files
authored
Merge eba7296 into 9b5182d
2 parents 9b5182d + eba7296 commit 9d29e83

File tree

9 files changed

+74
-63
lines changed

9 files changed

+74
-63
lines changed

ydb/library/workload/abstract/workload_query_generator.h

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ class IBulkDataGenerator {
5252
, Size(size)
5353
{}
5454

55-
struct TDataPortion: public TAtomicRefCount<TDataPortion>, TMoveOnly {
55+
class TDataPortion: public TAtomicRefCount<TDataPortion>, TMoveOnly {
56+
public:
5657
struct TCsv {
5758
TCsv(TString&& data, const TString& formatString = TString())
5859
: Data(std::move(data))
@@ -71,16 +72,26 @@ class IBulkDataGenerator {
7172
TString Schema;
7273
};
7374

75+
using TDataType = std::variant<NYdb::TValue, TCsv, TArrow>;
76+
7477
template<class T>
7578
TDataPortion(const TString& table, T&& data, ui64 size)
7679
: Table(table)
77-
, Data(std::move(data))
7880
, Size(size)
81+
, Data(std::move(data))
7982
{}
8083

81-
TString Table;
82-
std::variant<NYdb::TValue, TCsv, TArrow> Data;
83-
ui64 Size;
84+
virtual ~TDataPortion() = default;
85+
virtual void SetSendResult(const NYdb::TStatus& status) {
86+
Y_UNUSED(status);
87+
}
88+
TDataType& MutableData() {
89+
return Data;
90+
}
91+
YDB_READONLY_DEF(TString, Table);
92+
YDB_READONLY(ui64, Size, 0);
93+
private:
94+
TDataType Data;
8495
};
8596

8697
using TDataPortionPtr = TIntrusivePtr<TDataPortion>;

ydb/library/workload/benchmark_base/state.cpp

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,9 @@ TGeneratorStateProcessor::TGeneratorStateProcessor(const TFsPath& path, bool cle
2525
}
2626
}
2727

28-
void TGeneratorStateProcessor::AddPortion(const TString& source, ui64 from, ui64 size) {
29-
InProcess.Get().emplace_back(TInProcessPortion{source, from, size});
30-
}
31-
32-
void TGeneratorStateProcessor::FinishPortions() {
33-
bool needSave = false;
28+
void TGeneratorStateProcessor::FinishPortion(const TString& source, ui64 from, ui64 size) {
3429
auto g = Guard(Lock);
35-
for (const auto& p: InProcess.Get()) {
36-
needSave |= StateImpl[p.Source].FinishPortion(p.From, p.Size, State[p.Source].Position);
37-
}
38-
InProcess.Get().clear();
30+
bool needSave = StateImpl[source].FinishPortion(from, size, State[source].Position);
3931
if (needSave) {
4032
Save();
4133
}

ydb/library/workload/benchmark_base/state.h

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <ydb/library/workload/abstract/workload_query_generator.h>
34
#include <ydb/library/accessor/accessor.h>
45
#include <util/generic/deque.h>
56
#include <util/generic/map.h>
@@ -18,8 +19,7 @@ class TGeneratorStateProcessor {
1819

1920
public:
2021
TGeneratorStateProcessor(const TFsPath& path, bool clear);
21-
void AddPortion(const TString& source, ui64 from, ui64 size);
22-
void FinishPortions();
22+
void FinishPortion(const TString& source, ui64 from, ui64 size);
2323
YDB_READONLY_DEF(TState, State);
2424

2525
private:
@@ -37,12 +37,6 @@ class TGeneratorStateProcessor {
3737
TMap<ui64, TThreadSourceState> ThreadsState;
3838
};
3939

40-
struct TInProcessPortion {
41-
TString Source;
42-
ui64 From;
43-
ui64 Size;
44-
};
45-
4640
private:
4741
void Load();
4842
void Save() const;
@@ -52,7 +46,25 @@ class TGeneratorStateProcessor {
5246
TFsPath TmpPath;
5347
TMap<TString, TSourceStateImpl> StateImpl;
5448
TAdaptiveLock Lock;
55-
Y_THREAD(TVector<TInProcessPortion>) InProcess;
49+
};
50+
51+
class TDataPortionWithState: public IBulkDataGenerator::TDataPortion {
52+
public:
53+
template<class T>
54+
TDataPortionWithState(TGeneratorStateProcessor* stateProcessor, const TString& table, T&& data, ui64 position, ui64 size)
55+
: TDataPortion(table, std::move(data), size)
56+
, Position(position)
57+
, StateProcessor(stateProcessor)
58+
{}
59+
60+
virtual void SetSendResult(const NYdb::TStatus& status) {
61+
if (StateProcessor && status.IsSuccess()) {
62+
StateProcessor->FinishPortion(GetTable(), Position, GetSize());
63+
}
64+
}
65+
private:
66+
ui64 Position;
67+
TGeneratorStateProcessor* StateProcessor;
5668
};
5769

5870
}

ydb/library/workload/benchmark_base/state_ut.cpp

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,38 +6,34 @@
66
Y_UNIT_TEST_SUITE(DataGeneratorState) {
77
Y_UNIT_TEST(PortionProcessing) {
88
TFsPath path;
9-
NYdbWorkload::TGeneratorStateProcessor proccessor(path);
10-
proccessor.AddPortion("test1", 0, 10);
9+
NYdbWorkload::TGeneratorStateProcessor proccessor(path, true);
1110
UNIT_ASSERT(!proccessor.GetState().contains("test1"));
12-
proccessor.FinishPortions();
11+
proccessor.FinishPortion("test1", 0, 10);
1312
UNIT_ASSERT_EQUAL(proccessor.GetState().at("test1").Position, 10);
1413
TThread t([&proccessor](){
15-
proccessor.AddPortion("test1", 10, 10);
16-
proccessor.AddPortion("test1", 50, 10);
17-
proccessor.FinishPortions();
14+
proccessor.FinishPortion("test1", 10, 10);
15+
proccessor.FinishPortion("test1", 50, 10);
1816
});
1917
t.Start();
2018
t.Join();
2119
UNIT_ASSERT_EQUAL(proccessor.GetState().at("test1").Position, 20);
22-
proccessor.AddPortion("test1", 20, 30);
23-
proccessor.FinishPortions();
20+
proccessor.FinishPortion("test1", 20, 30);
2421
UNIT_ASSERT_EQUAL(proccessor.GetState().at("test1").Position, 60);
2522
};
2623

2724
Y_UNIT_TEST(SaveLoad) {
2825
TFsPath path("test_state.json");
2926
path.DeleteIfExists();
3027

31-
NYdbWorkload::TGeneratorStateProcessor proccessor1(path);
32-
proccessor1.AddPortion("test1", 0, 10);
33-
proccessor1.AddPortion("test2", 0, 15);
34-
proccessor1.AddPortion("test3", 0, 17);
35-
proccessor1.FinishPortions();
28+
NYdbWorkload::TGeneratorStateProcessor proccessor1(path, true);
29+
proccessor1.FinishPortion("test1", 0, 10);
30+
proccessor1.FinishPortion("test2", 0, 15);
31+
proccessor1.FinishPortion("test3", 0, 17);
3632
UNIT_ASSERT_EQUAL(proccessor1.GetState().at("test1").Position, 10);
3733
UNIT_ASSERT_EQUAL(proccessor1.GetState().at("test2").Position, 15);
3834
UNIT_ASSERT_EQUAL(proccessor1.GetState().at("test3").Position, 17);
3935

40-
NYdbWorkload::TGeneratorStateProcessor proccessor2(path);
36+
NYdbWorkload::TGeneratorStateProcessor proccessor2(path, false);
4137
UNIT_ASSERT_EQUAL(proccessor2.GetState().at("test1").Position, 10);
4238
UNIT_ASSERT_EQUAL(proccessor2.GetState().at("test2").Position, 15);
4339
UNIT_ASSERT_EQUAL(proccessor2.GetState().at("test3").Position, 17);

ydb/library/workload/benchmark_base/ut/ya.make

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
UNITTEST_FOR(kikimr/tests/acceptance/olap/cli/workload_base)
1+
UNITTEST_FOR(ydb/library/workload/benchmark_base)
22
SRCS(
33
state_ut.cpp
44
)

ydb/library/workload/benchmark_base/ya.make

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,7 @@ PEERDIR(
1616
GENERATE_ENUM_SERIALIZATION(workload.h)
1717

1818
END()
19+
20+
RECURSE_FOR_TESTS(
21+
ut
22+
)

ydb/library/workload/clickbench/data_generator.cpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@ TClickbenchWorkloadDataInitializerGenerator::TDataGenerartor::TDataGenerartor(co
4545
}
4646

4747
IBulkDataGenerator::TDataPortions TClickbenchWorkloadDataInitializerGenerator::TDataGenerartor::GenerateDataPortion() {
48-
if (Owner.StateProcessor) {
49-
Owner.StateProcessor->FinishPortions();
50-
}
5148
while (true) {
5249
size_t index;
5350
TFile::TPtr file;
@@ -139,13 +136,13 @@ class TClickbenchWorkloadDataInitializerGenerator::TDataGenerartor::TCsvFileBase
139136
data.reserve(lines.size() * 10000);
140137
data << Header << Endl;
141138
data << JoinSeq("\n", lines) << Endl;
142-
if (Owner.Owner.StateProcessor) {
143-
Owner.Owner.StateProcessor->AddPortion(Path, Readed, lines.size());
144-
}
139+
const ui64 position = Readed;
145140
Readed += lines.size();
146-
return MakeIntrusive<TDataPortion>(
141+
return MakeIntrusive<TDataPortionWithState>(
142+
Owner.Owner.StateProcessor.Get(),
147143
Owner.Owner.Params.GetFullTableName(nullptr),
148144
TDataPortion::TCsv(std::move(data), Foramt),
145+
position,
149146
lines.size()
150147
);
151148
}

ydb/library/workload/tpch/data_generator.cpp

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ void TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::Append
8585
const auto path = Owner.GetFullTableName(tdefs[TableNum].name);
8686
if (Builder) {
8787
Builder->EndList();
88-
result.push_back(MakeIntrusive<TDataPortion>(path, Builder->Build(), Count));
88+
result.push_back(MakeIntrusive<TDataPortionWithState>(Owner.Owner.StateProcessor.Get(), path, Builder->Build(), Start, Count));
8989
} else if (Csv) {
90-
result.push_back(MakeIntrusive<TDataPortion>(path, TDataPortion::TCsv(std::move(Csv), TWorkloadGeneratorBase::TsvFormatString), Count));
90+
result.push_back(MakeIntrusive<TDataPortionWithState>(Owner.Owner.StateProcessor.Get(), path, TDataPortion::TCsv(std::move(Csv), TWorkloadGeneratorBase::TsvFormatString), Start, Count));
9191
}
9292
}
9393

@@ -103,9 +103,6 @@ TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TBulkDataGenerator(co
103103
{}
104104

105105
TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::GenerateDataPortion() {
106-
if (Owner.StateProcessor) {
107-
Owner.StateProcessor->FinishPortions();
108-
}
109106
TDataPortions result;
110107
if (TableSize == 0) {
111108
return result;
@@ -132,9 +129,6 @@ TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpchWo
132129
}
133130
ctxs.front().SetCount(count);
134131
ctxs.front().SetStart((tdefs[TableNum].base * Owner.GetScale() / Owner.GetProcessCount()) * Owner.GetProcessIndex() + Generated + 1);
135-
if (Owner.StateProcessor) {
136-
Owner.StateProcessor->AddPortion(TString(GetName()), Generated, count);
137-
}
138132
Generated += count;
139133
}
140134
GenerateRows(ctxs);

ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,23 +73,23 @@ TAsyncStatus TWorkloadCommandImport::TUploadCommand::SendDataPortion(NYdbWorkloa
7373
auto convertResult = [](const NTable::TAsyncBulkUpsertResult& result) {
7474
return TStatus(result.GetValueSync());
7575
};
76-
if (auto* value = std::get_if<TValue>(&portion->Data)) {
77-
return TableClient->BulkUpsert(portion->Table, std::move(*value)).Apply(convertResult);
76+
if (auto* value = std::get_if<TValue>(&portion->MutableData())) {
77+
return TableClient->BulkUpsert(portion->GetTable(), std::move(*value)).Apply(convertResult);
7878
}
7979
NRetry::TRetryOperationSettings retrySettings;
8080
retrySettings.RetryUndefined(true);
81-
retrySettings.MaxRetries(10000);
82-
if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TCsv>(&portion->Data)) {
81+
retrySettings.MaxRetries(30);
82+
if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TCsv>(&portion->MutableData())) {
8383
return TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) {
8484
NTable::TBulkUpsertSettings settings;
8585
settings.FormatSettings(value->FormatString);
86-
return client.BulkUpsert(portion->Table, NTable::EDataFormat::CSV, value->Data, TString(), settings)
86+
return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::CSV, value->Data, TString(), settings)
8787
.Apply(convertResult);
8888
}, retrySettings);
8989
}
90-
if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TArrow>(&portion->Data)) {
90+
if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TArrow>(&portion->MutableData())) {
9191
return TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) {
92-
return client.BulkUpsert(portion->Table, NTable::EDataFormat::ApacheArrow, value->Data, value->Schema)
92+
return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::ApacheArrow, value->Data, value->Schema)
9393
.Apply(convertResult);
9494
}, retrySettings);
9595
}
@@ -106,12 +106,14 @@ void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_pt
106106
return SendDataPortion(data).Apply(
107107
[ar, data, this](const TAsyncStatus& result) {
108108
const auto& res = result.GetValueSync();
109+
data->SetSendResult(res);
109110
auto guard = Guard(Lock);
110111
if (!res.IsSuccess()) {
111-
Cerr << "Bulk upset to " << data->Table << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
112+
Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
112113
AtomicIncrement(ErrorsCount);
114+
} else {
115+
Bar->AddProgress(data->GetSize());
113116
}
114-
Bar->AddProgress(data->Size);
115117
});
116118
}
117119
)
@@ -120,6 +122,9 @@ void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_pt
120122
sendings.pop_front();
121123
}
122124
}
125+
if (AtomicGet(ErrorsCount)) {
126+
break;
127+
}
123128
}
124129
NThreading::WaitAll(sendings).GetValueSync();
125130
} catch (...) {

0 commit comments

Comments
 (0)