Skip to content

Commit 6a2d1fe

Browse files
Fixed a problem in RBO in MapJoin implementation (#7044)
1 parent 568d2c2 commit 6a2d1fe

File tree

18 files changed

+1122
-885
lines changed

18 files changed

+1122
-885
lines changed

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
430430
// It is now possible as we don't use datashard transactions for reads in data queries.
431431
bool pushLeftStage = !KqpCtx.IsDataQuery() && AllowFuseJoinInputs(node);
432432
TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal,
433-
pushLeftStage, KqpCtx.Config->GetHashJoinMode()
433+
pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false
434434
);
435435
DumpAppliedRule("BuildJoin", node.Ptr(), output.Ptr(), ctx);
436436
return output;

ydb/core/kqp/ut/join/kqp_join_ut.cpp

Lines changed: 157 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -874,10 +874,11 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
874874
}
875875
{
876876
auto result = db.ExecuteQuery(R"(
877-
SELECT ta.a, tb.bval, tc.cval FROM ta INNER JOIN tb ON ta.b = tb.b LEFT JOIN tc ON ta.c = tc.cval;
877+
SELECT ta.a, tb.bval, tc.cval FROM ta INNER JOIN tb ON ta.b = tb.b LEFT JOIN tc ON ta.c = tc.cval
878+
ORDER BY ta.a, tb.bval, tc.cval;
878879
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
879880
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
880-
CompareYson(R"([[[1];[1001];[2001]];[[3];[1003];[2003]];[[2];[1002];[2002]]])", FormatResultSetYson(result.GetResultSet(0)));
881+
CompareYson(R"([[[1];[1001];[2001]];[[2];[1002];[2002]];[[3];[1003];[2003]]])", FormatResultSetYson(result.GetResultSet(0)));
881882
}
882883
}
883884

@@ -1741,6 +1742,160 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
17411742
UNIT_ASSERT(result.GetQueryPlan().Contains("Lookup"));
17421743
}
17431744
}
1745+
1746+
Y_UNIT_TEST(ComplexJoin) {
1747+
TKikimrRunner kikimr;
1748+
auto db = kikimr.GetTableClient();
1749+
auto session = db.CreateSession().GetValueSync().GetSession();
1750+
1751+
{ // init tables
1752+
AssertSuccessResult(session.ExecuteSchemeQuery(R"(
1753+
--!syntax_v1
1754+
1755+
CREATE TABLE Input1
1756+
(
1757+
k1 String,
1758+
v1 String,
1759+
u1 String,
1760+
PRIMARY KEY (k1, v1, u1)
1761+
);
1762+
1763+
CREATE TABLE Input2
1764+
(
1765+
k2 String,
1766+
v2 String,
1767+
u2 String,
1768+
PRIMARY KEY (k2, v2, u2)
1769+
);
1770+
1771+
CREATE TABLE Input3
1772+
(
1773+
k3 String,
1774+
v3 String,
1775+
u3 String,
1776+
PRIMARY KEY (k3, v3, u3)
1777+
);
1778+
1779+
CREATE TABLE Input4
1780+
(
1781+
k4 String,
1782+
v4 String,
1783+
u4 String,
1784+
PRIMARY KEY (k4, v4, u4)
1785+
);
1786+
1787+
CREATE TABLE Input5
1788+
(
1789+
k5 String,
1790+
v5 String,
1791+
u5 String,
1792+
PRIMARY KEY (k5, v5, u5)
1793+
);
1794+
)").GetValueSync());
1795+
1796+
auto result = session.ExecuteDataQuery(R"(
1797+
--!syntax_v1
1798+
1799+
REPLACE INTO Input1 (k1, v1, u1) VALUES
1800+
("01","1","01"),
1801+
("01","2","02"),
1802+
("02","1","03"),
1803+
("02","2","05"),
1804+
("02","2","06"),
1805+
("03",NULL,"07"),
1806+
("03","1","08"),
1807+
("03","2","09"),
1808+
("04","1","10"),
1809+
("04","2","11"),
1810+
("05","1","12"),
1811+
("05","2","13"),
1812+
("06","1","14"),
1813+
("06","2","15"),
1814+
("07","1","16"),
1815+
("07","2","17"),
1816+
("08","1","18"),
1817+
("08","2","19"),
1818+
("09","1","20"),
1819+
("09","2","21"),
1820+
("10","1","22"),
1821+
("10","2","23");
1822+
1823+
REPLACE INTO Input2 (k2, v2, u2) VALUES
1824+
("02","1","01"),
1825+
("02","1","02"),
1826+
("02","2","03"),
1827+
("02","2","04"),
1828+
("03","1","05"),
1829+
("03","2","06"),
1830+
("04","1","07"),
1831+
("04","2","08"),
1832+
("05","1","09"),
1833+
("05","2","10"),
1834+
("06","1","11"),
1835+
("06","2","12"),
1836+
("07","1","13"),
1837+
("07","2","14"),
1838+
("09","1","15"),
1839+
("09","2","16");
1840+
1841+
REPLACE INTO Input3 (k3, v3, u3) VALUES
1842+
(NULL,"1","01"),
1843+
(NULL,"2","02"),
1844+
("04","1","03"),
1845+
("04","2","04"),
1846+
("05","1","05"),
1847+
("05","2","06");
1848+
1849+
REPLACE INTO Input4 (k4, v4, u4) VALUES
1850+
("03",NULL,"01"),
1851+
("03","1","02"),
1852+
("03","2","03"),
1853+
("04","1","04"),
1854+
("04","2","05"),
1855+
("05","1","06"),
1856+
("05","2","07"),
1857+
("06","1","08"),
1858+
("06","2","09"),
1859+
("07","1","10"),
1860+
("07","2","11"),
1861+
("08","1","12"),
1862+
("08","2","13");
1863+
1864+
REPLACE INTO Input5 (k5, v5, u5) VALUES
1865+
(NULL,"1","01"),
1866+
(NULL,"2","02"),
1867+
("02","1","03"),
1868+
("02","1","04"),
1869+
("02","2","05"),
1870+
("02","2","06"),
1871+
("03","1","07"),
1872+
("03","2","08"),
1873+
("09","1","09"),
1874+
("09","2","10");
1875+
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1876+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
1877+
}
1878+
1879+
{
1880+
auto result = session.ExecuteDataQuery(R"(
1881+
--!syntax_v1
1882+
1883+
pragma ydb.OverrideStatistics = '{"/Root/Input1": {"n_rows":10000}, "/Root/Input2" : {"n_rows":10000}, "/Root/Input3":{"n_rows":10000}, "/Root/Input4":{"n_rows":10000}, "/Root/Input5":{"n_rows":10000}}';
1884+
1885+
$rightSemi = select * from Input2 as b right semi join Input1 as a on a.v1 = b.v2 and a.k1 = b.k2;
1886+
$leftOnly = select * from $rightSemi as rs left only join Input3 as c on rs.k1 = c.k3 and rs.v1 = c.v3;
1887+
$right = select * from Input4 as d right join $leftOnly as lo on d.v4 = lo.v1 and lo.k1 = d.k4;
1888+
$inner = select * from $right as r join any Input5 as e on r.k1 = e.k5 and e.v5 = r.v1;
1889+
1890+
select * from $inner order by u1,u5;
1891+
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1892+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1893+
Cout << FormatResultSetYson(result.GetResultSet(0));
1894+
CompareYson(R"(
1895+
[[["02"];#;["02"];["03"];#;["03"];["1"];#;["1"]];[["02"];#;["02"];["03"];#;["04"];["1"];#;["1"]];[["02"];#;["02"];["05"];#;["05"];["2"];#;["2"]];[["02"];#;["02"];["05"];#;["06"];["2"];#;["2"]];[["02"];#;["02"];["06"];#;["05"];["2"];#;["2"]];[["02"];#;["02"];["06"];#;["06"];["2"];#;["2"]];[["03"];["03"];["03"];["08"];["02"];["07"];["1"];["1"];["1"]];[["03"];["03"];["03"];["09"];["03"];["08"];["2"];["2"];["2"]];[["09"];#;["09"];["20"];#;["09"];["1"];#;["1"]];[["09"];#;["09"];["21"];#;["10"];["2"];#;["2"]]]
1896+
)", FormatResultSetYson(result.GetResultSet(0)));
1897+
}
1898+
}
17441899
}
17451900

17461901
} // namespace NKqp

ydb/core/kqp/ut/query/kqp_explain_ut.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
8585
NJson::ReadJsonTree(*res.PlanJson, &plan, true);
8686
UNIT_ASSERT(ValidatePlanNodeIds(plan));
8787

88-
auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)");
88+
auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan");
8989
UNIT_ASSERT(join.IsDefined());
9090
auto left = FindPlanNodeByKv(join, "Table", "EightShard");
9191
UNIT_ASSERT(left.IsDefined());
@@ -112,7 +112,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
112112
NJson::ReadJsonTree(*res.PlanJson, &plan, true);
113113
UNIT_ASSERT(ValidatePlanNodeIds(plan));
114114

115-
auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)");
115+
auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan");
116116
UNIT_ASSERT(join.IsDefined());
117117
auto left = FindPlanNodeByKv(join, "Table", "EightShard");
118118
UNIT_ASSERT(left.IsDefined());
@@ -202,7 +202,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
202202
auto join = FindPlanNodeByKv(
203203
plan,
204204
"Node Type",
205-
"Aggregate-InnerJoin (MapJoin)"
205+
"Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan"
206206
);
207207

208208
UNIT_ASSERT(join.IsDefined());
@@ -365,9 +365,9 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
365365
NJson::ReadJsonTree(*res.PlanJson, &plan, true);
366366
UNIT_ASSERT(ValidatePlanNodeIds(plan));
367367

368-
auto join1 = FindPlanNodeByKv(plan, "Node Type", "Sort-InnerJoin (MapJoin)");
368+
auto join1 = FindPlanNodeByKv(plan, "Node Type", "Sort-InnerJoin (MapJoin)-Filter");
369369
UNIT_ASSERT(join1.IsDefined());
370-
auto join2 = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)");
370+
auto join2 = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter");
371371
UNIT_ASSERT(join2.IsDefined());
372372
}
373373

ydb/library/yql/dq/opt/dq_opt_join.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,19 @@ TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext&
624624
return join;
625625
}
626626

627+
TExprNode::TListType flags;
628+
if (const auto maybeFlags = join.Flags()) {
629+
flags = maybeFlags.Cast().Ref().ChildrenList();
630+
}
631+
632+
for (auto& flag : flags) {
633+
if (flag->IsAtom("LeftAny") || flag->IsAtom("RightAny")) {
634+
ctx.AddError(TIssue(ctx.GetPosition(join.Ptr()->Pos()), "ANY join kind is not currently supported"));
635+
return join;
636+
}
637+
}
638+
639+
627640
YQL_ENSURE(join.LeftInput().Maybe<TDqCnUnionAll>());
628641
TDqCnUnionAll leftCn = join.LeftInput().Cast<TDqCnUnionAll>();
629642

ydb/library/yql/dq/opt/dq_opt_join.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode
1919
NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx);
2020

2121
NNodes::TExprBase DqBuildJoin(const NNodes::TExprBase& node, TExprContext& ctx,
22-
IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin = EHashJoinMode::Off);
22+
IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin = EHashJoinMode::Off, bool shuffleMapJoin = true);
2323

2424
NNodes::TExprBase DqBuildHashJoin(const NNodes::TDqJoin& join, EHashJoinMode mode, TExprContext& ctx, IOptimizationContext& optCtx);
2525

ydb/library/yql/dq/opt/dq_opt_phy.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2654,7 +2654,7 @@ TMaybeNode<TDqJoin> DqFlipJoin(const TDqJoin& join, TExprContext& ctx) {
26542654

26552655

26562656
TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationContext& optCtx,
2657-
const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin)
2657+
const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin, bool shuffleMapJoin)
26582658
{
26592659
if (!node.Maybe<TDqJoin>()) {
26602660
return node;
@@ -2692,7 +2692,7 @@ TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationCon
26922692
return node;
26932693
}
26942694

2695-
if (useHashJoin) {
2695+
if (useHashJoin && (hashJoin == EHashJoinMode::GraceAndSelf || shuffleMapJoin)) {
26962696
return DqBuildHashJoin(join, hashJoin, ctx, optCtx);
26972697
}
26982698

0 commit comments

Comments
 (0)