Skip to content

Commit eca5b6e

Browse files
authored
YQ-3722 RD added messages accumulating (#10380)
1 parent 11f9117 commit eca5b6e

File tree

9 files changed

+248
-158
lines changed

9 files changed

+248
-158
lines changed

ydb/core/fq/libs/config/protos/row_dispatcher.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,18 @@ message TRowDispatcherCoordinatorConfig {
1313
string CoordinationNodePath = 2;
1414
bool LocalMode = 3; // Use only local row_dispatcher.
1515
}
16+
17+
message TJsonParserConfig {
18+
uint64 BatchSizeBytes = 1;
19+
uint64 BatchCreationTimeoutMs = 2;
20+
}
21+
1622
message TRowDispatcherConfig {
1723
bool Enabled = 1;
1824
uint64 TimeoutBeforeStartSessionSec = 2;
1925
uint64 SendStatusPeriodSec = 3;
2026
uint64 MaxSessionUsedMemory = 4;
2127
bool WithoutConsumer = 5;
28+
TJsonParserConfig JsonParser = 7;
2229
TRowDispatcherCoordinatorConfig Coordinator = 6;
2330
}

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
5454
TVector<NYT::TNode> Schemas;
5555
};
5656

57-
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, const TVector<TVector<std::string_view>>&>> {
57+
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>> {
5858
public:
5959
TFilterInputConsumer(
6060
const TFilterInputSpec& spec,
@@ -92,7 +92,7 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, c
9292
}
9393
}
9494

95-
void OnObject(std::pair<ui64, const TVector<TVector<std::string_view>>&> values) override {
95+
void OnObject(std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&> values) override {
9696
Y_ENSURE(FieldsPositions.size() == values.second.size());
9797

9898
NKikimr::NMiniKQL::TThrowingBindTerminator bind;
@@ -108,7 +108,7 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, c
108108
static_cast<ui32>(values.second.size() + 1),
109109
items);
110110

111-
items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(values.first++);
111+
items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(values.first[rowId]);
112112

113113
size_t fieldId = 0;
114114
for (const auto& column : values.second) {
@@ -200,7 +200,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
200200
static constexpr bool IsPartial = false;
201201
static constexpr bool SupportPushStreamMode = true;
202202

203-
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, const TVector<TVector<std::string_view>>&>>>;
203+
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>>>;
204204

205205
static TConsumerType MakeConsumer(
206206
const TFilterInputSpec& spec,
@@ -242,9 +242,9 @@ class TJsonFilter::TImpl {
242242
LOG_ROW_DISPATCHER_DEBUG("Program created");
243243
}
244244

245-
void Push(ui64 offset, const TVector<TVector<std::string_view>>& values) {
245+
void Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values) {
246246
Y_ENSURE(values, "Expected non empty schema");
247-
InputConsumer->OnObject(std::make_pair(offset, values));
247+
InputConsumer->OnObject(std::make_pair(offsets, values));
248248
}
249249

250250
TString GetSql() const {
@@ -277,7 +277,7 @@ class TJsonFilter::TImpl {
277277

278278
private:
279279
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
280-
THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, const TVector<TVector<std::string_view>>&>>> InputConsumer;
280+
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>>> InputConsumer;
281281
const TString Sql;
282282
};
283283

@@ -292,8 +292,8 @@ TJsonFilter::TJsonFilter(
292292
TJsonFilter::~TJsonFilter() {
293293
}
294294

295-
void TJsonFilter::Push(ui64 offset, const TVector<TVector<std::string_view>>& values) {
296-
Impl->Push(offset, values);
295+
void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values) {
296+
Impl->Push(offsets, values);
297297
}
298298

299299
TString TJsonFilter::GetSql() {

ydb/core/fq/libs/row_dispatcher/json_filter.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class TJsonFilter {
1818

1919
~TJsonFilter();
2020

21-
void Push(ui64 offset, const TVector<TVector<std::string_view>>& values);
21+
void Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values);
2222
TString GetSql();
2323

2424
private:

ydb/core/fq/libs/row_dispatcher/json_parser.cpp

Lines changed: 126 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -10,49 +10,72 @@ namespace {
1010

1111
TString LogPrefix = "JsonParser: ";
1212

13-
} // anonymous namespace
13+
struct TJsonParserBuffer {
14+
size_t NumberValues = 0;
15+
bool Finished = false;
16+
TInstant CreationStartTime = TInstant::Now();
17+
TVector<ui64> Offsets = {};
18+
19+
bool IsReady() const {
20+
return !Finished && NumberValues > 0;
21+
}
1422

15-
namespace NFq {
23+
size_t GetSize() const {
24+
return Values.size();
25+
}
1626

17-
//// TParserBuffer
27+
void Reserve(size_t size, size_t numberValues) {
28+
Values.reserve(2 * (size + simdjson::SIMDJSON_PADDING));
29+
Offsets.reserve(numberValues);
30+
}
1831

19-
TJsonParserBuffer::TJsonParserBuffer()
20-
: NumberValues(0)
21-
, Finished(false)
22-
{}
32+
void AddMessages(const TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>& messages) {
33+
Y_ENSURE(!Finished, "Cannot add messages into finished buffer");
2334

24-
void TJsonParserBuffer::Reserve(size_t size) {
25-
Y_ENSURE(!Finished, "Cannot reserve finished buffer");
26-
Values.reserve(2 * (size + simdjson::SIMDJSON_PADDING));
27-
}
35+
size_t messagesSize = 0;
36+
for (const auto& message : messages) {
37+
messagesSize += message.GetData().size();
38+
}
2839

29-
void TJsonParserBuffer::AddValue(const TString& value) {
30-
Y_ENSURE(!Finished, "Cannot add value into finished buffer");
31-
NumberValues++;
32-
Values << value;
33-
}
40+
NumberValues += messages.size();
41+
Reserve(Values.size() + messagesSize, NumberValues);
42+
for (const auto& message : messages) {
43+
Values << message.GetData();
44+
Offsets.emplace_back(message.GetOffset());
45+
}
46+
}
3447

35-
std::string_view TJsonParserBuffer::AddHolder(std::string_view value) {
36-
Y_ENSURE(Values.size() + value.size() <= Values.capacity(), "Requested too large holders");
37-
const size_t startPos = Values.size();
38-
Values << value;
39-
return std::string_view(Values).substr(startPos, value.length());
40-
}
48+
std::string_view AddHolder(std::string_view value) {
49+
Y_ENSURE(Values.size() + value.size() <= Values.capacity(), "Requested too large holders");
50+
const size_t startPos = Values.size();
51+
Values << value;
52+
return std::string_view(Values).substr(startPos, value.length());
53+
}
4154

42-
std::pair<const char*, size_t> TJsonParserBuffer::Finish() {
43-
Y_ENSURE(!Finished, "Cannot finish buffer twice");
44-
Finished = true;
45-
Values << TString(simdjson::SIMDJSON_PADDING, ' ');
46-
Values.reserve(2 * Values.size());
47-
return {Values.data(), Values.size()};
48-
}
55+
std::pair<const char*, size_t> Finish() {
56+
Y_ENSURE(!Finished, "Cannot finish buffer twice");
57+
Finished = true;
58+
Values << TString(simdjson::SIMDJSON_PADDING, ' ');
59+
Values.reserve(2 * Values.size());
60+
return {Values.data(), Values.size()};
61+
}
4962

50-
void TJsonParserBuffer::Clear() {
51-
Y_ENSURE(Finished, "Cannot clear not finished buffer");
52-
NumberValues = 0;
53-
Finished = false;
54-
Values.clear();
55-
}
63+
void Clear() {
64+
Y_ENSURE(Finished, "Cannot clear not finished buffer");
65+
NumberValues = 0;
66+
Finished = false;
67+
CreationStartTime = TInstant::Now();
68+
Values.clear();
69+
Offsets.clear();
70+
}
71+
72+
private:
73+
TStringBuilder Values = {};
74+
};
75+
76+
} // anonymous namespace
77+
78+
namespace NFq {
5679

5780
//// TJsonParser
5881

@@ -63,10 +86,13 @@ class TJsonParser::TImpl {
6386
};
6487

6588
public:
66-
TImpl(const TVector<TString>& columns, const TVector<TString>& types)
67-
: ParsedValues(columns.size())
89+
TImpl(const TVector<TString>& columns, const TVector<TString>& types, ui64 batchSize, TDuration batchCreationTimeout)
90+
: BatchSize(batchSize)
91+
, BatchCreationTimeout(batchCreationTimeout)
92+
, ParsedValues(columns.size())
6893
{
6994
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
95+
LOG_ROW_DISPATCHER_INFO("Simdjson active implementation " << simdjson::get_active_implementation()->name());
7096

7197
Columns.reserve(columns.size());
7298
for (size_t i = 0; i < columns.size(); i++) {
@@ -80,22 +106,51 @@ class TJsonParser::TImpl {
80106
for (size_t i = 0; i < columns.size(); i++) {
81107
ColumnsIndex.emplace(std::string_view(Columns[i].Name), i);
82108
}
109+
110+
Buffer.Reserve(BatchSize, 1);
111+
Parser.threaded = false;
112+
}
113+
114+
bool IsReady() const {
115+
return Buffer.IsReady() && (Buffer.GetSize() >= BatchSize || TInstant::Now() - Buffer.CreationStartTime >= BatchCreationTimeout);
116+
}
117+
118+
TInstant GetCreationDeadline() const {
119+
return Buffer.IsReady() ? Buffer.CreationStartTime + BatchCreationTimeout : TInstant::Zero();
120+
}
121+
122+
size_t GetNumberValues() const {
123+
return Buffer.IsReady() ? Buffer.NumberValues : 0;
124+
}
125+
126+
const TVector<ui64>& GetOffsets() {
127+
return Buffer.Offsets;
128+
}
129+
130+
void AddMessages(const TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>& messages) {
131+
if (messages.empty()) {
132+
return;
133+
}
134+
135+
if (Buffer.Finished) {
136+
Buffer.Clear();
137+
}
138+
Buffer.AddMessages(messages);
83139
}
84140

85141
const TVector<TVector<std::string_view>>& Parse() {
142+
Y_ENSURE(Buffer.IsReady(), "Nothing to parse");
143+
86144
const auto [values, size] = Buffer.Finish();
87145
LOG_ROW_DISPATCHER_TRACE("Parse values:\n" << values);
88146

89147
for (auto& parsedColumn : ParsedValues) {
90148
parsedColumn.clear();
91-
parsedColumn.reserve(Buffer.GetNumberValues());
149+
parsedColumn.reserve(Buffer.NumberValues);
92150
}
93151

94-
simdjson::ondemand::parser parser;
95-
parser.threaded = false;
96-
97152
size_t rowId = 0;
98-
simdjson::ondemand::document_stream documents = parser.iterate_many(values, size, simdjson::dom::DEFAULT_BATCH_SIZE);
153+
simdjson::ondemand::document_stream documents = Parser.iterate_many(values, size, simdjson::dom::DEFAULT_BATCH_SIZE);
99154
for (auto document : documents) {
100155
for (auto item : document.get_object()) {
101156
const auto it = ColumnsIndex.find(item.escaped_key().value());
@@ -126,27 +181,20 @@ class TJsonParser::TImpl {
126181
}
127182
rowId++;
128183
}
129-
Y_ENSURE(rowId == Buffer.GetNumberValues(), "Unexpected number of json documents");
184+
Y_ENSURE(rowId == Buffer.NumberValues, "Unexpected number of json documents");
130185

131186
for (auto& parsedColumn : ParsedValues) {
132-
parsedColumn.resize(Buffer.GetNumberValues());
187+
parsedColumn.resize(Buffer.NumberValues);
133188
}
134189
return ParsedValues;
135190
}
136191

137-
TJsonParserBuffer& GetBuffer() {
138-
if (Buffer.GetFinished()) {
139-
Buffer.Clear();
140-
}
141-
return Buffer;
142-
}
143-
144192
TString GetDescription() const {
145193
TStringBuilder description = TStringBuilder() << "Columns: ";
146194
for (const auto& column : Columns) {
147195
description << "'" << column.Name << "':" << column.Type << " ";
148196
}
149-
description << "\nBuffer size: " << Buffer.GetNumberValues() << ", finished: " << Buffer.GetFinished();
197+
description << "\nNumber values in buffer: " << Buffer.NumberValues << ", buffer size: " << Buffer.GetSize() << ", finished: " << Buffer.Finished;
150198
return description;
151199
}
152200

@@ -182,22 +230,42 @@ class TJsonParser::TImpl {
182230
}
183231

184232
private:
233+
const ui64 BatchSize;
234+
const TDuration BatchCreationTimeout;
185235
TVector<TColumnDescription> Columns;
186236
absl::flat_hash_map<std::string_view, size_t> ColumnsIndex;
187237

188238
TJsonParserBuffer Buffer;
239+
simdjson::ondemand::parser Parser;
240+
189241
TVector<TVector<std::string_view>> ParsedValues;
190242
};
191243

192-
TJsonParser::TJsonParser(const TVector<TString>& columns, const TVector<TString>& types)
193-
: Impl(std::make_unique<TJsonParser::TImpl>(columns, types))
244+
TJsonParser::TJsonParser(const TVector<TString>& columns, const TVector<TString>& types, ui64 batchSize, TDuration batchCreationTimeout)
245+
: Impl(std::make_unique<TJsonParser::TImpl>(columns, types, batchSize, batchCreationTimeout))
194246
{}
195247

196248
TJsonParser::~TJsonParser() {
197249
}
198250

199-
TJsonParserBuffer& TJsonParser::GetBuffer() {
200-
return Impl->GetBuffer();
251+
void TJsonParser::AddMessages(const TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>& messages) {
252+
Impl->AddMessages(messages);
253+
}
254+
255+
bool TJsonParser::IsReady() const {
256+
return Impl->IsReady();
257+
}
258+
259+
TInstant TJsonParser::GetCreationDeadline() const {
260+
return Impl->GetCreationDeadline();
261+
}
262+
263+
size_t TJsonParser::GetNumberValues() const {
264+
return Impl->GetNumberValues();
265+
}
266+
267+
const TVector<ui64>& TJsonParser::GetOffsets() const {
268+
return Impl->GetOffsets();
201269
}
202270

203271
const TVector<TVector<std::string_view>>& TJsonParser::Parse() {
@@ -212,8 +280,8 @@ TString TJsonParser::GetDebugString(const TVector<TVector<std::string_view>>& pa
212280
return Impl->GetDebugString(parsedValues);
213281
}
214282

215-
std::unique_ptr<TJsonParser> NewJsonParser(const TVector<TString>& columns, const TVector<TString>& types) {
216-
return std::unique_ptr<TJsonParser>(new TJsonParser(columns, types));
283+
std::unique_ptr<TJsonParser> NewJsonParser(const TVector<TString>& columns, const TVector<TString>& types, ui64 batchSize, TDuration batchCreationTimeout) {
284+
return std::unique_ptr<TJsonParser>(new TJsonParser(columns, types, batchSize, batchCreationTimeout));
217285
}
218286

219287
} // namespace NFq

0 commit comments

Comments
 (0)