Skip to content

Commit ef7e2a9

Browse files
authored
Allow multiple join-broadcasts in single stage (#7556) (#7713)
1 parent b12cda0 commit ef7e2a9

File tree

2 files changed

+52
-7
lines changed

2 files changed

+52
-7
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

+3-7
Original file line numberDiff line numberDiff line change
@@ -1300,13 +1300,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
13001300
const auto& input = stage.GetInputs(inputIndex);
13011301

13021302
// Current assumptions:
1303-
// 1. `Broadcast` can not be the 1st stage input unless it's a single input
1304-
// 2. All stage's inputs, except 1st one, must be a `Broadcast` or `UnionAll`
1305-
if (inputIndex == 0) {
1306-
if (stage.InputsSize() > 1) {
1307-
YQL_ENSURE(input.GetTypeCase() != NKqpProto::TKqpPhyConnection::kBroadcast);
1308-
}
1309-
} else {
1303+
// 1. All stage's inputs, except 1st one, must be a `Broadcast` or `UnionAll`
1304+
// 2. Stages where 1st input is `Broadcast` are not partitioned.
1305+
if (inputIndex > 0) {
13101306
switch (input.GetTypeCase()) {
13111307
case NKqpProto::TKqpPhyConnection::kBroadcast:
13121308
case NKqpProto::TKqpPhyConnection::kHashShuffle:

ydb/core/kqp/ut/opt/kqp_ne_ut.cpp

+49
Original file line numberDiff line numberDiff line change
@@ -3955,6 +3955,55 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
39553955
AssertTableReads(result, "/Root/SecondaryKeys/Index/indexImplTable", 1);
39563956
}
39573957

3958+
Y_UNIT_TEST(MultipleBroadcastJoin) {
3959+
TKikimrSettings kisettings;
3960+
NKikimrConfig::TAppConfig appConfig;
3961+
appConfig.MutableTableServiceConfig()->SetIndexAutoChooseMode(NKikimrConfig::TTableServiceConfig_EIndexAutoChooseMode_MAX_USED_PREFIX);
3962+
kisettings.SetAppConfig(appConfig);
3963+
3964+
TKikimrRunner kikimr(kisettings);
3965+
3966+
auto db = kikimr.GetTableClient();
3967+
auto client = kikimr.GetQueryClient();
3968+
auto session = db.CreateSession().GetValueSync().GetSession();
3969+
3970+
{
3971+
auto session = db.CreateSession().GetValueSync().GetSession();
3972+
AssertSuccessResult(session.ExecuteSchemeQuery(R"(
3973+
--!syntax_v1
3974+
3975+
create table demo_ba(id text, some text, ref1 text, ref2 text, primary key(id));
3976+
create table demo_ref1(id text, code text, some text, primary key(id), index ix_code global on (code));
3977+
create table demo_ref2(id text, code text, some text, primary key(id), index ix_code global on (code));
3978+
)").GetValueSync());
3979+
}
3980+
3981+
auto query = R"(
3982+
select ba_0.id, ba_0.some,
3983+
r_1.id, r_1.some, r_1.code,
3984+
r_2.id, r_2.some, r_2.code
3985+
from demo_ba ba_0
3986+
left join demo_ref1 r_1 on r_1.id=ba_0.ref1
3987+
left join demo_ref2 r_2 on r_2.code=ba_0.ref2
3988+
where ba_0.id in ("ba#10"u,"ba#20"u,"ba#30"u,"ba#40"u,"ba#50"u,"ba#60"u,"ba#70"u,"ba#80"u,"ba#90"u,"ba#100"u);
3989+
)";
3990+
3991+
auto settings = NYdb::NQuery::TExecuteQuerySettings()
3992+
.Syntax(NYdb::NQuery::ESyntax::YqlV1)
3993+
.ConcurrentResultSets(false);
3994+
{
3995+
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
3996+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
3997+
//CompareYson(R"([[[1];["321"]]])", FormatResultSetYson(result.GetResultSet(0)));
3998+
//CompareYson(R"([[["111"];[1]]])", FormatResultSetYson(result.GetResultSet(1)));
3999+
}
4000+
{
4001+
auto it = client.StreamExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
4002+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
4003+
Cerr << StreamResultToYson(it);
4004+
}
4005+
4006+
}
39584007

39594008
Y_UNIT_TEST_TWIN(ComplexLookupLimit, NewPredicateExtract) {
39604009
TKikimrSettings settings;

0 commit comments

Comments
 (0)