Skip to content

Expand wildcard to actual expressions in prepare_select_exprs #15090

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 1 addition & 39 deletions datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@ use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};

use datafusion_common::config::ConfigOptions;
use datafusion_common::{assert_contains, plan_err, Result, TableReference};
use datafusion_common::{plan_err, Result, TableReference};
use datafusion_expr::planner::ExprPlanner;
use datafusion_expr::sqlparser::dialect::PostgreSqlDialect;
use datafusion_expr::test::function_stub::sum_udaf;
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF};
use datafusion_functions_aggregate::average::avg_udaf;
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_functions_aggregate::planner::AggregateFunctionPlanner;
use datafusion_functions_window::planner::WindowFunctionPlanner;
use datafusion_optimizer::analyzer::type_coercion::TypeCoercionRewriter;
use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
Expand Down Expand Up @@ -344,16 +342,6 @@ fn test_propagate_empty_relation_inner_join_and_unions() {
assert_eq!(expected, format!("{plan}"));
}

#[test]
fn select_wildcard_with_repeated_column() {
let sql = "SELECT *, col_int32 FROM test";
let err = test_sql(sql).expect_err("query should have failed");
assert_eq!(
"Schema error: Schema contains duplicate qualified field name test.col_int32",
err.strip_backtrace()
);
}

#[test]
fn select_wildcard_with_repeated_column_but_is_aliased() {
let sql = "SELECT *, col_int32 as col_32 FROM test";
Expand Down Expand Up @@ -390,32 +378,6 @@ fn select_correlated_predicate_subquery_with_uppercase_ident() {
assert_eq!(expected, format!("{plan}"));
}

// The test should return an error
// because the wildcard didn't be expanded before type coercion
#[test]
fn test_union_coercion_with_wildcard() -> Result<()> {
let dialect = PostgreSqlDialect {};
let context_provider = MyContextProvider::default();
let sql = "select * from (SELECT col_int32, col_uint32 FROM test) union all select * from(SELECT col_uint32, col_int32 FROM test)";
let statements = Parser::parse_sql(&dialect, sql)?;
let sql_to_rel = SqlToRel::new(&context_provider);
let logical_plan = sql_to_rel.sql_statement_to_plan(statements[0].clone())?;

if let LogicalPlan::Union(union) = logical_plan {
let err = TypeCoercionRewriter::coerce_union(union)
.err()
.unwrap()
.to_string();
assert_contains!(
err,
"Error during planning: Wildcard should be expanded before type coercion"
);
} else {
panic!("Expected Union plan");
}
Ok(())
}

fn test_sql(sql: &str) -> Result<LogicalPlan> {
// parse the SQL
let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ...
Expand Down
78 changes: 66 additions & 12 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@ use crate::utils::{

use datafusion_common::error::DataFusionErrorBuilder;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{not_impl_err, plan_err, Result};
use datafusion_common::{not_impl_err, plan_err, Column, Result};
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
use datafusion_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_sorts,
};
use datafusion_expr::utils::{
expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs,
expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, expr_to_columns,
find_aggregate_exprs, find_window_exprs,
};
use datafusion_expr::{
qualified_wildcard_with_options, wildcard_with_options, Aggregate, Expr, Filter,
GroupingSet, LogicalPlan, LogicalPlanBuilder, LogicalPlanBuilderOptions,
Partitioning,
Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder,
LogicalPlanBuilderOptions, Partitioning,
};

use indexmap::IndexMap;
Expand Down Expand Up @@ -92,6 +92,12 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
planner_context,
)?;

// TOOD: remove this after Expr::Wildcard is removed
#[allow(deprecated)]
for expr in &select_exprs {
debug_assert!(!matches!(expr, Expr::Wildcard { .. }));
}

// Having and group by clause may reference aliases defined in select projection
let projected_plan = self.project(base_plan.clone(), select_exprs.clone())?;

Expand Down Expand Up @@ -583,7 +589,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
let mut error_builder = DataFusionErrorBuilder::new();
for expr in projection {
match self.sql_select_to_rex(expr, plan, empty_from, planner_context) {
Ok(expr) => prepared_select_exprs.push(expr),
Ok(expr) => prepared_select_exprs.extend(expr),
Err(err) => error_builder.add_error(err),
}
}
Expand All @@ -597,7 +603,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
plan: &LogicalPlan,
empty_from: bool,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
) -> Result<Vec<Expr>> {
match sql {
SelectItem::UnnamedExpr(expr) => {
let expr = self.sql_to_expr(expr, plan.schema(), planner_context)?;
Expand All @@ -606,7 +612,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
&[&[plan.schema()]],
&plan.using_columns()?,
)?;
Ok(col)
Ok(vec![col])
}
SelectItem::ExprWithAlias { expr, alias } => {
let select_expr =
Expand All @@ -622,7 +628,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
Expr::Column(column) if column.name.eq(&name) => col,
_ => col.alias(name),
};
Ok(expr)
Ok(vec![expr])
}
SelectItem::Wildcard(options) => {
Self::check_wildcard_options(&options)?;
Expand All @@ -635,7 +641,17 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
planner_context,
options,
)?;
Ok(wildcard_with_options(planned_options))

let expanded =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There two parts are the real change, others are test adjustment

expand_wildcard(plan.schema(), plan, Some(&planned_options))?;

// If there is a REPLACE statement, replace that column with the given
// replace expression. Column name remains the same.
if let Some(replace) = planned_options.replace {
replace_columns(expanded, &replace)
} else {
Ok(expanded)
}
}
SelectItem::QualifiedWildcard(object_name, options) => {
Self::check_wildcard_options(&options)?;
Expand All @@ -646,7 +662,19 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
planner_context,
options,
)?;
Ok(qualified_wildcard_with_options(qualifier, planned_options))

let expanded = expand_qualified_wildcard(
&qualifier,
plan.schema(),
Some(&planned_options),
)?;
// If there is a REPLACE statement, replace that column with the given
// replace expression. Column name remains the same.
if let Some(replace) = planned_options.replace {
replace_columns(expanded, &replace)
} else {
Ok(expanded)
}
}
}
}
Expand Down Expand Up @@ -698,7 +726,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
planner_context,
)
})
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();
let planned_replace = PlannedReplaceSelectItem {
items: replace.items.into_iter().map(|i| *i).collect(),
planned_expressions: replace_expr,
Expand Down Expand Up @@ -884,3 +915,26 @@ fn match_window_definitions(
}
Ok(())
}

/// If there is a REPLACE statement in the projected expression in the form of
/// "REPLACE (some_column_within_an_expr AS some_column)", this function replaces
/// that column with the given replace expression. Column name remains the same.
/// Multiple REPLACEs are also possible with comma separations.
fn replace_columns(
mut exprs: Vec<Expr>,
replace: &PlannedReplaceSelectItem,
) -> Result<Vec<Expr>> {
for expr in exprs.iter_mut() {
if let Expr::Column(Column { name, .. }) = expr {
if let Some((_, new_expr)) = replace
.items()
.iter()
.zip(replace.expressions().iter())
.find(|(item, _)| item.column_name.value == *name)
{
*expr = new_expr.clone().alias(name.clone())
}
}
}
Ok(exprs)
}
Loading