diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index ecf72445a11c..1cca2267a091 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -150,6 +150,7 @@ struct TObjectStorageExternalSource : public IExternalSource { } const bool hasPartitioning = objectStorage.projection_size() || objectStorage.partitioned_by_size(); issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting(), location, hasPartitioning)); + issues.AddIssues(ValidateSchema(schema)); issues.AddIssues(ValidateJsonListFormat(objectStorage.format(), schema, objectStorage.partitioned_by())); issues.AddIssues(ValidateRawFormat(objectStorage.format(), schema, objectStorage.partitioned_by())); if (hasPartitioning) { @@ -268,6 +269,22 @@ struct TObjectStorageExternalSource : public IExternalSource { return issues; } + template + static NYql::TIssues ValidateSchema(const TScheme& schema) { + NYql::TIssues issues; + for (const auto& column: schema.column()) { + const auto type = column.type(); + if (type.has_optional_type() && type.optional_type().item().has_optional_type()) { + issues.AddIssue(MakeErrorIssue( + Ydb::StatusIds::BAD_REQUEST, + TStringBuilder{} << "Double optional types are not supported (you have '" + << column.name() << " " << NYdb::TType(column.type()).ToString() << "' field)")); + } + } + + return issues; + } + template static NYql::TIssues ValidateJsonListFormat(const TString& format, const TScheme& schema, const google::protobuf::RepeatedPtrField& partitionedBy) { NYql::TIssues issues; diff --git a/ydb/core/external_sources/object_storage_ut.cpp b/ydb/core/external_sources/object_storage_ut.cpp index d5bc4a655d3f..129ad8febd7d 100644 --- a/ydb/core/external_sources/object_storage_ut.cpp +++ b/ydb/core/external_sources/object_storage_ut.cpp @@ -55,6 +55,15 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) { UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Date, Timestamp and Interval types are not allowed in json_list format"); } + Y_UNIT_TEST(FailedOptionalTypeValidation) { + auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false); + NKikimrExternalSources::TSchema schema; + NKikimrExternalSources::TGeneral general; + auto newColumn = schema.add_column(); + newColumn->mutable_type()->mutable_optional_type()->mutable_item()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::INT32); + UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Double optional types are not supported"); + } + Y_UNIT_TEST(WildcardsValidation) { auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false); NKikimrExternalSources::TSchema schema; diff --git a/ydb/library/yql/providers/s3/common/util.cpp b/ydb/library/yql/providers/s3/common/util.cpp index 594040188745..074b116cc8c2 100644 --- a/ydb/library/yql/providers/s3/common/util.cpp +++ b/ydb/library/yql/providers/s3/common/util.cpp @@ -48,4 +48,20 @@ TString UrlEscapeRet(const TStringBuf from) { return to; } +bool ValidateS3ReadWriteSchema(const TStructExprType* schemaStructRowType, TExprContext& ctx) { + for (const TItemExprType* item : schemaStructRowType->GetItems()) { + const TTypeAnnotationNode* rowType = item->GetItemType(); + if (rowType->GetKind() == ETypeAnnotationKind::Optional) { + rowType = rowType->Cast()->GetItemType(); + } + + if (rowType->GetKind() == ETypeAnnotationKind::Optional) { + ctx.AddError(TIssue(TStringBuilder() << "Double optional types are not supported (you have '" + << item->GetName() << " " << FormatType(item->GetItemType()) << "' field)")); + return false; + } + } + return true; +} + } diff --git a/ydb/library/yql/providers/s3/common/util.h b/ydb/library/yql/providers/s3/common/util.h index 8767742e31f5..d364e971078f 100644 --- a/ydb/library/yql/providers/s3/common/util.h +++ b/ydb/library/yql/providers/s3/common/util.h @@ -2,6 +2,7 @@ #include #include +#include namespace NYql::NS3Util { @@ -12,4 +13,6 @@ TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues); // '#', '?' TString UrlEscapeRet(const TStringBuf from); +bool ValidateS3ReadWriteSchema(const TStructExprType* schemaStructRowType, TExprContext& ctx); + } diff --git a/ydb/library/yql/providers/s3/common/ya.make b/ydb/library/yql/providers/s3/common/ya.make index 4927e3b10d66..058d0ca92fd2 100644 --- a/ydb/library/yql/providers/s3/common/ya.make +++ b/ydb/library/yql/providers/s3/common/ya.make @@ -18,6 +18,7 @@ PEERDIR( ydb/library/yql/providers/s3/events yql/essentials/public/issue yql/essentials/public/issue/protos + yql/essentials/ast ) IF (CLANG AND NOT WITH_VALGRIND) diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index ecda21b92813..350431400b7e 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -26,6 +27,7 @@ TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) { return {}; } + } namespace { @@ -74,6 +76,10 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase { return TStatus::Error; } + if (!NS3Util::ValidateS3ReadWriteSchema(sourceType->Cast(), ctx)) { + return TStatus::Error; + } + auto target = input->Child(TS3WriteObject::idx_Target); if (!TS3Target::Match(target)) { ctx.AddError(TIssue(ctx.GetPosition(target->Pos()), "Expected S3 target.")); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index 6bfb9b641345..a94de088fc65 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -1,6 +1,7 @@ #include "yql_s3_provider_impl.h" #include +#include #include #include #include @@ -491,6 +492,10 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { auto format = s3Object.Format().Ref().Content(); const TStructExprType* structRowType = rowType->Cast(); + if (!NS3Util::ValidateS3ReadWriteSchema(structRowType, ctx)) { + return TStatus::Error; + } + THashSet columns; for (const TItemExprType* item : structRowType->GetItems()) { columns.emplace(item->GetName()); diff --git a/ydb/tests/fq/s3/test_s3_0.py b/ydb/tests/fq/s3/test_s3_0.py index e5ea35c0abc4..1e527706db24 100644 --- a/ydb/tests/fq/s3/test_s3_0.py +++ b/ydb/tests/fq/s3/test_s3_0.py @@ -1114,3 +1114,56 @@ def wait_checkpoints(require_query_is_on=False): client.abort_query(query_id) client.wait_query(query_id) + + @yq_v2 + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_double_optional_types_validation(self, kikimr, s3, client, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("fbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + fruits = '''Fruit,Price,Weight +Banana,3,100 +Apple,2,22 +Pear,15,33''' + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.csv', ContentType='text/plain') + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "fruitbucket" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = f''' + SELECT * + FROM `{storage_connection_name}`.`fruits.csv` + WITH (format='csv_with_names', SCHEMA ( + Name Int32??, + )); + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + issues = str(client.describe_query(query_id).result.query.issue) + + assert "Double optional types are not supported" in issues, "Incorrect issues: " + issues + + sql = f''' + INSERT INTO `{storage_connection_name}`.`insert/` + WITH + ( + FORMAT="csv_with_names" + ) + SELECT CAST(42 AS Int32??) as Weight;''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + issues = str(client.describe_query(query_id).result.query.issue) + + assert "Double optional types are not supported" in issues, "Incorrect issues: " + issues