Skip to content

Commit c57f022

Browse files
dqhl76zhang2014
andauthored
fix: align EXPLAIN output with actual partition pruning statistics (#17491)
* fix: apply pruning pipeline in EXPLAIN to avoid OOM * fix: fix some stats missing * fixup * fixup ut * fix possible explain hang * fix possible explain hang --------- Co-authored-by: Winter Zhang <[email protected]>
1 parent e62d113 commit c57f022

File tree

10 files changed

+316
-41
lines changed

10 files changed

+316
-41
lines changed

โ€Žsrc/query/catalog/src/table_context.rs

+8
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ use crate::lock::LockTableOption;
7171
use crate::merge_into_join::MergeIntoJoin;
7272
use crate::plan::DataSourcePlan;
7373
use crate::plan::PartInfoPtr;
74+
use crate::plan::PartStatistics;
7475
use crate::plan::Partitions;
7576
use crate::query_kind::QueryKind;
7677
use crate::runtime_filter_info::RuntimeFilterInfo;
@@ -408,6 +409,13 @@ pub trait TableContext: Send + Sync {
408409
fn get_consume_streams(&self, _query: bool) -> Result<Vec<Arc<dyn Table>>> {
409410
unimplemented!()
410411
}
412+
413+
fn get_pruned_partitions_stats(&self) -> Option<PartStatistics> {
414+
unimplemented!()
415+
}
416+
fn set_pruned_partitions_stats(&self, _partitions: PartStatistics) {
417+
unimplemented!()
418+
}
411419
}
412420

413421
pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;

โ€Žsrc/query/service/src/interpreters/interpreter_explain.rs

+60-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use databend_common_ast::ast::FormatTreeNode;
2121
use databend_common_base::runtime::profile::get_statistics_desc;
2222
use databend_common_base::runtime::profile::ProfileDesc;
2323
use databend_common_base::runtime::profile::ProfileStatisticsName;
24+
use databend_common_catalog::plan::DataSourcePlan;
25+
use databend_common_catalog::plan::PartStatistics;
2426
use databend_common_catalog::table_context::TableContext;
2527
use databend_common_exception::ErrorCode;
2628
use databend_common_exception::Result;
@@ -31,6 +33,7 @@ use databend_common_expression::FromData;
3133
use databend_common_pipeline_core::always_callback;
3234
use databend_common_pipeline_core::processors::PlanProfile;
3335
use databend_common_pipeline_core::ExecutionInfo;
36+
use databend_common_pipeline_core::Pipeline;
3437
use databend_common_sql::binder::ExplainConfig;
3538
use databend_common_sql::executor::format_partial_tree;
3639
use databend_common_sql::executor::MutationBuildInfo;
@@ -39,6 +42,8 @@ use databend_common_sql::BindContext;
3942
use databend_common_sql::ColumnSet;
4043
use databend_common_sql::FormatOptions;
4144
use databend_common_sql::MetadataRef;
45+
use databend_common_storages_fuse::FuseLazyPartInfo;
46+
use databend_common_storages_fuse::FuseTable;
4247
use databend_common_storages_result_cache::gen_result_cache_key;
4348
use databend_common_storages_result_cache::ResultCacheReader;
4449
use databend_common_users::UserApiProvider;
@@ -54,6 +59,7 @@ use crate::interpreters::Interpreter;
5459
use crate::pipelines::executor::ExecutorSettings;
5560
use crate::pipelines::executor::PipelineCompleteExecutor;
5661
use crate::pipelines::executor::PipelinePullingExecutor;
62+
use crate::pipelines::executor::QueryPipelineExecutor;
5763
use crate::pipelines::PipelineBuildResult;
5864
use crate::schedulers::build_query_pipeline;
5965
use crate::schedulers::Fragmenter;
@@ -152,7 +158,8 @@ impl Interpreter for ExplainInterpreter {
152158
schema.clone(),
153159
metadata.clone(),
154160
)?;
155-
let plan = interpreter.build_physical_plan(&mutation, true).await?;
161+
let mut plan = interpreter.build_physical_plan(&mutation, true).await?;
162+
self.inject_pruned_partitions_stats(&mut plan, metadata)?;
156163
self.explain_physical_plan(&plan, metadata, &None).await?
157164
}
158165
_ => self.explain_plan(&self.plan)?,
@@ -541,7 +548,8 @@ impl ExplainInterpreter {
541548
// we should not use `dry_run` mode to build the physical plan.
542549
// It's because we need to get the same partitions as the original selecting plan.
543550
let mut builder = PhysicalPlanBuilder::new(metadata.clone(), ctx, formatted_ast.is_none());
544-
let plan = builder.build(s_expr, bind_context.column_set()).await?;
551+
let mut plan = builder.build(s_expr, bind_context.column_set()).await?;
552+
self.inject_pruned_partitions_stats(&mut plan, metadata)?;
545553
self.explain_physical_plan(&plan, metadata, formatted_ast)
546554
.await
547555
}
@@ -572,4 +580,54 @@ impl ExplainInterpreter {
572580
let formatted_plan = StringType::from_data(line_split_result);
573581
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
574582
}
583+
584+
fn inject_pruned_partitions_stats(
585+
&self,
586+
plan: &mut PhysicalPlan,
587+
metadata: &MetadataRef,
588+
) -> Result<()> {
589+
let mut sources = vec![];
590+
plan.get_all_data_source(&mut sources);
591+
for (id, source) in sources {
592+
if let Some(stat) = self.prune_lazy_parts(metadata, &source)? {
593+
plan.set_pruning_stats(id, stat);
594+
}
595+
}
596+
Ok(())
597+
}
598+
599+
fn prune_lazy_parts(
600+
&self,
601+
metadata: &MetadataRef,
602+
source: &DataSourcePlan,
603+
) -> Result<Option<PartStatistics>> {
604+
let partitions = source.parts.partitions.first();
605+
if partitions.is_none_or(|part| part.as_any().downcast_ref::<FuseLazyPartInfo>().is_none())
606+
{
607+
return Ok(None);
608+
}
609+
let meta = metadata.read();
610+
let max_threads = self.ctx.get_settings().get_max_threads()?;
611+
let table_entry = meta.table(source.scan_id);
612+
if let Some(fuse_table) = table_entry.table().as_any().downcast_ref::<FuseTable>() {
613+
let mut dummy_pipeline = Pipeline::create();
614+
let prune_pipeline = fuse_table.do_build_prune_pipeline(
615+
self.ctx.clone(),
616+
source,
617+
&mut dummy_pipeline,
618+
true,
619+
)?;
620+
if let Some(mut pipeline) = prune_pipeline {
621+
pipeline.set_max_threads(max_threads as usize);
622+
let settings = ExecutorSettings::try_create(self.ctx.clone())?;
623+
let executor = QueryPipelineExecutor::create(pipeline, settings)?;
624+
executor.execute()?;
625+
}
626+
let stats = self.ctx.get_pruned_partitions_stats();
627+
if stats.is_some() {
628+
return Ok(stats);
629+
}
630+
}
631+
Ok(None)
632+
}
575633
}

โ€Žsrc/query/service/src/sessions/query_ctx.rs

+9
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use databend_common_catalog::plan::DataSourceInfo;
4747
use databend_common_catalog::plan::DataSourcePlan;
4848
use databend_common_catalog::plan::ParquetReadOptions;
4949
use databend_common_catalog::plan::PartInfoPtr;
50+
use databend_common_catalog::plan::PartStatistics;
5051
use databend_common_catalog::plan::Partitions;
5152
use databend_common_catalog::plan::StageTableInfo;
5253
use databend_common_catalog::query_kind::QueryKind;
@@ -1826,6 +1827,14 @@ impl TableContext for QueryContext {
18261827
async fn get_warehouse_cluster(&self) -> Result<Arc<Cluster>> {
18271828
self.shared.get_warehouse_clusters().await
18281829
}
1830+
1831+
fn get_pruned_partitions_stats(&self) -> Option<PartStatistics> {
1832+
self.shared.get_pruned_partitions_stats()
1833+
}
1834+
1835+
fn set_pruned_partitions_stats(&self, partitions: PartStatistics) {
1836+
self.shared.set_pruned_partitions_stats(partitions);
1837+
}
18291838
}
18301839

18311840
impl TrySpawn for QueryContext {

โ€Žsrc/query/service/src/sessions/query_ctx_shared.rs

+13
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use databend_common_base::runtime::Runtime;
3333
use databend_common_catalog::catalog::Catalog;
3434
use databend_common_catalog::catalog::CatalogManager;
3535
use databend_common_catalog::merge_into_join::MergeIntoJoin;
36+
use databend_common_catalog::plan::PartStatistics;
3637
use databend_common_catalog::query_kind::QueryKind;
3738
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
3839
use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
@@ -167,6 +168,8 @@ pub struct QueryContextShared {
167168

168169
// Used by hilbert clustering when do recluster.
169170
pub(in crate::sessions) selected_segment_locs: Arc<RwLock<HashSet<Location>>>,
171+
172+
pub(in crate::sessions) pruned_partitions_stats: Arc<RwLock<Option<PartStatistics>>>,
170173
}
171174

172175
impl QueryContextShared {
@@ -232,6 +235,7 @@ impl QueryContextShared {
232235
mem_stat: Arc::new(RwLock::new(None)),
233236
node_memory_usage: Arc::new(RwLock::new(HashMap::new())),
234237
selected_segment_locs: Default::default(),
238+
pruned_partitions_stats: Arc::new(RwLock::new(None)),
235239
}))
236240
}
237241

@@ -765,6 +769,15 @@ impl QueryContextShared {
765769
pub fn get_table_meta_timestamps(&self) -> Arc<Mutex<HashMap<u64, TableMetaTimestamps>>> {
766770
self.table_meta_timestamps.clone()
767771
}
772+
773+
pub fn get_pruned_partitions_stats(&self) -> Option<PartStatistics> {
774+
self.pruned_partitions_stats.read().clone()
775+
}
776+
777+
pub fn set_pruned_partitions_stats(&self, stats: PartStatistics) {
778+
let mut guard = self.pruned_partitions_stats.write();
779+
*guard = Some(stats);
780+
}
768781
}
769782

770783
impl Drop for QueryContextShared {

โ€Žsrc/query/service/tests/it/storages/fuse/operations/internal_column.rs

+81-20
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,31 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::BTreeMap;
1516
use std::collections::HashSet;
1617

1718
use databend_common_base::base::tokio;
1819
use databend_common_catalog::plan::InternalColumn;
1920
use databend_common_catalog::plan::InternalColumnMeta;
20-
use databend_common_catalog::plan::Partitions;
21+
use databend_common_catalog::plan::PartInfoPtr;
2122
use databend_common_exception::Result;
23+
use databend_common_expression::block_debug::pretty_format_blocks;
2224
use databend_common_expression::DataBlock;
25+
use databend_common_expression::FieldIndex;
2326
use databend_common_expression::BLOCK_NAME_COL_NAME;
2427
use databend_common_expression::ROW_ID_COL_NAME;
2528
use databend_common_expression::SEGMENT_NAME_COL_NAME;
2629
use databend_common_expression::SNAPSHOT_NAME_COL_NAME;
30+
use databend_common_pipeline_core::Pipeline;
2731
use databend_common_sql::binder::INTERNAL_COLUMN_FACTORY;
32+
use databend_common_sql::executor::table_read_plan::ToReadDataSourcePlan;
2833
use databend_common_sql::Planner;
2934
use databend_common_storages_fuse::io::MetaReaders;
3035
use databend_common_storages_fuse::FuseBlockPartInfo;
3136
use databend_common_storages_fuse::FuseTable;
3237
use databend_query::interpreters::InterpreterFactory;
38+
use databend_query::pipelines::executor::ExecutorSettings;
39+
use databend_query::pipelines::executor::QueryPipelineExecutor;
3340
use databend_query::test_kits::*;
3441
use databend_storages_common_cache::LoadParams;
3542
use databend_storages_common_table_meta::meta::SegmentInfo;
@@ -39,11 +46,11 @@ use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
3946
use futures::TryStreamExt;
4047

4148
fn expected_data_block(
42-
parts: &Partitions,
49+
parts: &Vec<PartInfoPtr>,
4350
internal_columns: &Vec<InternalColumn>,
4451
) -> Result<Vec<DataBlock>> {
45-
let mut data_blocks = Vec::with_capacity(parts.partitions.len());
46-
for part in &parts.partitions {
52+
let mut data_blocks = Vec::with_capacity(parts.len());
53+
for part in parts {
4754
let fuse_part = FuseBlockPartInfo::from_part(part)?;
4855
let num_rows = fuse_part.nums_rows;
4956
let block_meta = fuse_part.block_meta_index.as_ref().unwrap();
@@ -65,28 +72,22 @@ fn expected_data_block(
6572
}
6673
data_blocks.push(DataBlock::new(columns, num_rows));
6774
}
68-
data_blocks.reverse();
6975

7076
Ok(data_blocks)
7177
}
7278

7379
fn check_data_block(expected: Vec<DataBlock>, blocks: Vec<DataBlock>) -> Result<()> {
74-
let expected_data_block = DataBlock::concat(&expected)?.consume_convert_to_full();
75-
let data_block = DataBlock::concat(&blocks)?.consume_convert_to_full();
76-
77-
for (expected_column, column) in expected_data_block
78-
.columns()
79-
.iter()
80-
.zip(data_block.columns())
81-
{
82-
assert_eq!(expected_column.data_type, column.data_type);
83-
assert_eq!(expected_column.value, column.value);
84-
}
80+
let expected_blocks = pretty_format_blocks(&expected)?;
81+
let expected_str: Vec<&str> = expected_blocks.split('\n').collect();
82+
databend_common_expression::block_debug::assert_blocks_sorted_eq(
83+
expected_str,
84+
blocks.as_slice(),
85+
);
8586

8687
Ok(())
8788
}
8889

89-
async fn check_partitions(parts: &Partitions, fixture: &TestFixture) -> Result<()> {
90+
async fn check_partitions(parts: &Vec<PartInfoPtr>, fixture: &TestFixture) -> Result<()> {
9091
let mut segment_name = HashSet::new();
9192
let mut block_name = HashSet::new();
9293

@@ -129,7 +130,7 @@ async fn check_partitions(parts: &Partitions, fixture: &TestFixture) -> Result<(
129130
}
130131
}
131132

132-
for part in &parts.partitions {
133+
for part in parts {
133134
let fuse_part = FuseBlockPartInfo::from_part(part)?;
134135
let block_meta = fuse_part.block_meta_index.as_ref().unwrap();
135136
assert_eq!(
@@ -166,6 +167,11 @@ async fn test_internal_column() -> Result<()> {
166167
.get_internal_column(BLOCK_NAME_COL_NAME)
167168
.unwrap(),
168169
];
170+
let internal_columns_map = internal_columns
171+
.iter()
172+
.enumerate()
173+
.map(|(i, col)| (i, col.clone()))
174+
.collect::<BTreeMap<FieldIndex, InternalColumn>>();
169175

170176
// insert 5 times
171177
let n = 5;
@@ -188,7 +194,34 @@ async fn test_internal_column() -> Result<()> {
188194
let blocks = res.try_collect::<Vec<DataBlock>>().await?;
189195

190196
let table = fixture.latest_default_table().await?;
191-
let (_, parts) = table.read_partitions(ctx.clone(), None, true).await?;
197+
let data_source_plan = table
198+
.read_plan(
199+
ctx.clone(),
200+
None,
201+
Some(internal_columns_map.clone()),
202+
false,
203+
false,
204+
)
205+
.await?;
206+
207+
let mut dummy_pipeline = Pipeline::create();
208+
let parts = if let Some(mut prune_pipeline) =
209+
table.build_prune_pipeline(ctx.clone(), &data_source_plan, &mut dummy_pipeline)?
210+
{
211+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
212+
let rx = fuse_table.pruned_result_receiver.lock().clone().unwrap();
213+
prune_pipeline.set_max_threads(1);
214+
let settings = ExecutorSettings::try_create(ctx.clone())?;
215+
let executor = QueryPipelineExecutor::create(prune_pipeline, settings)?;
216+
executor.execute()?;
217+
let mut parts = Vec::new();
218+
while let Ok(Ok(segment)) = rx.recv().await {
219+
parts.push(segment);
220+
}
221+
parts
222+
} else {
223+
data_source_plan.parts.partitions.clone()
224+
};
192225
let expected = expected_data_block(&parts, &internal_columns)?;
193226
check_partitions(&parts, &fixture).await?;
194227
check_data_block(expected, blocks)?;
@@ -213,7 +246,35 @@ async fn test_internal_column() -> Result<()> {
213246
let blocks = res.try_collect::<Vec<DataBlock>>().await?;
214247

215248
let table = fixture.latest_default_table().await?;
216-
let (_, parts) = table.read_partitions(ctx.clone(), None, true).await?;
249+
let data_source_plan = table
250+
.read_plan(
251+
ctx.clone(),
252+
None,
253+
Some(internal_columns_map.clone()),
254+
false,
255+
false,
256+
)
257+
.await?;
258+
259+
let mut dummy_pipeline = Pipeline::create();
260+
let parts = if let Some(mut prune_pipeline) =
261+
table.build_prune_pipeline(ctx.clone(), &data_source_plan, &mut dummy_pipeline)?
262+
{
263+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
264+
let rx = fuse_table.pruned_result_receiver.lock().clone().unwrap();
265+
prune_pipeline.set_max_threads(1);
266+
let settings = ExecutorSettings::try_create(ctx.clone())?;
267+
let executor = QueryPipelineExecutor::create(prune_pipeline, settings)?;
268+
executor.execute()?;
269+
let mut parts = Vec::new();
270+
while let Ok(Ok(segment)) = rx.recv().await {
271+
parts.push(segment);
272+
}
273+
parts
274+
} else {
275+
data_source_plan.parts.partitions.clone()
276+
};
277+
217278
let expected = expected_data_block(&parts, &internal_columns)?;
218279
check_partitions(&parts, &fixture).await?;
219280
check_data_block(expected, blocks)?;

โ€Žsrc/query/service/tests/it/storages/fuse/pruning_pipeline.rs

+2
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ async fn apply_snapshot_pruning(
9595
segment_rx,
9696
res_tx,
9797
cache_key,
98+
segment_locs.len(),
99+
false,
98100
)?;
99101
prune_pipeline.set_max_threads(1);
100102
prune_pipeline.set_on_init(move || {

0 commit comments

Comments
ย (0)