Skip to content

Commit 529d184

Browse files
authored
refactor: optimize data block processing in MergeIntoOperationAggregator (#11571)
* refactor: use dedicate thread while decoding block data * Revert experimental changes No significant performance improvement observed, back to try_join_all. * cleanup * adjust deserialization path so that metrics could be recorded * shffule segments while build replace into pipeline * fix ut * refactor: (MergeIntoOperationAggregator) load datablocks in parallel
1 parent 1197614 commit 529d184

File tree

12 files changed

+200
-87
lines changed

12 files changed

+200
-87
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/service/tests/it/storages/fuse/operations/replace_into.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use common_exception::Result;
1616
use common_storages_fuse::FuseTable;
17+
use itertools::Itertools;
1718

1819
#[test]
1920
fn test_partition() -> Result<()> {
@@ -36,10 +37,22 @@ fn test_partition() -> Result<()> {
3637
rng.gen_range(1..number_segment)
3738
};
3839

39-
let chunks = FuseTable::partition_segments(&segments, num_partition);
40-
assert_eq!(chunks.len(), num_partition);
41-
for (idx, (segment_idx, _)) in chunks.clone().into_iter().flatten().enumerate() {
42-
assert_eq!(idx, segment_idx)
40+
let partitions = FuseTable::partition_segments(&segments, num_partition);
41+
// check number of partitions are as expected
42+
assert_eq!(partitions.len(), num_partition);
43+
44+
// check segments
45+
let origin = segments.iter().enumerate();
46+
let segment_of_chunks = partitions
47+
.iter()
48+
.flatten()
49+
.sorted_by(|a, b| a.0.cmp(&b.0))
50+
.collect::<Vec<_>>();
51+
52+
for (origin_idx, origin_location) in origin {
53+
let (seg_idx, seg_location) = segment_of_chunks[origin_idx];
54+
assert_eq!(origin_idx, *seg_idx);
55+
assert_eq!(origin_location, seg_location);
4356
}
4457
}
4558
}

src/query/storages/fuse/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ futures-util = "0.3.24"
5050
metrics = "0.20.1"
5151
opendal = { workspace = true }
5252
parquet-format-safe = "0.2"
53+
rand = "0.8.5"
5354
serde = { workspace = true }
5455
serde_json = { workspace = true }
5556
sha2 = "0.10.6"

src/query/storages/fuse/src/io/read/block/block_reader_deserialize.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use super::BlockReader;
2929
use crate::io::read::block::block_reader_merge_io::DataItem;
3030
use crate::io::ReadSettings;
3131
use crate::io::UncompressedBuffer;
32+
use crate::FusePartInfo;
3233
use crate::FuseStorageFormat;
3334

3435
pub enum DeserializedArray<'a> {
@@ -47,15 +48,47 @@ pub struct FieldDeserializationContext<'a> {
4748

4849
impl BlockReader {
4950
/// Deserialize column chunks data from parquet format to DataBlock.
50-
pub fn deserialize_chunks(
51+
pub fn deserialize_chunks_with_part_info(
5152
&self,
5253
part: PartInfoPtr,
5354
chunks: HashMap<ColumnId, DataItem>,
5455
storage_format: &FuseStorageFormat,
56+
) -> Result<DataBlock> {
57+
let part = FusePartInfo::from_part(&part)?;
58+
self.deserialize_chunks(
59+
&part.location,
60+
part.nums_rows,
61+
&part.compression,
62+
&part.columns_meta,
63+
chunks,
64+
storage_format,
65+
)
66+
}
67+
68+
pub fn deserialize_chunks(
69+
&self,
70+
block_path: &str,
71+
num_rows: usize,
72+
compression: &Compression,
73+
column_metas: &HashMap<ColumnId, ColumnMeta>,
74+
column_chunks: HashMap<ColumnId, DataItem>,
75+
storage_format: &FuseStorageFormat,
5576
) -> Result<DataBlock> {
5677
match storage_format {
57-
FuseStorageFormat::Parquet => self.deserialize_parquet_chunks(part, chunks),
58-
FuseStorageFormat::Native => self.deserialize_native_chunks(part, chunks),
78+
FuseStorageFormat::Parquet => self.deserialize_parquet_chunks(
79+
block_path,
80+
num_rows,
81+
compression,
82+
column_metas,
83+
column_chunks,
84+
),
85+
FuseStorageFormat::Native => self.deserialize_native_chunks(
86+
block_path,
87+
num_rows,
88+
compression,
89+
column_metas,
90+
column_chunks,
91+
),
5992
}
6093
}
6194

src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ impl BlockReader {
174174

175175
let mut merge_io_read_res =
176176
Self::merge_io_read(settings, self.operator.clone(), location, ranges).await?;
177-
// TODO set
177+
178178
merge_io_read_res.cached_column_data = cached_column_data;
179179
merge_io_read_res.cached_column_array = cached_column_array;
180180
Ok(merge_io_read_res)

src/query/storages/fuse/src/io/read/block/block_reader_native_deserialize.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use common_arrow::arrow::chunk::Chunk;
2121
use common_arrow::arrow::datatypes::Field;
2222
use common_arrow::native::read::batch_read::batch_read_array;
2323
use common_arrow::parquet::metadata::ColumnDescriptor;
24-
use common_catalog::plan::PartInfoPtr;
2524
use common_exception::ErrorCode;
2625
use common_exception::Result;
2726
use common_expression::ColumnId;
@@ -35,7 +34,6 @@ use storages_common_table_meta::meta::Compression;
3534

3635
use super::block_reader_deserialize::DeserializedArray;
3736
use super::block_reader_deserialize::FieldDeserializationContext;
38-
use crate::fuse_part::FusePartInfo;
3937
use crate::io::read::block::block_reader_merge_io::DataItem;
4038
use crate::io::BlockReader;
4139
use crate::io::UncompressedBuffer;
@@ -45,22 +43,24 @@ impl BlockReader {
4543
/// Deserialize column chunks data from native format to DataBlock.
4644
pub(super) fn deserialize_native_chunks(
4745
&self,
48-
part: PartInfoPtr,
49-
chunks: HashMap<ColumnId, DataItem>,
46+
block_path: &str,
47+
num_rows: usize,
48+
compression: &Compression,
49+
column_metas: &HashMap<ColumnId, ColumnMeta>,
50+
column_chunks: HashMap<ColumnId, DataItem>,
5051
) -> Result<DataBlock> {
51-
let part = FusePartInfo::from_part(&part)?;
5252
let start = Instant::now();
5353

54-
if chunks.is_empty() {
55-
return self.build_default_values_block(part.nums_rows);
54+
if column_chunks.is_empty() {
55+
return self.build_default_values_block(num_rows);
5656
}
5757

5858
let deserialized_res = self.deserialize_native_chunks_with_buffer(
59-
&part.location,
60-
part.nums_rows,
61-
&part.compression,
62-
&part.columns_meta,
63-
chunks,
59+
block_path,
60+
num_rows,
61+
compression,
62+
column_metas,
63+
column_chunks,
6464
None,
6565
);
6666

src/query/storages/fuse/src/io/read/block/block_reader_parquet_deserialize.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use common_arrow::parquet::compression::Compression as ParquetCompression;
2424
use common_arrow::parquet::metadata::ColumnDescriptor;
2525
use common_arrow::parquet::read::PageMetaData;
2626
use common_arrow::parquet::read::PageReader;
27-
use common_catalog::plan::PartInfoPtr;
2827
use common_exception::ErrorCode;
2928
use common_exception::Result;
3029
use common_expression::ColumnId;
@@ -38,7 +37,6 @@ use storages_common_table_meta::meta::Compression;
3837

3938
use super::block_reader_deserialize::DeserializedArray;
4039
use super::block_reader_deserialize::FieldDeserializationContext;
41-
use crate::fuse_part::FusePartInfo;
4240
use crate::io::read::block::block_reader_merge_io::DataItem;
4341
use crate::io::read::block::decompressor::BuffedBasicDecompressor;
4442
use crate::io::BlockReader;
@@ -49,22 +47,24 @@ impl BlockReader {
4947
/// Deserialize column chunks data from parquet format to DataBlock.
5048
pub(super) fn deserialize_parquet_chunks(
5149
&self,
52-
part: PartInfoPtr,
53-
chunks: HashMap<ColumnId, DataItem>,
50+
block_path: &str,
51+
num_rows: usize,
52+
compression: &Compression,
53+
column_metas: &HashMap<ColumnId, ColumnMeta>,
54+
column_chunks: HashMap<ColumnId, DataItem>,
5455
) -> Result<DataBlock> {
55-
let part = FusePartInfo::from_part(&part)?;
56-
let start = Instant::now();
57-
58-
if chunks.is_empty() {
59-
return self.build_default_values_block(part.nums_rows);
56+
if column_chunks.is_empty() {
57+
return self.build_default_values_block(num_rows);
6058
}
6159

60+
let start = Instant::now();
61+
6262
let deserialized_res = self.deserialize_parquet_chunks_with_buffer(
63-
&part.location,
64-
part.nums_rows,
65-
&part.compression,
66-
&part.columns_meta,
67-
chunks,
63+
block_path,
64+
num_rows,
65+
compression,
66+
column_metas,
67+
column_chunks,
6868
None,
6969
);
7070

src/query/storages/fuse/src/io/read/read_settings.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,3 @@ impl ReadSettings {
3535
})
3636
}
3737
}
38-
39-
impl Default for ReadSettings {
40-
fn default() -> Self {
41-
ReadSettings {
42-
storage_io_min_bytes_for_seek: 1024,
43-
storage_io_max_page_bytes_for_read: 1024 * 1024,
44-
}
45-
}
46-
}

0 commit comments

Comments
 (0)