Skip to content

Commit b464529

Browse files
committed
Implement optimizer rule to eliminate double distinct
1 parent 7da66be commit b464529

File tree

2 files changed

+159
-0
lines changed

2 files changed

+159
-0
lines changed

dask_planner/src/sql/optimizer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use log::trace;
2929

3030
mod eliminate_agg_distinct;
3131
use eliminate_agg_distinct::EliminateAggDistinct;
32+
mod eliminate_double_distinct;
33+
use eliminate_double_distinct::EliminateDoubleDistinct;
3234

3335
/// Houses the optimization logic for Dask-SQL. This optimization controls the optimizations
3436
/// and their ordering in regards to their impact on the underlying `LogicalPlan` instance
@@ -66,6 +68,7 @@ impl DaskSqlOptimizer {
6668
Arc::new(LimitPushDown::new()),
6769
// Dask-SQL specific optimizations
6870
Arc::new(EliminateAggDistinct::new()),
71+
Arc::new(EliminateDoubleDistinct::new()),
6972
// The previous optimizations added expressions and projections,
7073
// that might benefit from the following rules
7174
Arc::new(SimplifyExpressions::new()),
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
use datafusion_common::Result;
2+
use datafusion_expr::{logical_plan::LogicalPlan, utils::from_plan};
3+
use datafusion_optimizer::{utils, OptimizerConfig, OptimizerRule};
4+
5+
#[derive(Default)]
6+
pub struct EliminateDoubleDistinct {}
7+
8+
impl EliminateDoubleDistinct {
9+
#[allow(missing_docs)]
10+
pub fn new() -> Self {
11+
Self {}
12+
}
13+
}
14+
15+
impl OptimizerRule for EliminateDoubleDistinct {
16+
fn optimize(
17+
&self,
18+
plan: &LogicalPlan,
19+
optimizer_config: &mut OptimizerConfig,
20+
) -> Result<LogicalPlan> {
21+
match plan {
22+
LogicalPlan::Distinct(distinct) => match distinct.input.as_ref() {
23+
LogicalPlan::Distinct(extra_distinct) => {
24+
let input =
25+
utils::optimize_children(self, &extra_distinct.input, optimizer_config)?;
26+
let new_plan = from_plan(plan, &plan.expressions(), &[input])?;
27+
Ok(new_plan)
28+
}
29+
_ => utils::optimize_children(self, plan, optimizer_config),
30+
},
31+
_ => utils::optimize_children(self, plan, optimizer_config),
32+
}
33+
}
34+
35+
fn name(&self) -> &str {
36+
"eliminate_double_distinct"
37+
}
38+
}
39+
40+
#[cfg(test)]
41+
mod tests {
42+
use std::sync::Arc;
43+
44+
use arrow::datatypes::{DataType, Field, Schema};
45+
use datafusion_expr::{
46+
col,
47+
count,
48+
count_distinct,
49+
logical_plan::{builder::LogicalTableSource, LogicalPlanBuilder},
50+
};
51+
52+
use super::*;
53+
use crate::sql::optimizer::DaskSqlOptimizer;
54+
55+
/// Optimize with just the eliminate_double_distinct rule
56+
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
57+
let rule = EliminateDoubleDistinct::new();
58+
let optimized_plan = rule
59+
.optimize(plan, &mut OptimizerConfig::new())
60+
.expect("failed to optimize plan");
61+
let formatted_plan = format!("{}", optimized_plan.display_indent());
62+
assert_eq!(expected, formatted_plan);
63+
}
64+
65+
/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
66+
/// This is mostly used for testing and documentation.
67+
pub fn table_scan(
68+
name: Option<&str>,
69+
table_schema: &Schema,
70+
projection: Option<Vec<usize>>,
71+
) -> Result<LogicalPlanBuilder> {
72+
let tbl_schema = Arc::new(table_schema.clone());
73+
let table_source = Arc::new(LogicalTableSource::new(tbl_schema));
74+
LogicalPlanBuilder::scan(name.unwrap_or("test"), table_source, projection)
75+
}
76+
77+
fn test_table_scan(table_name: &str) -> LogicalPlan {
78+
let schema = Schema::new(vec![
79+
Field::new("a", DataType::UInt32, false),
80+
Field::new("b", DataType::UInt32, false),
81+
Field::new("c", DataType::UInt32, false),
82+
Field::new("d", DataType::UInt32, false),
83+
]);
84+
table_scan(Some(table_name), &schema, None)
85+
.expect("creating scan")
86+
.build()
87+
.expect("building plan")
88+
}
89+
90+
#[test]
91+
fn test_single_double_distinct() -> Result<()> {
92+
let plan = LogicalPlanBuilder::from(test_table_scan("a"))
93+
.distinct()?
94+
.distinct()?
95+
.build()?;
96+
97+
let expected = "Distinct:\
98+
\n TableScan: a";
99+
assert_optimized_plan_eq(&plan, expected);
100+
Ok(())
101+
}
102+
103+
#[test]
104+
fn test_intersect_double_distinct() -> Result<()> {
105+
let left = LogicalPlanBuilder::from(test_table_scan("a"))
106+
.distinct()?
107+
.build()?;
108+
109+
let right = LogicalPlanBuilder::from(test_table_scan("b"))
110+
.distinct()?
111+
.build()?;
112+
113+
let plan = LogicalPlanBuilder::intersect(left, right, false)?;
114+
115+
let expected = "LeftSemi Join: a.a = b.a, a.b = b.b, a.c = b.c, a.d = b.d\
116+
\n Distinct:\
117+
\n TableScan: a\
118+
\n Distinct:\
119+
\n TableScan: b";
120+
assert_optimized_plan_eq(&plan, expected);
121+
Ok(())
122+
}
123+
124+
#[test]
125+
fn test_nested_intersect_double_distinct() -> Result<()> {
126+
let r1 = LogicalPlanBuilder::from(test_table_scan("a"))
127+
.distinct()?
128+
.build()?;
129+
130+
let r2 = LogicalPlanBuilder::from(test_table_scan("b"))
131+
.distinct()?
132+
.build()?;
133+
134+
let l1 = LogicalPlanBuilder::from(test_table_scan("c"))
135+
.distinct()?
136+
.build()?;
137+
138+
let l2 = LogicalPlanBuilder::from(test_table_scan("d"))
139+
.distinct()?
140+
.build()?;
141+
142+
let right = LogicalPlanBuilder::intersect(r1, r2, false)?;
143+
144+
let left = LogicalPlanBuilder::intersect(l1, l2, false)?;
145+
146+
let plan = LogicalPlanBuilder::intersect(left, right, false)?;
147+
148+
let expected = "LeftSemi Join: a.a = b.a, a.b = b.b, a.c = b.c, a.d = b.d\
149+
\n Distinct:\
150+
\n Tablescan: a\
151+
\n Distinct:\
152+
\n Tablescan: b";
153+
assert_optimized_plan_eq(&plan, expected);
154+
Ok(())
155+
}
156+
}

0 commit comments

Comments
 (0)