Skip to content

Commit 7d6972b

Browse files
committed
add global cost
1 parent e079428 commit 7d6972b

File tree

2 files changed

+17
-4
lines changed

2 files changed

+17
-4
lines changed

src/query/storages/fuse/src/io/read/block_reader.rs

+5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::atomic::AtomicU64;
16+
use std::sync::Arc;
17+
1518
use common_arrow::parquet::metadata::SchemaDescriptor;
1619
use common_catalog::plan::Projection;
1720
use common_datavalues::DataSchemaRef;
@@ -21,6 +24,8 @@ use opendal::Operator;
2124
// TODO: make BlockReader as a trait.
2225
#[derive(Clone)]
2326
pub struct BlockReader {
27+
pub normal_all_cost: Arc<AtomicU64>,
28+
pub merge_all_cost: Arc<AtomicU64>,
2429
pub(crate) operator: Operator,
2530
pub(crate) projection: Projection,
2631
pub(crate) projected_schema: DataSchemaRef,

src/query/storages/fuse/src/io/read/block_reader_parquet.rs

+12-4
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::collections::hash_map::Entry;
1616
use std::collections::HashMap;
17+
use std::sync::atomic::Ordering;
1718
use std::sync::Arc;
1819
use std::time::SystemTime;
1920

@@ -72,6 +73,8 @@ impl BlockReader {
7273
let column_leaves = ColumnLeaves::new_from_schema(&arrow_schema);
7374

7475
Ok(Arc::new(BlockReader {
76+
normal_all_cost: Arc::new(0.into()),
77+
merge_all_cost: Arc::new(0.into()),
7578
operator,
7679
projection,
7780
projected_schema,
@@ -301,7 +304,9 @@ impl BlockReader {
301304

302305
let now = SystemTime::now();
303306
let res = futures::future::try_join_all(join_handlers).await;
304-
let normal_cost = now.elapsed().unwrap().as_millis();
307+
let normal_cost = now.elapsed().unwrap().as_millis() as u64;
308+
self.normal_all_cost
309+
.fetch_add(normal_cost, Ordering::Release);
305310

306311
// Merge io requests.
307312
let max_gap_size = ctx.get_settings().get_max_storage_io_requests_merge_gap()?;
@@ -321,21 +326,24 @@ impl BlockReader {
321326
}
322327
let now = SystemTime::now();
323328
let _ = futures::future::try_join_all(merge_io_handlers).await;
324-
let merge_cost = now.elapsed().unwrap().as_millis();
329+
let merge_cost = now.elapsed().unwrap().as_millis() as u64;
330+
self.merge_all_cost.fetch_add(merge_cost, Ordering::Release);
325331

326332
info!(
327-
"async read norma partition={}, count={}, bytes={}, took:{} ms",
333+
"norma read partition={}, count={}, bytes={}, took:{} ms, all:{} ms",
328334
part.location,
329335
part.columns_meta.len(),
330336
normal_read_bytes,
331337
normal_cost,
338+
self.normal_all_cost.load(Ordering::Acquire),
332339
);
333340
info!(
334-
"async read merge partition={}, count={}, bytes={}, took:{} ms",
341+
"merge read partition={}, count={}, bytes={}, took:{} ms, all:{} ms",
335342
part.location,
336343
ranges.len(),
337344
merge_read_bytes,
338345
merge_cost,
346+
self.merge_all_cost.load(Ordering::Acquire)
339347
);
340348

341349
res

0 commit comments

Comments
 (0)