Skip to content

Commit 14a7ab5

Browse files
committed
change to try spawn batch
1 parent 084b6cd commit 14a7ab5

File tree

1 file changed

+39
-12
lines changed

1 file changed

+39
-12
lines changed

src/query/storages/fuse/src/io/read/block_reader.rs

+39-12
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@ use std::ops::Range;
1616
use std::sync::Arc;
1717

1818
use common_arrow::parquet::metadata::SchemaDescriptor;
19+
use common_base::base::tokio::sync::Semaphore;
1920
use common_base::rangemap::RangeMerger;
20-
use common_base::runtime::UnlimitedFuture;
21+
use common_base::runtime::Runtime;
2122
use common_catalog::plan::Projection;
2223
use common_catalog::table_context::TableContext;
2324
use common_datavalues::DataSchemaRef;
2425
use common_exception::ErrorCode;
2526
use common_exception::Result;
2627
use common_storage::ColumnLeaves;
28+
use futures_util::future;
2729
use opendal::Object;
2830
use opendal::Operator;
2931

@@ -61,17 +63,42 @@ impl BlockReader {
6163
let range_merger = RangeMerger::from_iter(ranges, max_gap_size, max_range_size);
6264
let merged_ranges = range_merger.ranges();
6365

64-
// Read merged range data.
65-
let mut read_handlers = Vec::with_capacity(merged_ranges.len());
66-
for (idx, range) in merged_ranges.iter().enumerate() {
67-
read_handlers.push(UnlimitedFuture::create(Self::read_range(
68-
object.clone(),
69-
idx,
70-
range.start,
71-
range.end,
72-
)));
73-
}
74-
let merged_range_data_results = futures::future::try_join_all(read_handlers).await?;
66+
// new joint,.
67+
let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize;
68+
69+
// 1.1 combine all the tasks.
70+
let mut iter = merged_ranges.iter().enumerate();
71+
let tasks = std::iter::from_fn(move || {
72+
if let Some((idx, range)) = iter.next() {
73+
Some(Self::read_range(
74+
object.clone(),
75+
idx,
76+
range.start,
77+
range.end,
78+
))
79+
} else {
80+
None
81+
}
82+
});
83+
84+
// 1.2 build the runtime.
85+
let semaphore = Semaphore::new(merged_ranges.len());
86+
let io_runtime = Arc::new(Runtime::with_worker_threads(
87+
max_runtime_threads,
88+
Some("io-read-worker".to_owned()),
89+
)?);
90+
91+
// 1.3 spawn all the tasks to the runtime.
92+
let join_handlers = io_runtime.try_spawn_batch(semaphore, tasks).await?;
93+
94+
// 1.4 get all the result.
95+
let merged_range_data_results = future::try_join_all(join_handlers)
96+
.await
97+
.map_err(|e| {
98+
ErrorCode::StorageOther(format!("try io read join futures failure, {}", e))
99+
})?
100+
.into_iter()
101+
.collect::<Result<Vec<_>>>()?;
75102

76103
// Build raw range data from merged range data.
77104
let mut final_result = Vec::with_capacity(raw_ranges.len());

0 commit comments

Comments
 (0)