Skip to content

Commit 283b204

Browse files
GrigoriyPAkardymonds
authored andcommitted
YQ-3722 RD fixed nested type parsing (ydb-platform#10300)
1 parent 7441eef commit 283b204

File tree

4 files changed

+141
-18
lines changed

4 files changed

+141
-18
lines changed

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

+7-1
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,13 @@ class TJsonFilter::TImpl {
258258
Y_ABORT_UNLESS(columnNames.size() == columnTypes.size());
259259
str << OffsetFieldName << ", ";
260260
for (size_t i = 0; i < columnNames.size(); ++i) {
261-
str << "CAST(" << columnNames[i] << " as " << columnTypes[i] << ") as " << columnNames[i] << ((i != columnNames.size() - 1) ? "," : "");
261+
TString columnType = columnTypes[i];
262+
if (columnType == "Json") {
263+
columnType = "String";
264+
} else if (columnType == "Optional<Json>") {
265+
columnType = "Optional<String>";
266+
}
267+
str << "CAST(" << columnNames[i] << " as " << columnType << ") as " << columnNames[i] << ((i != columnNames.size() - 1) ? "," : "");
262268
}
263269
str << " FROM Input;\n";
264270
str << "$filtered = SELECT * FROM $fields " << whereFilter << ";\n";

ydb/core/fq/libs/row_dispatcher/json_parser.cpp

+51-16
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,28 @@ void TJsonParserBuffer::Clear() {
5757
//// TJsonParser
5858

5959
class TJsonParser::TImpl {
60+
struct TColumnDescription {
61+
std::string Name;
62+
TString Type;
63+
};
64+
6065
public:
6166
TImpl(const TVector<TString>& columns, const TVector<TString>& types)
6267
: ParsedValues(columns.size())
6368
{
64-
Y_UNUSED(types); // TODO: Will be used for UV creation
69+
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
6570

6671
Columns.reserve(columns.size());
67-
for (const auto& column : columns) {
68-
Columns.emplace_back(column);
72+
for (size_t i = 0; i < columns.size(); i++) {
73+
Columns.emplace_back(TColumnDescription{
74+
.Name = columns[i],
75+
.Type = SkipOptional(types[i])
76+
});
6977
}
7078

7179
ColumnsIndex.reserve(columns.size());
72-
for (size_t i = 0; i < Columns.size(); i++) {
73-
ColumnsIndex.emplace(std::string_view(Columns[i]), i);
80+
for (size_t i = 0; i < columns.size(); i++) {
81+
ColumnsIndex.emplace(std::string_view(Columns[i].Name), i);
7482
}
7583
}
7684

@@ -86,6 +94,7 @@ class TJsonParser::TImpl {
8694
simdjson::ondemand::parser parser;
8795
parser.threaded = false;
8896

97+
size_t rowId = 0;
8998
simdjson::ondemand::document_stream documents = parser.iterate_many(values, size, simdjson::dom::DEFAULT_BATCH_SIZE);
9099
for (auto document : documents) {
91100
for (auto item : document.get_object()) {
@@ -94,17 +103,33 @@ class TJsonParser::TImpl {
94103
continue;
95104
}
96105

97-
auto& parsedColumn = ParsedValues[it->second];
98-
if (item.value().is_string()) {
99-
parsedColumn.emplace_back(CreateHolderIfNeeded(
100-
values, size, item.value().get_string().value()
101-
));
106+
const auto& column = Columns[it->second];
107+
108+
std::string_view value;
109+
if (item.value().is_null()) {
110+
// TODO: support optional types and create UV
111+
continue;
112+
} else if (column.Type == "Json") {
113+
value = item.value().raw_json().value();
114+
} else if (column.Type == "String" || column.Type == "Utf8") {
115+
value = item.value().get_string().value();
116+
} else if (item.value().is_scalar()) {
117+
// TODO: perform type validation and create UV
118+
value = item.value().raw_json_token().value();
102119
} else {
103-
parsedColumn.emplace_back(CreateHolderIfNeeded(
104-
values, size, item.value().raw_json_token().value()
105-
));
120+
throw yexception() << "Failed to parse json string, expected scalar type for column '" << it->first << "' with type " << column.Type << " but got nested json, please change column type to Json.";
106121
}
122+
123+
auto& parsedColumn = ParsedValues[it->second];
124+
parsedColumn.resize(rowId);
125+
parsedColumn.emplace_back(CreateHolderIfNeeded(values, size, value));
107126
}
127+
rowId++;
128+
}
129+
Y_ENSURE(rowId == Buffer.GetNumberValues(), "Unexpected number of json documents");
130+
131+
for (auto& parsedColumn : ParsedValues) {
132+
parsedColumn.resize(Buffer.GetNumberValues());
108133
}
109134
return ParsedValues;
110135
}
@@ -119,7 +144,7 @@ class TJsonParser::TImpl {
119144
TString GetDescription() const {
120145
TStringBuilder description = TStringBuilder() << "Columns: ";
121146
for (const auto& column : Columns) {
122-
description << "'" << column << "' ";
147+
description << "'" << column.Name << "':" << column.Type << " ";
123148
}
124149
description << "\nBuffer size: " << Buffer.GetNumberValues() << ", finished: " << Buffer.GetFinished();
125150
return description;
@@ -128,7 +153,7 @@ class TJsonParser::TImpl {
128153
TString GetDebugString(const TVector<TVector<std::string_view>>& parsedValues) const {
129154
TStringBuilder result;
130155
for (size_t i = 0; i < Columns.size(); ++i) {
131-
result << "Parsed column '" << Columns[i] << "': ";
156+
result << "Parsed column '" << Columns[i].Name << "': ";
132157
for (const auto& value : parsedValues[i]) {
133158
result << "'" << value << "' ";
134159
}
@@ -146,8 +171,18 @@ class TJsonParser::TImpl {
146171
return Buffer.AddHolder(value);
147172
}
148173

174+
static TString SkipOptional(const TString& type) {
175+
if (type.StartsWith("Optional")) {
176+
TStringBuf optionalType = type;
177+
Y_ENSURE(optionalType.SkipPrefix("Optional<"), "Unexpected type");
178+
Y_ENSURE(optionalType.ChopSuffix(">"), "Unexpected type");
179+
return TString(optionalType);
180+
}
181+
return type;
182+
}
183+
149184
private:
150-
TVector<std::string> Columns;
185+
TVector<TColumnDescription> Columns;
151186
absl::flat_hash_map<std::string_view, size_t> ColumnsIndex;
152187

153188
TJsonParserBuffer Buffer;

ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp

+52-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) {
118118
MakeParser({"a1", "a2"});
119119

120120
TJsonParserBuffer& buffer = Parser->GetBuffer();
121-
buffer.AddValue(R"({"a1": "hello1", "a2": 101, "event": "event1"})");
121+
buffer.AddValue(R"({"a1": "hello1", "a2": "101", "event": "event1"})");
122122
buffer.AddValue(R"({"a1": "hello1", "a2": "101", "event": "event2"})");
123123
buffer.AddValue(R"({"a2": "101", "a1": "hello1", "event": "event3"})");
124124

@@ -133,6 +133,57 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) {
133133
}
134134
}
135135

136+
Y_UNIT_TEST_F(MissingFields, TFixture) {
137+
MakeParser({"a1", "a2"});
138+
139+
TJsonParserBuffer& buffer = Parser->GetBuffer();
140+
buffer.AddValue(R"({"a1": "hello1", "a2": "101", "event": "event1"})");
141+
buffer.AddValue(R"({"a1": "hello1", "event": "event2"})");
142+
buffer.AddValue(R"({"a2": "101", "a1": null, "event": "event3"})");
143+
144+
ParsedValues = Parser->Parse();
145+
ResultNumberValues = ParsedValues.front().size();
146+
UNIT_ASSERT_VALUES_EQUAL(3, ResultNumberValues);
147+
for (size_t i = 0; i < ResultNumberValues; ++i) {
148+
const auto& result = GetParsedRow(i);
149+
UNIT_ASSERT_VALUES_EQUAL_C(2, result.size(), i);
150+
UNIT_ASSERT_VALUES_EQUAL_C(i != 2 ? "hello1" : "", result.front(), i);
151+
UNIT_ASSERT_VALUES_EQUAL_C(i != 1 ? "101" : "", result.back(), i);
152+
}
153+
}
154+
155+
Y_UNIT_TEST_F(NestedTypes, TFixture) {
156+
MakeParser({"nested", "a1"}, {"Optional<Json>", "String"});
157+
158+
TJsonParserBuffer& buffer = Parser->GetBuffer();
159+
buffer.AddValue(R"({"a1": "hello1", "nested": {"key": "value"}})");
160+
buffer.AddValue(R"({"a1": "hello1", "nested": ["key1", "key2"]})");
161+
162+
ParsedValues = Parser->Parse();
163+
ResultNumberValues = ParsedValues.front().size();
164+
UNIT_ASSERT_VALUES_EQUAL(2, ResultNumberValues);
165+
166+
const auto& nestedJson = GetParsedRow(0);
167+
UNIT_ASSERT_VALUES_EQUAL(2, nestedJson.size());
168+
UNIT_ASSERT_VALUES_EQUAL("{\"key\": \"value\"}", nestedJson.front());
169+
UNIT_ASSERT_VALUES_EQUAL("hello1", nestedJson.back());
170+
171+
const auto& nestedList = GetParsedRow(1);
172+
UNIT_ASSERT_VALUES_EQUAL(2, nestedList.size());
173+
UNIT_ASSERT_VALUES_EQUAL("[\"key1\", \"key2\"]", nestedList.front());
174+
UNIT_ASSERT_VALUES_EQUAL("hello1", nestedList.back());
175+
}
176+
177+
Y_UNIT_TEST_F(StringTypeValidation, TFixture) {
178+
MakeParser({"a1"}, {"String"});
179+
UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(R"({"a1": 1234})"), simdjson::simdjson_error, "INCORRECT_TYPE: The JSON element does not have the requested type.");
180+
}
181+
182+
Y_UNIT_TEST_F(JsonTypeValidation, TFixture) {
183+
MakeParser({"a1"}, {"Int32"});
184+
UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(R"({"a1": {"key": "value"}})"), yexception, "Failed to parse json string, expected scalar type for column 'a1' with type Int32 but got nested json, please change column type to Json.");
185+
}
186+
136187
Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) {
137188
MakeParser({"a2", "a1"});
138189
UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(R"(ydb)"), simdjson::simdjson_error, "INCORRECT_TYPE: The JSON element does not have the requested type.");

ydb/tests/fq/yds/test_row_dispatcher.py

+31
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,37 @@ def test_scheme_error(self, kikimr, client):
218218
stop_yds_query(client, query_id)
219219
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
220220

221+
@yq_v1
222+
def test_nested_types(self, kikimr, client):
223+
client.create_yds_connection(
224+
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
225+
)
226+
self.init_topics("test_nested_types")
227+
228+
sql = Rf'''
229+
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
230+
SELECT data FROM {YDS_CONNECTION}.`{self.input_topic}`
231+
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data Json NOT NULL, event String NOT NULL))
232+
WHERE event = "event1" or event = "event2";'''
233+
234+
query_id = start_yds_query(kikimr, client, sql)
235+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
236+
237+
data = [
238+
'{"time": 101, "data": {"key": "value"}, "event": "event1"}',
239+
'{"time": 102, "data": ["key1", "key2"], "event": "event2"}',
240+
]
241+
242+
self.write_stream(data)
243+
expected = [
244+
'{"key": "value"}',
245+
'["key1", "key2"]'
246+
]
247+
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
248+
249+
wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
250+
stop_yds_query(client, query_id)
251+
221252
@yq_v1
222253
def test_filter(self, kikimr, client):
223254
client.create_yds_connection(

0 commit comments

Comments
 (0)