Skip to content

Commit 6067a04

Browse files
Additional validation for s3 reads/writes (#12082)
1 parent 541cba7 commit 6067a04

File tree

8 files changed

+110
-0
lines changed

8 files changed

+110
-0
lines changed

ydb/core/external_sources/object_storage.cpp

+17
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
150150
}
151151
const bool hasPartitioning = objectStorage.projection_size() || objectStorage.partitioned_by_size();
152152
issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting(), location, hasPartitioning));
153+
issues.AddIssues(ValidateSchema(schema));
153154
issues.AddIssues(ValidateJsonListFormat(objectStorage.format(), schema, objectStorage.partitioned_by()));
154155
issues.AddIssues(ValidateRawFormat(objectStorage.format(), schema, objectStorage.partitioned_by()));
155156
if (hasPartitioning) {
@@ -268,6 +269,22 @@ struct TObjectStorageExternalSource : public IExternalSource {
268269
return issues;
269270
}
270271

272+
template<typename TScheme>
273+
static NYql::TIssues ValidateSchema(const TScheme& schema) {
274+
NYql::TIssues issues;
275+
for (const auto& column: schema.column()) {
276+
const auto type = column.type();
277+
if (type.has_optional_type() && type.optional_type().item().has_optional_type()) {
278+
issues.AddIssue(MakeErrorIssue(
279+
Ydb::StatusIds::BAD_REQUEST,
280+
TStringBuilder{} << "Double optional types are not supported (you have '"
281+
<< column.name() << " " << NYdb::TType(column.type()).ToString() << "' field)"));
282+
}
283+
}
284+
285+
return issues;
286+
}
287+
271288
template<typename TScheme>
272289
static NYql::TIssues ValidateJsonListFormat(const TString& format, const TScheme& schema, const google::protobuf::RepeatedPtrField<TString>& partitionedBy) {
273290
NYql::TIssues issues;

ydb/core/external_sources/object_storage_ut.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) {
5555
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Date, Timestamp and Interval types are not allowed in json_list format");
5656
}
5757

58+
Y_UNIT_TEST(FailedOptionalTypeValidation) {
59+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
60+
NKikimrExternalSources::TSchema schema;
61+
NKikimrExternalSources::TGeneral general;
62+
auto newColumn = schema.add_column();
63+
newColumn->mutable_type()->mutable_optional_type()->mutable_item()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::INT32);
64+
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Double optional types are not supported");
65+
}
66+
5867
Y_UNIT_TEST(WildcardsValidation) {
5968
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
6069
NKikimrExternalSources::TSchema schema;

ydb/library/yql/providers/s3/common/util.cpp

+16
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,20 @@ TString UrlEscapeRet(const TStringBuf from) {
4848
return to;
4949
}
5050

51+
bool ValidateS3ReadWriteSchema(const TStructExprType* schemaStructRowType, TExprContext& ctx) {
52+
for (const TItemExprType* item : schemaStructRowType->GetItems()) {
53+
const TTypeAnnotationNode* rowType = item->GetItemType();
54+
if (rowType->GetKind() == ETypeAnnotationKind::Optional) {
55+
rowType = rowType->Cast<TOptionalExprType>()->GetItemType();
56+
}
57+
58+
if (rowType->GetKind() == ETypeAnnotationKind::Optional) {
59+
ctx.AddError(TIssue(TStringBuilder() << "Double optional types are not supported (you have '"
60+
<< item->GetName() << " " << FormatType(item->GetItemType()) << "' field)"));
61+
return false;
62+
}
63+
}
64+
return true;
65+
}
66+
5167
}

ydb/library/yql/providers/s3/common/util.h

+3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <util/string/builder.h>
44
#include <yql/essentials/public/issue/yql_issue.h>
5+
#include <yql/essentials/ast/yql_expr.h>
56

67
namespace NYql::NS3Util {
78

@@ -12,4 +13,6 @@ TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues);
1213
// '#', '?'
1314
TString UrlEscapeRet(const TStringBuf from);
1415

16+
bool ValidateS3ReadWriteSchema(const TStructExprType* schemaStructRowType, TExprContext& ctx);
17+
1518
}

ydb/library/yql/providers/s3/common/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ PEERDIR(
1818
ydb/library/yql/providers/s3/events
1919
yql/essentials/public/issue
2020
yql/essentials/public/issue/protos
21+
yql/essentials/ast
2122
)
2223

2324
IF (CLANG AND NOT WITH_VALGRIND)

ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
44
#include <yql/essentials/core/yql_opt_utils.h>
5+
#include <ydb/library/yql/providers/s3/common/util.h>
56
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
67

78
#include <yql/essentials/providers/common/provider/yql_provider.h>
@@ -26,6 +27,7 @@ TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) {
2627

2728
return {};
2829
}
30+
2931
}
3032

3133
namespace {
@@ -74,6 +76,10 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
7476
return TStatus::Error;
7577
}
7678

79+
if (!NS3Util::ValidateS3ReadWriteSchema(sourceType->Cast<TStructExprType>(), ctx)) {
80+
return TStatus::Error;
81+
}
82+
7783
auto target = input->Child(TS3WriteObject::idx_Target);
7884
if (!TS3Target::Match(target)) {
7985
ctx.AddError(TIssue(ctx.GetPosition(target->Pos()), "Expected S3 target."));

ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "yql_s3_provider_impl.h"
22

33
#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
4+
#include <ydb/library/yql/providers/s3/common/util.h>
45
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
56
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
67
#include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h>
@@ -491,6 +492,10 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
491492
auto format = s3Object.Format().Ref().Content();
492493
const TStructExprType* structRowType = rowType->Cast<TStructExprType>();
493494

495+
if (!NS3Util::ValidateS3ReadWriteSchema(structRowType, ctx)) {
496+
return TStatus::Error;
497+
}
498+
494499
THashSet<TStringBuf> columns;
495500
for (const TItemExprType* item : structRowType->GetItems()) {
496501
columns.emplace(item->GetName());

ydb/tests/fq/s3/test_s3_0.py

+53
Original file line numberDiff line numberDiff line change
@@ -1114,3 +1114,56 @@ def wait_checkpoints(require_query_is_on=False):
11141114

11151115
client.abort_query(query_id)
11161116
client.wait_query(query_id)
1117+
1118+
@yq_v2
1119+
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
1120+
def test_double_optional_types_validation(self, kikimr, s3, client, unique_prefix):
1121+
resource = boto3.resource(
1122+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
1123+
)
1124+
1125+
bucket = resource.Bucket("fbucket")
1126+
bucket.create(ACL='public-read')
1127+
bucket.objects.all().delete()
1128+
1129+
s3_client = boto3.client(
1130+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
1131+
)
1132+
1133+
fruits = '''Fruit,Price,Weight
1134+
Banana,3,100
1135+
Apple,2,22
1136+
Pear,15,33'''
1137+
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.csv', ContentType='text/plain')
1138+
1139+
kikimr.control_plane.wait_bootstrap(1)
1140+
storage_connection_name = unique_prefix + "fruitbucket"
1141+
client.create_storage_connection(storage_connection_name, "fbucket")
1142+
1143+
sql = f'''
1144+
SELECT *
1145+
FROM `{storage_connection_name}`.`fruits.csv`
1146+
WITH (format='csv_with_names', SCHEMA (
1147+
Name Int32??,
1148+
));
1149+
'''
1150+
1151+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
1152+
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
1153+
issues = str(client.describe_query(query_id).result.query.issue)
1154+
1155+
assert "Double optional types are not supported" in issues, "Incorrect issues: " + issues
1156+
1157+
sql = f'''
1158+
INSERT INTO `{storage_connection_name}`.`insert/`
1159+
WITH
1160+
(
1161+
FORMAT="csv_with_names"
1162+
)
1163+
SELECT CAST(42 AS Int32??) as Weight;'''
1164+
1165+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
1166+
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
1167+
issues = str(client.describe_query(query_id).result.query.issue)
1168+
1169+
assert "Double optional types are not supported" in issues, "Incorrect issues: " + issues

0 commit comments

Comments
 (0)