Skip to content

Commit 64082b5

Browse files
committed
feat(fuse): try to improve object storage read, merge io requests
1 parent 479329f commit 64082b5

File tree

8 files changed

+290
-7
lines changed

8 files changed

+290
-7
lines changed

src/common/base/src/rangemap/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
mod range_map;
1616
mod range_map_key;
17+
mod range_merger;
1718

1819
pub use range_map::RangeMap;
1920
pub use range_map_key::RangeMapKey;
21+
pub use range_merger::RangeMerger;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/// Merge the overlap range.
16+
/// If we have 3 ranges: [1,3], [2,4], [6,8]
17+
/// The final range stack will be: [1,4], [6,8]
18+
/// If we set the max_merge_gap to 2:
19+
/// The final range stack will be: [1,8]
20+
use std::cmp;
21+
use std::ops::Range;
22+
23+
/// Check overlap.
24+
fn overlaps(this: &Range<u64>, other: &Range<u64>, max_gap_size: u64, max_range_size: u64) -> bool {
25+
if (this.end - this.start) >= max_range_size {
26+
false
27+
} else {
28+
let end_with_hole = this.end + max_gap_size;
29+
(other.start >= this.start && other.start <= end_with_hole)
30+
|| (other.end >= this.start && other.end <= end_with_hole)
31+
}
32+
}
33+
34+
/// Merge to one.
35+
fn merge(this: &mut Range<u64>, other: &Range<u64>) {
36+
this.start = cmp::min(this.start, other.start);
37+
this.end = cmp::max(this.end, other.end);
38+
}
39+
40+
#[derive(Debug, Clone)]
41+
pub struct RangeMerger {
42+
max_gap_size: u64,
43+
max_range_size: u64,
44+
ranges: Vec<Range<u64>>,
45+
}
46+
47+
impl RangeMerger {
48+
pub fn from_iter<I>(iter: I, max_gap_size: u64, max_range_size: u64) -> Self
49+
where I: IntoIterator<Item = std::ops::Range<u64>> {
50+
let mut raw_ranges: Vec<_> = iter.into_iter().collect();
51+
raw_ranges.sort_by(|a, b| a.start.cmp(&b.start));
52+
53+
let mut rs = RangeMerger {
54+
max_gap_size,
55+
max_range_size,
56+
ranges: Vec::with_capacity(raw_ranges.len()),
57+
};
58+
59+
for range in &raw_ranges {
60+
rs.add(range);
61+
}
62+
63+
rs
64+
}
65+
66+
pub fn ranges(&self) -> Vec<Range<u64>> {
67+
self.ranges.clone()
68+
}
69+
70+
fn add(&mut self, range: &Range<u64>) {
71+
if let Some(last) = self.ranges.last_mut() {
72+
if overlaps(last, range, self.max_gap_size, self.max_range_size) {
73+
merge(last, range);
74+
return;
75+
}
76+
}
77+
78+
self.ranges.push(range.clone());
79+
}
80+
}

src/common/base/tests/it/main.rs

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
mod pool;
1616
mod pool_retry;
1717
mod progress;
18+
mod range_merger;
1819
mod runtime;
1920
mod runtime_tracker;
2021
mod stoppable;
+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt;
16+
use std::fmt::Formatter;
17+
18+
use common_base::rangemap::RangeMerger;
19+
use common_exception::Result;
20+
21+
struct Array(Vec<std::ops::Range<u64>>);
22+
impl fmt::Display for Array {
23+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
24+
for range in &self.0 {
25+
write!(f, "[{},{}] ", range.start, range.end)?;
26+
}
27+
Ok(())
28+
}
29+
}
30+
31+
#[test]
32+
fn test_range_merger() -> Result<()> {
33+
let v = [3..6, 1..5, 7..11, 8..9, 9..12, 4..8, 13..15, 18..20];
34+
35+
let mr = RangeMerger::from_iter(v, 0, 100);
36+
let actual = format!("{}", Array(mr.ranges()));
37+
let expect = "[1,12] [13,15] [18,20] ";
38+
assert_eq!(actual, expect);
39+
40+
Ok(())
41+
}
42+
43+
#[test]
44+
fn test_range_merger_with_gap() -> Result<()> {
45+
let v = [3..6, 1..5, 7..11, 8..9, 9..12, 4..8, 13..15, 18..20];
46+
47+
// max_gap_size = 1
48+
{
49+
let mr = RangeMerger::from_iter(v.clone(), 1, 100);
50+
let actual = format!("{}", Array(mr.ranges()));
51+
let expect = "[1,15] [18,20] ";
52+
assert_eq!(actual, expect);
53+
}
54+
55+
// max_gap_size = 2
56+
{
57+
let mr = RangeMerger::from_iter(v.clone(), 2, 100);
58+
let actual = format!("{}", Array(mr.ranges()));
59+
let expect = "[1,15] [18,20] ";
60+
assert_eq!(actual, expect);
61+
}
62+
63+
// max_gap_size = 3
64+
{
65+
let mr = RangeMerger::from_iter(v.clone(), 3, 100);
66+
let actual = format!("{}", Array(mr.ranges()));
67+
let expect = "[1,20] ";
68+
assert_eq!(actual, expect);
69+
}
70+
71+
// max_gap_size = 3, max_range_size = 5
72+
{
73+
let mr = RangeMerger::from_iter(v, 3, 4);
74+
let actual = format!("{}", Array(mr.ranges()));
75+
let expect = "[1,5] [3,8] [7,11] [8,12] [13,20] ";
76+
assert_eq!(actual, expect);
77+
}
78+
79+
Ok(())
80+
}

src/query/settings/src/lib.rs

+42
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,28 @@ impl Settings {
181181
desc: "The maximum number of concurrent IO requests. By default the value is determined automatically.",
182182
possible_values: None,
183183
},
184+
// max_storage_io_requests_merge_gap
185+
SettingValue {
186+
default_value: UserSettingValue::UInt64(16 * 1024),
187+
user_setting: UserSetting::create(
188+
"max_storage_io_requests_merge_gap",
189+
UserSettingValue::UInt64(16 * 1024),
190+
),
191+
level: ScopeLevel::Session,
192+
desc: "The maximum gap bytes of adjusting to merge two IO requests. By default the value is 16KB",
193+
possible_values: None,
194+
},
195+
// max_storage_io_requests_page_size
196+
SettingValue {
197+
default_value: UserSettingValue::UInt64(256 * 1024),
198+
user_setting: UserSetting::create(
199+
"max_storage_io_requests_page_size",
200+
UserSettingValue::UInt64(256 * 1024),
201+
),
202+
level: ScopeLevel::Session,
203+
desc: "The maximum bytes of one IO request read. By default the value is 256KB",
204+
possible_values: None,
205+
},
184206
// flight_client_timeout
185207
SettingValue {
186208
default_value: UserSettingValue::UInt64(60),
@@ -561,6 +583,26 @@ impl Settings {
561583
self.try_set_u64(key, val, false)
562584
}
563585

586+
pub fn get_max_storage_io_requests_merge_gap(&self) -> Result<u64> {
587+
let key = "max_storage_io_requests_merge_gap";
588+
self.try_get_u64(key)
589+
}
590+
591+
pub fn set_max_storage_io_requests_merge_gap(&self, val: u64) -> Result<()> {
592+
let key = "max_storage_io_requests_merge_gap";
593+
self.try_set_u64(key, val, false)
594+
}
595+
596+
pub fn get_max_storage_io_requests_page_size(&self) -> Result<u64> {
597+
let key = "max_storage_io_requests_page_size";
598+
self.try_get_u64(key)
599+
}
600+
601+
pub fn set_max_storage_io_requests_page_size(&self, val: u64) -> Result<()> {
602+
let key = "max_storage_io_requests_page_size";
603+
self.try_set_u64(key, val, false)
604+
}
605+
564606
// Get max_execute_time.
565607
pub fn get_max_execute_time(&self) -> Result<u64> {
566608
self.try_get_u64("max_execute_time")

src/query/storages/fuse/src/io/read/block_reader_parquet.rs

+71-3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::collections::hash_map::Entry;
1616
use std::collections::HashMap;
1717
use std::sync::Arc;
18+
use std::time::SystemTime;
1819

1920
use common_arrow::arrow::array::Array;
2021
use common_arrow::arrow::chunk::Chunk;
@@ -28,9 +29,11 @@ use common_arrow::parquet::metadata::ColumnDescriptor;
2829
use common_arrow::parquet::read::BasicDecompressor;
2930
use common_arrow::parquet::read::PageMetaData;
3031
use common_arrow::parquet::read::PageReader;
32+
use common_base::rangemap::RangeMerger;
3133
use common_base::runtime::UnlimitedFuture;
3234
use common_catalog::plan::PartInfoPtr;
3335
use common_catalog::plan::Projection;
36+
use common_catalog::table_context::TableContext;
3437
use common_datablocks::DataBlock;
3538
use common_datavalues::DataSchemaRef;
3639
use common_exception::ErrorCode;
@@ -45,6 +48,7 @@ use futures::TryStreamExt;
4548
use opendal::Object;
4649
use opendal::Operator;
4750
use tracing::debug_span;
51+
use tracing::info;
4852
use tracing::Instrument;
4953

5054
use crate::fuse_part::FusePartInfo;
@@ -270,8 +274,27 @@ impl BlockReader {
270274
self.try_next_block(&mut deserializer)
271275
}
272276

273-
pub async fn read_columns_data(&self, part: PartInfoPtr) -> Result<Vec<(usize, Vec<u8>)>> {
274-
let part = FusePartInfo::from_part(&part)?;
277+
/// Merge overlap io request to one.
278+
fn merge_io_requests(
279+
max_gap_size: u64,
280+
max_range_size: u64,
281+
part: &PartInfoPtr,
282+
) -> Result<Vec<std::ops::Range<u64>>> {
283+
let part = FusePartInfo::from_part(part)?;
284+
let ranges = part
285+
.columns_meta
286+
.values()
287+
.map(|v| (v.offset..v.offset + v.len))
288+
.collect::<Vec<_>>();
289+
Ok(RangeMerger::from_iter(ranges, max_gap_size, max_range_size).ranges())
290+
}
291+
292+
pub async fn read_columns_data(
293+
&self,
294+
ctx: Arc<dyn TableContext>,
295+
raw_part: PartInfoPtr,
296+
) -> Result<Vec<(usize, Vec<u8>)>> {
297+
let part = FusePartInfo::from_part(&raw_part)?;
275298
let columns = self.projection.project_column_leaves(&self.column_leaves)?;
276299
let indices = Self::build_projection_indices(&columns);
277300
let mut join_handlers = Vec::with_capacity(indices.len());
@@ -286,7 +309,41 @@ impl BlockReader {
286309
)));
287310
}
288311

289-
futures::future::try_join_all(join_handlers).await
312+
let now = SystemTime::now();
313+
let res = futures::future::try_join_all(join_handlers).await;
314+
let normal_cost = now.elapsed().unwrap().as_millis();
315+
316+
// Merge io requests.
317+
let max_gap_size = ctx.get_settings().get_max_storage_io_requests_merge_gap()?;
318+
let max_range_size = ctx.get_settings().get_max_storage_io_requests_page_size()?;
319+
let ranges = Self::merge_io_requests(max_gap_size, max_range_size, &raw_part)?;
320+
let mut merge_io_handlers = Vec::with_capacity(ranges.len());
321+
for (index, range) in ranges.iter().enumerate() {
322+
merge_io_handlers.push(UnlimitedFuture::create(Self::read_range(
323+
self.operator.object(&part.location),
324+
index,
325+
range.start,
326+
range.end,
327+
)));
328+
}
329+
let now = SystemTime::now();
330+
let _ = futures::future::try_join_all(merge_io_handlers).await;
331+
let merge_cost = now.elapsed().unwrap().as_millis();
332+
333+
info!(
334+
"async read normal partition={}, count={}, took:{} ms",
335+
part.location,
336+
part.columns_meta.len(),
337+
normal_cost,
338+
);
339+
info!(
340+
"async read merge partition={}, count={}, took:{} ms",
341+
part.location,
342+
ranges.len(),
343+
merge_cost,
344+
);
345+
346+
res
290347
}
291348

292349
pub fn support_blocking_api(&self) -> bool {
@@ -316,6 +373,17 @@ impl BlockReader {
316373
Ok(results)
317374
}
318375

376+
pub async fn read_range(
377+
o: Object,
378+
index: usize,
379+
start: u64,
380+
end: u64,
381+
) -> Result<(usize, Vec<u8>)> {
382+
let chunk = o.range_read(start..end).await?;
383+
384+
Ok((index, chunk))
385+
}
386+
319387
pub async fn read_column(
320388
o: Object,
321389
index: usize,

src/query/storages/fuse/src/operations/fuse_parquet_source.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,10 @@ impl Processor for FuseParquetSource {
283283
async fn async_process(&mut self) -> Result<()> {
284284
match std::mem::replace(&mut self.state, State::Finish) {
285285
State::ReadDataPrewhere(Some(part)) => {
286-
let chunks = self.prewhere_reader.read_columns_data(part.clone()).await?;
286+
let chunks = self
287+
.prewhere_reader
288+
.read_columns_data(self.ctx.clone(), part.clone())
289+
.await?;
287290

288291
if self.prewhere_filter.is_some() {
289292
self.state = State::PrewhereFilter(part, chunks);
@@ -295,7 +298,9 @@ impl Processor for FuseParquetSource {
295298
}
296299
State::ReadDataRemain(part, prewhere_data) => {
297300
if let Some(remain_reader) = self.remain_reader.as_ref() {
298-
let chunks = remain_reader.read_columns_data(part.clone()).await?;
301+
let chunks = remain_reader
302+
.read_columns_data(self.ctx.clone(), part.clone())
303+
.await?;
299304
self.state = State::Deserialize(part, chunks, Some(prewhere_data));
300305
Ok(())
301306
} else {

src/query/storages/fuse/src/operations/mutation/deletion/deletion_source.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,10 @@ impl Processor for DeletionSource {
298298
self.index = deletion_part.index;
299299
self.origin_stats = deletion_part.cluster_stats.clone();
300300
let part = deletion_part.inner_part.clone();
301-
let chunks = self.block_reader.read_columns_data(part.clone()).await?;
301+
let chunks = self
302+
.block_reader
303+
.read_columns_data(self.ctx.clone(), part.clone())
304+
.await?;
302305
self.state = State::FilterData(part, chunks);
303306
}
304307
State::ReadRemain {
@@ -307,7 +310,9 @@ impl Processor for DeletionSource {
307310
filter,
308311
} => {
309312
if let Some(remain_reader) = self.remain_reader.as_ref() {
310-
let chunks = remain_reader.read_columns_data(part.clone()).await?;
313+
let chunks = remain_reader
314+
.read_columns_data(self.ctx.clone(), part.clone())
315+
.await?;
311316
self.state = State::MergeRemain {
312317
part,
313318
chunks,

0 commit comments

Comments
 (0)