Skip to content

Commit 09fb3c0

Browse files
authored
Merge branch 'main' into add-default-role
2 parents bc2b978 + 530bb49 commit 09fb3c0

File tree

12 files changed

+397
-8
lines changed

12 files changed

+397
-8
lines changed

query/src/sql/optimizer/heuristic/mod.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
mod decorrelate;
1616
mod implement;
17+
mod prune_columns;
1718
mod rule_list;
1819
mod subquery_rewriter;
1920

@@ -24,6 +25,7 @@ use once_cell::sync::Lazy;
2425

2526
use super::rule::RuleID;
2627
use super::util::validate_distributed_query;
28+
use super::ColumnSet;
2729
use crate::sessions::QueryContext;
2830
use crate::sql::optimizer::heuristic::decorrelate::decorrelate_subquery;
2931
use crate::sql::optimizer::heuristic::implement::HeuristicImplementor;
@@ -35,6 +37,7 @@ use crate::sql::optimizer::RelExpr;
3537
use crate::sql::optimizer::RequiredProperty;
3638
use crate::sql::optimizer::SExpr;
3739
use crate::sql::plans::Exchange;
40+
use crate::sql::BindContext;
3841
use crate::sql::MetadataRef;
3942

4043
pub static DEFAULT_REWRITE_RULES: Lazy<Vec<RuleID>> = Lazy::new(|| {
@@ -61,6 +64,7 @@ pub struct HeuristicOptimizer {
6164
implementor: HeuristicImplementor,
6265

6366
_ctx: Arc<QueryContext>,
67+
bind_context: Box<BindContext>,
6468
metadata: MetadataRef,
6569

6670
enable_distributed_optimization: bool,
@@ -69,6 +73,7 @@ pub struct HeuristicOptimizer {
6973
impl HeuristicOptimizer {
7074
pub fn new(
7175
ctx: Arc<QueryContext>,
76+
bind_context: Box<BindContext>,
7277
metadata: MetadataRef,
7378
rules: RuleList,
7479
enable_distributed_optimization: bool,
@@ -78,6 +83,7 @@ impl HeuristicOptimizer {
7883
implementor: HeuristicImplementor::new(),
7984

8085
_ctx: ctx,
86+
bind_context,
8187
metadata,
8288
enable_distributed_optimization,
8389
}
@@ -89,7 +95,10 @@ impl HeuristicOptimizer {
8995
}
9096

9197
fn post_optimize(&mut self, s_expr: SExpr) -> Result<SExpr> {
92-
Ok(s_expr)
98+
let pruner = prune_columns::ColumnPruner::new(self.metadata.clone());
99+
let require_columns: ColumnSet =
100+
self.bind_context.columns.iter().map(|c| c.index).collect();
101+
pruner.prune_columns(&s_expr, require_columns)
93102
}
94103

95104
pub fn optimize(&mut self, s_expr: SExpr) -> Result<SExpr> {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_exception::ErrorCode;
16+
use common_exception::Result;
17+
18+
use crate::sql::find_smallest_column;
19+
use crate::sql::optimizer::ColumnSet;
20+
use crate::sql::optimizer::SExpr;
21+
use crate::sql::plans::Aggregate;
22+
use crate::sql::plans::EvalScalar;
23+
use crate::sql::plans::LogicalGet;
24+
use crate::sql::plans::Project;
25+
use crate::sql::plans::RelOperator;
26+
use crate::sql::MetadataRef;
27+
use crate::sql::ScalarExpr;
28+
29+
pub struct ColumnPruner {
30+
metadata: MetadataRef,
31+
}
32+
33+
impl ColumnPruner {
34+
pub fn new(metadata: MetadataRef) -> Self {
35+
Self { metadata }
36+
}
37+
38+
pub fn prune_columns(&self, expr: &SExpr, require_columns: ColumnSet) -> Result<SExpr> {
39+
match expr.plan() {
40+
// For project and aggregate, collect required columns for its child
41+
RelOperator::Project(p) => Ok(SExpr::create_unary(
42+
RelOperator::Project(p.clone()),
43+
self.keep_required_columns(expr.child(0)?, p.columns.clone())?,
44+
)),
45+
RelOperator::Aggregate(p) => {
46+
let mut used = p.group_items.iter().fold(ColumnSet::new(), |acc, v| {
47+
acc.union(&v.scalar.used_columns()).cloned().collect()
48+
});
49+
used = p.aggregate_functions.iter().fold(used, |acc, v| {
50+
acc.union(&v.scalar.used_columns()).cloned().collect()
51+
});
52+
Ok(SExpr::create_unary(
53+
RelOperator::Aggregate(p.clone()),
54+
self.keep_required_columns(expr.child(0)?, used)?,
55+
))
56+
}
57+
// For the other plan nodes, keep searching for Project node with required columns
58+
p => {
59+
let children = expr
60+
.children()
61+
.iter()
62+
.map(|expr| self.prune_columns(expr, require_columns.clone()))
63+
.collect::<Result<Vec<_>>>()?;
64+
Ok(SExpr::create(p.clone(), children, None))
65+
}
66+
}
67+
}
68+
69+
/// Keep columns referenced by parent plan node.
70+
/// `required` contains columns referenced by its ancestors. When a node has multiple children,
71+
/// the required columns for each child could be different and we may include columns not needed
72+
/// by a specific child. Columns should be skipped once we found it not exist in the subtree as we
73+
/// visit a plan node.
74+
fn keep_required_columns(&self, expr: &SExpr, mut required: ColumnSet) -> Result<SExpr> {
75+
match expr.plan() {
76+
RelOperator::LogicalGet(p) => {
77+
let mut used: ColumnSet = required.intersection(&p.columns).cloned().collect();
78+
if used.is_empty() {
79+
let columns = self.metadata.read().columns_by_table_index(p.table_index);
80+
let smallest_index = find_smallest_column(&columns);
81+
used.insert(smallest_index);
82+
}
83+
84+
Ok(SExpr::create_leaf(RelOperator::LogicalGet(LogicalGet {
85+
table_index: p.table_index,
86+
columns: used,
87+
push_down_predicates: p.push_down_predicates.clone(),
88+
})))
89+
}
90+
RelOperator::LogicalInnerJoin(p) => {
91+
// Include columns referenced in left conditions
92+
let left = p.left_conditions.iter().fold(required.clone(), |acc, v| {
93+
acc.union(&v.used_columns()).cloned().collect()
94+
});
95+
// Include columns referenced in left conditions
96+
let right = p.right_conditions.iter().fold(required.clone(), |acc, v| {
97+
acc.union(&v.used_columns()).cloned().collect()
98+
});
99+
100+
let others = p.other_conditions.iter().fold(required, |acc, v| {
101+
acc.union(&v.used_columns()).cloned().collect()
102+
});
103+
104+
Ok(SExpr::create_binary(
105+
RelOperator::LogicalInnerJoin(p.clone()),
106+
self.keep_required_columns(
107+
expr.child(0)?,
108+
left.union(&others).cloned().collect(),
109+
)?,
110+
self.keep_required_columns(
111+
expr.child(1)?,
112+
right.union(&others).cloned().collect(),
113+
)?,
114+
))
115+
}
116+
RelOperator::Project(p) => {
117+
let used: ColumnSet = p.columns.intersection(&required).cloned().collect();
118+
Ok(SExpr::create_unary(
119+
RelOperator::Project(Project {
120+
columns: used.clone(),
121+
}),
122+
self.keep_required_columns(expr.child(0)?, used)?,
123+
))
124+
}
125+
RelOperator::EvalScalar(p) => {
126+
let mut used = vec![];
127+
// Only keep columns needed by parent plan.
128+
p.items.iter().for_each(|s| {
129+
if !required.contains(&s.index) {
130+
return;
131+
}
132+
used.push(s.clone());
133+
s.scalar.used_columns().iter().for_each(|c| {
134+
required.insert(*c);
135+
})
136+
});
137+
Ok(SExpr::create_unary(
138+
RelOperator::EvalScalar(EvalScalar { items: used }),
139+
self.keep_required_columns(expr.child(0)?, required)?,
140+
))
141+
}
142+
RelOperator::Filter(p) => {
143+
let used = p.predicates.iter().fold(required, |acc, v| {
144+
acc.union(&v.used_columns()).cloned().collect()
145+
});
146+
Ok(SExpr::create_unary(
147+
RelOperator::Filter(p.clone()),
148+
self.keep_required_columns(expr.child(0)?, used)?,
149+
))
150+
}
151+
RelOperator::Aggregate(p) => {
152+
let mut used = vec![];
153+
for item in &p.aggregate_functions {
154+
if required.contains(&item.index) {
155+
for c in item.scalar.used_columns() {
156+
required.insert(c);
157+
}
158+
used.push(item.clone());
159+
}
160+
}
161+
p.group_items.iter().for_each(|i| {
162+
// If the group item comes from a complex expression, we only include the final
163+
// column index here. The used columns will be included in its EvalScalar child.
164+
required.insert(i.index);
165+
});
166+
Ok(SExpr::create_unary(
167+
RelOperator::Aggregate(Aggregate {
168+
group_items: p.group_items.clone(),
169+
aggregate_functions: used,
170+
from_distinct: p.from_distinct,
171+
mode: p.mode,
172+
}),
173+
self.keep_required_columns(expr.child(0)?, required)?,
174+
))
175+
}
176+
RelOperator::Sort(p) => {
177+
p.items.iter().for_each(|s| {
178+
required.insert(s.index);
179+
});
180+
Ok(SExpr::create_unary(
181+
RelOperator::Sort(p.clone()),
182+
self.keep_required_columns(expr.child(0)?, required)?,
183+
))
184+
}
185+
RelOperator::Limit(p) => Ok(SExpr::create_unary(
186+
RelOperator::Limit(p.clone()),
187+
self.keep_required_columns(expr.child(0)?, required)?,
188+
)),
189+
190+
_ => Err(ErrorCode::LogicalError(
191+
"Attempting to prune columns of a physical plan is not allowed",
192+
)),
193+
}
194+
}
195+
}

query/src/sql/optimizer/mod.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub use s_expr::SExpr;
4242

4343
use self::util::contains_local_table_scan;
4444
use super::plans::Plan;
45+
use super::BindContext;
4546
use crate::sessions::QueryContext;
4647
pub use crate::sql::optimizer::heuristic::RuleList;
4748
pub use crate::sql::optimizer::rule::RuleID;
@@ -77,7 +78,7 @@ pub fn optimize(
7778
metadata,
7879
rewrite_kind,
7980
} => Ok(Plan::Query {
80-
s_expr: optimize_query(ctx, opt_ctx, metadata.clone(), s_expr)?,
81+
s_expr: optimize_query(ctx, opt_ctx, metadata.clone(), bind_context.clone(), s_expr)?,
8182
bind_context,
8283
metadata,
8384
rewrite_kind,
@@ -114,6 +115,7 @@ pub fn optimize_query(
114115
ctx: Arc<QueryContext>,
115116
opt_ctx: Arc<OptimizerContext>,
116117
metadata: MetadataRef,
118+
bind_context: Box<BindContext>,
117119
s_expr: SExpr,
118120
) -> Result<SExpr> {
119121
let rules = RuleList::create(DEFAULT_REWRITE_RULES.clone())?;
@@ -123,7 +125,8 @@ pub fn optimize_query(
123125
let enable_distributed_query = opt_ctx.config.enable_distributed_optimization
124126
&& !contains_local_table_scan(&s_expr, &metadata);
125127

126-
let mut heuristic = HeuristicOptimizer::new(ctx, metadata, rules, enable_distributed_query);
128+
let mut heuristic =
129+
HeuristicOptimizer::new(ctx, bind_context, metadata, rules, enable_distributed_query);
127130
let optimized = heuristic.optimize(s_expr)?;
128131
// TODO: enable cascades optimizer
129132
// let mut cascades = CascadesOptimizer::create(ctx);

query/src/sql/planner/metadata.rs

+16
Original file line numberDiff line numberDiff line change
@@ -199,3 +199,19 @@ pub fn optimize_remove_count_args(name: &str, distinct: bool, args: &[&Expr]) ->
199199
.iter()
200200
.all(|expr| matches!(expr, Expr::Literal{lit,..} if *lit!=Literal::Null))
201201
}
202+
203+
pub fn find_smallest_column(entries: &[ColumnEntry]) -> usize {
204+
debug_assert!(!entries.is_empty());
205+
206+
let mut smallest_index = 0;
207+
let mut smallest_size = usize::MAX;
208+
for (column_index, column_entry) in entries.iter().enumerate() {
209+
if let Ok(bytes) = column_entry.data_type.data_type_id().numeric_byte_size() {
210+
if smallest_size > bytes {
211+
smallest_size = bytes;
212+
smallest_index = column_index;
213+
}
214+
}
215+
}
216+
smallest_index
217+
}

query/src/sql/planner/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ mod semantic;
3535
pub use binder::Binder;
3636
pub use binder::ColumnBinding;
3737
pub use format::FormatTreeNode;
38+
pub use metadata::find_smallest_column;
3839
pub use metadata::ColumnEntry;
3940
pub use metadata::Metadata;
4041
pub use metadata::MetadataRef;

query/src/sql/planner/plans/aggregate.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::sql::plans::PhysicalOperator;
2626
use crate::sql::plans::RelOp;
2727
use crate::sql::plans::ScalarItem;
2828

29-
#[derive(Clone, Debug, PartialEq, Eq)]
29+
#[derive(Clone, Debug, PartialEq, Eq, Copy)]
3030
pub enum AggregateMode {
3131
Partial,
3232
Final,

query/tests/it/sql/optimizer/heuristic/exchange.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,14 @@ async fn run_cluster_test(ctx: Arc<QueryContext>, suite: &Suite) -> Result<Strin
4848

4949
let result = match plan {
5050
Plan::Query {
51-
s_expr, metadata, ..
51+
s_expr,
52+
metadata,
53+
bind_context,
54+
..
5255
} => {
5356
let mut heuristic_opt = HeuristicOptimizer::new(
5457
ctx.clone(),
58+
bind_context,
5559
metadata.clone(),
5660
RuleList::create(suite.rules.clone())?,
5761
true,

query/tests/it/sql/optimizer/heuristic/mod.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
mod exchange;
1616
mod join;
17+
mod prune_columns;
1718
mod select;
1819
mod subquery;
1920

@@ -55,10 +56,14 @@ async fn run_test(ctx: Arc<QueryContext>, suite: &Suite) -> Result<String> {
5556

5657
let result = match plan {
5758
Plan::Query {
58-
s_expr, metadata, ..
59+
s_expr,
60+
metadata,
61+
bind_context,
62+
..
5963
} => {
6064
let mut heuristic_opt = HeuristicOptimizer::new(
6165
ctx.clone(),
66+
bind_context,
6267
metadata.clone(),
6368
RuleList::create(suite.rules.clone())?,
6469
false,

0 commit comments

Comments
 (0)