Skip to content

Commit 479329f

Browse files
authored
Merge pull request #9341 from BohuTANG/dev-refactor-fuse
refactor(fuse): remove unused codes after fuse-result purged
2 parents 9abc8f2 + 695455b commit 479329f

File tree

98 files changed

+53
-125
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+53
-125
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ members = [
4545
"src/query/sql",
4646
"src/query/storages/cache",
4747
"src/query/storages/factory",
48-
"src/query/storages/fuse/fuse",
48+
"src/query/storages/fuse",
4949
"src/query/storages/hive/hive",
5050
"src/query/storages/hive/hive-meta-store",
5151
"src/query/storages/index",

src/query/service/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ common-sql = { path = "../sql" }
5858
common-storage = { path = "../../common/storage" }
5959
common-storages-cache = { path = "../storages/cache" }
6060
common-storages-factory = { path = "../storages/factory" }
61-
common-storages-fuse = { path = "../storages/fuse/fuse" }
61+
common-storages-fuse = { path = "../storages/fuse" }
6262
common-storages-hive = { path = "../storages/hive/hive", optional = true }
6363
common-storages-index = { path = "../storages/index" }
6464
common-storages-information-schema = { path = "../storages/information-schema" }

src/query/storages/factory/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ common-catalog = { path = "../../catalog" }
1616
common-config = { path = "../../config" }
1717
common-exception = { path = "../../../common/exception" }
1818
common-meta-app = { path = "../../../meta/app" }
19-
common-storages-fuse = { path = "../fuse/fuse" }
19+
common-storages-fuse = { path = "../fuse" }
2020
common-storages-index = { path = "../index" }
2121
common-storages-memory = { path = "../memory" }
2222
common-storages-null = { path = "../null" }

src/query/storages/fuse/Cargo.toml

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
[package]
2+
name = "common-storages-fuse"
3+
version = { workspace = true }
4+
authors = { workspace = true }
5+
license = { workspace = true }
6+
publish = { workspace = true }
7+
edition = { workspace = true }
8+
9+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
10+
[lib]
11+
doctest = false
12+
test = false
13+
14+
[dependencies]
15+
common-arrow = { path = "../../../common/arrow" }
16+
common-base = { path = "../../../common/base" }
17+
common-cache = { path = "../../../common/cache" }
18+
common-catalog = { path = "../../catalog" }
19+
common-config = { path = "../../config" }
20+
common-datablocks = { path = "../../datablocks" }
21+
common-datavalues = { path = "../../datavalues" }
22+
common-exception = { path = "../../../common/exception" }
23+
common-functions = { path = "../../functions" }
24+
common-meta-app = { path = "../../../meta/app" }
25+
common-meta-types = { path = "../../../meta/types" }
26+
common-pipeline-core = { path = "../../pipeline/core" }
27+
common-pipeline-sources = { path = "../../pipeline/sources" }
28+
common-pipeline-transforms = { path = "../../pipeline/transforms" }
29+
common-sharing = { path = "../../sharing" }
30+
common-sql = { path = "../../sql" }
31+
common-storage = { path = "../../../common/storage" }
32+
common-storages-cache = { path = "../cache" }
33+
common-storages-index = { path = "../index" }
34+
common-storages-pruner = { path = "../pruner" }
35+
common-storages-table-meta = { path = "../table-meta" }
36+
37+
async-trait = { version = "0.1.57", package = "async-trait-fn" }
38+
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
39+
backon = "0.2"
40+
chrono = { workspace = true }
41+
futures = "0.3.24"
42+
futures-util = "0.3.24"
43+
itertools = "0.10.5"
44+
metrics = "0.20.1"
45+
opendal = "0.22"
46+
serde = { workspace = true }
47+
serde_json = { workspace = true }
48+
tracing = "0.1.36"
49+
typetag = "0.2.3"
50+
uuid = { version = "1.1.2", features = ["serde", "v4"] }

src/query/storages/fuse/fuse/Cargo.toml

-50
This file was deleted.

src/query/storages/fuse/fuse/src/io/read/block_reader_parquet.rs renamed to src/query/storages/fuse/src/io/read/block_reader_parquet.rs

-72
Original file line numberDiff line numberDiff line change
@@ -205,71 +205,6 @@ impl BlockReader {
205205
Ok((num_rows, columns_array_iter))
206206
}
207207

208-
async fn read_columns(&self, part: PartInfoPtr) -> Result<(usize, Vec<ArrayIter<'static>>)> {
209-
let part = FusePartInfo::from_part(&part)?;
210-
211-
// TODO: add prefetch column data.
212-
let num_rows = part.nums_rows;
213-
let num_cols = self.projection.len();
214-
let mut column_chunk_futs = Vec::with_capacity(num_cols);
215-
216-
let columns = self.projection.project_column_leaves(&self.column_leaves)?;
217-
let indices = Self::build_projection_indices(&columns);
218-
for (index, _) in indices {
219-
let column_meta = &part.columns_meta[&index];
220-
let column_reader = self.operator.object(&part.location);
221-
let fut = async move {
222-
let (idx, column_chunk) =
223-
Self::read_column(column_reader, index, column_meta.offset, column_meta.len)
224-
.await?;
225-
Ok::<_, ErrorCode>((idx, column_chunk))
226-
}
227-
.instrument(debug_span!("read_col_chunk"));
228-
column_chunk_futs.push(fut);
229-
}
230-
231-
let num_cols = column_chunk_futs.len();
232-
let chunks = futures::stream::iter(column_chunk_futs)
233-
.buffered(std::cmp::min(10, num_cols))
234-
.try_collect::<Vec<_>>()
235-
.await?;
236-
237-
let mut chunk_map: HashMap<usize, Vec<u8>> = chunks.into_iter().collect();
238-
let mut cnt_map = Self::build_projection_count_map(&columns);
239-
let mut columns_array_iter = Vec::with_capacity(num_cols);
240-
for column in &columns {
241-
let field = column.field.clone();
242-
let indices = &column.leaf_ids;
243-
let mut column_metas = Vec::with_capacity(indices.len());
244-
let mut column_chunks = Vec::with_capacity(indices.len());
245-
let mut column_descriptors = Vec::with_capacity(indices.len());
246-
for index in indices {
247-
let column_meta = &part.columns_meta[index];
248-
let cnt = cnt_map.get_mut(index).unwrap();
249-
*cnt -= 1;
250-
let column_chunk = if cnt > &mut 0 {
251-
chunk_map.get(index).unwrap().clone()
252-
} else {
253-
chunk_map.remove(index).unwrap()
254-
};
255-
let column_descriptor = &self.parquet_schema_descriptor.columns()[*index];
256-
column_metas.push(column_meta);
257-
column_chunks.push(column_chunk);
258-
column_descriptors.push(column_descriptor);
259-
}
260-
columns_array_iter.push(Self::to_array_iter(
261-
column_metas,
262-
column_chunks,
263-
num_rows,
264-
column_descriptors,
265-
field,
266-
&part.compression,
267-
)?);
268-
}
269-
270-
Ok((num_rows, columns_array_iter))
271-
}
272-
273208
pub fn build_block(&self, chunks: Vec<(usize, Box<dyn Array>)>) -> Result<DataBlock> {
274209
let mut results = Vec::with_capacity(chunks.len());
275210
let mut chunk_map: HashMap<usize, Box<dyn Array>> = chunks.into_iter().collect();
@@ -402,13 +337,6 @@ impl BlockReader {
402337
Ok((index, chunk))
403338
}
404339

405-
#[tracing::instrument(level = "debug", skip_all)]
406-
pub async fn read(&self, part: PartInfoPtr) -> Result<DataBlock> {
407-
let (num_rows, columns_array_iter) = self.read_columns(part).await?;
408-
let mut deserializer = RowGroupDeserializer::new(columns_array_iter, num_rows, None);
409-
self.try_next_block(&mut deserializer)
410-
}
411-
412340
fn try_next_block(&self, deserializer: &mut RowGroupDeserializer) -> Result<DataBlock> {
413341
match deserializer.next() {
414342
None => Err(ErrorCode::Internal(

0 commit comments

Comments
 (0)