Skip to content

Commit ce6ce71

Browse files
committed
add max_range_size
1 parent edbf57c commit ce6ce71

File tree

3 files changed

+36
-20
lines changed

3 files changed

+36
-20
lines changed

src/common/base/src/rangemap/range_merger.rs

+14-8
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@ use std::cmp;
2121
use std::ops::Range;
2222

2323
/// Check overlap.
24-
fn overlaps(this: &Range<u64>, other: &Range<u64>, hole: u64) -> bool {
25-
let end_with_hole = this.end + hole;
26-
(other.start >= this.start && other.start <= end_with_hole)
27-
|| (other.end >= this.start && other.end <= end_with_hole)
24+
fn overlaps(this: &Range<u64>, other: &Range<u64>, max_gap_size: u64, max_range_size: u64) -> bool {
25+
if (this.end - this.start) >= max_range_size {
26+
false
27+
} else {
28+
let end_with_hole = this.end + max_gap_size;
29+
(other.start >= this.start && other.start <= end_with_hole)
30+
|| (other.end >= this.start && other.end <= end_with_hole)
31+
}
2832
}
2933

3034
/// Merge to one.
@@ -35,18 +39,20 @@ fn merge(this: &mut Range<u64>, other: &Range<u64>) {
3539

3640
#[derive(Debug, Clone)]
3741
pub struct RangeMerger {
38-
max_merge_gap: u64,
42+
max_gap_size: u64,
43+
max_range_size: u64,
3944
ranges: Vec<Range<u64>>,
4045
}
4146

4247
impl RangeMerger {
43-
pub fn from_iter<I>(iter: I, max_merge_gap: u64) -> Self
48+
pub fn from_iter<I>(iter: I, max_gap_size: u64, max_range_size: u64) -> Self
4449
where I: IntoIterator<Item = std::ops::Range<u64>> {
4550
let mut raw_ranges: Vec<_> = iter.into_iter().collect();
4651
raw_ranges.sort_by(|a, b| a.start.cmp(&b.start));
4752

4853
let mut rs = RangeMerger {
49-
max_merge_gap,
54+
max_gap_size,
55+
max_range_size,
5056
ranges: Vec::with_capacity(raw_ranges.len()),
5157
};
5258

@@ -63,7 +69,7 @@ impl RangeMerger {
6369

6470
fn add(&mut self, range: &Range<u64>) {
6571
if let Some(last) = self.ranges.last_mut() {
66-
if overlaps(last, range, self.max_merge_gap) {
72+
if overlaps(last, range, self.max_gap_size, self.max_range_size) {
6773
merge(last, range);
6874
return;
6975
}

src/common/base/tests/it/range_merger.rs

+16-8
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ impl fmt::Display for Array {
3232
fn test_range_merger() -> Result<()> {
3333
let v = [3..6, 1..5, 7..11, 8..9, 9..12, 4..8, 13..15, 18..20];
3434

35-
let mr = RangeMerger::from_iter(v, 0);
35+
let mr = RangeMerger::from_iter(v, 0, 100);
3636
let actual = format!("{}", Array(mr.ranges()));
3737
let expect = "[1,12] [13,15] [18,20] ";
3838
assert_eq!(actual, expect);
@@ -41,32 +41,40 @@ fn test_range_merger() -> Result<()> {
4141
}
4242

4343
#[test]
44-
fn test_range_merger_with_hole() -> Result<()> {
44+
fn test_range_merger_with_gap() -> Result<()> {
4545
let v = [3..6, 1..5, 7..11, 8..9, 9..12, 4..8, 13..15, 18..20];
4646

47-
// Hole = 1
47+
// max_gap_size = 1
4848
{
49-
let mr = RangeMerger::from_iter(v.clone(), 1);
49+
let mr = RangeMerger::from_iter(v.clone(), 1, 100);
5050
let actual = format!("{}", Array(mr.ranges()));
5151
let expect = "[1,15] [18,20] ";
5252
assert_eq!(actual, expect);
5353
}
5454

55-
// Hole = 2
55+
// max_gap_size = 2
5656
{
57-
let mr = RangeMerger::from_iter(v.clone(), 2);
57+
let mr = RangeMerger::from_iter(v.clone(), 2, 100);
5858
let actual = format!("{}", Array(mr.ranges()));
5959
let expect = "[1,15] [18,20] ";
6060
assert_eq!(actual, expect);
6161
}
6262

63-
// Hole = 3
63+
// max_gap_size = 3
6464
{
65-
let mr = RangeMerger::from_iter(v, 3);
65+
let mr = RangeMerger::from_iter(v.clone(), 3, 100);
6666
let actual = format!("{}", Array(mr.ranges()));
6767
let expect = "[1,20] ";
6868
assert_eq!(actual, expect);
6969
}
7070

71+
// max_gap_size = 3, max_range_size = 5
72+
{
73+
let mr = RangeMerger::from_iter(v, 3, 4);
74+
let actual = format!("{}", Array(mr.ranges()));
75+
let expect = "[1,5] [3,8] [7,11] [8,12] [13,20] ";
76+
assert_eq!(actual, expect);
77+
}
78+
7179
Ok(())
7280
}

src/query/storages/fuse/fuse/src/io/read/block_reader_parquet.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,8 @@ impl BlockReader {
342342

343343
/// Merge overlap io request to one.
344344
fn merge_io_requests(
345-
max_merge_gap: u64,
345+
max_gap_size: u64,
346+
max_range_size: u64,
346347
part: &PartInfoPtr,
347348
) -> Result<Vec<std::ops::Range<u64>>> {
348349
let part = FusePartInfo::from_part(part)?;
@@ -351,7 +352,7 @@ impl BlockReader {
351352
.values()
352353
.map(|v| (v.offset..v.offset + v.len))
353354
.collect::<Vec<_>>();
354-
Ok(RangeMerger::from_iter(ranges, max_merge_gap).ranges())
355+
Ok(RangeMerger::from_iter(ranges, max_gap_size, max_range_size).ranges())
355356
}
356357

357358
pub async fn read_columns_data(
@@ -380,8 +381,9 @@ impl BlockReader {
380381
let normal_cost = now.elapsed().unwrap().as_millis();
381382

382383
// Merge io requests.
383-
let max_merge_gap = ctx.get_settings().get_max_storage_io_requests_merge_gap()?;
384-
let ranges = Self::merge_io_requests(max_merge_gap, &raw_part)?;
384+
let max_gap_size = ctx.get_settings().get_max_storage_io_requests_merge_gap()?;
385+
let max_range_size = ctx.get_settings().get_max_storage_io_requests_page_size()?;
386+
let ranges = Self::merge_io_requests(max_gap_size, max_range_size, &raw_part)?;
385387
let mut merge_io_handlers = Vec::with_capacity(ranges.len());
386388
for (index, range) in ranges.iter().enumerate() {
387389
merge_io_handlers.push(UnlimitedFuture::create(Self::read_range(

0 commit comments

Comments
 (0)