Skip to content

Commit 5b96327

Browse files
committed
fixup ut
1 parent b20744a commit 5b96327

File tree

3 files changed

+94
-33
lines changed

3 files changed

+94
-33
lines changed

src/query/service/tests/it/storages/fuse/operations/internal_column.rs

+81-20
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,31 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::BTreeMap;
1516
use std::collections::HashSet;
1617

1718
use databend_common_base::base::tokio;
1819
use databend_common_catalog::plan::InternalColumn;
1920
use databend_common_catalog::plan::InternalColumnMeta;
20-
use databend_common_catalog::plan::Partitions;
21+
use databend_common_catalog::plan::PartInfoPtr;
2122
use databend_common_exception::Result;
23+
use databend_common_expression::block_debug::pretty_format_blocks;
2224
use databend_common_expression::DataBlock;
25+
use databend_common_expression::FieldIndex;
2326
use databend_common_expression::BLOCK_NAME_COL_NAME;
2427
use databend_common_expression::ROW_ID_COL_NAME;
2528
use databend_common_expression::SEGMENT_NAME_COL_NAME;
2629
use databend_common_expression::SNAPSHOT_NAME_COL_NAME;
30+
use databend_common_pipeline_core::Pipeline;
2731
use databend_common_sql::binder::INTERNAL_COLUMN_FACTORY;
32+
use databend_common_sql::executor::table_read_plan::ToReadDataSourcePlan;
2833
use databend_common_sql::Planner;
2934
use databend_common_storages_fuse::io::MetaReaders;
3035
use databend_common_storages_fuse::FuseBlockPartInfo;
3136
use databend_common_storages_fuse::FuseTable;
3237
use databend_query::interpreters::InterpreterFactory;
38+
use databend_query::pipelines::executor::ExecutorSettings;
39+
use databend_query::pipelines::executor::QueryPipelineExecutor;
3340
use databend_query::test_kits::*;
3441
use databend_storages_common_cache::LoadParams;
3542
use databend_storages_common_table_meta::meta::SegmentInfo;
@@ -39,11 +46,11 @@ use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
3946
use futures::TryStreamExt;
4047

4148
fn expected_data_block(
42-
parts: &Partitions,
49+
parts: &Vec<PartInfoPtr>,
4350
internal_columns: &Vec<InternalColumn>,
4451
) -> Result<Vec<DataBlock>> {
45-
let mut data_blocks = Vec::with_capacity(parts.partitions.len());
46-
for part in &parts.partitions {
52+
let mut data_blocks = Vec::with_capacity(parts.len());
53+
for part in parts {
4754
let fuse_part = FuseBlockPartInfo::from_part(part)?;
4855
let num_rows = fuse_part.nums_rows;
4956
let block_meta = fuse_part.block_meta_index.as_ref().unwrap();
@@ -65,28 +72,22 @@ fn expected_data_block(
6572
}
6673
data_blocks.push(DataBlock::new(columns, num_rows));
6774
}
68-
data_blocks.reverse();
6975

7076
Ok(data_blocks)
7177
}
7278

7379
fn check_data_block(expected: Vec<DataBlock>, blocks: Vec<DataBlock>) -> Result<()> {
74-
let expected_data_block = DataBlock::concat(&expected)?.consume_convert_to_full();
75-
let data_block = DataBlock::concat(&blocks)?.consume_convert_to_full();
76-
77-
for (expected_column, column) in expected_data_block
78-
.columns()
79-
.iter()
80-
.zip(data_block.columns())
81-
{
82-
assert_eq!(expected_column.data_type, column.data_type);
83-
assert_eq!(expected_column.value, column.value);
84-
}
80+
let expected_blocks = pretty_format_blocks(&expected)?;
81+
let expected_str: Vec<&str> = expected_blocks.split('\n').collect();
82+
databend_common_expression::block_debug::assert_blocks_sorted_eq(
83+
expected_str,
84+
blocks.as_slice(),
85+
);
8586

8687
Ok(())
8788
}
8889

89-
async fn check_partitions(parts: &Partitions, fixture: &TestFixture) -> Result<()> {
90+
async fn check_partitions(parts: &Vec<PartInfoPtr>, fixture: &TestFixture) -> Result<()> {
9091
let mut segment_name = HashSet::new();
9192
let mut block_name = HashSet::new();
9293

@@ -129,7 +130,7 @@ async fn check_partitions(parts: &Partitions, fixture: &TestFixture) -> Result<(
129130
}
130131
}
131132

132-
for part in &parts.partitions {
133+
for part in parts {
133134
let fuse_part = FuseBlockPartInfo::from_part(part)?;
134135
let block_meta = fuse_part.block_meta_index.as_ref().unwrap();
135136
assert_eq!(
@@ -166,6 +167,11 @@ async fn test_internal_column() -> Result<()> {
166167
.get_internal_column(BLOCK_NAME_COL_NAME)
167168
.unwrap(),
168169
];
170+
let internal_columns_map = internal_columns
171+
.iter()
172+
.enumerate()
173+
.map(|(i, col)| (i, col.clone()))
174+
.collect::<BTreeMap<FieldIndex, InternalColumn>>();
169175

170176
// insert 5 times
171177
let n = 5;
@@ -188,7 +194,34 @@ async fn test_internal_column() -> Result<()> {
188194
let blocks = res.try_collect::<Vec<DataBlock>>().await?;
189195

190196
let table = fixture.latest_default_table().await?;
191-
let (_, parts) = table.read_partitions(ctx.clone(), None, true).await?;
197+
let data_source_plan = table
198+
.read_plan(
199+
ctx.clone(),
200+
None,
201+
Some(internal_columns_map.clone()),
202+
false,
203+
false,
204+
)
205+
.await?;
206+
207+
let mut dummy_pipeline = Pipeline::create();
208+
let parts = if let Some(mut prune_pipeline) =
209+
table.build_prune_pipeline(ctx.clone(), &data_source_plan, &mut dummy_pipeline)?
210+
{
211+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
212+
let rx = fuse_table.pruned_result_receiver.lock().clone().unwrap();
213+
prune_pipeline.set_max_threads(1);
214+
let settings = ExecutorSettings::try_create(ctx.clone())?;
215+
let executor = QueryPipelineExecutor::create(prune_pipeline, settings)?;
216+
executor.execute()?;
217+
let mut parts = Vec::new();
218+
while let Ok(Ok(segment)) = rx.recv().await {
219+
parts.push(segment);
220+
}
221+
parts
222+
} else {
223+
data_source_plan.parts.partitions.clone()
224+
};
192225
let expected = expected_data_block(&parts, &internal_columns)?;
193226
check_partitions(&parts, &fixture).await?;
194227
check_data_block(expected, blocks)?;
@@ -213,7 +246,35 @@ async fn test_internal_column() -> Result<()> {
213246
let blocks = res.try_collect::<Vec<DataBlock>>().await?;
214247

215248
let table = fixture.latest_default_table().await?;
216-
let (_, parts) = table.read_partitions(ctx.clone(), None, true).await?;
249+
let data_source_plan = table
250+
.read_plan(
251+
ctx.clone(),
252+
None,
253+
Some(internal_columns_map.clone()),
254+
false,
255+
false,
256+
)
257+
.await?;
258+
259+
let mut dummy_pipeline = Pipeline::create();
260+
let parts = if let Some(mut prune_pipeline) =
261+
table.build_prune_pipeline(ctx.clone(), &data_source_plan, &mut dummy_pipeline)?
262+
{
263+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
264+
let rx = fuse_table.pruned_result_receiver.lock().clone().unwrap();
265+
prune_pipeline.set_max_threads(1);
266+
let settings = ExecutorSettings::try_create(ctx.clone())?;
267+
let executor = QueryPipelineExecutor::create(prune_pipeline, settings)?;
268+
executor.execute()?;
269+
let mut parts = Vec::new();
270+
while let Ok(Ok(segment)) = rx.recv().await {
271+
parts.push(segment);
272+
}
273+
parts
274+
} else {
275+
data_source_plan.parts.partitions.clone()
276+
};
277+
217278
let expected = expected_data_block(&parts, &internal_columns)?;
218279
check_partitions(&parts, &fixture).await?;
219280
check_data_block(expected, blocks)?;

src/query/storages/fuse/src/fuse_table.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ pub struct FuseTable {
149149
// If this is set, reading from fuse_table should only return the increment blocks
150150
pub(crate) changes_desc: Option<ChangesDesc>,
151151

152-
pub(crate) pruned_result_receiver: Arc<Mutex<PartInfoReceiver>>,
152+
pub pruned_result_receiver: Arc<Mutex<PartInfoReceiver>>,
153153
}
154154

155155
type PartInfoReceiver = Option<Receiver<Result<PartInfoPtr>>>;

src/query/storages/fuse/src/operations/read_partitions.rs

+12-12
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,7 @@ impl FuseTable {
147147
nodes_num = cluster.nodes.len();
148148
}
149149

150-
if self.is_column_oriented()
151-
|| (segment_len > nodes_num && distributed_pruning)
152-
{
150+
if self.is_column_oriented() || (segment_len > nodes_num && distributed_pruning) {
153151
let mut segments = Vec::with_capacity(segment_locs.len());
154152
for (idx, segment_location) in segment_locs.into_iter().enumerate() {
155153
segments.push(FuseLazyPartInfo::create(idx, segment_location))
@@ -270,15 +268,17 @@ impl FuseTable {
270268
let (segment_tx, segment_rx) = async_channel::bounded(max_io_requests);
271269

272270
match segment_format {
273-
FuseSegmentFormat::Row => {self.prune_segments_with_pipeline(
274-
pruner.clone(),
275-
&mut prune_pipeline,
276-
ctx.clone(),
277-
segment_rx,
278-
part_info_tx,
279-
derterministic_cache_key.clone(),
280-
lazy_init_segments.len(),
281-
)?;}
271+
FuseSegmentFormat::Row => {
272+
self.prune_segments_with_pipeline(
273+
pruner.clone(),
274+
&mut prune_pipeline,
275+
ctx.clone(),
276+
segment_rx,
277+
part_info_tx,
278+
derterministic_cache_key.clone(),
279+
lazy_init_segments.len(),
280+
)?;
281+
}
282282
FuseSegmentFormat::Column => {
283283
self.prune_column_oriented_segments_with_pipeline(
284284
pruner.clone(),

0 commit comments

Comments
 (0)