Skip to content

Commit 795d4a7

Browse files
authored
Merge 3996850 into 8074419
2 parents 8074419 + 3996850 commit 795d4a7

File tree

11 files changed

+416
-236
lines changed

11 files changed

+416
-236
lines changed

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

Lines changed: 47 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <ydb/library/yql/providers/common/schema/parser/yql_type_parser.h>
12
#include <ydb/library/yql/public/udf/udf_version.h>
23
#include <ydb/library/yql/public/purecalc/purecalc.h>
34
#include <ydb/library/yql/public/purecalc/io_specs/mkql/spec.h>
@@ -10,6 +11,8 @@
1011
#include <ydb/core/fq/libs/common/util.h>
1112
#include <ydb/core/fq/libs/row_dispatcher/json_filter.h>
1213

14+
#include <cxxabi.h>
15+
1316

1417
namespace {
1518

@@ -23,6 +26,12 @@ NYT::TNode CreateTypeNode(const TString& fieldType) {
2326
.Add(fieldType);
2427
}
2528

29+
NYT::TNode CreateOptionalTypeNode(const TString& fieldType) {
30+
return NYT::TNode::CreateList()
31+
.Add("OptionalType")
32+
.Add(CreateTypeNode(fieldType));
33+
}
34+
2635
void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
2736
node.Add(
2837
NYT::TNode::CreateList()
@@ -31,18 +40,29 @@ void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldTy
3140
);
3241
}
3342

34-
void AddOptionalField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
35-
node.Add(NYT::TNode::CreateList()
36-
.Add(fieldName)
37-
.Add(NYT::TNode::CreateList().Add("OptionalType").Add(CreateTypeNode(fieldType)))
43+
void AddTypedField(NYT::TNode& node, const TString& fieldName, const TString& fieldTypeYson) {
44+
NYT::TNode parsedType;
45+
Y_ENSURE(NYql::NCommon::ParseYson(parsedType, fieldTypeYson, Cerr), "Invalid field type");
46+
47+
// TODO: remove this when the re-parsing is removed from pq read actor
48+
if (parsedType == CreateTypeNode("Json")) {
49+
parsedType = CreateTypeNode("String");
50+
} else if (parsedType == CreateOptionalTypeNode("Json")) {
51+
parsedType = CreateOptionalTypeNode("String");
52+
}
53+
54+
node.Add(
55+
NYT::TNode::CreateList()
56+
.Add(fieldName)
57+
.Add(parsedType)
3858
);
3959
}
4060

41-
NYT::TNode MakeInputSchema(const TVector<TString>& columns) {
61+
NYT::TNode MakeInputSchema(const TVector<TString>& columns, const TVector<TString>& types) {
4262
auto structMembers = NYT::TNode::CreateList();
4363
AddField(structMembers, OffsetFieldName, "Uint64");
44-
for (const auto& col : columns) {
45-
AddOptionalField(structMembers, col, "String");
64+
for (size_t i = 0; i < columns.size(); ++i) {
65+
AddTypedField(structMembers, columns[i], types[i]);
4666
}
4767
return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers));
4868
}
@@ -68,7 +88,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
6888
TVector<NYT::TNode> Schemas;
6989
};
7090

71-
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>> {
91+
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>> {
7292
public:
7393
TFilterInputConsumer(
7494
const TFilterInputSpec& spec,
@@ -106,15 +126,15 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
106126
}
107127
}
108128

109-
void OnObject(std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&> values) override {
129+
void OnObject(std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&> values) override {
110130
Y_ENSURE(FieldsPositions.size() == values.second.size());
111131

112132
NKikimr::NMiniKQL::TThrowingBindTerminator bind;
113133
with_lock (Worker->GetScopedAlloc()) {
114134
auto& holderFactory = Worker->GetGraph().GetHolderFactory();
115135

116136
// TODO: use blocks here
117-
for (size_t rowId = 0; rowId < values.second.front().size(); ++rowId) {
137+
for (size_t rowId = 0; rowId < values.second.front()->size(); ++rowId) {
118138
NYql::NUdf::TUnboxedValue* items = nullptr;
119139

120140
NYql::NUdf::TUnboxedValue result = Cache.NewArray(
@@ -126,13 +146,16 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
126146

127147
size_t fieldId = 0;
128148
for (const auto& column : values.second) {
129-
items[FieldsPositions[fieldId++]] = column[rowId].data() // Check that std::string_view was initialized in json_parser
130-
? NKikimr::NMiniKQL::MakeString(column[rowId]).MakeOptional()
131-
: NKikimr::NUdf::TUnboxedValuePod();
149+
items[FieldsPositions[fieldId++]] = column->at(rowId);
132150
}
133151

134152
Worker->Push(std::move(result));
135153
}
154+
155+
// Clear cache after each object because
156+
// values allocated on another allocator and should be released
157+
Cache.Clear();
158+
Worker->GetGraph().Invalidate();
136159
}
137160
}
138161

@@ -216,7 +239,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
216239
static constexpr bool IsPartial = false;
217240
static constexpr bool SupportPushStreamMode = true;
218241

219-
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>>>;
242+
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>>;
220243

221244
static TConsumerType MakeConsumer(
222245
const TFilterInputSpec& spec,
@@ -244,12 +267,15 @@ class TJsonFilter::TImpl {
244267
const TVector<TString>& types,
245268
const TString& whereFilter,
246269
TCallback callback)
247-
: Sql(GenerateSql(columns, types, whereFilter)) {
270+
: Sql(GenerateSql(whereFilter)) {
271+
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
248272
auto factory = NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions());
249273

274+
// Program should be stateless because input values
275+
// allocated on another allocator and should be released
250276
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
251277
Program = factory->MakePushStreamProgram(
252-
TFilterInputSpec(MakeInputSchema(columns)),
278+
TFilterInputSpec(MakeInputSchema(columns, types)),
253279
TFilterOutputSpec(MakeOutputSchema()),
254280
Sql,
255281
NYql::NPureCalc::ETranslationMode::SQL
@@ -258,7 +284,7 @@ class TJsonFilter::TImpl {
258284
LOG_ROW_DISPATCHER_DEBUG("Program created");
259285
}
260286

261-
void Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values) {
287+
void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
262288
Y_ENSURE(values, "Expected non empty schema");
263289
InputConsumer->OnObject(std::make_pair(offsets, values));
264290
}
@@ -268,29 +294,9 @@ class TJsonFilter::TImpl {
268294
}
269295

270296
private:
271-
TString GenerateSql(const TVector<TString>& columnNames, const TVector<TString>& columnTypes, const TString& whereFilter) {
297+
TString GenerateSql(const TString& whereFilter) {
272298
TStringStream str;
273-
str << "$fields = SELECT ";
274-
Y_ABORT_UNLESS(columnNames.size() == columnTypes.size());
275-
str << OffsetFieldName << ", ";
276-
for (size_t i = 0; i < columnNames.size(); ++i) {
277-
TString columnType = columnTypes[i];
278-
TString columnName = NFq::EncloseAndEscapeString(columnNames[i], '`');
279-
if (columnType == "Json") {
280-
columnType = "String";
281-
} else if (columnType == "Optional<Json>") {
282-
columnType = "Optional<String>";
283-
}
284-
285-
if (columnType.StartsWith("Optional")) {
286-
str << "IF(" << columnName << " IS NOT NULL, Unwrap(CAST(" << columnName << " as " << columnType << ")), NULL)";
287-
} else {
288-
str << "Unwrap(CAST(" << columnName << " as " << columnType << "))";
289-
}
290-
str << " as " << columnName << ((i != columnNames.size() - 1) ? "," : "");
291-
}
292-
str << " FROM Input;\n";
293-
str << "$filtered = SELECT * FROM $fields " << whereFilter << ";\n";
299+
str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n";
294300

295301
str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName;
296302
str << "\"])))) as data FROM $filtered";
@@ -300,7 +306,7 @@ class TJsonFilter::TImpl {
300306

301307
private:
302308
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
303-
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>>> InputConsumer;
309+
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>> InputConsumer;
304310
const TString Sql;
305311
};
306312

@@ -315,7 +321,7 @@ TJsonFilter::TJsonFilter(
315321
TJsonFilter::~TJsonFilter() {
316322
}
317323

318-
void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values) {
324+
void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
319325
Impl->Push(offsets, values);
320326
}
321327

ydb/core/fq/libs/row_dispatcher/json_filter.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#pragma once
22

3-
#include <ydb/library/yql/public/udf/udf_data_type.h>
4-
#include <ydb/library/yql/public/udf/udf_value.h>
3+
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
54

65
namespace NFq {
76

@@ -18,7 +17,7 @@ class TJsonFilter {
1817

1918
~TJsonFilter();
2019

21-
void Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values);
20+
void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values);
2221
TString GetSql();
2322

2423
private:

0 commit comments

Comments
 (0)