Skip to content

Commit 7f4190a

Browse files
authored
Merge 1bf5738 into 3079189
2 parents 3079189 + 1bf5738 commit 7f4190a

File tree

2 files changed

+104
-11
lines changed

2 files changed

+104
-11
lines changed

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

+72
Original file line numberDiff line numberDiff line change
@@ -1591,6 +1591,78 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
15911591
UNIT_ASSERT_STRING_CONTAINS(queryExecutionOperation.GetIssues().ToString(), "\"/Root/external_table\" is expected to be external data source");
15921592
}
15931593
}
1594+
1595+
Y_UNIT_TEST(QueryWithNoDataInS3) {
1596+
const TString externalDataSourceName = "tpc_h_s3_storage_connection";
1597+
const TString bucket = "test_bucket_no_data";
1598+
1599+
Aws::S3::S3Client s3Client = MakeS3Client();
1600+
CreateBucket(bucket, s3Client);
1601+
// Uncomment if you want to compare with query with data
1602+
//UploadObject(bucket, "l/l", R"json({"l_extendedprice": 0.0, "l_discount": 1.0, "l_partkey": 1})json", s3Client);
1603+
//UploadObject(bucket, "p/p", R"json({"p_partkey": 1, "p_type": "t"})json", s3Client);
1604+
1605+
auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
1606+
auto client = kikimr->GetQueryClient();
1607+
1608+
{
1609+
const TString query = fmt::format(R"sql(
1610+
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
1611+
SOURCE_TYPE="ObjectStorage",
1612+
LOCATION="{location}",
1613+
AUTH_METHOD="NONE"
1614+
);
1615+
)sql",
1616+
"external_source"_a = externalDataSourceName,
1617+
"location"_a = GetBucketLocation(bucket)
1618+
);
1619+
auto result = client.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
1620+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
1621+
}
1622+
1623+
{
1624+
// YQ-2750
1625+
const TString query = fmt::format(R"sql(
1626+
$border = Date("1994-08-01");
1627+
select
1628+
100.00 * sum(case
1629+
when StartsWith(p.p_type, 'PROMO')
1630+
then l.l_extendedprice * (1 - l.l_discount)
1631+
else 0
1632+
end) / sum(l.l_extendedprice * (1 - l.l_discount)) as promo_revenue
1633+
from
1634+
{external_source}.`l/` with ( schema (
1635+
l_extendedprice double,
1636+
l_discount double,
1637+
l_partkey int64,
1638+
l_shipdate date
1639+
),
1640+
format = "json_each_row"
1641+
) as l
1642+
join
1643+
{external_source}.`p/` with ( schema (
1644+
p_partkey int64,
1645+
p_type string
1646+
),
1647+
format = "json_each_row"
1648+
) as p
1649+
on
1650+
l.l_partkey = p.p_partkey
1651+
where
1652+
cast(l.l_shipdate as timestamp) >= $border
1653+
and cast(l.l_shipdate as timestamp) < ($border + Interval("P31D"));
1654+
)sql",
1655+
"external_source"_a = externalDataSourceName
1656+
);
1657+
auto result = client.ExecuteQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync();
1658+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
1659+
auto rs = result.GetResultSetParser(0);
1660+
UNIT_ASSERT_VALUES_EQUAL(rs.RowsCount(), 1);
1661+
rs.TryNextRow();
1662+
TMaybe<double> sum = rs.ColumnParser(0).GetOptionalDouble();
1663+
UNIT_ASSERT(!sum);
1664+
}
1665+
}
15941666
}
15951667

15961668
} // namespace NKikimr::NKqp

ydb/library/yql/core/common_opt/yql_co_simple1.cpp

+32-11
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ TExprNode::TPtr OptimizePgCastOverPgConst(const TExprNode::TPtr& input, TExprCon
7575
return input;
7676
}
7777
auto val = input->Child(0);
78-
if (!val->IsCallable("PgConst")) {
78+
if (!val->IsCallable("PgConst")) {
7979
return input;
8080
}
8181

@@ -85,7 +85,7 @@ TExprNode::TPtr OptimizePgCastOverPgConst(const TExprNode::TPtr& input, TExprCon
8585
YQL_CLOG(DEBUG, Core) << "Remove PgCast unknown->text over PgConst";
8686
return ctx.ChangeChild(*val, 1, castToType);
8787
}
88-
88+
8989
return input;
9090
}
9191

@@ -6282,17 +6282,38 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
62826282
return node;
62836283
};
62846284

6285-
map["ShuffleByKeys"] = map["PartitionsByKeys"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
6285+
map["PartitionsByKeys"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
62866286
if (IsEmpty(node->Head(), *optCtx.Types)) {
62876287
YQL_CLOG(DEBUG, Core) << node->Content() << " over empty input.";
6288-
auto lambdaResult = ctx.Builder(node->Pos()).Apply(node->Tail()).With(0, KeepConstraints(node->HeadPtr(), node->Tail().Head().Head(), ctx)).Seal().Build();
6289-
if (node->IsCallable("ShuffleByKeys")) {
6290-
auto lambdaType = node->Tail().GetTypeAnn();
6291-
if (lambdaType->GetKind() == ETypeAnnotationKind::Optional) {
6292-
lambdaResult = ctx.NewCallable(lambdaResult->Pos(), "ToList", { lambdaResult });
6293-
} else if (lambdaType->GetKind() == ETypeAnnotationKind::Stream) {
6294-
lambdaResult = ctx.NewCallable(lambdaResult->Pos(), "ForwardList", { lambdaResult });
6295-
}
6288+
6289+
TExprNode::TPtr sequence = KeepConstraints(node->HeadPtr(), node->Tail().Head().Head(), ctx);
6290+
auto lambdaResult = ctx.Builder(node->Pos()).Apply(node->Tail()).With(0, sequence).Seal().Build();
6291+
return lambdaResult;
6292+
}
6293+
return node;
6294+
};
6295+
6296+
map["ShuffleByKeys"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
6297+
if (IsEmpty(node->Head(), *optCtx.Types)) {
6298+
YQL_CLOG(DEBUG, Core) << node->Content() << " over empty input.";
6299+
6300+
auto& lambdaArg = node->Tail().Head().Head();
6301+
6302+
TExprNode::TPtr sequence = node->HeadPtr();
6303+
auto* sequenceType = sequence->GetTypeAnn();
6304+
YQL_ENSURE(sequenceType, "No argument type for sequence in " << node->Content());
6305+
6306+
if (sequenceType->GetKind() != ETypeAnnotationKind::Stream) {
6307+
sequence = ctx.NewCallable(sequence->Pos(), "ToStream", { sequence });
6308+
}
6309+
sequence = KeepConstraints(sequence, lambdaArg, ctx);
6310+
6311+
auto lambdaResult = ctx.Builder(node->Pos()).Apply(node->Tail()).With(0, sequence).Seal().Build();
6312+
auto lambdaType = node->Tail().GetTypeAnn();
6313+
if (lambdaType->GetKind() == ETypeAnnotationKind::Optional) {
6314+
lambdaResult = ctx.NewCallable(lambdaResult->Pos(), "ToList", { lambdaResult });
6315+
} else if (lambdaType->GetKind() == ETypeAnnotationKind::Stream || lambdaType->GetKind() == ETypeAnnotationKind::Flow) {
6316+
lambdaResult = ctx.NewCallable(lambdaResult->Pos(), "ForwardList", { lambdaResult });
62966317
}
62976318
return lambdaResult;
62986319
}

0 commit comments

Comments
 (0)