Skip to content

Commit 5e3c70a

Browse files
authored
Merge pull request #7273 from TCeason/limit_fix_test
feat(planner): Push limit down further
2 parents 3af1e5b + b14a0c7 commit 5e3c70a

33 files changed

+801
-124
lines changed

src/query/service/src/sql/executor/physical_plan_builder.rs

+25-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::sync::Arc;
1717

1818
use common_exception::ErrorCode;
1919
use common_exception::Result;
20+
use common_planners::Expression;
2021
use common_planners::Extras;
2122
use common_planners::Projection;
2223
use common_planners::StageKind;
@@ -92,6 +93,28 @@ impl PhysicalPlanBuilder {
9293
})
9394
.transpose()?;
9495

96+
let order_by = scan
97+
.order_by
98+
.clone()
99+
.map(|items| {
100+
let builder =
101+
ExpressionBuilderWithoutRenaming::create(self.metadata.clone());
102+
items
103+
.into_iter()
104+
.map(|item| {
105+
builder
106+
.build_column_ref(item.index)
107+
.map(|c| Expression::Sort {
108+
expr: Box::new(c.clone()),
109+
asc: item.asc,
110+
nulls_first: item.nulls_first,
111+
origin_expr: Box::new(c),
112+
})
113+
})
114+
.collect::<Result<Vec<_>>>()
115+
})
116+
.transpose()?;
117+
95118
let table_entry = metadata.table(scan.table_index);
96119
let table = table_entry.table.clone();
97120
let table_schema = table.schema();
@@ -130,7 +153,8 @@ impl PhysicalPlanBuilder {
130153
let push_downs = Extras {
131154
projection: Some(projection),
132155
filters: push_down_filters.unwrap_or_default(),
133-
..Default::default()
156+
limit: scan.limit,
157+
order_by: order_by.unwrap_or_default(),
134158
};
135159

136160
let source = table

src/query/service/src/sql/optimizer/heuristic/decorrelate.rs

+2
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,8 @@ impl SubqueryRewriter {
455455
table_index,
456456
columns: self.derived_columns.values().cloned().collect(),
457457
push_down_predicates: None,
458+
limit: None,
459+
order_by: None,
458460
}
459461
.into(),
460462
);

src/query/service/src/sql/optimizer/heuristic/mod.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ pub static DEFAULT_REWRITE_RULES: Lazy<Vec<RuleID>> = Lazy::new(|| {
5050
RuleID::MergeFilter,
5151
RuleID::MergeEvalScalar,
5252
RuleID::MergeProject,
53+
RuleID::PushDownLimitProject,
54+
RuleID::PushDownLimitSort,
55+
RuleID::PushDownLimitOuterJoin,
56+
RuleID::PushDownLimitScan,
57+
RuleID::PushDownSortScan,
5358
RuleID::PushDownFilterEvalScalar,
5459
RuleID::PushDownFilterProject,
5560
RuleID::PushDownFilterJoin,
@@ -132,7 +137,7 @@ impl HeuristicOptimizer {
132137
for expr in s_expr.children() {
133138
optimized_children.push(self.optimize_expression(expr)?);
134139
}
135-
let optimized_expr = SExpr::create(s_expr.plan().clone(), optimized_children, None);
140+
let optimized_expr = s_expr.replace_children(optimized_children);
136141
let result = self.apply_transform_rules(&optimized_expr, &self.rules)?;
137142

138143
Ok(result)
@@ -161,8 +166,8 @@ impl HeuristicOptimizer {
161166
for rule in rule_list.iter() {
162167
let mut state = TransformState::new();
163168
if s_expr.match_pattern(rule.pattern()) && !s_expr.applied_rule(&rule.id()) {
164-
rule.apply(&s_expr, &mut state)?;
165169
s_expr.apply_rule(&rule.id());
170+
rule.apply(&s_expr, &mut state)?;
166171
if !state.results().is_empty() {
167172
// Recursive optimize the result
168173
let result = &state.results()[0];

src/query/service/src/sql/optimizer/heuristic/prune_columns.rs

+2
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ impl ColumnPruner {
8787
table_index: p.table_index,
8888
columns: used,
8989
push_down_predicates: p.push_down_predicates.clone(),
90+
limit: p.limit,
91+
order_by: p.order_by.clone(),
9092
})))
9193
}
9294
RelOperator::LogicalInnerJoin(p) => {

src/query/service/src/sql/optimizer/rule/factory.rs

+10
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ use crate::sql::optimizer::rule::rewrite::RuleMergeEvalScalar;
2626
use crate::sql::optimizer::rule::rewrite::RuleMergeFilter;
2727
use crate::sql::optimizer::rule::rewrite::RuleMergeProject;
2828
use crate::sql::optimizer::rule::rewrite::RulePushDownFilterScan;
29+
use crate::sql::optimizer::rule::rewrite::RulePushDownLimitOuterJoin;
30+
use crate::sql::optimizer::rule::rewrite::RulePushDownLimitProject;
31+
use crate::sql::optimizer::rule::rewrite::RulePushDownLimitScan;
32+
use crate::sql::optimizer::rule::rewrite::RulePushDownLimitSort;
33+
use crate::sql::optimizer::rule::rewrite::RulePushDownSortScan;
2934
use crate::sql::optimizer::rule::rewrite::RuleSplitAggregate;
3035
use crate::sql::optimizer::rule::rule_implement_get::RuleImplementGet;
3136
use crate::sql::optimizer::rule::rule_implement_hash_join::RuleImplementHashJoin;
@@ -48,6 +53,11 @@ impl RuleFactory {
4853
RuleID::PushDownFilterEvalScalar => Ok(Box::new(RulePushDownFilterEvalScalar::new())),
4954
RuleID::PushDownFilterJoin => Ok(Box::new(RulePushDownFilterJoin::new())),
5055
RuleID::PushDownFilterScan => Ok(Box::new(RulePushDownFilterScan::new())),
56+
RuleID::PushDownLimitProject => Ok(Box::new(RulePushDownLimitProject::new())),
57+
RuleID::PushDownLimitScan => Ok(Box::new(RulePushDownLimitScan::new())),
58+
RuleID::PushDownSortScan => Ok(Box::new(RulePushDownSortScan::new())),
59+
RuleID::PushDownLimitOuterJoin => Ok(Box::new(RulePushDownLimitOuterJoin::new())),
60+
RuleID::PushDownLimitSort => Ok(Box::new(RulePushDownLimitSort::new())),
5161
RuleID::EliminateFilter => Ok(Box::new(RuleEliminateFilter::new())),
5262
RuleID::EliminateProject => Ok(Box::new(RuleEliminateProject::new())),
5363
RuleID::MergeProject => Ok(Box::new(RuleMergeProject::new())),

src/query/service/src/sql/optimizer/rule/mod.rs

+10
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ pub enum RuleID {
5050
PushDownFilterEvalScalar,
5151
PushDownFilterJoin,
5252
PushDownFilterScan,
53+
PushDownLimitOuterJoin,
54+
PushDownLimitProject,
55+
PushDownLimitSort,
56+
PushDownLimitScan,
57+
PushDownSortScan,
5358
EliminateEvalScalar,
5459
EliminateFilter,
5560
EliminateProject,
@@ -72,6 +77,11 @@ impl Display for RuleID {
7277
RuleID::PushDownFilterEvalScalar => write!(f, "PushDownFilterEvalScalar"),
7378
RuleID::PushDownFilterJoin => write!(f, "PushDownFilterJoin"),
7479
RuleID::PushDownFilterScan => write!(f, "PushDownFilterScan"),
80+
RuleID::PushDownLimitOuterJoin => write!(f, "PushDownLimitOuterJoin"),
81+
RuleID::PushDownLimitProject => write!(f, "PushDownLimitProject"),
82+
RuleID::PushDownLimitSort => write!(f, "PushDownLimitSort"),
83+
RuleID::PushDownLimitScan => write!(f, "PushDownLimitScan"),
84+
RuleID::PushDownSortScan => write!(f, "PushDownSortScan"),
7585
RuleID::EliminateEvalScalar => write!(f, "EliminateEvalScalar"),
7686
RuleID::EliminateFilter => write!(f, "EliminateFilter"),
7787
RuleID::EliminateProject => write!(f, "EliminateProject"),

src/query/service/src/sql/optimizer/rule/rewrite/mod.rs

+10
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ mod rule_push_down_filter_eval_scalar;
2424
mod rule_push_down_filter_join;
2525
mod rule_push_down_filter_project;
2626
mod rule_push_down_filter_scan;
27+
mod rule_push_down_limit_join;
28+
mod rule_push_down_limit_project;
29+
mod rule_push_down_limit_scan;
30+
mod rule_push_down_limit_sort;
31+
mod rule_push_down_sort_scan;
2732
mod rule_split_aggregate;
2833

2934
pub use rule_eliminate_eval_scalar::RuleEliminateEvalScalar;
@@ -38,4 +43,9 @@ pub use rule_push_down_filter_eval_scalar::RulePushDownFilterEvalScalar;
3843
pub use rule_push_down_filter_join::RulePushDownFilterJoin;
3944
pub use rule_push_down_filter_project::RulePushDownFilterProject;
4045
pub use rule_push_down_filter_scan::RulePushDownFilterScan;
46+
pub use rule_push_down_limit_join::RulePushDownLimitOuterJoin;
47+
pub use rule_push_down_limit_project::RulePushDownLimitProject;
48+
pub use rule_push_down_limit_scan::RulePushDownLimitScan;
49+
pub use rule_push_down_limit_sort::RulePushDownLimitSort;
50+
pub use rule_push_down_sort_scan::RulePushDownSortScan;
4151
pub use rule_split_aggregate::RuleSplitAggregate;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::sql::optimizer::rule::Rule;
16+
use crate::sql::optimizer::rule::TransformState;
17+
use crate::sql::optimizer::RuleID;
18+
use crate::sql::optimizer::SExpr;
19+
use crate::sql::plans::JoinType;
20+
use crate::sql::plans::Limit;
21+
use crate::sql::plans::LogicalInnerJoin;
22+
use crate::sql::plans::PatternPlan;
23+
use crate::sql::plans::RelOp;
24+
use crate::sql::plans::RelOperator;
25+
26+
/// Input: Limit
27+
/// |
28+
/// Left Outer Join
29+
/// / \
30+
/// * *
31+
///
32+
/// Output:
33+
/// Limit
34+
/// |
35+
/// Left Outer Join
36+
/// / \
37+
/// Limit Limit
38+
/// / \
39+
/// * *
40+
pub struct RulePushDownLimitOuterJoin {
41+
id: RuleID,
42+
pattern: SExpr,
43+
}
44+
45+
impl RulePushDownLimitOuterJoin {
46+
pub fn new() -> Self {
47+
Self {
48+
id: RuleID::PushDownLimitOuterJoin,
49+
pattern: SExpr::create_unary(
50+
PatternPlan {
51+
plan_type: RelOp::Limit,
52+
}
53+
.into(),
54+
SExpr::create_binary(
55+
PatternPlan {
56+
plan_type: RelOp::LogicalInnerJoin,
57+
}
58+
.into(),
59+
SExpr::create_leaf(
60+
PatternPlan {
61+
plan_type: RelOp::Pattern,
62+
}
63+
.into(),
64+
),
65+
SExpr::create_leaf(
66+
PatternPlan {
67+
plan_type: RelOp::Pattern,
68+
}
69+
.into(),
70+
),
71+
),
72+
),
73+
}
74+
}
75+
}
76+
77+
impl Rule for RulePushDownLimitOuterJoin {
78+
fn id(&self) -> RuleID {
79+
self.id
80+
}
81+
82+
fn apply(&self, s_expr: &SExpr, state: &mut TransformState) -> common_exception::Result<()> {
83+
let limit: Limit = s_expr.plan().clone().try_into()?;
84+
if limit.limit.is_some() {
85+
let child = s_expr.child(0)?;
86+
let join: LogicalInnerJoin = child.plan().clone().try_into()?;
87+
match join.join_type {
88+
JoinType::Left | JoinType::Full => {
89+
state.add_result(s_expr.replace_children(vec![child.replace_children(vec![
90+
SExpr::create_unary(
91+
RelOperator::Limit(limit.clone()),
92+
child.child(0)?.clone(),
93+
),
94+
SExpr::create_unary(RelOperator::Limit(limit), child.child(1)?.clone()),
95+
])]))
96+
}
97+
_ => {}
98+
}
99+
}
100+
Ok(())
101+
}
102+
103+
fn pattern(&self) -> &SExpr {
104+
&self.pattern
105+
}
106+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_exception::Result;
16+
17+
use crate::sql::optimizer::rule::Rule;
18+
use crate::sql::optimizer::rule::TransformState;
19+
use crate::sql::optimizer::RuleID;
20+
use crate::sql::optimizer::SExpr;
21+
use crate::sql::plans::Limit;
22+
use crate::sql::plans::PatternPlan;
23+
use crate::sql::plans::RelOp;
24+
25+
/// Input: Limit
26+
/// \
27+
/// Project
28+
/// \
29+
/// *
30+
///
31+
/// Output: Project
32+
/// \
33+
/// Limit
34+
/// \
35+
/// *
36+
pub struct RulePushDownLimitProject {
37+
id: RuleID,
38+
pattern: SExpr,
39+
}
40+
41+
impl RulePushDownLimitProject {
42+
pub fn new() -> Self {
43+
Self {
44+
id: RuleID::PushDownLimitProject,
45+
pattern: SExpr::create_unary(
46+
PatternPlan {
47+
plan_type: RelOp::Limit,
48+
}
49+
.into(),
50+
SExpr::create_unary(
51+
PatternPlan {
52+
plan_type: RelOp::Project,
53+
}
54+
.into(),
55+
SExpr::create_leaf(
56+
PatternPlan {
57+
plan_type: RelOp::Pattern,
58+
}
59+
.into(),
60+
),
61+
),
62+
),
63+
}
64+
}
65+
}
66+
67+
impl Rule for RulePushDownLimitProject {
68+
fn id(&self) -> RuleID {
69+
self.id
70+
}
71+
72+
fn apply(&self, s_expr: &SExpr, state: &mut TransformState) -> Result<()> {
73+
let limit: Limit = s_expr.plan().clone().try_into()?;
74+
if limit.limit.is_none() {
75+
return Ok(());
76+
}
77+
let project = s_expr.child(0)?;
78+
state.add_result(project.replace_children(vec![
79+
s_expr.replace_children(vec![project.child(0)?.clone()]),
80+
]));
81+
Ok(())
82+
}
83+
84+
fn pattern(&self) -> &SExpr {
85+
&self.pattern
86+
}
87+
}

0 commit comments

Comments
 (0)