@@ -274,21 +274,6 @@ impl BlockReader {
274
274
self . try_next_block ( & mut deserializer)
275
275
}
276
276
277
- /// Merge overlap io request to one.
278
- fn merge_io_requests (
279
- max_gap_size : u64 ,
280
- max_range_size : u64 ,
281
- part : & PartInfoPtr ,
282
- ) -> Result < Vec < std:: ops:: Range < u64 > > > {
283
- let part = FusePartInfo :: from_part ( part) ?;
284
- let ranges = part
285
- . columns_meta
286
- . values ( )
287
- . map ( |v| ( v. offset ..v. offset + v. len ) )
288
- . collect :: < Vec < _ > > ( ) ;
289
- Ok ( RangeMerger :: from_iter ( ranges, max_gap_size, max_range_size) . ranges ( ) )
290
- }
291
-
292
277
pub async fn read_columns_data (
293
278
& self ,
294
279
ctx : Arc < dyn TableContext > ,
@@ -299,9 +284,12 @@ impl BlockReader {
299
284
let indices = Self :: build_projection_indices ( & columns) ;
300
285
let mut join_handlers = Vec :: with_capacity ( indices. len ( ) ) ;
301
286
287
+ let mut ranges = vec ! [ ] ;
302
288
let mut normal_read_bytes = 0u64 ;
303
289
for ( index, _) in indices {
304
290
let column_meta = & part. columns_meta [ & index] ;
291
+ ranges. push ( column_meta. offset ..( column_meta. offset + column_meta. len ) ) ;
292
+
305
293
normal_read_bytes += column_meta. len ;
306
294
join_handlers. push ( UnlimitedFuture :: create ( Self :: read_column (
307
295
self . operator . object ( & part. location ) ,
@@ -318,11 +306,12 @@ impl BlockReader {
318
306
// Merge io requests.
319
307
let max_gap_size = ctx. get_settings ( ) . get_max_storage_io_requests_merge_gap ( ) ?;
320
308
let max_range_size = ctx. get_settings ( ) . get_max_storage_io_requests_page_size ( ) ?;
321
- let ranges = Self :: merge_io_requests ( max_gap_size , max_range_size , & raw_part ) ? ;
309
+ let ranges = RangeMerger :: from_iter ( ranges , max_gap_size , max_range_size ) . ranges ( ) ;
322
310
let mut merge_io_handlers = Vec :: with_capacity ( ranges. len ( ) ) ;
323
311
let mut merge_read_bytes = 0u64 ;
324
312
for ( index, range) in ranges. iter ( ) . enumerate ( ) {
325
- merge_read_bytes += range. end - range. start ;
313
+ let len = range. end - range. start ;
314
+ merge_read_bytes += len;
326
315
merge_io_handlers. push ( UnlimitedFuture :: create ( Self :: read_range (
327
316
self . operator . object ( & part. location ) ,
328
317
index,
0 commit comments