14
14
15
15
use std:: collections:: hash_map:: Entry ;
16
16
use std:: collections:: HashMap ;
17
+ use std:: sync:: atomic:: Ordering ;
17
18
use std:: sync:: Arc ;
18
19
use std:: time:: SystemTime ;
19
20
@@ -72,6 +73,8 @@ impl BlockReader {
72
73
let column_leaves = ColumnLeaves :: new_from_schema ( & arrow_schema) ;
73
74
74
75
Ok ( Arc :: new ( BlockReader {
76
+ normal_all_cost : Arc :: new ( 0 . into ( ) ) ,
77
+ merge_all_cost : Arc :: new ( 0 . into ( ) ) ,
75
78
operator,
76
79
projection,
77
80
projected_schema,
@@ -284,6 +287,7 @@ impl BlockReader {
284
287
let indices = Self :: build_projection_indices ( & columns) ;
285
288
let mut join_handlers = Vec :: with_capacity ( indices. len ( ) ) ;
286
289
290
+ // Normal read.
287
291
let mut ranges = vec ! [ ] ;
288
292
let mut normal_read_bytes = 0u64 ;
289
293
for ( index, _) in indices {
@@ -301,7 +305,9 @@ impl BlockReader {
301
305
302
306
let now = SystemTime :: now ( ) ;
303
307
let res = futures:: future:: try_join_all ( join_handlers) . await ;
304
- let normal_cost = now. elapsed ( ) . unwrap ( ) . as_millis ( ) ;
308
+ let normal_cost = now. elapsed ( ) . unwrap ( ) . as_millis ( ) as u64 ;
309
+ self . normal_all_cost
310
+ . fetch_add ( normal_cost, Ordering :: Release ) ;
305
311
306
312
// Merge io requests.
307
313
let max_gap_size = ctx. get_settings ( ) . get_max_storage_io_requests_merge_gap ( ) ?;
@@ -321,21 +327,24 @@ impl BlockReader {
321
327
}
322
328
let now = SystemTime :: now ( ) ;
323
329
let _ = futures:: future:: try_join_all ( merge_io_handlers) . await ;
324
- let merge_cost = now. elapsed ( ) . unwrap ( ) . as_millis ( ) ;
330
+ let merge_cost = now. elapsed ( ) . unwrap ( ) . as_millis ( ) as u64 ;
331
+ self . merge_all_cost . fetch_add ( merge_cost, Ordering :: Release ) ;
325
332
326
333
info ! (
327
- "async read norma partition={}, count={}, bytes={}, took:{} ms" ,
334
+ "norma read partition={}, count={}, bytes={}, took:{} ms, all :{} ms" ,
328
335
part. location,
329
336
part. columns_meta. len( ) ,
330
337
normal_read_bytes,
331
338
normal_cost,
339
+ self . normal_all_cost. load( Ordering :: Acquire ) ,
332
340
) ;
333
341
info ! (
334
- "async read merge partition={}, count={}, bytes={}, took:{} ms" ,
342
+ "merge read partition={}, count={}, bytes={}, took:{} ms, all :{} ms" ,
335
343
part. location,
336
344
ranges. len( ) ,
337
345
merge_read_bytes,
338
346
merge_cost,
347
+ self . merge_all_cost. load( Ordering :: Acquire )
339
348
) ;
340
349
341
350
res
0 commit comments