Skip to content

YQ-3722 RD fixed nested type parsing #10300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,13 @@ class TJsonFilter::TImpl {
Y_ABORT_UNLESS(columnNames.size() == columnTypes.size());
str << OffsetFieldName << ", ";
for (size_t i = 0; i < columnNames.size(); ++i) {
str << "CAST(" << columnNames[i] << " as " << columnTypes[i] << ") as " << columnNames[i] << ((i != columnNames.size() - 1) ? "," : "");
TString columnType = columnTypes[i];
if (columnType == "Json") {
columnType = "String";
} else if (columnType == "Optional<Json>") {
columnType = "Optional<String>";
}
str << "CAST(" << columnNames[i] << " as " << columnType << ") as " << columnNames[i] << ((i != columnNames.size() - 1) ? "," : "");
}
str << " FROM Input;\n";
str << "$filtered = SELECT * FROM $fields " << whereFilter << ";\n";
Expand Down
67 changes: 51 additions & 16 deletions ydb/core/fq/libs/row_dispatcher/json_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,28 @@ void TJsonParserBuffer::Clear() {
//// TJsonParser

class TJsonParser::TImpl {
struct TColumnDescription {
std::string Name;
TString Type;
};

public:
TImpl(const TVector<TString>& columns, const TVector<TString>& types)
: ParsedValues(columns.size())
{
Y_UNUSED(types); // TODO: Will be used for UV creation
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");

Columns.reserve(columns.size());
for (const auto& column : columns) {
Columns.emplace_back(column);
for (size_t i = 0; i < columns.size(); i++) {
Columns.emplace_back(TColumnDescription{
.Name = columns[i],
.Type = SkipOptional(types[i])
});
}

ColumnsIndex.reserve(columns.size());
for (size_t i = 0; i < Columns.size(); i++) {
ColumnsIndex.emplace(std::string_view(Columns[i]), i);
for (size_t i = 0; i < columns.size(); i++) {
ColumnsIndex.emplace(std::string_view(Columns[i].Name), i);
}
}

Expand All @@ -86,6 +94,7 @@ class TJsonParser::TImpl {
simdjson::ondemand::parser parser;
parser.threaded = false;

size_t rowId = 0;
simdjson::ondemand::document_stream documents = parser.iterate_many(values, size, simdjson::dom::DEFAULT_BATCH_SIZE);
for (auto document : documents) {
for (auto item : document.get_object()) {
Expand All @@ -94,17 +103,33 @@ class TJsonParser::TImpl {
continue;
}

auto& parsedColumn = ParsedValues[it->second];
if (item.value().is_string()) {
parsedColumn.emplace_back(CreateHolderIfNeeded(
values, size, item.value().get_string().value()
));
const auto& column = Columns[it->second];

std::string_view value;
if (item.value().is_null()) {
// TODO: support optional types and create UV
continue;
} else if (column.Type == "Json") {
value = item.value().raw_json().value();
} else if (column.Type == "String" || column.Type == "Utf8") {
value = item.value().get_string().value();
} else if (item.value().is_scalar()) {
// TODO: perform type validation and create UV
value = item.value().raw_json_token().value();
} else {
parsedColumn.emplace_back(CreateHolderIfNeeded(
values, size, item.value().raw_json_token().value()
));
throw yexception() << "Failed to parse json string, expected scalar type for column '" << it->first << "' with type " << column.Type << " but got nested json, please change column type to Json.";
}

auto& parsedColumn = ParsedValues[it->second];
parsedColumn.resize(rowId);
parsedColumn.emplace_back(CreateHolderIfNeeded(values, size, value));
}
rowId++;
}
Y_ENSURE(rowId == Buffer.GetNumberValues(), "Unexpected number of json documents");

for (auto& parsedColumn : ParsedValues) {
parsedColumn.resize(Buffer.GetNumberValues());
}
return ParsedValues;
}
Expand All @@ -119,7 +144,7 @@ class TJsonParser::TImpl {
TString GetDescription() const {
TStringBuilder description = TStringBuilder() << "Columns: ";
for (const auto& column : Columns) {
description << "'" << column << "' ";
description << "'" << column.Name << "':" << column.Type << " ";
}
description << "\nBuffer size: " << Buffer.GetNumberValues() << ", finished: " << Buffer.GetFinished();
return description;
Expand All @@ -128,7 +153,7 @@ class TJsonParser::TImpl {
TString GetDebugString(const TVector<TVector<std::string_view>>& parsedValues) const {
TStringBuilder result;
for (size_t i = 0; i < Columns.size(); ++i) {
result << "Parsed column '" << Columns[i] << "': ";
result << "Parsed column '" << Columns[i].Name << "': ";
for (const auto& value : parsedValues[i]) {
result << "'" << value << "' ";
}
Expand All @@ -146,8 +171,18 @@ class TJsonParser::TImpl {
return Buffer.AddHolder(value);
}

static TString SkipOptional(const TString& type) {
if (type.StartsWith("Optional")) {
TStringBuf optionalType = type;
Y_ENSURE(optionalType.SkipPrefix("Optional<"), "Unexpected type");
Y_ENSURE(optionalType.ChopSuffix(">"), "Unexpected type");
return TString(optionalType);
}
return type;
}

private:
TVector<std::string> Columns;
TVector<TColumnDescription> Columns;
absl::flat_hash_map<std::string_view, size_t> ColumnsIndex;

TJsonParserBuffer Buffer;
Expand Down
53 changes: 52 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) {
MakeParser({"a1", "a2"});

TJsonParserBuffer& buffer = Parser->GetBuffer();
buffer.AddValue(R"({"a1": "hello1", "a2": 101, "event": "event1"})");
buffer.AddValue(R"({"a1": "hello1", "a2": "101", "event": "event1"})");
buffer.AddValue(R"({"a1": "hello1", "a2": "101", "event": "event2"})");
buffer.AddValue(R"({"a2": "101", "a1": "hello1", "event": "event3"})");

Expand All @@ -133,6 +133,57 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) {
}
}

Y_UNIT_TEST_F(MissingFields, TFixture) {
MakeParser({"a1", "a2"});

TJsonParserBuffer& buffer = Parser->GetBuffer();
buffer.AddValue(R"({"a1": "hello1", "a2": "101", "event": "event1"})");
buffer.AddValue(R"({"a1": "hello1", "event": "event2"})");
buffer.AddValue(R"({"a2": "101", "a1": null, "event": "event3"})");

ParsedValues = Parser->Parse();
ResultNumberValues = ParsedValues.front().size();
UNIT_ASSERT_VALUES_EQUAL(3, ResultNumberValues);
for (size_t i = 0; i < ResultNumberValues; ++i) {
const auto& result = GetParsedRow(i);
UNIT_ASSERT_VALUES_EQUAL_C(2, result.size(), i);
UNIT_ASSERT_VALUES_EQUAL_C(i != 2 ? "hello1" : "", result.front(), i);
UNIT_ASSERT_VALUES_EQUAL_C(i != 1 ? "101" : "", result.back(), i);
}
}

Y_UNIT_TEST_F(NestedTypes, TFixture) {
MakeParser({"nested", "a1"}, {"Optional<Json>", "String"});

TJsonParserBuffer& buffer = Parser->GetBuffer();
buffer.AddValue(R"({"a1": "hello1", "nested": {"key": "value"}})");
buffer.AddValue(R"({"a1": "hello1", "nested": ["key1", "key2"]})");

ParsedValues = Parser->Parse();
ResultNumberValues = ParsedValues.front().size();
UNIT_ASSERT_VALUES_EQUAL(2, ResultNumberValues);

const auto& nestedJson = GetParsedRow(0);
UNIT_ASSERT_VALUES_EQUAL(2, nestedJson.size());
UNIT_ASSERT_VALUES_EQUAL("{\"key\": \"value\"}", nestedJson.front());
UNIT_ASSERT_VALUES_EQUAL("hello1", nestedJson.back());

const auto& nestedList = GetParsedRow(1);
UNIT_ASSERT_VALUES_EQUAL(2, nestedList.size());
UNIT_ASSERT_VALUES_EQUAL("[\"key1\", \"key2\"]", nestedList.front());
UNIT_ASSERT_VALUES_EQUAL("hello1", nestedList.back());
}

Y_UNIT_TEST_F(StringTypeValidation, TFixture) {
MakeParser({"a1"}, {"String"});
UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(R"({"a1": 1234})"), simdjson::simdjson_error, "INCORRECT_TYPE: The JSON element does not have the requested type.");
}

Y_UNIT_TEST_F(JsonTypeValidation, TFixture) {
MakeParser({"a1"}, {"Int32"});
UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(R"({"a1": {"key": "value"}})"), yexception, "Failed to parse json string, expected scalar type for column 'a1' with type Int32 but got nested json, please change column type to Json.");
}

Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) {
MakeParser({"a2", "a1"});
UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(R"(ydb)"), simdjson::simdjson_error, "INCORRECT_TYPE: The JSON element does not have the requested type.");
Expand Down
31 changes: 31 additions & 0 deletions ydb/tests/fq/yds/test_row_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,37 @@ def test_scheme_error(self, kikimr, client):
stop_yds_query(client, query_id)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)

@yq_v1
def test_nested_types(self, kikimr, client):
client.create_yds_connection(
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
)
self.init_topics("test_nested_types")

sql = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
SELECT data FROM {YDS_CONNECTION}.`{self.input_topic}`
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data Json NOT NULL, event String NOT NULL))
WHERE event = "event1" or event = "event2";'''

query_id = start_yds_query(kikimr, client, sql)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)

data = [
'{"time": 101, "data": {"key": "value"}, "event": "event1"}',
'{"time": 102, "data": ["key1", "key2"], "event": "event2"}',
]

self.write_stream(data)
expected = [
'{"key": "value"}',
'["key1", "key2"]'
]
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected

wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
stop_yds_query(client, query_id)

@yq_v1
def test_filter(self, kikimr, client):
client.create_yds_connection(
Expand Down
Loading