Skip to content

Commit da72f11

Browse files
authored
Merge 3e5e3f9 into 7bc2f70
2 parents 7bc2f70 + 3e5e3f9 commit da72f11

File tree

2 files changed

+116
-3
lines changed

2 files changed

+116
-3
lines changed

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

+72
Original file line numberDiff line numberDiff line change
@@ -1591,6 +1591,78 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
15911591
UNIT_ASSERT_STRING_CONTAINS(queryExecutionOperation.GetIssues().ToString(), "\"/Root/external_table\" is expected to be external data source");
15921592
}
15931593
}
1594+
1595+
Y_UNIT_TEST(QueryWithNoDataInS3) {
1596+
const TString externalDataSourceName = "tpc_h_s3_storage_connection";
1597+
const TString bucket = "test_bucket_no_data";
1598+
1599+
Aws::S3::S3Client s3Client = MakeS3Client();
1600+
CreateBucket(bucket, s3Client);
1601+
// Uncomment if you want to compare with query with data
1602+
//UploadObject(bucket, "l/l", R"json({"l_extendedprice": 0.0, "l_discount": 1.0, "l_partkey": 1})json", s3Client);
1603+
//UploadObject(bucket, "p/p", R"json({"p_partkey": 1, "p_type": "t"})json", s3Client);
1604+
1605+
auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
1606+
auto client = kikimr->GetQueryClient();
1607+
1608+
{
1609+
const TString query = fmt::format(R"sql(
1610+
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
1611+
SOURCE_TYPE="ObjectStorage",
1612+
LOCATION="{location}",
1613+
AUTH_METHOD="NONE"
1614+
);
1615+
)sql",
1616+
"external_source"_a = externalDataSourceName,
1617+
"location"_a = GetBucketLocation(bucket)
1618+
);
1619+
auto result = client.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
1620+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
1621+
}
1622+
1623+
{
1624+
// YQ-2750
1625+
const TString query = fmt::format(R"sql(
1626+
$border = Date("1994-08-01");
1627+
select
1628+
100.00 * sum(case
1629+
when StartsWith(p.p_type, 'PROMO')
1630+
then l.l_extendedprice * (1 - l.l_discount)
1631+
else 0
1632+
end) / sum(l.l_extendedprice * (1 - l.l_discount)) as promo_revenue
1633+
from
1634+
{external_source}.`l/` with ( schema (
1635+
l_extendedprice double,
1636+
l_discount double,
1637+
l_partkey int64,
1638+
l_shipdate date
1639+
),
1640+
format = "json_each_row"
1641+
) as l
1642+
join
1643+
{external_source}.`p/` with ( schema (
1644+
p_partkey int64,
1645+
p_type string
1646+
),
1647+
format = "json_each_row"
1648+
) as p
1649+
on
1650+
l.l_partkey = p.p_partkey
1651+
where
1652+
cast(l.l_shipdate as timestamp) >= $border
1653+
and cast(l.l_shipdate as timestamp) < ($border + Interval("P31D"));
1654+
)sql",
1655+
"external_source"_a = externalDataSourceName
1656+
);
1657+
auto result = client.ExecuteQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync();
1658+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
1659+
auto rs = result.GetResultSetParser(0);
1660+
UNIT_ASSERT_VALUES_EQUAL(rs.RowsCount(), 1);
1661+
rs.TryNextRow();
1662+
TMaybe<double> sum = rs.ColumnParser(0).GetOptionalDouble();
1663+
UNIT_ASSERT(!sum);
1664+
}
1665+
}
15941666
}
15951667

15961668
} // namespace NKikimr::NKqp

ydb/library/yql/core/common_opt/yql_co_simple1.cpp

+44-3
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ TExprNode::TPtr OptimizePgCastOverPgConst(const TExprNode::TPtr& input, TExprCon
7575
return input;
7676
}
7777
auto val = input->Child(0);
78-
if (!val->IsCallable("PgConst")) {
78+
if (!val->IsCallable("PgConst")) {
7979
return input;
8080
}
8181

@@ -85,7 +85,7 @@ TExprNode::TPtr OptimizePgCastOverPgConst(const TExprNode::TPtr& input, TExprCon
8585
YQL_CLOG(DEBUG, Core) << "Remove PgCast unknown->text over PgConst";
8686
return ctx.ChangeChild(*val, 1, castToType);
8787
}
88-
88+
8989
return input;
9090
}
9191

@@ -6285,7 +6285,48 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
62856285
map["ShuffleByKeys"] = map["PartitionsByKeys"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
62866286
if (IsEmpty(node->Head(), *optCtx.Types)) {
62876287
YQL_CLOG(DEBUG, Core) << node->Content() << " over empty input.";
6288-
auto lambdaResult = ctx.Builder(node->Pos()).Apply(node->Tail()).With(0, KeepConstraints(node->HeadPtr(), node->Tail().Head().Head(), ctx)).Seal().Build();
6288+
6289+
// lambda argument type:
6290+
auto& lambdaArg = node->Tail().Head().Head();
6291+
auto* lambdaArgType = lambdaArg.GetTypeAnn();
6292+
YQL_ENSURE(lambdaArgType, "No argument type for lambda in " << node->Content());
6293+
6294+
// sequence type:
6295+
auto* sequenceType = node->Head().GetTypeAnn();
6296+
YQL_ENSURE(sequenceType, "No argument type for sequence in " << node->Content());
6297+
6298+
TStringBuf typeConversionFunc;
6299+
if (sequenceType->GetKind() != lambdaArgType->GetKind()) {
6300+
switch (lambdaArgType->GetKind()) {
6301+
case ETypeAnnotationKind::List:
6302+
if (sequenceType->GetKind() == ETypeAnnotationKind::Optional) {
6303+
typeConversionFunc = "ToList";
6304+
} else if (IsIn({ ETypeAnnotationKind::Stream, ETypeAnnotationKind::Flow }, sequenceType->GetKind())) {
6305+
typeConversionFunc = "ForwardList";
6306+
}
6307+
break;
6308+
case ETypeAnnotationKind::Flow:
6309+
typeConversionFunc = "ToFlow";
6310+
break;
6311+
case ETypeAnnotationKind::Stream:
6312+
typeConversionFunc = "ToStream";
6313+
break;
6314+
case ETypeAnnotationKind::Optional:
6315+
if (sequenceType->GetKind() == ETypeAnnotationKind::List) {
6316+
typeConversionFunc = "ToOptional";
6317+
}
6318+
// TODO: convert from Stream/Flow to Optional
6319+
break;
6320+
default:
6321+
break;
6322+
}
6323+
}
6324+
6325+
TExprNode::TPtr sequence = KeepConstraints(node->HeadPtr(), lambdaArg, ctx);
6326+
if (typeConversionFunc) {
6327+
sequence = KeepConstraints(ctx.NewCallable(sequence->Pos(), typeConversionFunc, { sequence }), lambdaArg, ctx);
6328+
}
6329+
auto lambdaResult = ctx.Builder(node->Pos()).Apply(node->Tail()).With(0, sequence).Seal().Build();
62896330
if (node->IsCallable("ShuffleByKeys")) {
62906331
auto lambdaType = node->Tail().GetTypeAnn();
62916332
if (lambdaType->GetKind() == ETypeAnnotationKind::Optional) {

0 commit comments

Comments
 (0)