Skip to content

Commit a5bf388

Browse files
ignoring columns with unsupported parquet and json types with type inferring (#9524)
1 parent 77a5f3a commit a5bf388

File tree

2 files changed

+53
-33
lines changed

2 files changed

+53
-33
lines changed

ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp

+9-33
Original file line numberDiff line numberDiff line change
@@ -126,32 +126,10 @@ bool ArrowToYdbType(Ydb::Type& maybeOptionalType, const arrow::DataType& type, s
126126
case arrow::Type::LIST: { // TODO: is ok?
127127
return false;
128128
}
129-
case arrow::Type::STRUCT: { // TODO: is ok?
130-
auto& structType = *resType.mutable_struct_type();
131-
for (const auto& field : type.fields()) {
132-
auto& member = *structType.add_members();
133-
auto& memberType = *member.mutable_type();
134-
if (!ArrowToYdbType(memberType, *field->type(), config)) {
135-
return false;
136-
}
137-
member.mutable_name()->assign(field->name().data(), field->name().size());
138-
}
139-
return true;
140-
}
129+
case arrow::Type::STRUCT:
141130
case arrow::Type::SPARSE_UNION:
142-
case arrow::Type::DENSE_UNION: { // TODO: is ok?
143-
auto& variant = *resType.mutable_variant_type()->mutable_struct_items();
144-
for (const auto& field : type.fields()) {
145-
auto& member = *variant.add_members();
146-
if (!ArrowToYdbType(*member.mutable_type(), *field->type(), config)) {
147-
return false;
148-
}
149-
if (field->name().empty()) {
150-
return false;
151-
}
152-
member.mutable_name()->assign(field->name().data(), field->name().size());
153-
}
154-
return true;
131+
case arrow::Type::DENSE_UNION: {
132+
return false;
155133
}
156134
case arrow::Type::DICTIONARY: // TODO: is representable?
157135
return false;
@@ -325,17 +303,15 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
325303
auto& arrowFields = std::get<ArrowFields>(mbArrowFields);
326304
std::vector<Ydb::Column> ydbFields;
327305
for (const auto& field : arrowFields) {
328-
if (field->name().empty()) {
306+
Ydb::Column column;
307+
if (!ArrowToYdbType(*column.mutable_type(), *field->type(), file.Config)) {
329308
continue;
330309
}
331-
ydbFields.emplace_back();
332-
auto& ydbField = ydbFields.back();
333-
if (!ArrowToYdbType(*ydbField.mutable_type(), *field->type(), file.Config)) {
334-
ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "couldn't convert arrow type to ydb: " << field->ToString()));
335-
RequesterId_ = {};
336-
return;
310+
if (field->name().empty()) {
311+
continue;
337312
}
338-
ydbField.mutable_name()->assign(field->name());
313+
column.mutable_name()->assign(field->name());
314+
ydbFields.push_back(column);
339315
}
340316

341317
ctx.Send(RequesterId_, new TEvInferredFileSchema(file.Path, std::move(ydbFields)));

ydb/tests/fq/s3/test_s3_0.py

+44
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,50 @@ def test_inference_null_column_name(self, kikimr, s3, client, unique_prefix):
624624
assert result_set.rows[2].items[1].int64_value == 15
625625
assert sum(kikimr.control_plane.get_metering(1)) == 10
626626

627+
@yq_v2
628+
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
629+
def test_inference_unsupported_types(self, kikimr, s3, client, unique_prefix):
630+
resource = boto3.resource(
631+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
632+
)
633+
634+
bucket = resource.Bucket("fbucket")
635+
bucket.create(ACL='public-read')
636+
bucket.objects.all().delete()
637+
638+
s3_client = boto3.client(
639+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
640+
)
641+
642+
fruits = '''{ "a" : [10, 20, 30] , "b" : { "key" : "value" }, "c" : 10 }
643+
{ "a" : [10, 20, 30] , "b" : { "key" : "value" }, "c" : 20 }
644+
{ "a" : [10, 20, 30] , "b" : { "key" : "value" }, "c" : 30 }'''
645+
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.json', ContentType='text/plain')
646+
kikimr.control_plane.wait_bootstrap(1)
647+
storage_connection_name = unique_prefix + "fruitbucket"
648+
client.create_storage_connection(storage_connection_name, "fbucket")
649+
650+
sql = f'''
651+
SELECT *
652+
FROM `{storage_connection_name}`.`fruits.json`
653+
WITH (format=json_each_row, with_infer='true');
654+
'''
655+
656+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
657+
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
658+
659+
data = client.get_result_data(query_id)
660+
result_set = data.result.result_set
661+
logging.debug(str(result_set))
662+
assert len(result_set.columns) == 1
663+
assert result_set.columns[0].name == "c"
664+
assert result_set.columns[0].type.optional_type.item.type_id == ydb.Type.INT64
665+
assert len(result_set.rows) == 3
666+
assert result_set.rows[0].items[0].int64_value == 10
667+
assert result_set.rows[1].items[0].int64_value == 20
668+
assert result_set.rows[2].items[0].int64_value == 30
669+
assert sum(kikimr.control_plane.get_metering(1)) == 10
670+
627671
@yq_all
628672
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
629673
def test_csv_with_hopping(self, kikimr, s3, client, unique_prefix):

0 commit comments

Comments
 (0)