Skip to content

Commit 463bc97

Browse files
authored
YQ-3704 RD use CH udf in json parsing (#9878)
1 parent 264fb8c commit 463bc97

File tree

6 files changed

+71
-20
lines changed

6 files changed

+71
-20
lines changed

ydb/core/fq/libs/row_dispatcher/json_parser.cpp

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,9 @@ class TJsonParser::TImpl {
230230
public:
231231
TImpl(
232232
const TVector<TString>& columns,
233+
const TVector<TString>& types,
233234
TCallback callback)
234-
: Sql(GenerateSql(columns)) {
235+
: Sql(GenerateSql(columns, types)) {
235236
auto options = NYql::NPureCalc::TProgramFactoryOptions();
236237
auto factory = NYql::NPureCalc::MakeProgramFactory(options);
237238

@@ -240,7 +241,7 @@ class TJsonParser::TImpl {
240241
TParserInputSpec(),
241242
TParserOutputSpec(MakeOutputSchema(columns)),
242243
Sql,
243-
NYql::NPureCalc::ETranslationMode::SQL
244+
NYql::NPureCalc::ETranslationMode::SExpr
244245
);
245246
LOG_ROW_DISPATCHER_DEBUG("Program created");
246247
InputConsumer = Program->Apply(MakeHolder<TParserOutputConsumer>(callback));
@@ -257,19 +258,51 @@ class TJsonParser::TImpl {
257258
}
258259

259260
private:
260-
TString GenerateSql(const TVector<TString>& columns) {
261-
TStringStream str;
262-
str << "$json = SELECT CAST(data AS Json) as `Json`, " << OffsetFieldName << " FROM Input;";
263-
str << "\nSELECT " << OffsetFieldName << ", ";
264-
for (auto it = columns.begin(); it != columns.end(); ++it) {
265-
str << R"(CAST(Unwrap(JSON_VALUE(`Json`, "$.)" << *it << "\")) as String) as "
266-
<< *it << ((it != columns.end() - 1) ? "," : "");
261+
TString GenerateSql(const TVector<TString>& columnNames, const TVector<TString>& columnTypes) {
262+
Y_ABORT_UNLESS(columnNames.size() == columnTypes.size(), "Unexpected column types size");
263+
264+
TStringStream udfOutputType;
265+
TStringStream resultType;
266+
for (size_t i = 0; i < columnNames.size(); ++i) {
267+
const TString& lastSymbol = i + 1 == columnNames.size() ? "" : " ";
268+
const TString& column = columnNames[i];
269+
const TString& type = SkipOptional(columnTypes[i]);
270+
271+
udfOutputType << "'('" << column << " (DataType '" << type << "))" << lastSymbol;
272+
resultType << "'('" << column << " (SafeCast (Member $parsed '" << column << ") $string_type))" << lastSymbol;
267273
}
268-
str << " FROM $json;";
274+
275+
TStringStream str;
276+
str << R"(
277+
(
278+
(let $string_type (DataType 'String))
279+
280+
(let $input_type (TupleType $string_type (DataType 'Uint64)))
281+
(let $output_type (TupleType (StructType )" << udfOutputType.Str() << R"() (DataType 'Uint64)))
282+
(let $udf_argument_type (TupleType $input_type (StructType) $output_type))
283+
(let $udf_callable_type (CallableType '('1) '((StreamType $output_type)) '((StreamType $input_type)) '((OptionalType (DataType 'Utf8)))))
284+
(let $udf (Udf 'ClickHouseClient.ParseFormat (Void) $udf_argument_type 'json_each_row $udf_callable_type (VoidType) '"" '()))
285+
286+
(return (Map (Apply $udf (Map (Self '0) (lambda '($input) (block '(
287+
(return '((Member $input 'data) (Member $input ')" << OffsetFieldName << R"()))
288+
))))) (lambda '($output) (block '(
289+
(let $parsed (Nth $output '0))
290+
(return (AsStruct '(')" << OffsetFieldName << R"( (Nth $output '1)) )" << resultType.Str() << R"())
291+
)))))
292+
)
293+
)";
269294
LOG_ROW_DISPATCHER_DEBUG("GenerateSql " << str.Str());
270295
return str.Str();
271296
}
272297

298+
static TString SkipOptional(TStringBuf type) {
299+
if (type.StartsWith("Optional")) {
300+
Y_ABORT_UNLESS(type.SkipPrefix("Optional<"));
301+
Y_ABORT_UNLESS(type.ChopSuffix(">"));
302+
}
303+
return TString(type);
304+
}
305+
273306
private:
274307
THolder<NYql::NPureCalc::TPushStreamProgram<TParserInputSpec, TParserOutputSpec>> Program;
275308
THolder<NYql::NPureCalc::IConsumer<TInputConsumerArg>> InputConsumer;
@@ -278,8 +311,9 @@ class TJsonParser::TImpl {
278311

279312
TJsonParser::TJsonParser(
280313
const TVector<TString>& columns,
314+
const TVector<TString>& types,
281315
TCallback callback)
282-
: Impl(std::make_unique<TJsonParser::TImpl>(columns, callback)) {
316+
: Impl(std::make_unique<TJsonParser::TImpl>(columns, types, callback)) {
283317
}
284318

285319
TJsonParser::~TJsonParser() {
@@ -295,8 +329,9 @@ TString TJsonParser::GetSql() {
295329

296330
std::unique_ptr<TJsonParser> NewJsonParser(
297331
const TVector<TString>& columns,
332+
const TVector<TString>& types,
298333
TCallback callback) {
299-
return std::unique_ptr<TJsonParser>(new TJsonParser(columns, callback));
334+
return std::unique_ptr<TJsonParser>(new TJsonParser(columns, types, callback));
300335
}
301336

302337
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/json_parser.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class TJsonParser {
1313
public:
1414
TJsonParser(
1515
const TVector<TString>& columns,
16+
const TVector<TString>& types,
1617
TCallback callback);
1718
~TJsonParser();
1819
void Push(ui64 offset, const TString& value);
@@ -25,6 +26,7 @@ class TJsonParser {
2526

2627
std::unique_ptr<TJsonParser> NewJsonParser(
2728
const TVector<TString>& columns,
29+
const TVector<TString>& types,
2830
TJsonParser::TCallback callback);
2931

3032
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,7 @@ void TTopicSession::InitParser(const NYql::NPq::NProto::TDqPqTopicSource& source
676676
NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem();
677677
Parser = NewJsonParser(
678678
GetVector(sourceParams.GetColumns()),
679+
GetVector(sourceParams.GetColumnTypes()),
679680
[actorSystem, selfId = SelfId()](ui64 offset, TList<TString>&& value){
680681
actorSystem->Send(selfId, new NFq::TEvPrivate::TEvDataParsed(offset, std::move(value)));
681682
});

ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
#include <ydb/core/testlib/actors/test_runtime.h>
77
#include <ydb/core/testlib/basics/helpers.h>
88
#include <ydb/core/testlib/actor_helpers.h>
9+
10+
#include <ydb/library/yql/public/purecalc/common/interface.h>
11+
912
#include <library/cpp/testing/unittest/registar.h>
1013

1114
namespace {
@@ -31,10 +34,19 @@ class TFixture : public NUnitTest::TBaseFixture {
3134
}
3235
}
3336

37+
void MakeParser(TVector<TString> columns, TVector<TString> types, NFq::TJsonParser::TCallback callback) {
38+
try {
39+
Parser = NFq::NewJsonParser(
40+
columns,
41+
types,
42+
callback);
43+
} catch (NYql::NPureCalc::TCompileError compileError) {
44+
UNIT_ASSERT_C(false, TStringBuilder() << "Failed to create json parser: " << compileError.what() << "\nQuery text:\n" << compileError.GetYql() << "Reason:\n" << compileError.GetIssues());
45+
}
46+
}
47+
3448
void MakeParser(TVector<TString> columns, NFq::TJsonParser::TCallback callback) {
35-
Parser = NFq::NewJsonParser(
36-
columns,
37-
callback);
49+
MakeParser(columns, TVector<TString>(columns.size(), "String"), callback);
3850
}
3951

4052
TActorSystemStub actorSystemStub;
@@ -46,11 +58,11 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) {
4658
Y_UNIT_TEST_F(Simple1, TFixture) {
4759
TList<TString> result;
4860
ui64 resultOffset;
49-
MakeParser({"a1", "a2"}, [&](ui64 offset, TList<TString>&& value){
61+
MakeParser({"a1", "a2"}, {"String", "Optional<Uint64>"}, [&](ui64 offset, TList<TString>&& value){
5062
resultOffset = offset;
5163
result = std::move(value);
5264
});
53-
Parser->Push(5, R"({"a1": "hello1", "a2": "101", "event": "event1"})");
65+
Parser->Push(5, R"({"a1": "hello1", "a2": 101, "event": "event1"})");
5466
UNIT_ASSERT_VALUES_EQUAL(5, resultOffset);
5567
UNIT_ASSERT_VALUES_EQUAL(2, result.size());
5668
UNIT_ASSERT_VALUES_EQUAL("hello1", result.front());
@@ -102,7 +114,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) {
102114
Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) {
103115

104116
MakeParser({"a2", "a1"}, [&](ui64, TList<TString>&&){ });
105-
UNIT_ASSERT_EXCEPTION_CONTAINS(Parser->Push(5, R"(ydb)"), yexception, " Failed to unwrap empty optional");
117+
UNIT_ASSERT_EXCEPTION_CONTAINS(Parser->Push(5, R"(ydb)"), yexception, "DB::ParsingException: Cannot parse input: expected '{' before: 'ydb': (at row 1)");
106118
}
107119
}
108120

ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class TFixture : public NUnitTest::TBaseFixture {
8181
settings.SetDatabase(GetDefaultPqDatabase());
8282
settings.AddColumns("dt");
8383
settings.AddColumns("value");
84-
settings.AddColumnTypes("UInt64");
84+
settings.AddColumnTypes("Uint64");
8585
settings.AddColumnTypes("String");
8686
if (!emptyPredicate) {
8787
settings.SetPredicate("WHERE true");
@@ -263,7 +263,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
263263
const std::vector<TString> data = { "not json", "noch einmal / nicht json" };
264264
PQWrite(data, topicName);
265265

266-
ExpectSessionError(ReadActorId1, "Failed to unwrap empty optional");
266+
ExpectSessionError(ReadActorId1, "DB::ParsingException: Cannot parse input: expected '{' before: 'not json': (at row 1)");
267267
StopSession(ReadActorId1, source);
268268
}
269269

ydb/core/fq/libs/row_dispatcher/ut/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ PEERDIR(
2020
ydb/library/yql/udfs/common/yson2
2121
ydb/tests/fq/pq_async_io
2222
ydb/library/yql/sql/pg_dummy
23+
ydb/library/yql/udfs/common/clickhouse/client
2324
)
2425

2526
SIZE(MEDIUM)

0 commit comments

Comments
 (0)