Skip to content

Commit 5ff44aa

Browse files
authored
Merge 1560482 into cb09349
2 parents cb09349 + 1560482 commit 5ff44aa

File tree

11 files changed

+432
-238
lines changed

11 files changed

+432
-238
lines changed

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

Lines changed: 45 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <ydb/library/yql/providers/common/schema/parser/yql_type_parser.h>
12
#include <ydb/library/yql/public/udf/udf_version.h>
23
#include <ydb/library/yql/public/purecalc/purecalc.h>
34
#include <ydb/library/yql/public/purecalc/io_specs/mkql/spec.h>
@@ -10,7 +11,6 @@
1011
#include <ydb/core/fq/libs/common/util.h>
1112
#include <ydb/core/fq/libs/row_dispatcher/json_filter.h>
1213

13-
1414
namespace {
1515

1616
using TCallback = NFq::TJsonFilter::TCallback;
@@ -23,6 +23,12 @@ NYT::TNode CreateTypeNode(const TString& fieldType) {
2323
.Add(fieldType);
2424
}
2525

26+
NYT::TNode CreateOptionalTypeNode(const TString& fieldType) {
27+
return NYT::TNode::CreateList()
28+
.Add("OptionalType")
29+
.Add(CreateTypeNode(fieldType));
30+
}
31+
2632
void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
2733
node.Add(
2834
NYT::TNode::CreateList()
@@ -31,18 +37,29 @@ void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldTy
3137
);
3238
}
3339

34-
void AddOptionalField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
35-
node.Add(NYT::TNode::CreateList()
36-
.Add(fieldName)
37-
.Add(NYT::TNode::CreateList().Add("OptionalType").Add(CreateTypeNode(fieldType)))
40+
void AddTypedField(NYT::TNode& node, const TString& fieldName, const TString& fieldTypeYson) {
41+
NYT::TNode parsedType;
42+
Y_ENSURE(NYql::NCommon::ParseYson(parsedType, fieldTypeYson, Cerr), "Invalid field type");
43+
44+
// TODO: remove this when the re-parsing is removed from pq read actor
45+
if (parsedType == CreateTypeNode("Json")) {
46+
parsedType = CreateTypeNode("String");
47+
} else if (parsedType == CreateOptionalTypeNode("Json")) {
48+
parsedType = CreateOptionalTypeNode("String");
49+
}
50+
51+
node.Add(
52+
NYT::TNode::CreateList()
53+
.Add(fieldName)
54+
.Add(parsedType)
3855
);
3956
}
4057

41-
NYT::TNode MakeInputSchema(const TVector<TString>& columns) {
58+
NYT::TNode MakeInputSchema(const TVector<TString>& columns, const TVector<TString>& types) {
4259
auto structMembers = NYT::TNode::CreateList();
4360
AddField(structMembers, OffsetFieldName, "Uint64");
44-
for (const auto& col : columns) {
45-
AddOptionalField(structMembers, col, "String");
61+
for (size_t i = 0; i < columns.size(); ++i) {
62+
AddTypedField(structMembers, columns[i], types[i]);
4663
}
4764
return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers));
4865
}
@@ -68,7 +85,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
6885
TVector<NYT::TNode> Schemas;
6986
};
7087

71-
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>> {
88+
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>> {
7289
public:
7390
TFilterInputConsumer(
7491
const TFilterInputSpec& spec,
@@ -106,15 +123,15 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
106123
}
107124
}
108125

109-
void OnObject(std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&> values) override {
126+
void OnObject(std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&> values) override {
110127
Y_ENSURE(FieldsPositions.size() == values.second.size());
111128

112129
NKikimr::NMiniKQL::TThrowingBindTerminator bind;
113130
with_lock (Worker->GetScopedAlloc()) {
114131
auto& holderFactory = Worker->GetGraph().GetHolderFactory();
115132

116133
// TODO: use blocks here
117-
for (size_t rowId = 0; rowId < values.second.front().size(); ++rowId) {
134+
for (size_t rowId = 0; rowId < values.second.front()->size(); ++rowId) {
118135
NYql::NUdf::TUnboxedValue* items = nullptr;
119136

120137
NYql::NUdf::TUnboxedValue result = Cache.NewArray(
@@ -126,13 +143,16 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
126143

127144
size_t fieldId = 0;
128145
for (const auto& column : values.second) {
129-
items[FieldsPositions[fieldId++]] = column[rowId].data() // Check that std::string_view was initialized in json_parser
130-
? NKikimr::NMiniKQL::MakeString(column[rowId]).MakeOptional()
131-
: NKikimr::NUdf::TUnboxedValuePod();
146+
items[FieldsPositions[fieldId++]] = column->at(rowId);
132147
}
133148

134149
Worker->Push(std::move(result));
135150
}
151+
152+
// Clear cache after each object because
153+
// values allocated on another allocator and should be released
154+
Cache.Clear();
155+
Worker->GetGraph().Invalidate();
136156
}
137157
}
138158

@@ -216,7 +236,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
216236
static constexpr bool IsPartial = false;
217237
static constexpr bool SupportPushStreamMode = true;
218238

219-
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>>>;
239+
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>>;
220240

221241
static TConsumerType MakeConsumer(
222242
const TFilterInputSpec& spec,
@@ -244,12 +264,15 @@ class TJsonFilter::TImpl {
244264
const TVector<TString>& types,
245265
const TString& whereFilter,
246266
TCallback callback)
247-
: Sql(GenerateSql(columns, types, whereFilter)) {
267+
: Sql(GenerateSql(whereFilter)) {
268+
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
248269
auto factory = NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions());
249270

271+
// Program should be stateless because input values
272+
// allocated on another allocator and should be released
250273
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
251274
Program = factory->MakePushStreamProgram(
252-
TFilterInputSpec(MakeInputSchema(columns)),
275+
TFilterInputSpec(MakeInputSchema(columns, types)),
253276
TFilterOutputSpec(MakeOutputSchema()),
254277
Sql,
255278
NYql::NPureCalc::ETranslationMode::SQL
@@ -258,7 +281,7 @@ class TJsonFilter::TImpl {
258281
LOG_ROW_DISPATCHER_DEBUG("Program created");
259282
}
260283

261-
void Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values) {
284+
void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
262285
Y_ENSURE(values, "Expected non empty schema");
263286
InputConsumer->OnObject(std::make_pair(offsets, values));
264287
}
@@ -268,29 +291,9 @@ class TJsonFilter::TImpl {
268291
}
269292

270293
private:
271-
TString GenerateSql(const TVector<TString>& columnNames, const TVector<TString>& columnTypes, const TString& whereFilter) {
294+
TString GenerateSql(const TString& whereFilter) {
272295
TStringStream str;
273-
str << "$fields = SELECT ";
274-
Y_ABORT_UNLESS(columnNames.size() == columnTypes.size());
275-
str << OffsetFieldName << ", ";
276-
for (size_t i = 0; i < columnNames.size(); ++i) {
277-
TString columnType = columnTypes[i];
278-
TString columnName = NFq::EncloseAndEscapeString(columnNames[i], '`');
279-
if (columnType == "Json") {
280-
columnType = "String";
281-
} else if (columnType == "Optional<Json>") {
282-
columnType = "Optional<String>";
283-
}
284-
285-
if (columnType.StartsWith("Optional")) {
286-
str << "IF(" << columnName << " IS NOT NULL, Unwrap(CAST(" << columnName << " as " << columnType << ")), NULL)";
287-
} else {
288-
str << "Unwrap(CAST(" << columnName << " as " << columnType << "))";
289-
}
290-
str << " as " << columnName << ((i != columnNames.size() - 1) ? "," : "");
291-
}
292-
str << " FROM Input;\n";
293-
str << "$filtered = SELECT * FROM $fields " << whereFilter << ";\n";
296+
str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n";
294297

295298
str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName;
296299
str << "\"])))) as data FROM $filtered";
@@ -300,7 +303,7 @@ class TJsonFilter::TImpl {
300303

301304
private:
302305
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
303-
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>>> InputConsumer;
306+
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>> InputConsumer;
304307
const TString Sql;
305308
};
306309

@@ -315,7 +318,7 @@ TJsonFilter::TJsonFilter(
315318
TJsonFilter::~TJsonFilter() {
316319
}
317320

318-
void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values) {
321+
void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
319322
Impl->Push(offsets, values);
320323
}
321324

ydb/core/fq/libs/row_dispatcher/json_filter.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#pragma once
22

3-
#include <ydb/library/yql/public/udf/udf_data_type.h>
4-
#include <ydb/library/yql/public/udf/udf_value.h>
3+
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
54

65
namespace NFq {
76

@@ -18,7 +17,7 @@ class TJsonFilter {
1817

1918
~TJsonFilter();
2019

21-
void Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values);
20+
void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values);
2221
TString GetSql();
2322

2423
private:

0 commit comments

Comments
 (0)