Skip to content

Commit 12e4bb0

Browse files
authored
Merge pull request #2783 from dantengsky/fix-2782
ISSUE-2782: rm unnecessary clones while merging col stats & some minor file name refactorying
2 parents 1b1d703 + 7814128 commit 12e4bb0

File tree

9 files changed

+145
-127
lines changed

9 files changed

+145
-127
lines changed

query/src/datasources/table/fuse/mod.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,18 @@
1313
// limitations under the License.
1414
//
1515

16+
mod append;
1617
pub(crate) mod index;
1718
pub(crate) mod io;
1819
mod meta;
20+
mod read;
21+
mod read_plan;
1922
mod table;
20-
mod table_do_append;
21-
mod table_do_read;
22-
mod table_do_read_partitions;
23-
mod table_do_truncate;
23+
mod truncate;
2424
pub(crate) mod util;
2525

26+
#[cfg(test)]
27+
mod read_plan_test;
2628
#[cfg(test)]
2729
mod table_test;
2830
#[cfg(test)]

query/src/datasources/table/fuse/table_do_read_partitions.rs renamed to query/src/datasources/table/fuse/read_plan.rs

+38-1
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,20 @@
1313
// limitations under the License.
1414
//
1515

16+
use std::collections::HashSet;
17+
1618
use common_base::BlockingWait;
1719
use common_context::IOContext;
1820
use common_context::TableIOContext;
1921
use common_dal::read_obj;
2022
use common_exception::Result;
2123
use common_planners::Extras;
24+
use common_planners::Part;
2225
use common_planners::Partitions;
2326
use common_planners::Statistics;
2427

2528
use super::index;
29+
use crate::datasources::table::fuse::BlockMeta;
2630
use crate::datasources::table::fuse::FuseTable;
2731

2832
impl FuseTable {
@@ -43,10 +47,43 @@ impl FuseTable {
4347
}
4448
.wait_in(&io_ctx.get_runtime(), None)??;
4549

46-
let (statistics, parts) = Self::to_partitions(&block_metas, push_downs);
50+
let (statistics, parts) = to_partitions(&block_metas, push_downs);
4751
Ok((statistics, parts))
4852
} else {
4953
Ok((Statistics::default(), vec![]))
5054
}
5155
}
5256
}
57+
58+
pub(crate) fn to_partitions(
59+
blocks_metas: &[BlockMeta],
60+
push_downs: Option<Extras>,
61+
) -> (Statistics, Partitions) {
62+
let proj_cols =
63+
push_downs.and_then(|extras| extras.projection.map(HashSet::<usize>::from_iter));
64+
blocks_metas.iter().fold(
65+
(Statistics::default(), Partitions::default()),
66+
|(mut stats, mut parts), block_meta| {
67+
parts.push(Part {
68+
name: block_meta.location.location.clone(),
69+
version: 0,
70+
});
71+
72+
stats.read_rows += block_meta.row_count as usize;
73+
74+
match &proj_cols {
75+
Some(proj) => {
76+
stats.read_bytes += block_meta
77+
.col_stats
78+
.iter()
79+
.filter(|(cid, _)| proj.contains(&(**cid as usize)))
80+
.map(|(_, col_stats)| col_stats.in_memory_size)
81+
.sum::<u64>() as usize
82+
}
83+
None => stats.read_bytes += block_meta.block_size as usize,
84+
}
85+
86+
(stats, parts)
87+
},
88+
)
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2021 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
16+
use std::collections::HashMap;
17+
18+
use common_datavalues::DataValue;
19+
use common_exception::Result;
20+
use common_planners::Extras;
21+
22+
use crate::datasources::table::fuse::read_plan::to_partitions;
23+
use crate::datasources::table::fuse::BlockLocation;
24+
use crate::datasources::table::fuse::BlockMeta;
25+
use crate::datasources::table::fuse::ColStats;
26+
27+
#[test]
28+
fn test_to_partitions() -> Result<()> {
29+
// setup
30+
let num_of_col = 10;
31+
let num_of_block = 5;
32+
33+
let col_stats_gen = |col_size| ColStats {
34+
min: DataValue::Int8(Some(1)),
35+
max: DataValue::Int8(Some(2)),
36+
null_count: 0,
37+
in_memory_size: col_size as u64,
38+
};
39+
40+
let cols_stats = (0..num_of_col)
41+
.into_iter()
42+
.map(|col_id| (col_id as u32, col_stats_gen(col_id)))
43+
.collect::<HashMap<_, _>>();
44+
45+
let block_meta = BlockMeta {
46+
row_count: 0,
47+
block_size: cols_stats
48+
.iter()
49+
.map(|(_, col_stats)| col_stats.in_memory_size)
50+
.sum(),
51+
col_stats: cols_stats.clone(),
52+
location: BlockLocation {
53+
location: "".to_string(),
54+
meta_size: 0,
55+
},
56+
};
57+
58+
let blocks_metas = (0..num_of_block)
59+
.into_iter()
60+
.map(|_| block_meta.clone())
61+
.collect::<Vec<_>>();
62+
63+
// CASE I: no projection
64+
let (s, _) = to_partitions(&blocks_metas, None);
65+
let expected_block_size: u64 = cols_stats
66+
.iter()
67+
.map(|(_, col_stats)| col_stats.in_memory_size)
68+
.sum();
69+
assert_eq!(expected_block_size * num_of_block, s.read_bytes as u64);
70+
71+
// CASE II: col pruning
72+
// projection which keeps the odd ones
73+
let proj = (0..num_of_col)
74+
.into_iter()
75+
.filter(|v| v & 1 != 0)
76+
.collect::<Vec<usize>>();
77+
78+
// for each block, the block size we expects (after pruning)
79+
let expected_block_size: u64 = cols_stats
80+
.iter()
81+
.filter(|(cid, _)| proj.contains(&(**cid as usize)))
82+
.map(|(_, col_stats)| col_stats.in_memory_size)
83+
.sum();
84+
85+
// kick off
86+
let push_down = Some(Extras {
87+
projection: Some(proj),
88+
filters: vec![],
89+
limit: None,
90+
order_by: vec![],
91+
});
92+
let (stats, _) = to_partitions(&blocks_metas, push_down);
93+
assert_eq!(expected_block_size * num_of_block, stats.read_bytes as u64);
94+
Ok(())
95+
}

query/src/datasources/table/fuse/table.rs

-37
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
//
1515

1616
use std::any::Any;
17-
use std::collections::HashSet;
18-
use std::iter::FromIterator;
1917
use std::sync::Arc;
2018

2119
use common_context::DataContext;
@@ -26,7 +24,6 @@ use common_exception::Result;
2624
use common_meta_types::TableInfo;
2725
use common_planners::Extras;
2826
use common_planners::InsertIntoPlan;
29-
use common_planners::Part;
3027
use common_planners::Partitions;
3128
use common_planners::ReadDataSourcePlan;
3229
use common_planners::Statistics;
@@ -35,7 +32,6 @@ use common_streams::SendableDataBlockStream;
3532

3633
use super::util;
3734
use crate::catalogs::Table;
38-
use crate::datasources::table::fuse::BlockMeta;
3935
use crate::datasources::table::fuse::TableSnapshot;
4036

4137
pub struct FuseTable {
@@ -122,37 +118,4 @@ impl FuseTable {
122118
Ok(None)
123119
}
124120
}
125-
126-
pub(crate) fn to_partitions(
127-
blocks_metas: &[BlockMeta],
128-
push_downs: Option<Extras>,
129-
) -> (Statistics, Partitions) {
130-
let proj_cols =
131-
push_downs.and_then(|extras| extras.projection.map(HashSet::<usize>::from_iter));
132-
blocks_metas.iter().fold(
133-
(Statistics::default(), Partitions::default()),
134-
|(mut stats, mut parts), block_meta| {
135-
parts.push(Part {
136-
name: block_meta.location.location.clone(),
137-
version: 0,
138-
});
139-
140-
stats.read_rows += block_meta.row_count as usize;
141-
142-
match &proj_cols {
143-
Some(proj) => {
144-
stats.read_bytes += block_meta
145-
.col_stats
146-
.iter()
147-
.filter(|(cid, _)| proj.contains(&(**cid as usize)))
148-
.map(|(_, col_stats)| col_stats.in_memory_size)
149-
.sum::<u64>() as usize
150-
}
151-
None => stats.read_bytes += block_meta.block_size as usize,
152-
}
153-
154-
(stats, parts)
155-
},
156-
)
157-
}
158121
}

query/src/datasources/table/fuse/table_test.rs

-77
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,17 @@
1313
// limitations under the License.
1414
//
1515

16-
use std::collections::HashMap;
1716
use std::sync::Arc;
1817

1918
use common_base::tokio;
20-
use common_datavalues::DataValue;
2119
use common_exception::Result;
22-
use common_planners::Extras;
2320
use common_planners::ReadDataSourcePlan;
2421
use common_planners::TruncateTablePlan;
2522
use futures::TryStreamExt;
2623

2724
use crate::catalogs::Catalog;
2825
use crate::catalogs::ToReadDataSourcePlan;
2926
use crate::datasources::table::fuse::table_test_fixture::TestFixture;
30-
use crate::datasources::table::fuse::BlockLocation;
31-
use crate::datasources::table::fuse::BlockMeta;
32-
use crate::datasources::table::fuse::ColStats;
33-
use crate::datasources::table::fuse::FuseTable;
3427

3528
#[tokio::test]
3629
async fn test_fuse_table_simple_case() -> Result<()> {
@@ -202,73 +195,3 @@ async fn test_fuse_table_truncate() -> Result<()> {
202195

203196
Ok(())
204197
}
205-
206-
#[test]
207-
fn test_fuse_table_to_partitions_stats_with_col_pruning() -> Result<()> {
208-
// setup
209-
let num_of_col = 10;
210-
let num_of_block = 5;
211-
212-
let col_stats_gen = |col_size| ColStats {
213-
min: DataValue::Int8(Some(1)),
214-
max: DataValue::Int8(Some(2)),
215-
null_count: 0,
216-
in_memory_size: col_size as u64,
217-
};
218-
219-
let cols_stats = (0..num_of_col)
220-
.into_iter()
221-
.map(|col_id| (col_id as u32, col_stats_gen(col_id)))
222-
.collect::<HashMap<_, _>>();
223-
224-
let block_meta = BlockMeta {
225-
row_count: 0,
226-
block_size: cols_stats
227-
.iter()
228-
.map(|(_, col_stats)| col_stats.in_memory_size)
229-
.sum(),
230-
col_stats: cols_stats.clone(),
231-
location: BlockLocation {
232-
location: "".to_string(),
233-
meta_size: 0,
234-
},
235-
};
236-
237-
let blocks_metas = (0..num_of_block)
238-
.into_iter()
239-
.map(|_| block_meta.clone())
240-
.collect::<Vec<_>>();
241-
242-
// CASE I: no projection
243-
let (s, _) = FuseTable::to_partitions(&blocks_metas, None);
244-
let expected_block_size: u64 = cols_stats
245-
.iter()
246-
.map(|(_, col_stats)| col_stats.in_memory_size)
247-
.sum();
248-
assert_eq!(expected_block_size * num_of_block, s.read_bytes as u64);
249-
250-
// CASE II: col pruning
251-
// projection which keeps the odd ones
252-
let proj = (0..num_of_col)
253-
.into_iter()
254-
.filter(|v| v & 1 != 0)
255-
.collect::<Vec<usize>>();
256-
257-
// for each block, the block size we expects (after pruning)
258-
let expected_block_size: u64 = cols_stats
259-
.iter()
260-
.filter(|(cid, _)| proj.contains(&(**cid as usize)))
261-
.map(|(_, col_stats)| col_stats.in_memory_size)
262-
.sum();
263-
264-
// kick off
265-
let push_down = Some(Extras {
266-
projection: Some(proj),
267-
filters: vec![],
268-
limit: None,
269-
order_by: vec![],
270-
});
271-
let (stats, _) = FuseTable::to_partitions(&blocks_metas, push_down);
272-
assert_eq!(expected_block_size * num_of_block, stats.read_bytes as u64);
273-
Ok(())
274-
}

query/src/datasources/table/fuse/util/statistic_helper.rs

+6-8
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414
//
1515

16+
use std::borrow::Borrow;
1617
use std::collections::hash_map::Entry;
1718
use std::collections::HashMap;
1819

@@ -137,15 +138,15 @@ pub(super) fn block_stats(data_block: &DataBlock) -> Result<BlockStats> {
137138
.collect()
138139
}
139140

140-
pub fn column_stats_reduce_with_schema(
141-
stats: &[HashMap<ColumnId, ColStats>],
141+
pub fn column_stats_reduce_with_schema<T: Borrow<HashMap<ColumnId, ColStats>>>(
142+
stats: &[T],
142143
schema: &DataSchema,
143144
) -> Result<HashMap<ColumnId, ColStats>> {
144145
let len = stats.len();
145146

146147
// transpose Vec<HashMap<_,(_,_)>> to HashMap<_, (_, Vec<_>)>
147148
let col_stat_list = stats.iter().fold(HashMap::new(), |acc, item| {
148-
item.iter().fold(
149+
item.borrow().iter().fold(
149150
acc,
150151
|mut acc: HashMap<ColumnId, Vec<&ColStats>>, (col_id, stats)| {
151152
let entry = acc.entry(*col_id);
@@ -185,7 +186,7 @@ pub fn column_stats_reduce_with_schema(
185186

186187
// TODO
187188
// for some data types, we shall balance the accuracy and the length
188-
// e.g. for a string col, which max value is "xxxxxxxxxxxx....", we record the max as something like "y"
189+
// e.g. for a string col, which max value is "abcdef....", we record the max as something like "b"
189190
let min =
190191
common_datavalues::DataValue::try_into_data_array(min_stats.as_slice(), data_type)?
191192
.min()?;
@@ -210,10 +211,7 @@ pub fn merge_stats(schema: &DataSchema, l: &Stats, r: &Stats) -> Result<Stats> {
210211
block_count: l.block_count + r.block_count,
211212
uncompressed_byte_size: l.uncompressed_byte_size + r.uncompressed_byte_size,
212213
compressed_byte_size: l.compressed_byte_size + r.compressed_byte_size,
213-
col_stats: util::column_stats_reduce_with_schema(
214-
&[l.col_stats.clone(), r.col_stats.clone()],
215-
schema,
216-
)?,
214+
col_stats: util::column_stats_reduce_with_schema(&[&l.col_stats, &r.col_stats], schema)?,
217215
};
218216
Ok(s)
219217
}

0 commit comments

Comments
 (0)