Skip to content

Commit 5634318

Browse files
authored
Merge pull request #7302 from sundy-li/topn_pruner
feat(storage): add topn pruner
2 parents 016a3b5 + 134b32d commit 5634318

File tree

5 files changed

+179
-48
lines changed

5 files changed

+179
-48
lines changed

src/query/service/tests/it/storages/fuse/pruning.rs

+47-38
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use common_planners::col;
2626
use common_planners::lit;
2727
use common_planners::sub;
2828
use common_planners::CreateTablePlan;
29+
use common_planners::Expression;
2930
use common_planners::Extras;
3031
use databend_query::interpreters::CreateTableInterpreter;
3132
use databend_query::interpreters::Interpreter;
@@ -145,49 +146,57 @@ async fn test_block_pruner() -> Result<()> {
145146
let reader = MetaReaders::table_snapshot_reader(ctx.clone());
146147
let snapshot = reader.read(snapshot_loc.as_str(), None, 1).await?;
147148

148-
// nothing will be pruned
149-
let push_downs = None;
150-
let blocks = apply_block_pruning(
151-
snapshot.clone(),
152-
table.get_table_info().schema(),
153-
&push_downs,
154-
ctx.clone(),
155-
)
156-
.await?;
157-
let rows = blocks.iter().map(|b| b.row_count as usize).sum::<usize>();
158-
assert_eq!(rows, num_blocks * row_per_block);
159-
assert_eq!(num_blocks, blocks.len());
160-
161-
// fully pruned
162-
let mut extra = Extras::default();
163-
// max value of col a is
164-
let pred = col("a").gt(lit(30u64));
165-
extra.filters = vec![pred];
166-
167-
let blocks = apply_block_pruning(
168-
snapshot.clone(),
169-
table.get_table_info().schema(),
170-
&Some(extra),
171-
ctx.clone(),
172-
)
173-
.await?;
174-
assert_eq!(0, blocks.len());
149+
// nothing is pruned
150+
let mut e1 = Extras::default();
151+
e1.filters = vec![col("a").gt(lit(30u64))];
175152

176153
// some blocks pruned
177-
let mut extra = Extras::default();
154+
let mut e2 = Extras::default();
178155
let max_val_of_b = 6u64;
179-
let pred = col("a").gt(lit(0u64)).and(col("b").gt(lit(max_val_of_b)));
180-
extra.filters = vec![pred];
156+
e2.filters = vec![col("a").gt(lit(0u64)).and(col("b").gt(lit(max_val_of_b)))];
157+
let b2 = num_blocks - max_val_of_b as usize - 1;
158+
159+
// Sort asc Limit
160+
let mut e3 = Extras::default();
161+
e3.order_by = vec![Expression::Sort {
162+
expr: Box::new(col("b")),
163+
asc: true,
164+
nulls_first: false,
165+
origin_expr: Box::new(col("b")),
166+
}];
167+
e3.limit = Some(3);
168+
169+
// Sort desc Limit
170+
let mut e4 = Extras::default();
171+
e4.order_by = vec![Expression::Sort {
172+
expr: Box::new(col("b")),
173+
asc: false,
174+
nulls_first: false,
175+
origin_expr: Box::new(col("b")),
176+
}];
177+
e4.limit = Some(3);
178+
179+
let extras = vec![
180+
(None, num_blocks, num_blocks * row_per_block),
181+
(Some(e1), 0, 0),
182+
(Some(e2), b2, b2 * row_per_block),
183+
(Some(e3), 3, 3 * row_per_block),
184+
(Some(e4), 4, 4 * row_per_block),
185+
];
181186

182-
let blocks = apply_block_pruning(
183-
snapshot.clone(),
184-
table.get_table_info().schema(),
185-
&Some(extra),
186-
ctx.clone(),
187-
)
188-
.await?;
187+
for (extra, expected_blocks, expected_rows) in extras {
188+
let blocks = apply_block_pruning(
189+
snapshot.clone(),
190+
table.get_table_info().schema(),
191+
&extra,
192+
ctx.clone(),
193+
)
194+
.await?;
189195

190-
assert_eq!((num_blocks - max_val_of_b as usize - 1), blocks.len());
196+
let rows = blocks.iter().map(|b| b.row_count as usize).sum::<usize>();
197+
assert_eq!(expected_rows, rows);
198+
assert_eq!(expected_blocks, blocks.len());
199+
}
191200

192201
Ok(())
193202
}

src/query/storages/fuse/src/operations/read_partitions.rs

-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ impl FuseTable {
5151
.into_iter()
5252
.map(|(_, v)| v)
5353
.collect::<Vec<_>>();
54-
5554
let partitions_total = snapshot.summary.block_count as usize;
5655
self.read_partitions_with_metas(ctx, push_downs, block_metas, partitions_total)
5756
}

src/query/storages/fuse/src/pruning/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,6 @@ mod bloom_pruner;
1616
mod limiter;
1717
mod pruning_executor;
1818
mod range_pruner;
19+
mod topn_pruner;
20+
1921
pub use pruning_executor::BlockPruner;

src/query/storages/fuse/src/pruning/pruning_executor.rs

+28-9
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use super::bloom_pruner;
3333
use crate::io::MetaReaders;
3434
use crate::pruning::limiter;
3535
use crate::pruning::range_pruner;
36+
use crate::pruning::topn_pruner;
3637

3738
pub struct BlockPruner {
3839
table_snapshot: Arc<TableSnapshot>,
@@ -150,15 +151,33 @@ impl BlockPruner {
150151
.map_err(|e| ErrorCode::StorageOther(format!("block pruning failure, {}", e)))?;
151152

152153
// 3. collect the result
153-
tracing::debug_span!("collect_result").in_scope(|| {
154-
// flatten the collected block metas
155-
let metas = joint
156-
.into_iter()
157-
.collect::<Result<Vec<_>>>()?
158-
.into_iter()
159-
.flatten();
160-
Ok(metas.collect())
161-
})
154+
let metas: Result<Vec<(usize, BlockMeta)>> = tracing::debug_span!("collect_result")
155+
.in_scope(|| {
156+
// flatten the collected block metas
157+
let metas = joint
158+
.into_iter()
159+
.collect::<Result<Vec<_>>>()?
160+
.into_iter()
161+
.flatten();
162+
Ok(metas.collect())
163+
});
164+
let metas = metas?;
165+
166+
// if there are ordering + limit clause, use topn pruner
167+
168+
if push_down
169+
.as_ref()
170+
.filter(|p| !p.order_by.is_empty() && p.limit.is_some())
171+
.is_some()
172+
{
173+
let push_down = push_down.as_ref().unwrap();
174+
let limit = push_down.limit.unwrap();
175+
let sort = push_down.order_by.clone();
176+
let tpruner = topn_pruner::TopNPrunner::new(schema, sort, limit);
177+
return tpruner.prune(metas);
178+
}
179+
180+
Ok(metas)
162181
}
163182

164183
async fn all_the_blocks(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_datavalues::DataSchemaRef;
16+
use common_exception::ErrorCode;
17+
use common_exception::Result;
18+
use common_fuse_meta::meta::BlockMeta;
19+
use common_fuse_meta::meta::ColumnStatistics;
20+
use common_planners::Expression;
21+
22+
pub(crate) struct TopNPrunner {
23+
schema: DataSchemaRef,
24+
sort: Vec<Expression>,
25+
limit: usize,
26+
}
27+
28+
impl TopNPrunner {
29+
pub(crate) fn new(schema: DataSchemaRef, sort: Vec<Expression>, limit: usize) -> Self {
30+
Self {
31+
schema,
32+
sort,
33+
limit,
34+
}
35+
}
36+
}
37+
38+
impl TopNPrunner {
39+
pub(crate) fn prune(&self, metas: Vec<(usize, BlockMeta)>) -> Result<Vec<(usize, BlockMeta)>> {
40+
if self.sort.len() != 1 {
41+
return Ok(metas);
42+
}
43+
44+
if self.limit >= metas.len() {
45+
return Ok(metas);
46+
}
47+
48+
let (sort, asc, nulls_first) = match &self.sort[0] {
49+
Expression::Sort {
50+
expr,
51+
asc,
52+
nulls_first,
53+
..
54+
} => (expr, asc, nulls_first),
55+
_ => unreachable!(),
56+
};
57+
58+
// Currently, we only support topn on single-column sort.
59+
// TODO: support monadic + multi expression + order by cluster key sort.
60+
let column = if let Expression::Column(c) = sort.as_ref() {
61+
c
62+
} else {
63+
return Ok(metas);
64+
};
65+
66+
let sort_idx = if let Ok(index) = self.schema.index_of(column.as_str()) {
67+
index as u32
68+
} else {
69+
return Ok(metas);
70+
};
71+
72+
let mut id_stats = metas
73+
.iter()
74+
.map(|(id, meta)| {
75+
let stat = meta.col_stats.get(&sort_idx).ok_or_else(|| {
76+
ErrorCode::UnknownException(format!(
77+
"Unable to get the colStats by ColumnId: {}",
78+
sort_idx
79+
))
80+
})?;
81+
Ok((*id, stat.clone(), meta.clone()))
82+
})
83+
.collect::<Result<Vec<(usize, ColumnStatistics, BlockMeta)>>>()?;
84+
85+
id_stats.sort_by(|a, b| {
86+
if a.1.null_count + b.1.null_count != 0 && *nulls_first {
87+
return a.1.null_count.cmp(&b.1.null_count).reverse();
88+
}
89+
// no nulls
90+
if *asc {
91+
a.1.min.cmp(&b.1.min)
92+
} else {
93+
a.1.max.cmp(&b.1.max).reverse()
94+
}
95+
});
96+
Ok(id_stats
97+
.iter()
98+
.map(|s| (s.0, s.2.clone()))
99+
.take(self.limit as usize)
100+
.collect())
101+
}
102+
}

0 commit comments

Comments
 (0)