Skip to content

Commit b5c55a4

Browse files
authored
PushLeftStage fix (#7465)
1 parent 20af1f5 commit b5c55a4

File tree

2 files changed

+66
-1
lines changed

2 files changed

+66
-1
lines changed

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
419419
{
420420
// TODO: Allow push to left stage for data queries.
421421
// It is now possible as we don't use datashard transactions for reads in data queries.
422-
bool pushLeftStage = !KqpCtx.IsDataQuery() && AllowFuseJoinInputs(node);
422+
bool pushLeftStage = (KqpCtx.IsScanQuery() || KqpCtx.Config->EnableKqpDataQueryStreamLookup) && AllowFuseJoinInputs(node);
423423
TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal,
424424
pushLeftStage, KqpCtx.Config->GetHashJoinMode()
425425
);

ydb/core/kqp/ut/join/kqp_join_ut.cpp

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,71 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
588588
}
589589
}
590590

591+
Y_UNIT_TEST(TwoJoinsWithQueryService) {
592+
NKikimrConfig::TAppConfig appConfig;
593+
auto serverSettings = TKikimrSettings()
594+
.SetAppConfig(appConfig)
595+
.SetWithSampleTables(false);
596+
597+
TKikimrRunner kikimr(serverSettings);
598+
auto client = kikimr.GetTableClient();
599+
auto db = kikimr.GetQueryClient();
600+
auto settings = NYdb::NQuery::TExecuteQuerySettings();
601+
602+
{
603+
auto session = client.CreateSession().GetValueSync().GetSession();
604+
const auto query = Q_(R"(
605+
CREATE TABLE ta(
606+
a Int64,
607+
b Int64,
608+
c Int64,
609+
PRIMARY KEY(a)
610+
);
611+
)");
612+
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
613+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
614+
}
615+
{
616+
auto session = client.CreateSession().GetValueSync().GetSession();
617+
const auto query = Q_(R"(
618+
CREATE TABLE tb(
619+
b Int64,
620+
bval Int64,
621+
PRIMARY KEY(b)
622+
);
623+
)");
624+
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
625+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
626+
}
627+
{
628+
auto session = client.CreateSession().GetValueSync().GetSession();
629+
const auto query = Q_(R"(
630+
CREATE TABLE tc(
631+
c Int64,
632+
cval Int64,
633+
PRIMARY KEY(c)
634+
);
635+
)");
636+
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
637+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
638+
}
639+
{
640+
auto result = db.ExecuteQuery(R"(
641+
UPSERT INTO ta(a, b, c) VALUES (1, 1001, 2001), (2, 1002, 2002), (3, 1003, 2003);
642+
UPSERT INTO tb(b, bval) VALUES (1001, 1001), (1002, 1002), (1003, 1003);
643+
UPSERT INTO tc(c, cval) VALUES (2001, 2001), (2002, 2002), (2003, 2003);
644+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
645+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
646+
}
647+
{
648+
auto result = db.ExecuteQuery(R"(
649+
SELECT ta.a, tb.bval, tc.cval FROM ta INNER JOIN tb ON ta.b = tb.b LEFT JOIN tc ON ta.c = tc.cval;
650+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
651+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
652+
CompareYson(R"([[[3];[1003];[2003]];[[2];[1002];[2002]];[[1];[1001];[2001]]])", FormatResultSetYson(result.GetResultSet(0)));
653+
}
654+
}
655+
591656
// join on key prefix => index-lookup
592657
Y_UNIT_TEST(RightSemiJoin_KeyPrefix) {
593658
TKikimrRunner kikimr(SyntaxV1Settings());

0 commit comments

Comments
 (0)