Skip to content

Commit ca68997

Browse files
authored
Fix generator state work (#6219)
1 parent 6f872bd commit ca68997

File tree

10 files changed

+113
-99
lines changed

10 files changed

+113
-99
lines changed

ydb/library/workload/abstract/workload_query_generator.h

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ class IBulkDataGenerator {
5252
, Size(size)
5353
{}
5454

55-
struct TDataPortion: public TAtomicRefCount<TDataPortion>, TMoveOnly {
55+
class TDataPortion: public TAtomicRefCount<TDataPortion>, TMoveOnly {
56+
public:
5657
struct TCsv {
5758
TCsv(TString&& data, const TString& formatString = TString())
5859
: Data(std::move(data))
@@ -71,16 +72,26 @@ class IBulkDataGenerator {
7172
TString Schema;
7273
};
7374

75+
using TDataType = std::variant<NYdb::TValue, TCsv, TArrow>;
76+
7477
template<class T>
7578
TDataPortion(const TString& table, T&& data, ui64 size)
7679
: Table(table)
77-
, Data(std::move(data))
7880
, Size(size)
81+
, Data(std::move(data))
7982
{}
8083

81-
TString Table;
82-
std::variant<NYdb::TValue, TCsv, TArrow> Data;
83-
ui64 Size;
84+
virtual ~TDataPortion() = default;
85+
virtual void SetSendResult(const NYdb::TStatus& status) {
86+
Y_UNUSED(status);
87+
}
88+
TDataType& MutableData() {
89+
return Data;
90+
}
91+
YDB_READONLY_DEF(TString, Table);
92+
YDB_READONLY(ui64, Size, 0);
93+
private:
94+
TDataType Data;
8495
};
8596

8697
using TDataPortionPtr = TIntrusivePtr<TDataPortion>;

ydb/library/workload/benchmark_base/state.cpp

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
#include <library/cpp/json/json_writer.h>
44
#include <library/cpp/json/json_reader.h>
55
#include <util/stream/file.h>
6-
#include <thread>
76

87
namespace NYdbWorkload {
98

@@ -25,48 +24,28 @@ TGeneratorStateProcessor::TGeneratorStateProcessor(const TFsPath& path, bool cle
2524
}
2625
}
2726

28-
void TGeneratorStateProcessor::AddPortion(const TString& source, ui64 from, ui64 size) {
29-
InProcess.Get().emplace_back(TInProcessPortion{source, from, size});
30-
}
31-
32-
void TGeneratorStateProcessor::FinishPortions() {
33-
bool needSave = false;
27+
void TGeneratorStateProcessor::FinishPortion(const TString& source, ui64 from, ui64 size) {
3428
auto g = Guard(Lock);
35-
for (const auto& p: InProcess.Get()) {
36-
needSave |= StateImpl[p.Source].FinishPortion(p.From, p.Size, State[p.Source].Position);
37-
}
38-
InProcess.Get().clear();
29+
bool needSave = StateImpl[source].FinishPortion(from, size, State[source].Position);
3930
if (needSave) {
4031
Save();
4132
}
4233
}
4334

4435
bool TGeneratorStateProcessor::TSourceStateImpl::FinishPortion(ui64 from, ui64 size, ui64& position) {
45-
const ui64 threadId = std::hash<std::thread::id>()(std::this_thread::get_id());
46-
ThreadsState[threadId].FinishedPortions.emplace_back(TSourcePortion{from, size});
47-
TVector<TThreadSourceState::TFinishedPortions*> portions;
48-
for (auto& [t, ss]: ThreadsState) {
49-
while (!ss.FinishedPortions.empty() && ss.FinishedPortions.front().first < position) {
50-
ss.FinishedPortions.pop_front();
51-
}
52-
if (!ss.FinishedPortions.empty()) {
53-
portions.push_back(&ss.FinishedPortions);
54-
}
36+
auto i = FinishedPortions.begin();
37+
while (i != FinishedPortions.end() && i->first < from) {
38+
++i;
5539
}
56-
Y_VERIFY(!portions.empty());
57-
auto portionsCmp = [](auto l, auto r) {return l->front().first > r->front().first;};
58-
std::make_heap(portions.begin(), portions.end(), portionsCmp);
40+
FinishedPortions.emplace(i, TSourcePortion{from, size});
5941
bool result = false;
60-
while (!portions.empty() && portions.front()->front().first == position) {
42+
while (!FinishedPortions.empty() && FinishedPortions.front().first < position) {
43+
FinishedPortions.pop_front();
44+
}
45+
while (!FinishedPortions.empty() && FinishedPortions.front().first == position) {
6146
result = true;
62-
position += portions.front()->front().second;
63-
std::pop_heap(portions.begin(), portions.end(), portionsCmp);
64-
portions.back()->pop_front();
65-
if (portions.back()->empty()) {
66-
portions.pop_back();
67-
} else {
68-
std::push_heap(portions.begin(), portions.end(), portionsCmp);
69-
}
47+
position += FinishedPortions.front().second;
48+
FinishedPortions.pop_front();
7049
}
7150
return result;
7251
}
Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <ydb/library/workload/abstract/workload_query_generator.h>
34
#include <ydb/library/accessor/accessor.h>
45
#include <util/generic/deque.h>
56
#include <util/generic/map.h>
@@ -18,29 +19,17 @@ class TGeneratorStateProcessor {
1819

1920
public:
2021
TGeneratorStateProcessor(const TFsPath& path, bool clear);
21-
void AddPortion(const TString& source, ui64 from, ui64 size);
22-
void FinishPortions();
22+
void FinishPortion(const TString& source, ui64 from, ui64 size);
2323
YDB_READONLY_DEF(TState, State);
2424

2525
private:
2626
using TSourcePortion = std::pair<ui64, ui64>;
27-
struct TThreadSourceState {
28-
using TFinishedPortions = TDeque<TSourcePortion>;
29-
TFinishedPortions FinishedPortions;
30-
};
31-
3227
class TSourceStateImpl {
3328
public:
3429
bool FinishPortion(ui64 from, ui64 size, ui64& position);
3530

3631
private:
37-
TMap<ui64, TThreadSourceState> ThreadsState;
38-
};
39-
40-
struct TInProcessPortion {
41-
TString Source;
42-
ui64 From;
43-
ui64 Size;
32+
TList<TSourcePortion> FinishedPortions;
4433
};
4534

4635
private:
@@ -52,7 +41,27 @@ class TGeneratorStateProcessor {
5241
TFsPath TmpPath;
5342
TMap<TString, TSourceStateImpl> StateImpl;
5443
TAdaptiveLock Lock;
55-
Y_THREAD(TVector<TInProcessPortion>) InProcess;
44+
};
45+
46+
class TDataPortionWithState: public IBulkDataGenerator::TDataPortion {
47+
public:
48+
template<class T>
49+
TDataPortionWithState(TGeneratorStateProcessor* stateProcessor, const TString& table, const TString& stateSource, T&& data, ui64 position, ui64 size)
50+
: TDataPortion(table, std::move(data), size)
51+
, StateSource(stateSource)
52+
, Position(position)
53+
, StateProcessor(stateProcessor)
54+
{}
55+
56+
virtual void SetSendResult(const NYdb::TStatus& status) {
57+
if (StateProcessor && status.IsSuccess()) {
58+
StateProcessor->FinishPortion(StateSource, Position, GetSize());
59+
}
60+
}
61+
private:
62+
TString StateSource;
63+
ui64 Position;
64+
TGeneratorStateProcessor* StateProcessor;
5665
};
5766

5867
}

ydb/library/workload/benchmark_base/state_ut.cpp

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,38 +6,34 @@
66
Y_UNIT_TEST_SUITE(DataGeneratorState) {
77
Y_UNIT_TEST(PortionProcessing) {
88
TFsPath path;
9-
NYdbWorkload::TGeneratorStateProcessor proccessor(path);
10-
proccessor.AddPortion("test1", 0, 10);
9+
NYdbWorkload::TGeneratorStateProcessor proccessor(path, true);
1110
UNIT_ASSERT(!proccessor.GetState().contains("test1"));
12-
proccessor.FinishPortions();
11+
proccessor.FinishPortion("test1", 0, 10);
1312
UNIT_ASSERT_EQUAL(proccessor.GetState().at("test1").Position, 10);
1413
TThread t([&proccessor](){
15-
proccessor.AddPortion("test1", 10, 10);
16-
proccessor.AddPortion("test1", 50, 10);
17-
proccessor.FinishPortions();
14+
proccessor.FinishPortion("test1", 10, 10);
15+
proccessor.FinishPortion("test1", 50, 10);
1816
});
1917
t.Start();
2018
t.Join();
2119
UNIT_ASSERT_EQUAL(proccessor.GetState().at("test1").Position, 20);
22-
proccessor.AddPortion("test1", 20, 30);
23-
proccessor.FinishPortions();
20+
proccessor.FinishPortion("test1", 20, 30);
2421
UNIT_ASSERT_EQUAL(proccessor.GetState().at("test1").Position, 60);
2522
};
2623

2724
Y_UNIT_TEST(SaveLoad) {
2825
TFsPath path("test_state.json");
2926
path.DeleteIfExists();
3027

31-
NYdbWorkload::TGeneratorStateProcessor proccessor1(path);
32-
proccessor1.AddPortion("test1", 0, 10);
33-
proccessor1.AddPortion("test2", 0, 15);
34-
proccessor1.AddPortion("test3", 0, 17);
35-
proccessor1.FinishPortions();
28+
NYdbWorkload::TGeneratorStateProcessor proccessor1(path, true);
29+
proccessor1.FinishPortion("test1", 0, 10);
30+
proccessor1.FinishPortion("test2", 0, 15);
31+
proccessor1.FinishPortion("test3", 0, 17);
3632
UNIT_ASSERT_EQUAL(proccessor1.GetState().at("test1").Position, 10);
3733
UNIT_ASSERT_EQUAL(proccessor1.GetState().at("test2").Position, 15);
3834
UNIT_ASSERT_EQUAL(proccessor1.GetState().at("test3").Position, 17);
3935

40-
NYdbWorkload::TGeneratorStateProcessor proccessor2(path);
36+
NYdbWorkload::TGeneratorStateProcessor proccessor2(path, false);
4137
UNIT_ASSERT_EQUAL(proccessor2.GetState().at("test1").Position, 10);
4238
UNIT_ASSERT_EQUAL(proccessor2.GetState().at("test2").Position, 15);
4339
UNIT_ASSERT_EQUAL(proccessor2.GetState().at("test3").Position, 17);

ydb/library/workload/benchmark_base/ut/ya.make

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
UNITTEST_FOR(kikimr/tests/acceptance/olap/cli/workload_base)
1+
UNITTEST_FOR(ydb/library/workload/benchmark_base)
22
SRCS(
33
state_ut.cpp
44
)

ydb/library/workload/benchmark_base/ya.make

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,7 @@ PEERDIR(
1616
GENERATE_ENUM_SERIALIZATION(workload.h)
1717

1818
END()
19+
20+
RECURSE_FOR_TESTS(
21+
ut
22+
)

ydb/library/workload/clickbench/data_generator.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@ TClickbenchWorkloadDataInitializerGenerator::TDataGenerartor::TDataGenerartor(co
4545
}
4646

4747
IBulkDataGenerator::TDataPortions TClickbenchWorkloadDataInitializerGenerator::TDataGenerartor::GenerateDataPortion() {
48-
if (Owner.StateProcessor) {
49-
Owner.StateProcessor->FinishPortions();
50-
}
5148
while (true) {
5249
size_t index;
5350
TFile::TPtr file;
@@ -139,13 +136,14 @@ class TClickbenchWorkloadDataInitializerGenerator::TDataGenerartor::TCsvFileBase
139136
data.reserve(lines.size() * 10000);
140137
data << Header << Endl;
141138
data << JoinSeq("\n", lines) << Endl;
142-
if (Owner.Owner.StateProcessor) {
143-
Owner.Owner.StateProcessor->AddPortion(Path, Readed, lines.size());
144-
}
139+
const ui64 position = Readed;
145140
Readed += lines.size();
146-
return MakeIntrusive<TDataPortion>(
141+
return MakeIntrusive<TDataPortionWithState>(
142+
Owner.Owner.StateProcessor.Get(),
147143
Owner.Owner.Params.GetFullTableName(nullptr),
144+
Path,
148145
TDataPortion::TCsv(std::move(data), Foramt),
146+
position,
149147
lines.size()
150148
);
151149
}

ydb/library/workload/tpch/data_generator.cpp

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ ui64 TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGener
6464
return rowCount / owner.GetProcessCount() + extraRows - position;
6565
}
6666

67-
TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::TContext(const TBulkDataGenerator& owner, int tableNum)
67+
TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::TContext(const TBulkDataGenerator& owner, int tableNum, TGeneratorStateProcessor* state)
6868
: Owner(owner)
6969
, TableNum(tableNum)
70+
, State(state)
7071
{}
7172

7273
NYdb::TValueBuilder& TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::GetBuilder() {
@@ -85,9 +86,23 @@ void TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::Append
8586
const auto path = Owner.GetFullTableName(tdefs[TableNum].name);
8687
if (Builder) {
8788
Builder->EndList();
88-
result.push_back(MakeIntrusive<TDataPortion>(path, Builder->Build(), Count));
89+
result.push_back(MakeIntrusive<TDataPortionWithState>(
90+
State,
91+
path,
92+
tdefs[TableNum].name,
93+
Builder->Build(),
94+
Start - 1,
95+
Count
96+
));
8997
} else if (Csv) {
90-
result.push_back(MakeIntrusive<TDataPortion>(path, TDataPortion::TCsv(std::move(Csv), TWorkloadGeneratorBase::TsvFormatString), Count));
98+
result.push_back(MakeIntrusive<TDataPortionWithState>(
99+
State,
100+
path,
101+
tdefs[TableNum].name,
102+
TDataPortion::TCsv(std::move(Csv), TWorkloadGeneratorBase::TsvFormatString),
103+
Start - 1,
104+
Count
105+
));
91106
}
92107
}
93108

@@ -103,27 +118,26 @@ TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TBulkDataGenerator(co
103118
{}
104119

105120
TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::GenerateDataPortion() {
106-
if (Owner.StateProcessor) {
107-
Owner.StateProcessor->FinishPortions();
108-
}
109121
TDataPortions result;
110122
if (TableSize == 0) {
111123
return result;
112124
}
113125
TContexts ctxs;
114-
ctxs.emplace_back(*this, TableNum);
126+
ctxs.emplace_back(*this, TableNum, Owner.StateProcessor.Get());
115127
if (tdefs[TableNum].child != NONE) {
116-
ctxs.emplace_back(*this, tdefs[TableNum].child);
128+
ctxs.emplace_back(*this, tdefs[TableNum].child, nullptr);
117129
}
118130
with_lock(NumbersLock) {
119131
if (!Generated) {
120132
if (Owner.GetProcessCount() > 1) {
121133
DSS_HUGE e;
122134
set_state(TableNum, Owner.GetScale(), Owner.GetProcessCount(), Owner.GetProcessIndex() + 1, &e);
123135
}
124-
if (!!Owner.StateProcessor && Owner.StateProcessor->GetState().contains(GetName())) {
125-
Generated = Owner.StateProcessor->GetState().at(TString(GetName())).Position;
126-
GenSeed(TableNum, Generated);
136+
if (!!Owner.StateProcessor) {
137+
if (const auto* state = MapFindPtr(Owner.StateProcessor->GetState(), GetName())) {
138+
Generated = state->Position;
139+
GenSeed(TableNum, Generated);
140+
}
127141
}
128142
}
129143
const auto count = TableSize > Generated ? std::min(ui64(TableSize - Generated), Owner.Params.BulkSize) : 0;
@@ -132,9 +146,6 @@ TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpchWo
132146
}
133147
ctxs.front().SetCount(count);
134148
ctxs.front().SetStart((tdefs[TableNum].base * Owner.GetScale() / Owner.GetProcessCount()) * Owner.GetProcessIndex() + Generated + 1);
135-
if (Owner.StateProcessor) {
136-
Owner.StateProcessor->AddPortion(TString(GetName()), Generated, count);
137-
}
138149
Generated += count;
139150
}
140151
GenerateRows(ctxs);

ydb/library/workload/tpch/data_generator.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class TTpchWorkloadDataInitializerGenerator: public TWorkloadDataInitializerBase
3131
protected:
3232
class TContext {
3333
public:
34-
TContext(const TBulkDataGenerator& owner, int tableNum);
34+
TContext(const TBulkDataGenerator& owner, int tableNum, TGeneratorStateProcessor* state);
3535
NYdb::TValueBuilder& GetBuilder();
3636
TStringBuilder& GetCsv();
3737
void AppendPortions(TDataPortions& result);
@@ -43,6 +43,7 @@ class TTpchWorkloadDataInitializerGenerator: public TWorkloadDataInitializerBase
4343
TStringBuilder Csv;
4444
const TBulkDataGenerator& Owner;
4545
int TableNum;
46+
TGeneratorStateProcessor* State;
4647
};
4748

4849
using TContexts = TVector<TContext>;

0 commit comments

Comments
 (0)