Skip to content

feat(query): Support variant json path push down as virtual column #10729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Apr 10, 2023
Merged
487 changes: 294 additions & 193 deletions Cargo.lock

Large diffs are not rendered by default.

28 changes: 27 additions & 1 deletion src/query/catalog/src/plan/pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,37 @@ use std::fmt::Debug;

use common_expression::types::DataType;
use common_expression::RemoteExpr;
use common_expression::Scalar;
use common_expression::TableDataType;
use common_expression::TableField;
use common_expression::TableSchema;

use crate::plan::Projection;

/// Information of Virtual Columns.
///
/// Generated from the source column by the paths.
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct VirtualColumnInfo {
/// Source column name
pub source_name: String,
/// Virtual column name
pub name: String,
/// Paths to generate virtual column from source column
pub paths: Vec<Scalar>,
/// Virtual column data type
pub data_type: Box<TableDataType>,
}

/// Information about prewhere optimization.
///
/// Prewhere steps:
///
/// 1. Read columns by `prewhere_columns`.
/// 2. Filter data by `filter`.
/// 3. Read columns by `remain_columns`.
/// 4. Combine columns from step 1 and step 3, and prune columns to be `output_columns`.
/// 4. If virtual columns are required, generate them from the source columns.
/// 5. Combine columns from step 1 and step 4, and prune columns to be `output_columns`.
///
/// **NOTE: the [`Projection`] is to be applied for the [`TableSchema`] of the data source.**
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
Expand All @@ -44,6 +62,8 @@ pub struct PrewhereInfo {
/// filter for prewhere
/// Assumption: expression's data type must be `DataType::Boolean`.
pub filter: RemoteExpr<String>,
/// Optional prewhere virtual columns
pub virtual_columns: Option<Vec<VirtualColumnInfo>>,
}

/// Extras is a wrapper for push down items.
Expand All @@ -52,6 +72,10 @@ pub struct PushDownInfo {
/// Optional column indices to use as a projection.
/// It represents the columns to be read from the source.
pub projection: Option<Projection>,
/// Optional column indices as output by the scan, only used when having virtual columns.
/// The difference with `projection` is the removal of the source columns
/// which were only used to generate virtual columns.
pub output_columns: Option<Projection>,
/// Optional filter expression plan
/// Assumption: expression's data type must be `DataType::Boolean`.
pub filter: Option<RemoteExpr<String>>,
Expand All @@ -62,6 +86,8 @@ pub struct PushDownInfo {
pub limit: Option<usize>,
/// Optional order_by expression plan, asc, null_first
pub order_by: Vec<(RemoteExpr<String>, bool, bool)>,
/// Optional virtual columns
pub virtual_columns: Option<Vec<VirtualColumnInfo>>,
}

/// TopK is a wrapper for topk push down items.
Expand Down
5 changes: 5 additions & 0 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ pub trait Table: Sync + Send {
false
}

/// Whether the table engine supports virtual columns optimization.
fn support_virtual_columns(&self) -> bool {
false
}

#[async_backtrace::framed]
async fn alter_table_cluster_keys(
&self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,12 @@ fn test_to_partitions() -> Result<()> {
// kick off
let push_down = Some(PushDownInfo {
projection: Some(proj),
output_columns: None,
filter: None,
limit: None,
order_by: vec![],
prewhere: None,
virtual_columns: None,
});

let (stats, parts) =
Expand Down Expand Up @@ -173,10 +175,12 @@ async fn test_fuse_table_exact_statistic() -> Result<()> {
let proj = Projection::Columns(vec![]);
let push_downs = PushDownInfo {
projection: Some(proj),
output_columns: None,
filter: None,
prewhere: None,
limit: None,
order_by: vec![],
virtual_columns: None,
};
let (stats, parts) = table.read_partitions(ctx.clone(), Some(push_downs)).await?;
assert_eq!(stats.read_rows, num_blocks * rows_per_block);
Expand Down
122 changes: 30 additions & 92 deletions src/query/sql/src/executor/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ use crate::executor::RuntimeFilterSource;
use crate::executor::Window;
use crate::planner::MetadataRef;
use crate::planner::DUMMY_TABLE_INDEX;
use crate::BaseTableColumn;
use crate::ColumnEntry;
use crate::DerivedColumn;
use crate::TableInternalColumn;

impl PhysicalPlan {
pub fn format(
Expand Down Expand Up @@ -182,14 +178,30 @@ fn table_scan_to_format_tree(
.map_or("NONE".to_string(), |limit| limit.to_string())
});

let virtual_columns = plan.source.push_downs.as_ref().and_then(|extras| {
extras.virtual_columns.as_ref().map(|columns| {
let mut names = columns.iter().map(|c| c.name.clone()).collect::<Vec<_>>();
names.sort();
names.iter().join(", ")
})
});

let mut children = vec![FormatTreeNode::new(format!("table: {table_name}"))];

// Part stats.
children.extend(part_stats_info_to_format_tree(&plan.source.statistics));
// Push downs.
children.push(FormatTreeNode::new(format!(
"push downs: [filters: [{filters}], limit: {limit}]"
)));
let push_downs = match virtual_columns {
Some(virtual_columns) => {
format!(
"push downs: [filters: [{filters}], limit: {limit}, virtual_columns: [{virtual_columns}]]"
)
}
None => {
format!("push downs: [filters: [{filters}], limit: {limit}]")
}
};
children.push(FormatTreeNode::new(push_downs));

let output_columns = plan.source.output_schema.fields();

Expand Down Expand Up @@ -254,20 +266,7 @@ fn project_to_format_tree(
.columns
.iter()
.sorted()
.map(|column| {
format!(
"{} (#{})",
match metadata.read().column(*column) {
ColumnEntry::BaseTableColumn(BaseTableColumn { column_name, .. }) =>
column_name,
ColumnEntry::DerivedColumn(DerivedColumn { alias, .. }) => alias,
ColumnEntry::InternalColumn(TableInternalColumn {
internal_column, ..
}) => internal_column.column_name(),
},
column
)
})
.map(|&index| format!("{} (#{})", metadata.read().column(index).name(), index))
.collect::<Vec<_>>()
.join(", ");
let mut children = vec![FormatTreeNode::new(format!("columns: [{columns}]"))];
Expand Down Expand Up @@ -331,18 +330,7 @@ pub fn pretty_display_agg_desc(desc: &AggregateFunctionDesc, metadata: &Metadata
desc.sig.name,
desc.arg_indices
.iter()
.map(|&index| {
let column = metadata.read().column(index).clone();
match column {
ColumnEntry::BaseTableColumn(BaseTableColumn { column_name, .. }) => {
column_name
}
ColumnEntry::DerivedColumn(DerivedColumn { alias, .. }) => alias,
ColumnEntry::InternalColumn(TableInternalColumn {
internal_column, ..
}) => internal_column.column_name().to_string(),
}
})
.map(|&index| { metadata.read().column(index).name() })
.collect::<Vec<_>>()
.join(", ")
)
Expand All @@ -358,19 +346,7 @@ fn aggregate_expand_to_format_tree(
.iter()
.map(|set| {
set.iter()
.map(|column| {
let column = metadata.read().column(*column).clone();
match column {
ColumnEntry::BaseTableColumn(BaseTableColumn { column_name, .. }) => {
column_name
}
ColumnEntry::DerivedColumn(DerivedColumn { alias, .. }) => alias,
ColumnEntry::InternalColumn(TableInternalColumn {
internal_column,
..
}) => internal_column.column_name().to_string(),
}
})
.map(|&index| metadata.read().column(index).name())
.collect::<Vec<_>>()
.join(", ")
})
Expand Down Expand Up @@ -408,15 +384,8 @@ fn aggregate_partial_to_format_tree(
let group_by = plan
.group_by
.iter()
.map(|column| {
let column = metadata.read().column(*column).clone();
let name = match column {
ColumnEntry::BaseTableColumn(BaseTableColumn { column_name, .. }) => column_name,
ColumnEntry::DerivedColumn(DerivedColumn { alias, .. }) => alias,
ColumnEntry::InternalColumn(TableInternalColumn {
internal_column, ..
}) => internal_column.column_name().to_string(),
};
.map(|&index| {
let name = metadata.read().column(index).name();
Ok(name)
})
.collect::<Result<Vec<_>>>()?
Expand Down Expand Up @@ -461,15 +430,8 @@ fn aggregate_final_to_format_tree(
let group_by = plan
.group_by
.iter()
.map(|column| {
let column = metadata.read().column(*column).clone();
let name = match column {
ColumnEntry::BaseTableColumn(BaseTableColumn { column_name, .. }) => column_name,
ColumnEntry::DerivedColumn(DerivedColumn { alias, .. }) => alias,
ColumnEntry::InternalColumn(TableInternalColumn {
internal_column, ..
}) => internal_column.column_name().to_string(),
};
.map(|&index| {
let name = metadata.read().column(index).name();
Ok(name)
})
.collect::<Result<Vec<_>>>()?
Expand Down Expand Up @@ -520,15 +482,8 @@ fn window_to_format_tree(
let partition_by = plan
.partition_by
.iter()
.map(|col| {
let column = metadata.read().column(*col).clone();
let name = match column {
ColumnEntry::BaseTableColumn(BaseTableColumn { column_name, .. }) => column_name,
ColumnEntry::DerivedColumn(DerivedColumn { alias, .. }) => alias,
ColumnEntry::InternalColumn(TableInternalColumn {
internal_column, ..
}) => internal_column.column_name().to_string(),
};
.map(|&index| {
let name = metadata.read().column(index).name();
Ok(name)
})
.collect::<Result<Vec<_>>>()?
Expand All @@ -538,14 +493,7 @@ fn window_to_format_tree(
.order_by
.iter()
.map(|v| {
let column = metadata.read().column(v.order_by).clone();
let name = match column {
ColumnEntry::BaseTableColumn(BaseTableColumn { column_name, .. }) => column_name,
ColumnEntry::DerivedColumn(DerivedColumn { alias, .. }) => alias,
ColumnEntry::InternalColumn(TableInternalColumn {
internal_column, ..
}) => internal_column.column_name().to_string(),
};
let name = metadata.read().column(v.order_by).name();
Ok(name)
})
.collect::<Result<Vec<_>>>()?
Expand Down Expand Up @@ -590,19 +538,9 @@ fn sort_to_format_tree(
.iter()
.map(|sort_key| {
let index = sort_key.order_by;
let column = metadata.read().column(index).clone();
Ok(format!(
"{} {} {}",
match column {
ColumnEntry::BaseTableColumn(BaseTableColumn { column_name, .. }) =>
column_name,
ColumnEntry::DerivedColumn(DerivedColumn { alias, .. }) => alias,
ColumnEntry::InternalColumn(TableInternalColumn {
internal_column, ..
}) => {
internal_column.column_name().to_string()
}
},
metadata.read().column(index).name(),
if sort_key.asc { "ASC" } else { "DESC" },
if sort_key.nulls_first {
"NULLS FIRST"
Expand Down
Loading