Skip to content

Commit 359735a

Browse files
authored
Expand wildcard to actual expressions in prepare_select_exprs (#15090)
* first draft * fix tests * cleanup * assert * clippy * fix test * fix test
1 parent 4b8a2d8 commit 359735a

File tree

13 files changed

+673
-607
lines changed

13 files changed

+673
-607
lines changed

datafusion/optimizer/tests/optimizer_integration.rs

Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,14 @@ use std::sync::Arc;
2222
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
2323

2424
use datafusion_common::config::ConfigOptions;
25-
use datafusion_common::{assert_contains, plan_err, Result, TableReference};
25+
use datafusion_common::{plan_err, Result, TableReference};
2626
use datafusion_expr::planner::ExprPlanner;
27-
use datafusion_expr::sqlparser::dialect::PostgreSqlDialect;
2827
use datafusion_expr::test::function_stub::sum_udaf;
2928
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF};
3029
use datafusion_functions_aggregate::average::avg_udaf;
3130
use datafusion_functions_aggregate::count::count_udaf;
3231
use datafusion_functions_aggregate::planner::AggregateFunctionPlanner;
3332
use datafusion_functions_window::planner::WindowFunctionPlanner;
34-
use datafusion_optimizer::analyzer::type_coercion::TypeCoercionRewriter;
3533
use datafusion_optimizer::analyzer::Analyzer;
3634
use datafusion_optimizer::optimizer::Optimizer;
3735
use datafusion_optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
@@ -344,16 +342,6 @@ fn test_propagate_empty_relation_inner_join_and_unions() {
344342
assert_eq!(expected, format!("{plan}"));
345343
}
346344

347-
#[test]
348-
fn select_wildcard_with_repeated_column() {
349-
let sql = "SELECT *, col_int32 FROM test";
350-
let err = test_sql(sql).expect_err("query should have failed");
351-
assert_eq!(
352-
"Schema error: Schema contains duplicate qualified field name test.col_int32",
353-
err.strip_backtrace()
354-
);
355-
}
356-
357345
#[test]
358346
fn select_wildcard_with_repeated_column_but_is_aliased() {
359347
let sql = "SELECT *, col_int32 as col_32 FROM test";
@@ -390,32 +378,6 @@ fn select_correlated_predicate_subquery_with_uppercase_ident() {
390378
assert_eq!(expected, format!("{plan}"));
391379
}
392380

393-
// The test should return an error
394-
// because the wildcard didn't be expanded before type coercion
395-
#[test]
396-
fn test_union_coercion_with_wildcard() -> Result<()> {
397-
let dialect = PostgreSqlDialect {};
398-
let context_provider = MyContextProvider::default();
399-
let sql = "select * from (SELECT col_int32, col_uint32 FROM test) union all select * from(SELECT col_uint32, col_int32 FROM test)";
400-
let statements = Parser::parse_sql(&dialect, sql)?;
401-
let sql_to_rel = SqlToRel::new(&context_provider);
402-
let logical_plan = sql_to_rel.sql_statement_to_plan(statements[0].clone())?;
403-
404-
if let LogicalPlan::Union(union) = logical_plan {
405-
let err = TypeCoercionRewriter::coerce_union(union)
406-
.err()
407-
.unwrap()
408-
.to_string();
409-
assert_contains!(
410-
err,
411-
"Error during planning: Wildcard should be expanded before type coercion"
412-
);
413-
} else {
414-
panic!("Expected Union plan");
415-
}
416-
Ok(())
417-
}
418-
419381
fn test_sql(sql: &str) -> Result<LogicalPlan> {
420382
// parse the SQL
421383
let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ...

datafusion/sql/src/select.rs

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,19 @@ use crate::utils::{
2727

2828
use datafusion_common::error::DataFusionErrorBuilder;
2929
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
30-
use datafusion_common::{not_impl_err, plan_err, Result};
30+
use datafusion_common::{not_impl_err, plan_err, Column, Result};
3131
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
3232
use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
3333
use datafusion_expr::expr_rewriter::{
3434
normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_sorts,
3535
};
3636
use datafusion_expr::utils::{
37-
expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs,
37+
expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, expr_to_columns,
38+
find_aggregate_exprs, find_window_exprs,
3839
};
3940
use datafusion_expr::{
40-
qualified_wildcard_with_options, wildcard_with_options, Aggregate, Expr, Filter,
41-
GroupingSet, LogicalPlan, LogicalPlanBuilder, LogicalPlanBuilderOptions,
42-
Partitioning,
41+
Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder,
42+
LogicalPlanBuilderOptions, Partitioning,
4343
};
4444

4545
use indexmap::IndexMap;
@@ -92,6 +92,12 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
9292
planner_context,
9393
)?;
9494

95+
// TOOD: remove this after Expr::Wildcard is removed
96+
#[allow(deprecated)]
97+
for expr in &select_exprs {
98+
debug_assert!(!matches!(expr, Expr::Wildcard { .. }));
99+
}
100+
95101
// Having and group by clause may reference aliases defined in select projection
96102
let projected_plan = self.project(base_plan.clone(), select_exprs.clone())?;
97103

@@ -583,7 +589,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
583589
let mut error_builder = DataFusionErrorBuilder::new();
584590
for expr in projection {
585591
match self.sql_select_to_rex(expr, plan, empty_from, planner_context) {
586-
Ok(expr) => prepared_select_exprs.push(expr),
592+
Ok(expr) => prepared_select_exprs.extend(expr),
587593
Err(err) => error_builder.add_error(err),
588594
}
589595
}
@@ -597,7 +603,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
597603
plan: &LogicalPlan,
598604
empty_from: bool,
599605
planner_context: &mut PlannerContext,
600-
) -> Result<Expr> {
606+
) -> Result<Vec<Expr>> {
601607
match sql {
602608
SelectItem::UnnamedExpr(expr) => {
603609
let expr = self.sql_to_expr(expr, plan.schema(), planner_context)?;
@@ -606,7 +612,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
606612
&[&[plan.schema()]],
607613
&plan.using_columns()?,
608614
)?;
609-
Ok(col)
615+
Ok(vec![col])
610616
}
611617
SelectItem::ExprWithAlias { expr, alias } => {
612618
let select_expr =
@@ -622,7 +628,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
622628
Expr::Column(column) if column.name.eq(&name) => col,
623629
_ => col.alias(name),
624630
};
625-
Ok(expr)
631+
Ok(vec![expr])
626632
}
627633
SelectItem::Wildcard(options) => {
628634
Self::check_wildcard_options(&options)?;
@@ -635,7 +641,17 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
635641
planner_context,
636642
options,
637643
)?;
638-
Ok(wildcard_with_options(planned_options))
644+
645+
let expanded =
646+
expand_wildcard(plan.schema(), plan, Some(&planned_options))?;
647+
648+
// If there is a REPLACE statement, replace that column with the given
649+
// replace expression. Column name remains the same.
650+
if let Some(replace) = planned_options.replace {
651+
replace_columns(expanded, &replace)
652+
} else {
653+
Ok(expanded)
654+
}
639655
}
640656
SelectItem::QualifiedWildcard(object_name, options) => {
641657
Self::check_wildcard_options(&options)?;
@@ -646,7 +662,19 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
646662
planner_context,
647663
options,
648664
)?;
649-
Ok(qualified_wildcard_with_options(qualifier, planned_options))
665+
666+
let expanded = expand_qualified_wildcard(
667+
&qualifier,
668+
plan.schema(),
669+
Some(&planned_options),
670+
)?;
671+
// If there is a REPLACE statement, replace that column with the given
672+
// replace expression. Column name remains the same.
673+
if let Some(replace) = planned_options.replace {
674+
replace_columns(expanded, &replace)
675+
} else {
676+
Ok(expanded)
677+
}
650678
}
651679
}
652680
}
@@ -698,7 +726,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
698726
planner_context,
699727
)
700728
})
701-
.collect::<Result<Vec<_>>>()?;
729+
.collect::<Result<Vec<_>>>()?
730+
.into_iter()
731+
.flatten()
732+
.collect();
702733
let planned_replace = PlannedReplaceSelectItem {
703734
items: replace.items.into_iter().map(|i| *i).collect(),
704735
planned_expressions: replace_expr,
@@ -884,3 +915,26 @@ fn match_window_definitions(
884915
}
885916
Ok(())
886917
}
918+
919+
/// If there is a REPLACE statement in the projected expression in the form of
920+
/// "REPLACE (some_column_within_an_expr AS some_column)", this function replaces
921+
/// that column with the given replace expression. Column name remains the same.
922+
/// Multiple REPLACEs are also possible with comma separations.
923+
fn replace_columns(
924+
mut exprs: Vec<Expr>,
925+
replace: &PlannedReplaceSelectItem,
926+
) -> Result<Vec<Expr>> {
927+
for expr in exprs.iter_mut() {
928+
if let Expr::Column(Column { name, .. }) = expr {
929+
if let Some((_, new_expr)) = replace
930+
.items()
931+
.iter()
932+
.zip(replace.expressions().iter())
933+
.find(|(item, _)| item.column_name.value == *name)
934+
{
935+
*expr = new_expr.clone().alias(name.clone())
936+
}
937+
}
938+
}
939+
Ok(exprs)
940+
}

0 commit comments

Comments
 (0)