Skip to content

Commit 3fb7fa2

Browse files
authored
PushLeftStage fix (#7349)
1 parent 32218fe commit 3fb7fa2

File tree

2 files changed

+66
-1
lines changed

2 files changed

+66
-1
lines changed

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
428428
{
429429
// TODO: Allow push to left stage for data queries.
430430
// It is now possible as we don't use datashard transactions for reads in data queries.
431-
bool pushLeftStage = !KqpCtx.IsDataQuery() && AllowFuseJoinInputs(node);
431+
bool pushLeftStage = (KqpCtx.IsScanQuery() || KqpCtx.Config->EnableKqpDataQueryStreamLookup) && AllowFuseJoinInputs(node);
432432
TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal,
433433
pushLeftStage, KqpCtx.Config->GetHashJoinMode()
434434
);

ydb/core/kqp/ut/join/kqp_join_ut.cpp

+65
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,71 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
816816
}
817817
}
818818

819+
Y_UNIT_TEST(TwoJoinsWithQueryService) {
820+
NKikimrConfig::TAppConfig appConfig;
821+
auto serverSettings = TKikimrSettings()
822+
.SetAppConfig(appConfig)
823+
.SetWithSampleTables(false);
824+
825+
TKikimrRunner kikimr(serverSettings);
826+
auto client = kikimr.GetTableClient();
827+
auto db = kikimr.GetQueryClient();
828+
auto settings = NYdb::NQuery::TExecuteQuerySettings();
829+
830+
{
831+
auto session = client.CreateSession().GetValueSync().GetSession();
832+
const auto query = Q_(R"(
833+
CREATE TABLE ta(
834+
a Int64,
835+
b Int64,
836+
c Int64,
837+
PRIMARY KEY(a)
838+
);
839+
)");
840+
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
841+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
842+
}
843+
{
844+
auto session = client.CreateSession().GetValueSync().GetSession();
845+
const auto query = Q_(R"(
846+
CREATE TABLE tb(
847+
b Int64,
848+
bval Int64,
849+
PRIMARY KEY(b)
850+
);
851+
)");
852+
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
853+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
854+
}
855+
{
856+
auto session = client.CreateSession().GetValueSync().GetSession();
857+
const auto query = Q_(R"(
858+
CREATE TABLE tc(
859+
c Int64,
860+
cval Int64,
861+
PRIMARY KEY(c)
862+
);
863+
)");
864+
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
865+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
866+
}
867+
{
868+
auto result = db.ExecuteQuery(R"(
869+
UPSERT INTO ta(a, b, c) VALUES (1, 1001, 2001), (2, 1002, 2002), (3, 1003, 2003);
870+
UPSERT INTO tb(b, bval) VALUES (1001, 1001), (1002, 1002), (1003, 1003);
871+
UPSERT INTO tc(c, cval) VALUES (2001, 2001), (2002, 2002), (2003, 2003);
872+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
873+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
874+
}
875+
{
876+
auto result = db.ExecuteQuery(R"(
877+
SELECT ta.a, tb.bval, tc.cval FROM ta INNER JOIN tb ON ta.b = tb.b LEFT JOIN tc ON ta.c = tc.cval;
878+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
879+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
880+
CompareYson(R"([[[3];[1003];[2003]];[[2];[1002];[2002]];[[1];[1001];[2001]]])", FormatResultSetYson(result.GetResultSet(0)));
881+
}
882+
}
883+
819884
// join on key prefix => index-lookup
820885
Y_UNIT_TEST(RightSemiJoin_KeyPrefix) {
821886
TKikimrRunner kikimr(SyntaxV1Settings());

0 commit comments

Comments
 (0)