From e17e18427b48b09156d3330f188d84e6f1878eb0 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Mon, 15 Aug 2022 23:08:35 +0800 Subject: [PATCH 01/14] feat(planner): support union all --- .../service/src/sql/planner/binder/select.rs | 14 ++++ .../service/src/sql/planner/plans/mod.rs | 2 + .../service/src/sql/planner/plans/operator.rs | 7 ++ .../service/src/sql/planner/plans/union.rs | 64 +++++++++++++++++++ 4 files changed, 87 insertions(+) create mode 100644 src/query/service/src/sql/planner/plans/union.rs diff --git a/src/query/service/src/sql/planner/binder/select.rs b/src/query/service/src/sql/planner/binder/select.rs index 89cfdc939ef0d..421b3e47b9838 100644 --- a/src/query/service/src/sql/planner/binder/select.rs +++ b/src/query/service/src/sql/planner/binder/select.rs @@ -38,6 +38,7 @@ use crate::sql::planner::binder::BindContext; use crate::sql::planner::binder::Binder; use crate::sql::plans::BoundColumnRef; use crate::sql::plans::Filter; +use crate::sql::plans::Union; use crate::sql::plans::JoinType; use crate::sql::plans::Scalar; @@ -312,12 +313,25 @@ impl<'a> Binder { // Transfer Except to Anti join self.bind_except(left_bind_context, right_bind_context, left_expr, right_expr) } + (SetOperator::Union, true) => { + self.bind_union(left_expr, right_expr) + } _ => Err(ErrorCode::UnImplement( "Unsupported query type, currently, databend only support intersect distinct and except distinct", )), } } + fn bind_union( + &mut self, + left_expr: SExpr, + right_expr: SExpr, + ) -> Result { + let union_plan = Union {}; + let new_expr = SExpr::create_binary(union_plan.into(), left_expr, right_expr); + Ok(new_expr) + } + fn bind_intersect( &mut self, left_context: BindContext, diff --git a/src/query/service/src/sql/planner/plans/mod.rs b/src/query/service/src/sql/planner/plans/mod.rs index 3cb0e622c4357..b5e8074ea2525 100644 --- a/src/query/service/src/sql/planner/plans/mod.rs +++ b/src/query/service/src/sql/planner/plans/mod.rs @@ -31,6 +31,7 @@ mod project; mod scalar; pub mod share; mod sort; +mod union; use std::fmt::Display; use std::sync::Arc; @@ -109,6 +110,7 @@ pub use scalar::*; pub use share::*; pub use sort::Sort; pub use sort::SortItem; +pub use union::Union; use super::BindContext; use super::MetadataRef; diff --git a/src/query/service/src/sql/planner/plans/operator.rs b/src/query/service/src/sql/planner/plans/operator.rs index e0d5a3718d747..c0c28a1850ffa 100644 --- a/src/query/service/src/sql/planner/plans/operator.rs +++ b/src/query/service/src/sql/planner/plans/operator.rs @@ -18,6 +18,7 @@ use common_exception::Result; use super::aggregate::Aggregate; use super::eval_scalar::EvalScalar; use super::filter::Filter; +use super::union::Union; use super::hash_join::PhysicalHashJoin; use super::limit::Limit; use super::logical_get::LogicalGet; @@ -103,6 +104,7 @@ pub enum RelOperator { Sort(Sort), Limit(Limit), Exchange(Exchange), + Union(Union), Pattern(PatternPlan), } @@ -122,6 +124,7 @@ impl Operator for RelOperator { RelOperator::Limit(rel_op) => rel_op.rel_op(), RelOperator::Pattern(rel_op) => rel_op.rel_op(), RelOperator::Exchange(rel_op) => rel_op.rel_op(), + RelOperator::Union(rel_op) => rel_op.rel_op(), } } @@ -139,6 +142,7 @@ impl Operator for RelOperator { RelOperator::Limit(rel_op) => rel_op.is_physical(), RelOperator::Pattern(rel_op) => rel_op.is_physical(), RelOperator::Exchange(rel_op) => rel_op.is_physical(), + RelOperator::Union(rel_op) => rel_op.is_physical(), } } @@ -156,6 +160,7 @@ impl Operator for RelOperator { RelOperator::Limit(rel_op) => rel_op.is_logical(), RelOperator::Pattern(rel_op) => rel_op.is_logical(), RelOperator::Exchange(rel_op) => rel_op.is_logical(), + RelOperator::Union(rel_op) => rel_op.is_logical(), } } @@ -173,6 +178,7 @@ impl Operator for RelOperator { RelOperator::Limit(rel_op) => rel_op.as_logical(), RelOperator::Pattern(rel_op) => rel_op.as_logical(), RelOperator::Exchange(rel_op) => rel_op.as_logical(), + RelOperator::Union(rel_op) => rel_op.as_logical(), } } @@ -190,6 +196,7 @@ impl Operator for RelOperator { RelOperator::Limit(rel_op) => rel_op.as_physical(), RelOperator::Pattern(rel_op) => rel_op.as_physical(), RelOperator::Exchange(rel_op) => rel_op.as_physical(), + RelOperator::Union(rel_op) => rel_op.as_physical(), } } } diff --git a/src/query/service/src/sql/planner/plans/union.rs b/src/query/service/src/sql/planner/plans/union.rs new file mode 100644 index 0000000000000..e283b37c587b8 --- /dev/null +++ b/src/query/service/src/sql/planner/plans/union.rs @@ -0,0 +1,64 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +use crate::sql::optimizer::{PhysicalProperty, RelationalProperty, RelExpr, RequiredProperty}; +use crate::sql::plans::{LogicalOperator, Operator, PhysicalOperator, RelOp}; +use common_exception::Result; + + +pub struct Union; + +impl Operator for Union { + fn rel_op(&self) -> RelOp { + RelOp::Union + } + + fn is_physical(&self) -> bool { + true + } + + fn is_logical(&self) -> bool { + true + } + + fn as_physical(&self) -> Option<&dyn PhysicalOperator> { + Some(self) + } + + fn as_logical(&self) -> Option<&dyn LogicalOperator> { + Some(self) + } +} + +impl PhysicalOperator for Union { + fn derive_physical_prop<'a>(&self, rel_expr: &RelExpr<'a>) -> Result { + todo!() + } + + fn compute_required_prop_child<'a>( + &self, + _rel_expr: &RelExpr<'a>, + _child_index: usize, + required: &RequiredProperty, + ) -> Result { + todo!() + } +} + +impl LogicalOperator for Union { + fn derive_relational_prop<'a>(&self, rel_expr: &RelExpr<'a>) -> Result { + todo!() + } +} \ No newline at end of file From 573fcfcafad70c3c89d3f4e6afee45568cd5c63c Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 17 Aug 2022 10:54:52 +0800 Subject: [PATCH 02/14] finish demo --- .../sinks/src/processors/sinks/mod.rs | 2 + .../processors/sinks/union_receive_sink.rs | 44 ++++++ .../service/src/pipelines/executor/mod.rs | 1 + .../executor/pipeline_pulling_executor.rs | 2 +- .../pipelines/processors/transforms/mod.rs | 2 + .../transforms/transform_merge_block.rs | 130 ++++++++++++++++++ .../service/src/sql/executor/physical_plan.rs | 18 +++ .../src/sql/executor/physical_plan_builder.rs | 11 +- .../src/sql/executor/physical_plan_display.rs | 8 ++ .../src/sql/executor/physical_plan_visitor.rs | 16 +++ .../src/sql/executor/pipeline_builder.rs | 61 ++++++++ .../sql/optimizer/heuristic/decorrelate.rs | 13 ++ .../optimizer/heuristic/subquery_rewriter.rs | 2 +- .../service/src/sql/planner/binder/select.rs | 10 +- .../planner/format/display_rel_operator.rs | 1 + .../service/src/sql/planner/plans/operator.rs | 22 ++- .../service/src/sql/planner/plans/union.rs | 14 +- 17 files changed, 342 insertions(+), 15 deletions(-) create mode 100644 src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs create mode 100644 src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs diff --git a/src/common/pipeline/sinks/src/processors/sinks/mod.rs b/src/common/pipeline/sinks/src/processors/sinks/mod.rs index b741c95b00393..cdcd279fdb4cf 100644 --- a/src/common/pipeline/sinks/src/processors/sinks/mod.rs +++ b/src/common/pipeline/sinks/src/processors/sinks/mod.rs @@ -18,6 +18,7 @@ mod empty_sink; mod subquery_receive_sink; mod sync_sink; mod sync_sink_sender; +mod union_receive_sink; pub use async_sink::AsyncSink; pub use async_sink::AsyncSinker; @@ -27,3 +28,4 @@ pub use subquery_receive_sink::SubqueryReceiveSink; pub use sync_sink::Sink; pub use sync_sink::Sinker; pub use sync_sink_sender::SyncSenderSink; +pub use union_receive_sink::UnionReceiveSink; diff --git a/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs b/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs new file mode 100644 index 0000000000000..4ca5b23b7b7f5 --- /dev/null +++ b/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs @@ -0,0 +1,44 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_base::base::tokio::sync::broadcast::Sender; +use common_datablocks::DataBlock; +use common_exception::Result; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::processor::ProcessorPtr; + +use crate::processors::sinks::Sink; +use crate::processors::sinks::Sinker; + +pub struct UnionReceiveSink { + sender: Sender, +} + +impl UnionReceiveSink { + pub fn create(sender: Sender, input: Arc) -> ProcessorPtr { + Sinker::create(input, UnionReceiveSink { sender }) + } +} + +#[async_trait::async_trait] +impl Sink for UnionReceiveSink { + const NAME: &'static str = "UnionReceiveSink"; + + fn consume(&mut self, data_block: DataBlock) -> Result<()> { + self.sender.send(data_block).ok(); + Ok(()) + } +} diff --git a/src/query/service/src/pipelines/executor/mod.rs b/src/query/service/src/pipelines/executor/mod.rs index db476f2dfa3be..5e95546e836c0 100644 --- a/src/query/service/src/pipelines/executor/mod.rs +++ b/src/query/service/src/pipelines/executor/mod.rs @@ -28,4 +28,5 @@ pub use pipeline_complete_executor::PipelineCompleteExecutor; pub use pipeline_executor::FinishedCallback; pub use pipeline_executor::PipelineExecutor; pub use pipeline_pulling_executor::PipelinePullingExecutor; +pub use pipeline_pulling_executor::PullingSink; pub use pipeline_pushing_executor::PipelinePushingExecutor; diff --git a/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs b/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs index f51092e9efeb6..55d9371f5ae45 100644 --- a/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs @@ -184,7 +184,7 @@ impl Drop for PipelinePullingExecutor { } } -struct PullingSink { +pub struct PullingSink { sender: SyncSender>>, } diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index 36cc59d5255c3..8fb7aa3e81b00 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -37,6 +37,7 @@ mod transform_rename; mod transform_window_func; pub mod group_by; +mod transform_merge_block; pub use aggregator::AggregatorParams; pub use aggregator::AggregatorTransformParams; @@ -75,6 +76,7 @@ pub use transform_limit::TransformLimit; pub use transform_limit_by::TransformLimitBy; pub use transform_mark_join::MarkJoinCompactor; pub use transform_mark_join::TransformMarkJoin; +pub use transform_merge_block::TransformMergeBlock; pub use transform_project::TransformProject; pub use transform_rename::TransformRename; pub use transform_sort_merge::SortMergeCompactor; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs new file mode 100644 index 0000000000000..0a6c03bb308a2 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs @@ -0,0 +1,130 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use common_base::base::tokio::sync::broadcast::Receiver; +use common_datablocks::DataBlock; +use common_exception::Result; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_core::processors::Processor; +use poem::web::Data; + +pub struct TransformMergeBlock { + initialized: bool, + input: Arc, + output: Arc, + + input_data: Option, + output_data: Option, + + receiver: Receiver, + receiver_result: Option, +} + +impl TransformMergeBlock { + pub fn create( + input: Arc, + output: Arc, + receiver: Receiver, + ) -> ProcessorPtr { + ProcessorPtr::create(Box::new(TransformMergeBlock { + initialized: false, + input, + output, + input_data: None, + output_data: None, + receiver, + receiver_result: None, + })) + } +} + +#[async_trait::async_trait] +impl Processor for TransformMergeBlock { + fn name(&self) -> &'static str { + "TransformMergeBlock" + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if !self.initialized { + return Ok(Event::Async); + } + + if self.output.is_finished() { + self.input.finish(); + return Ok(Event::Finished); + } + + if !self.output.can_push() { + self.input.set_not_need_data(); + return Ok(Event::NeedConsume); + } + + if let Some(output_data) = self.output_data.take() { + self.output.push_data(Ok(output_data)); + return Ok(Event::NeedConsume); + } + + if self.input_data.is_some() { + return Ok(Event::Sync); + } + + if self.input.is_finished() { + self.output.finish(); + return Ok(Event::Finished); + } + + if self.input.has_data() { + self.input_data = Some(self.input.pull_data().unwrap()?); + return Ok(Event::Sync); + } + + self.input.set_need_data(); + Ok(Event::NeedData) + } + + fn process(&mut self) -> Result<()> { + if let Some(input_data) = self.input_data.take() { + if let Some(receiver_result) = self.receiver_result.take() { + dbg!(receiver_result.clone()); + let data_block = DataBlock::create( + input_data.schema().clone(), + receiver_result.columns().to_vec(), + ); + self.output_data = Some(DataBlock::concat_blocks(&vec![input_data, data_block])?); + } else { + self.output_data = Some(input_data); + } + } + + Ok(()) + } + + async fn async_process(&mut self) -> Result<()> { + if !self.initialized { + self.initialized = true; + self.receiver_result = self.receiver.recv().await.ok(); + } + Ok(()) + } +} diff --git a/src/query/service/src/sql/executor/physical_plan.rs b/src/query/service/src/sql/executor/physical_plan.rs index 2d20a7ea73535..f98fd84a3ae3e 100644 --- a/src/query/service/src/sql/executor/physical_plan.rs +++ b/src/query/service/src/sql/executor/physical_plan.rs @@ -281,6 +281,19 @@ impl ExchangeSink { } } +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct Union { + pub left: Box, + pub right: Box, + pub schema: DataSchemaRef, +} + +impl Union { + pub fn output_schema(&self) -> Result { + Ok(self.schema.clone()) + } +} + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum PhysicalPlan { TableScan(TableScan), @@ -293,6 +306,7 @@ pub enum PhysicalPlan { Limit(Limit), HashJoin(HashJoin), Exchange(Exchange), + Union(Union), /// Synthesized by fragmenter ExchangeSource(ExchangeSource), @@ -322,6 +336,7 @@ impl PhysicalPlan { PhysicalPlan::Exchange(plan) => plan.output_schema(), PhysicalPlan::ExchangeSource(plan) => plan.output_schema(), PhysicalPlan::ExchangeSink(plan) => plan.output_schema(), + PhysicalPlan::Union(plan) => plan.output_schema(), } } @@ -341,6 +356,9 @@ impl PhysicalPlan { PhysicalPlan::Exchange(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::ExchangeSource(_) => Box::new(std::iter::empty()), PhysicalPlan::ExchangeSink(plan) => Box::new(std::iter::once(plan.input.as_ref())), + PhysicalPlan::Union(plan) => Box::new( + std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())), + ), } } } diff --git a/src/query/service/src/sql/executor/physical_plan_builder.rs b/src/query/service/src/sql/executor/physical_plan_builder.rs index 76d05aaa2bbc6..fd31fb6039fd7 100644 --- a/src/query/service/src/sql/executor/physical_plan_builder.rs +++ b/src/query/service/src/sql/executor/physical_plan_builder.rs @@ -41,6 +41,7 @@ use crate::sql::executor::ExpressionBuilderWithoutRenaming; use crate::sql::executor::PhysicalPlan; use crate::sql::executor::PhysicalScalar; use crate::sql::executor::SortDesc; +use crate::sql::executor::Union; use crate::sql::optimizer::SExpr; use crate::sql::plans::AggregateMode; use crate::sql::plans::Exchange; @@ -323,7 +324,15 @@ impl PhysicalPlanBuilder { keys, })) } - + RelOperator::Union(_) => { + let left = self.build(s_expr.child(0)?).await?; + let schema = left.output_schema()?; + Ok(PhysicalPlan::Union(Union { + left: Box::new(left), + right: Box::new(self.build(s_expr.child(1)?).await?), + schema, + })) + } _ => Err(ErrorCode::LogicalError(format!( "Unsupported physical plan: {:?}", s_expr.plan() diff --git a/src/query/service/src/sql/executor/physical_plan_display.rs b/src/query/service/src/sql/executor/physical_plan_display.rs index 88c8cad72e3e3..d1c517ddf2cd0 100644 --- a/src/query/service/src/sql/executor/physical_plan_display.rs +++ b/src/query/service/src/sql/executor/physical_plan_display.rs @@ -32,6 +32,7 @@ use crate::sql::executor::PhysicalScalar; use crate::sql::executor::Project; use crate::sql::executor::Sort; use crate::sql::executor::TableScan; +use crate::sql::executor::Union; use crate::sql::plans::JoinType; impl PhysicalPlan { @@ -62,6 +63,7 @@ impl<'a> Display for PhysicalPlanIndentFormatDisplay<'a> { PhysicalPlan::Exchange(exchange) => write!(f, "{}", exchange)?, PhysicalPlan::ExchangeSource(source) => write!(f, "{}", source)?, PhysicalPlan::ExchangeSink(sink) => write!(f, "{}", sink)?, + PhysicalPlan::Union(union) => write!(f, "{}", union)?, } for node in self.node.children() { @@ -305,3 +307,9 @@ impl Display for ExchangeSink { ) } } + +impl Display for Union { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Union") + } +} diff --git a/src/query/service/src/sql/executor/physical_plan_visitor.rs b/src/query/service/src/sql/executor/physical_plan_visitor.rs index 6e77db4511f56..7ff5250a1825f 100644 --- a/src/query/service/src/sql/executor/physical_plan_visitor.rs +++ b/src/query/service/src/sql/executor/physical_plan_visitor.rs @@ -27,6 +27,7 @@ use super::PhysicalPlan; use super::Project; use super::Sort; use super::TableScan; +use crate::sql::executor::Union; pub trait PhysicalPlanReplacer { fn replace(&mut self, plan: &PhysicalPlan) -> Result { @@ -43,6 +44,7 @@ pub trait PhysicalPlanReplacer { PhysicalPlan::Exchange(plan) => self.replace_exchange(plan), PhysicalPlan::ExchangeSource(plan) => self.replace_exchange_source(plan), PhysicalPlan::ExchangeSink(plan) => self.replace_exchange_sink(plan), + PhysicalPlan::Union(plan) => self.replace_union(plan), } } @@ -160,6 +162,16 @@ pub trait PhysicalPlanReplacer { query_id: plan.query_id.clone(), })) } + + fn replace_union(&mut self, plan: &Union) -> Result { + let left = self.replace(&plan.left)?; + let right = self.replace(&plan.right)?; + Ok(PhysicalPlan::Union(Union { + left: Box::new(left), + right: Box::new(right), + schema: plan.schema.clone(), + })) + } } impl PhysicalPlan { @@ -205,6 +217,10 @@ impl PhysicalPlan { PhysicalPlan::ExchangeSink(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); } + PhysicalPlan::Union(plan) => { + Self::traverse(&plan.left, pre_visit, visit, post_visit); + Self::traverse(&plan.right, pre_visit, visit, post_visit); + } } post_visit(plan); } diff --git a/src/query/service/src/sql/executor/pipeline_builder.rs b/src/query/service/src/sql/executor/pipeline_builder.rs index 1f4a8c217a2da..82a935cb2af00 100644 --- a/src/query/service/src/sql/executor/pipeline_builder.rs +++ b/src/query/service/src/sql/executor/pipeline_builder.rs @@ -14,15 +14,23 @@ use std::sync::Arc; +use common_base::base::tokio::sync::broadcast::channel; +use common_base::base::tokio::sync::broadcast::Receiver; +use common_datablocks::DataBlock; use common_datablocks::SortColumnDescription; use common_datavalues::DataField; use common_datavalues::DataSchemaRef; use common_datavalues::DataSchemaRefExt; +use common_datavalues::DataValue; use common_exception::ErrorCode; use common_exception::Result; use common_functions::aggregates::AggregateFunctionFactory; use common_functions::aggregates::AggregateFunctionRef; use common_functions::scalars::FunctionFactory; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::Pipe; +use common_pipeline_sinks::processors::sinks::SyncSenderSink; +use common_pipeline_sinks::processors::sinks::UnionReceiveSink; use super::AggregateFinal; use super::AggregatePartial; @@ -40,8 +48,10 @@ use crate::evaluator::Evaluator; use crate::pipelines::processors::port::InputPort; use crate::pipelines::processors::transforms::ExpressionTransformV2; use crate::pipelines::processors::transforms::HashJoinDesc; +use crate::pipelines::processors::transforms::SubqueryReceiver; use crate::pipelines::processors::transforms::TransformFilterV2; use crate::pipelines::processors::transforms::TransformMarkJoin; +use crate::pipelines::processors::transforms::TransformMergeBlock; use crate::pipelines::processors::transforms::TransformProject; use crate::pipelines::processors::transforms::TransformRename; use crate::pipelines::processors::AggregatorParams; @@ -52,6 +62,7 @@ use crate::pipelines::processors::SinkBuildHashTable; use crate::pipelines::processors::Sinker; use crate::pipelines::processors::SortMergeCompactor; use crate::pipelines::processors::TransformAggregator; +use crate::pipelines::processors::TransformCreateSets; use crate::pipelines::processors::TransformHashJoinProbe; use crate::pipelines::processors::TransformLimit; use crate::pipelines::processors::TransformSortMerge; @@ -65,6 +76,7 @@ use crate::sql::executor::physical_plan::ColumnID; use crate::sql::executor::physical_plan::PhysicalPlan; use crate::sql::executor::AggregateFunctionDesc; use crate::sql::executor::PhysicalScalar; +use crate::sql::executor::Union; use crate::sql::plans::JoinType; use crate::sql::ColumnBinding; @@ -113,6 +125,7 @@ impl PipelineBuilder { PhysicalPlan::HashJoin(join) => self.build_join(join), PhysicalPlan::ExchangeSink(sink) => self.build_exchange_sink(sink), PhysicalPlan::ExchangeSource(source) => self.build_exchange_source(source), + PhysicalPlan::Union(union) => self.build_union(union), PhysicalPlan::Exchange(_) => Err(ErrorCode::LogicalError( "Invalid physical plan with PhysicalPlan::Exchange", )), @@ -472,4 +485,52 @@ impl PipelineBuilder { // ExchangeSink will be appended by `ExchangeManager::execute_pipeline` self.build_pipeline(&exchange_sink.input) } + + fn expand_union(&mut self, plan: &PhysicalPlan) -> Result> { + let union_ctx = QueryContext::create_from(self.ctx.clone()); + let pipeline_builder = PipelineBuilder::create(union_ctx); + let mut build_res = pipeline_builder.finalize(plan)?; + + assert!(build_res.main_pipeline.is_pulling_pipeline()?); + + build_res.main_pipeline.resize(1)?; + let (tx, rx) = channel(1); + let input = InputPort::create(); + build_res.main_pipeline.add_pipe(Pipe::SimplePipe { + outputs_port: vec![], + inputs_port: vec![input.clone()], + processors: vec![UnionReceiveSink::create(tx, input)], + }); + self.pipelines.push(build_res.main_pipeline); + self.pipelines + .extend(build_res.sources_pipelines.into_iter()); + Ok(rx) + } + + pub fn build_union(&mut self, union: &Union) -> Result<()> { + self.build_pipeline(&union.left)?; + let union_receiver = self.expand_union(&union.right)?; + let mut inputs_port = Vec::with_capacity(self.main_pipeline.output_len()); + let mut outputs_port = Vec::with_capacity(self.main_pipeline.output_len()); + let mut processors = Vec::with_capacity(self.main_pipeline.output_len()); + for _ in 0..self.main_pipeline.output_len() { + let transform_input_port = InputPort::create(); + let transform_output_port = OutputPort::create(); + + inputs_port.push(transform_input_port.clone()); + outputs_port.push(transform_output_port.clone()); + processors.push(TransformMergeBlock::create( + transform_input_port, + transform_output_port, + union_receiver.resubscribe(), + )); + } + + self.main_pipeline.add_pipe(Pipe::SimplePipe { + processors, + inputs_port, + outputs_port, + }); + Ok(()) + } } diff --git a/src/query/service/src/sql/optimizer/heuristic/decorrelate.rs b/src/query/service/src/sql/optimizer/heuristic/decorrelate.rs index 9617d2f4438cd..1ab3ac72cc680 100644 --- a/src/query/service/src/sql/optimizer/heuristic/decorrelate.rs +++ b/src/query/service/src/sql/optimizer/heuristic/decorrelate.rs @@ -51,6 +51,7 @@ use crate::sql::plans::Scalar; use crate::sql::plans::ScalarItem; use crate::sql::plans::SubqueryExpr; use crate::sql::plans::SubqueryType; +use crate::sql::plans::Union; use crate::sql::ColumnBinding; use crate::sql::IndexType; use crate::sql::MetadataRef; @@ -631,6 +632,18 @@ impl SubqueryRewriter { Ok(SExpr::create_unary(plan.plan().clone(), flatten_plan)) } + RelOperator::Union(_) => { + let left_flatten_plan = + self.flatten(plan.child(0)?, correlated_columns, flatten_info)?; + let right_flatten_plan = + self.flatten(plan.child(1)?, correlated_columns, flatten_info)?; + Ok(SExpr::create_binary( + Union {}.into(), + left_flatten_plan, + right_flatten_plan, + )) + } + RelOperator::Exchange(_) | RelOperator::Pattern(_) | RelOperator::LogicalGet(_) diff --git a/src/query/service/src/sql/optimizer/heuristic/subquery_rewriter.rs b/src/query/service/src/sql/optimizer/heuristic/subquery_rewriter.rs index bd9f77c6c0770..fb838bca0afe4 100644 --- a/src/query/service/src/sql/optimizer/heuristic/subquery_rewriter.rs +++ b/src/query/service/src/sql/optimizer/heuristic/subquery_rewriter.rs @@ -119,7 +119,7 @@ impl SubqueryRewriter { Ok(SExpr::create_unary(plan.into(), input)) } - RelOperator::LogicalInnerJoin(_) => Ok(SExpr::create_binary( + RelOperator::LogicalInnerJoin(_) | RelOperator::Union(_) => Ok(SExpr::create_binary( s_expr.plan().clone(), self.rewrite(s_expr.child(0)?)?, self.rewrite(s_expr.child(1)?)?, diff --git a/src/query/service/src/sql/planner/binder/select.rs b/src/query/service/src/sql/planner/binder/select.rs index 421b3e47b9838..1d1aff339a6d2 100644 --- a/src/query/service/src/sql/planner/binder/select.rs +++ b/src/query/service/src/sql/planner/binder/select.rs @@ -38,9 +38,9 @@ use crate::sql::planner::binder::BindContext; use crate::sql::planner::binder::Binder; use crate::sql::plans::BoundColumnRef; use crate::sql::plans::Filter; -use crate::sql::plans::Union; use crate::sql::plans::JoinType; use crate::sql::plans::Scalar; +use crate::sql::plans::Union; // A normalized IR for `SELECT` clause. #[derive(Debug, Default)] @@ -314,7 +314,7 @@ impl<'a> Binder { self.bind_except(left_bind_context, right_bind_context, left_expr, right_expr) } (SetOperator::Union, true) => { - self.bind_union(left_expr, right_expr) + Ok((self.bind_union(left_expr, right_expr)?, left_bind_context)) } _ => Err(ErrorCode::UnImplement( "Unsupported query type, currently, databend only support intersect distinct and except distinct", @@ -322,11 +322,7 @@ impl<'a> Binder { } } - fn bind_union( - &mut self, - left_expr: SExpr, - right_expr: SExpr, - ) -> Result { + fn bind_union(&mut self, left_expr: SExpr, right_expr: SExpr) -> Result { let union_plan = Union {}; let new_expr = SExpr::create_binary(union_plan.into(), left_expr, right_expr); Ok(new_expr) diff --git a/src/query/service/src/sql/planner/format/display_rel_operator.rs b/src/query/service/src/sql/planner/format/display_rel_operator.rs index 768f0807c2f96..9bd5d61acc924 100644 --- a/src/query/service/src/sql/planner/format/display_rel_operator.rs +++ b/src/query/service/src/sql/planner/format/display_rel_operator.rs @@ -78,6 +78,7 @@ impl Display for FormatContext { RelOperator::Sort(op) => format_sort(f, &self.metadata, op), RelOperator::Limit(op) => format_limit(f, &self.metadata, op), RelOperator::Exchange(op) => format_exchange(f, &self.metadata, op), + RelOperator::Union(_) => write!(f, "UNION"), RelOperator::Pattern(_) => write!(f, "Pattern"), } } diff --git a/src/query/service/src/sql/planner/plans/operator.rs b/src/query/service/src/sql/planner/plans/operator.rs index c0c28a1850ffa..ad1544080dc8c 100644 --- a/src/query/service/src/sql/planner/plans/operator.rs +++ b/src/query/service/src/sql/planner/plans/operator.rs @@ -18,7 +18,6 @@ use common_exception::Result; use super::aggregate::Aggregate; use super::eval_scalar::EvalScalar; use super::filter::Filter; -use super::union::Union; use super::hash_join::PhysicalHashJoin; use super::limit::Limit; use super::logical_get::LogicalGet; @@ -27,6 +26,7 @@ use super::pattern::PatternPlan; use super::physical_scan::PhysicalScan; use super::project::Project; use super::sort::Sort; +use super::union::Union; use crate::sql::optimizer::PhysicalProperty; use crate::sql::optimizer::RelExpr; use crate::sql::optimizer::RelationalProperty; @@ -83,6 +83,7 @@ pub enum RelOp { Sort, Limit, Exchange, + Union, // Pattern Pattern, @@ -429,3 +430,22 @@ impl TryFrom for Exchange { } } } + +impl From for RelOperator { + fn from(v: Union) -> Self { + Self::Union(v) + } +} + +impl TryFrom for Union { + type Error = ErrorCode; + fn try_from(value: RelOperator) -> Result { + if let RelOperator::Union(value) = value { + Ok(value) + } else { + Err(ErrorCode::LogicalError( + "Cannot downcast RelOperator to Union", + )) + } + } +} diff --git a/src/query/service/src/sql/planner/plans/union.rs b/src/query/service/src/sql/planner/plans/union.rs index e283b37c587b8..5c1a0a46bd719 100644 --- a/src/query/service/src/sql/planner/plans/union.rs +++ b/src/query/service/src/sql/planner/plans/union.rs @@ -12,12 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. - -use crate::sql::optimizer::{PhysicalProperty, RelationalProperty, RelExpr, RequiredProperty}; -use crate::sql::plans::{LogicalOperator, Operator, PhysicalOperator, RelOp}; use common_exception::Result; +use crate::sql::optimizer::PhysicalProperty; +use crate::sql::optimizer::RelExpr; +use crate::sql::optimizer::RelationalProperty; +use crate::sql::optimizer::RequiredProperty; +use crate::sql::plans::LogicalOperator; +use crate::sql::plans::Operator; +use crate::sql::plans::PhysicalOperator; +use crate::sql::plans::RelOp; +#[derive(Clone, Debug)] pub struct Union; impl Operator for Union { @@ -61,4 +67,4 @@ impl LogicalOperator for Union { fn derive_relational_prop<'a>(&self, rel_expr: &RelExpr<'a>) -> Result { todo!() } -} \ No newline at end of file +} From a0a682eececd21c2e8f33206e175d1e76402f80d Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 17 Aug 2022 13:55:48 +0800 Subject: [PATCH 03/14] cargo fix --- .../processors/transforms/transform_merge_block.rs | 2 +- src/query/service/src/sql/executor/pipeline_builder.rs | 8 ++++---- src/query/service/src/sql/planner/plans/union.rs | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs index 0a6c03bb308a2..f7a81d6d0297c 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs @@ -23,7 +23,7 @@ use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_core::processors::Processor; -use poem::web::Data; + pub struct TransformMergeBlock { initialized: bool, diff --git a/src/query/service/src/sql/executor/pipeline_builder.rs b/src/query/service/src/sql/executor/pipeline_builder.rs index 82a935cb2af00..e6227f26392f9 100644 --- a/src/query/service/src/sql/executor/pipeline_builder.rs +++ b/src/query/service/src/sql/executor/pipeline_builder.rs @@ -21,7 +21,7 @@ use common_datablocks::SortColumnDescription; use common_datavalues::DataField; use common_datavalues::DataSchemaRef; use common_datavalues::DataSchemaRefExt; -use common_datavalues::DataValue; + use common_exception::ErrorCode; use common_exception::Result; use common_functions::aggregates::AggregateFunctionFactory; @@ -29,7 +29,7 @@ use common_functions::aggregates::AggregateFunctionRef; use common_functions::scalars::FunctionFactory; use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::Pipe; -use common_pipeline_sinks::processors::sinks::SyncSenderSink; + use common_pipeline_sinks::processors::sinks::UnionReceiveSink; use super::AggregateFinal; @@ -48,7 +48,7 @@ use crate::evaluator::Evaluator; use crate::pipelines::processors::port::InputPort; use crate::pipelines::processors::transforms::ExpressionTransformV2; use crate::pipelines::processors::transforms::HashJoinDesc; -use crate::pipelines::processors::transforms::SubqueryReceiver; + use crate::pipelines::processors::transforms::TransformFilterV2; use crate::pipelines::processors::transforms::TransformMarkJoin; use crate::pipelines::processors::transforms::TransformMergeBlock; @@ -62,7 +62,7 @@ use crate::pipelines::processors::SinkBuildHashTable; use crate::pipelines::processors::Sinker; use crate::pipelines::processors::SortMergeCompactor; use crate::pipelines::processors::TransformAggregator; -use crate::pipelines::processors::TransformCreateSets; + use crate::pipelines::processors::TransformHashJoinProbe; use crate::pipelines::processors::TransformLimit; use crate::pipelines::processors::TransformSortMerge; diff --git a/src/query/service/src/sql/planner/plans/union.rs b/src/query/service/src/sql/planner/plans/union.rs index 5c1a0a46bd719..c64c5caef168a 100644 --- a/src/query/service/src/sql/planner/plans/union.rs +++ b/src/query/service/src/sql/planner/plans/union.rs @@ -49,7 +49,7 @@ impl Operator for Union { } impl PhysicalOperator for Union { - fn derive_physical_prop<'a>(&self, rel_expr: &RelExpr<'a>) -> Result { + fn derive_physical_prop<'a>(&self, _rel_expr: &RelExpr<'a>) -> Result { todo!() } @@ -57,14 +57,14 @@ impl PhysicalOperator for Union { &self, _rel_expr: &RelExpr<'a>, _child_index: usize, - required: &RequiredProperty, + _required: &RequiredProperty, ) -> Result { todo!() } } impl LogicalOperator for Union { - fn derive_relational_prop<'a>(&self, rel_expr: &RelExpr<'a>) -> Result { + fn derive_relational_prop<'a>(&self, _rel_expr: &RelExpr<'a>) -> Result { todo!() } } From afcc9422a299f55f2a9192308fba8712b6d0cb20 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 17 Aug 2022 17:14:08 +0800 Subject: [PATCH 04/14] union all now works --- .../src/processors/sinks/union_receive_sink.rs | 15 +++++++++++++-- .../transforms/transform_merge_block.rs | 2 +- .../service/src/sql/executor/pipeline_builder.rs | 5 +---- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs b/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs index 4ca5b23b7b7f5..588a363ca3e18 100644 --- a/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs +++ b/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs @@ -24,12 +24,16 @@ use crate::processors::sinks::Sink; use crate::processors::sinks::Sinker; pub struct UnionReceiveSink { + input_blocks: Vec, sender: Sender, } impl UnionReceiveSink { pub fn create(sender: Sender, input: Arc) -> ProcessorPtr { - Sinker::create(input, UnionReceiveSink { sender }) + Sinker::create(input, UnionReceiveSink { + input_blocks: vec![], + sender, + }) } } @@ -37,8 +41,15 @@ impl UnionReceiveSink { impl Sink for UnionReceiveSink { const NAME: &'static str = "UnionReceiveSink"; + fn on_finish(&mut self) -> Result<()> { + self.sender + .send(DataBlock::concat_blocks(&self.input_blocks)?) + .ok(); + Ok(()) + } + fn consume(&mut self, data_block: DataBlock) -> Result<()> { - self.sender.send(data_block).ok(); + self.input_blocks.push(data_block); Ok(()) } } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs index f7a81d6d0297c..e5753610bfa72 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs @@ -24,7 +24,6 @@ use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_core::processors::Processor; - pub struct TransformMergeBlock { initialized: bool, input: Arc, @@ -105,6 +104,7 @@ impl Processor for TransformMergeBlock { fn process(&mut self) -> Result<()> { if let Some(input_data) = self.input_data.take() { + dbg!(input_data.clone()); if let Some(receiver_result) = self.receiver_result.take() { dbg!(receiver_result.clone()); let data_block = DataBlock::create( diff --git a/src/query/service/src/sql/executor/pipeline_builder.rs b/src/query/service/src/sql/executor/pipeline_builder.rs index e6227f26392f9..da371ae2a6d46 100644 --- a/src/query/service/src/sql/executor/pipeline_builder.rs +++ b/src/query/service/src/sql/executor/pipeline_builder.rs @@ -21,7 +21,6 @@ use common_datablocks::SortColumnDescription; use common_datavalues::DataField; use common_datavalues::DataSchemaRef; use common_datavalues::DataSchemaRefExt; - use common_exception::ErrorCode; use common_exception::Result; use common_functions::aggregates::AggregateFunctionFactory; @@ -29,7 +28,6 @@ use common_functions::aggregates::AggregateFunctionRef; use common_functions::scalars::FunctionFactory; use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::Pipe; - use common_pipeline_sinks::processors::sinks::UnionReceiveSink; use super::AggregateFinal; @@ -48,7 +46,6 @@ use crate::evaluator::Evaluator; use crate::pipelines::processors::port::InputPort; use crate::pipelines::processors::transforms::ExpressionTransformV2; use crate::pipelines::processors::transforms::HashJoinDesc; - use crate::pipelines::processors::transforms::TransformFilterV2; use crate::pipelines::processors::transforms::TransformMarkJoin; use crate::pipelines::processors::transforms::TransformMergeBlock; @@ -62,7 +59,6 @@ use crate::pipelines::processors::SinkBuildHashTable; use crate::pipelines::processors::Sinker; use crate::pipelines::processors::SortMergeCompactor; use crate::pipelines::processors::TransformAggregator; - use crate::pipelines::processors::TransformHashJoinProbe; use crate::pipelines::processors::TransformLimit; use crate::pipelines::processors::TransformSortMerge; @@ -513,6 +509,7 @@ impl PipelineBuilder { let mut inputs_port = Vec::with_capacity(self.main_pipeline.output_len()); let mut outputs_port = Vec::with_capacity(self.main_pipeline.output_len()); let mut processors = Vec::with_capacity(self.main_pipeline.output_len()); + self.main_pipeline.resize(1)?; for _ in 0..self.main_pipeline.output_len() { let transform_input_port = InputPort::create(); let transform_output_port = OutputPort::create(); From 5f156ac77f346556aab1b35d4be7f0ff56acfb8d Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 17 Aug 2022 18:46:06 +0800 Subject: [PATCH 05/14] support union empty --- .../processors/sinks/union_receive_sink.rs | 19 +++++++---- .../transforms/transform_merge_block.rs | 15 ++++---- .../src/sql/executor/pipeline_builder.rs | 34 +++++++------------ 3 files changed, 34 insertions(+), 34 deletions(-) diff --git a/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs b/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs index 588a363ca3e18..a9969bc45b808 100644 --- a/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs +++ b/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs @@ -14,8 +14,9 @@ use std::sync::Arc; -use common_base::base::tokio::sync::broadcast::Sender; +use common_base::base::tokio::sync::mpsc::Sender; use common_datablocks::DataBlock; +use common_exception::ErrorCode; use common_exception::Result; use common_pipeline_core::processors::port::InputPort; use common_pipeline_core::processors::processor::ProcessorPtr; @@ -25,11 +26,11 @@ use crate::processors::sinks::Sinker; pub struct UnionReceiveSink { input_blocks: Vec, - sender: Sender, + sender: Sender>, } impl UnionReceiveSink { - pub fn create(sender: Sender, input: Arc) -> ProcessorPtr { + pub fn create(sender: Sender>, input: Arc) -> ProcessorPtr { Sinker::create(input, UnionReceiveSink { input_blocks: vec![], sender, @@ -42,9 +43,15 @@ impl Sink for UnionReceiveSink { const NAME: &'static str = "UnionReceiveSink"; fn on_finish(&mut self) -> Result<()> { - self.sender - .send(DataBlock::concat_blocks(&self.input_blocks)?) - .ok(); + let send_blocks = if self.input_blocks.is_empty() { + None + } else { + Some(DataBlock::concat_blocks(&self.input_blocks)?) + }; + if let Err(_) = self.sender.try_send(send_blocks) { + return Err(ErrorCode::UnexpectedError("UnionReceiveSink sender failed")); + }; + Ok(()) } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs index e5753610bfa72..59db16b7c12fe 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs @@ -15,7 +15,7 @@ use std::any::Any; use std::sync::Arc; -use common_base::base::tokio::sync::broadcast::Receiver; +use common_base::base::tokio::sync::mpsc::Receiver; use common_datablocks::DataBlock; use common_exception::Result; use common_pipeline_core::processors::port::InputPort; @@ -32,7 +32,7 @@ pub struct TransformMergeBlock { input_data: Option, output_data: Option, - receiver: Receiver, + receiver: Receiver>, receiver_result: Option, } @@ -40,7 +40,7 @@ impl TransformMergeBlock { pub fn create( input: Arc, output: Arc, - receiver: Receiver, + receiver: Receiver>, ) -> ProcessorPtr { ProcessorPtr::create(Box::new(TransformMergeBlock { initialized: false, @@ -103,10 +103,10 @@ impl Processor for TransformMergeBlock { } fn process(&mut self) -> Result<()> { + dbg!("come here"); if let Some(input_data) = self.input_data.take() { - dbg!(input_data.clone()); + dbg!(&input_data); if let Some(receiver_result) = self.receiver_result.take() { - dbg!(receiver_result.clone()); let data_block = DataBlock::create( input_data.schema().clone(), receiver_result.columns().to_vec(), @@ -115,6 +115,9 @@ impl Processor for TransformMergeBlock { } else { self.output_data = Some(input_data); } + } else if let Some(receiver_result) = self.receiver_result.take() { + dbg!(&receiver_result); + self.output_data = Some(receiver_result); } Ok(()) @@ -123,7 +126,7 @@ impl Processor for TransformMergeBlock { async fn async_process(&mut self) -> Result<()> { if !self.initialized { self.initialized = true; - self.receiver_result = self.receiver.recv().await.ok(); + self.receiver_result = self.receiver.recv().await.unwrap_or(None); } Ok(()) } diff --git a/src/query/service/src/sql/executor/pipeline_builder.rs b/src/query/service/src/sql/executor/pipeline_builder.rs index da371ae2a6d46..4d42823c06863 100644 --- a/src/query/service/src/sql/executor/pipeline_builder.rs +++ b/src/query/service/src/sql/executor/pipeline_builder.rs @@ -14,8 +14,8 @@ use std::sync::Arc; -use common_base::base::tokio::sync::broadcast::channel; -use common_base::base::tokio::sync::broadcast::Receiver; +use common_base::base::tokio::sync::mpsc::channel; +use common_base::base::tokio::sync::mpsc::Receiver; use common_datablocks::DataBlock; use common_datablocks::SortColumnDescription; use common_datavalues::DataField; @@ -482,7 +482,7 @@ impl PipelineBuilder { self.build_pipeline(&exchange_sink.input) } - fn expand_union(&mut self, plan: &PhysicalPlan) -> Result> { + fn expand_union(&mut self, plan: &PhysicalPlan) -> Result>> { let union_ctx = QueryContext::create_from(self.ctx.clone()); let pipeline_builder = PipelineBuilder::create(union_ctx); let mut build_res = pipeline_builder.finalize(plan)?; @@ -506,27 +506,17 @@ impl PipelineBuilder { pub fn build_union(&mut self, union: &Union) -> Result<()> { self.build_pipeline(&union.left)?; let union_receiver = self.expand_union(&union.right)?; - let mut inputs_port = Vec::with_capacity(self.main_pipeline.output_len()); - let mut outputs_port = Vec::with_capacity(self.main_pipeline.output_len()); - let mut processors = Vec::with_capacity(self.main_pipeline.output_len()); self.main_pipeline.resize(1)?; - for _ in 0..self.main_pipeline.output_len() { - let transform_input_port = InputPort::create(); - let transform_output_port = OutputPort::create(); - - inputs_port.push(transform_input_port.clone()); - outputs_port.push(transform_output_port.clone()); - processors.push(TransformMergeBlock::create( - transform_input_port, - transform_output_port, - union_receiver.resubscribe(), - )); - } - + let input_port = InputPort::create(); + let output_port = OutputPort::create(); self.main_pipeline.add_pipe(Pipe::SimplePipe { - processors, - inputs_port, - outputs_port, + processors: vec![TransformMergeBlock::create( + input_port.clone(), + output_port.clone(), + union_receiver, + )], + outputs_port: vec![output_port], + inputs_port: vec![input_port], }); Ok(()) } From e61254f56d4bc7a0e1de98c77330b800c77ad62d Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 17 Aug 2022 19:08:18 +0800 Subject: [PATCH 06/14] refine code --- .../pipelines/processors/transforms/transform_merge_block.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs index 59db16b7c12fe..b96f77a411fc4 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs @@ -84,7 +84,7 @@ impl Processor for TransformMergeBlock { return Ok(Event::NeedConsume); } - if self.input_data.is_some() { + if self.input_data.is_some() || self.receiver_result.is_some() { return Ok(Event::Sync); } @@ -103,9 +103,7 @@ impl Processor for TransformMergeBlock { } fn process(&mut self) -> Result<()> { - dbg!("come here"); if let Some(input_data) = self.input_data.take() { - dbg!(&input_data); if let Some(receiver_result) = self.receiver_result.take() { let data_block = DataBlock::create( input_data.schema().clone(), @@ -116,7 +114,6 @@ impl Processor for TransformMergeBlock { self.output_data = Some(input_data); } } else if let Some(receiver_result) = self.receiver_result.take() { - dbg!(&receiver_result); self.output_data = Some(receiver_result); } From da0750b34c06c0ddcabfb70bddaea494d537bf5c Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 17 Aug 2022 23:22:40 +0800 Subject: [PATCH 07/14] add logic test --- .../service/src/pipelines/executor/mod.rs | 1 - .../executor/pipeline_pulling_executor.rs | 2 +- .../transforms/transform_merge_block.rs | 14 +-- .../src/sql/executor/pipeline_builder.rs | 1 + .../sql/optimizer/heuristic/prune_columns.rs | 3 + .../service/src/sql/planner/plans/union.rs | 32 +++++-- .../logictest/suites/base/15_query/union.test | 88 +++++++++++++++++++ 7 files changed, 128 insertions(+), 13 deletions(-) create mode 100644 tests/logictest/suites/base/15_query/union.test diff --git a/src/query/service/src/pipelines/executor/mod.rs b/src/query/service/src/pipelines/executor/mod.rs index 5e95546e836c0..db476f2dfa3be 100644 --- a/src/query/service/src/pipelines/executor/mod.rs +++ b/src/query/service/src/pipelines/executor/mod.rs @@ -28,5 +28,4 @@ pub use pipeline_complete_executor::PipelineCompleteExecutor; pub use pipeline_executor::FinishedCallback; pub use pipeline_executor::PipelineExecutor; pub use pipeline_pulling_executor::PipelinePullingExecutor; -pub use pipeline_pulling_executor::PullingSink; pub use pipeline_pushing_executor::PipelinePushingExecutor; diff --git a/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs b/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs index 55d9371f5ae45..f51092e9efeb6 100644 --- a/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs @@ -184,7 +184,7 @@ impl Drop for PipelinePullingExecutor { } } -pub struct PullingSink { +struct PullingSink { sender: SyncSender>>, } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs index b96f77a411fc4..97cc86de62e38 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use common_base::base::tokio::sync::mpsc::Receiver; use common_datablocks::DataBlock; +use common_datavalues::DataSchemaRef; use common_exception::Result; use common_pipeline_core::processors::port::InputPort; use common_pipeline_core::processors::port::OutputPort; @@ -31,6 +32,7 @@ pub struct TransformMergeBlock { input_data: Option, output_data: Option, + schema: DataSchemaRef, receiver: Receiver>, receiver_result: Option, @@ -40,6 +42,7 @@ impl TransformMergeBlock { pub fn create( input: Arc, output: Arc, + schema: DataSchemaRef, receiver: Receiver>, ) -> ProcessorPtr { ProcessorPtr::create(Box::new(TransformMergeBlock { @@ -48,6 +51,7 @@ impl TransformMergeBlock { output, input_data: None, output_data: None, + schema, receiver, receiver_result: None, })) @@ -105,16 +109,16 @@ impl Processor for TransformMergeBlock { fn process(&mut self) -> Result<()> { if let Some(input_data) = self.input_data.take() { if let Some(receiver_result) = self.receiver_result.take() { - let data_block = DataBlock::create( - input_data.schema().clone(), - receiver_result.columns().to_vec(), - ); + let data_block = + DataBlock::create(self.schema.clone(), receiver_result.columns().to_vec()); self.output_data = Some(DataBlock::concat_blocks(&vec![input_data, data_block])?); } else { self.output_data = Some(input_data); } } else if let Some(receiver_result) = self.receiver_result.take() { - self.output_data = Some(receiver_result); + let data_block = + DataBlock::create(self.schema.clone(), receiver_result.columns().to_vec()); + self.output_data = Some(data_block); } Ok(()) diff --git a/src/query/service/src/sql/executor/pipeline_builder.rs b/src/query/service/src/sql/executor/pipeline_builder.rs index 4d42823c06863..c8d004e5f9425 100644 --- a/src/query/service/src/sql/executor/pipeline_builder.rs +++ b/src/query/service/src/sql/executor/pipeline_builder.rs @@ -513,6 +513,7 @@ impl PipelineBuilder { processors: vec![TransformMergeBlock::create( input_port.clone(), output_port.clone(), + union.left.output_schema()?, union_receiver, )], outputs_port: vec![output_port], diff --git a/src/query/service/src/sql/optimizer/heuristic/prune_columns.rs b/src/query/service/src/sql/optimizer/heuristic/prune_columns.rs index ee81bdb1fea93..ad617447e5a3f 100644 --- a/src/query/service/src/sql/optimizer/heuristic/prune_columns.rs +++ b/src/query/service/src/sql/optimizer/heuristic/prune_columns.rs @@ -115,6 +115,7 @@ impl ColumnPruner { )?, )) } + RelOperator::Project(p) => { let mut used: ColumnSet = p.columns.intersection(&required).cloned().collect(); if used.is_empty() { @@ -224,6 +225,8 @@ impl ColumnPruner { self.keep_required_columns(expr.child(0)?, required)?, )), + RelOperator::Union(_) => Ok(expr.clone()), + _ => Err(ErrorCode::LogicalError( "Attempting to prune columns of a physical plan is not allowed", )), diff --git a/src/query/service/src/sql/planner/plans/union.rs b/src/query/service/src/sql/planner/plans/union.rs index c64c5caef168a..1672436f47ae4 100644 --- a/src/query/service/src/sql/planner/plans/union.rs +++ b/src/query/service/src/sql/planner/plans/union.rs @@ -48,6 +48,32 @@ impl Operator for Union { } } +impl LogicalOperator for Union { + fn derive_relational_prop<'a>(&self, rel_expr: &RelExpr<'a>) -> Result { + let left_prop = rel_expr.derive_relational_prop_child(0)?; + let right_prop = rel_expr.derive_relational_prop_child(1)?; + + // Derive output columns + let mut output_columns = left_prop.output_columns; + output_columns = output_columns + .union(&right_prop.output_columns) + .cloned() + .collect(); + + // Derive outer columns + let mut outer_columns = left_prop.outer_columns; + outer_columns = outer_columns + .union(&right_prop.outer_columns) + .cloned() + .collect(); + + Ok(RelationalProperty { + output_columns, + outer_columns, + }) + } +} + impl PhysicalOperator for Union { fn derive_physical_prop<'a>(&self, _rel_expr: &RelExpr<'a>) -> Result { todo!() @@ -62,9 +88,3 @@ impl PhysicalOperator for Union { todo!() } } - -impl LogicalOperator for Union { - fn derive_relational_prop<'a>(&self, _rel_expr: &RelExpr<'a>) -> Result { - todo!() - } -} diff --git a/tests/logictest/suites/base/15_query/union.test b/tests/logictest/suites/base/15_query/union.test new file mode 100644 index 0000000000000..088ac7312a47b --- /dev/null +++ b/tests/logictest/suites/base/15_query/union.test @@ -0,0 +1,88 @@ +statement ok +DROP TABLE IF EXISTS data2013; + +statement ok +DROP TABLE IF EXISTS data2014; + +statement ok +DROP TABLE IF EXISTS data2015; + +statement ok +CREATE TABLE data2013 (name String, value UInt32); + +statement ok +CREATE TABLE data2014 (name String, value UInt32); + +statement ok +CREATE TABLE data2015 (data_name String, data_value UInt32); + +statement ok +CREATE TABLE data2016 (name String, value UInt32); + +statement ok +INSERT INTO data2013(name,value) VALUES('Alice', 1000), ('Bob', 2000), ('Carol', 5000); + + +statement ok +INSERT INTO data2014(name,value) VALUES('Alice', 2000), ('Bob', 2000), ('Dennis', 35000); + +statement ok +INSERT INTO data2015(data_name, data_value) VALUES('Foo', 42), ('Bar', 1); + +statement query I +SELECT val FROM +(SELECT value AS val FROM data2013 WHERE name = 'Alice' +UNION ALL +SELECT value AS val FROM data2014 WHERE name = 'Alice') +ORDER BY val ASC; + +---- +1000 +2000 + +statement query IT +SELECT val, name FROM +(SELECT value AS val, value AS val_1, name FROM data2013 WHERE name = 'Alice' +UNION ALL +SELECT value AS val, value, name FROM data2014 WHERE name = 'Alice') +ORDER BY val ASC; + +---- +1000 Alice +2000 Alice + +statement query TI +select * from data2013 union all select * from data2015 order by value; + +---- +Bar 1 +Foo 42 +Alice 1000 +Bob 2000 +Carol 5000 + +statement query I +select value from data2016 union all select data_value from data2015 order by value; + +---- +1 +42 + +statement query I +select data_value from data2015 union all select value from data2016 order by data_value; + +---- +1 +42 + +statement ok +DROP TABLE data2013; + +statement ok +DROP TABLE data2014; + +statement ok +DROP TABLE data2015; + +statement ok +DROP TABLE data2016; From 15196d44541334e57dc9e3454e34998aa9c461b1 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 17 Aug 2022 23:47:25 +0800 Subject: [PATCH 08/14] support union distinct --- .../service/src/sql/planner/binder/select.rs | 29 +++++++++++++++---- .../logictest/suites/base/15_query/union.test | 13 +++++++++ 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/src/query/service/src/sql/planner/binder/select.rs b/src/query/service/src/sql/planner/binder/select.rs index 1d1aff339a6d2..42c71d2fb3fff 100644 --- a/src/query/service/src/sql/planner/binder/select.rs +++ b/src/query/service/src/sql/planner/binder/select.rs @@ -313,18 +313,37 @@ impl<'a> Binder { // Transfer Except to Anti join self.bind_except(left_bind_context, right_bind_context, left_expr, right_expr) } - (SetOperator::Union, true) => { - Ok((self.bind_union(left_expr, right_expr)?, left_bind_context)) - } + (SetOperator::Union, true) => Ok(( + self.bind_union(&left_bind_context, left_expr, right_expr, false)?, + left_bind_context, + )), + (SetOperator::Union, false) => Ok(( + self.bind_union(&left_bind_context, left_expr, right_expr, true)?, + left_bind_context, + )), _ => Err(ErrorCode::UnImplement( "Unsupported query type, currently, databend only support intersect distinct and except distinct", )), } } - fn bind_union(&mut self, left_expr: SExpr, right_expr: SExpr) -> Result { + fn bind_union( + &mut self, + bind_context: &BindContext, + left_expr: SExpr, + right_expr: SExpr, + distinct: bool, + ) -> Result { let union_plan = Union {}; - let new_expr = SExpr::create_binary(union_plan.into(), left_expr, right_expr); + let mut new_expr = SExpr::create_binary(union_plan.into(), left_expr, right_expr); + if distinct { + new_expr = self.bind_distinct( + &bind_context, + bind_context.all_column_bindings(), + &mut HashMap::new(), + new_expr, + )?; + } Ok(new_expr) } diff --git a/tests/logictest/suites/base/15_query/union.test b/tests/logictest/suites/base/15_query/union.test index 088ac7312a47b..f3dc3328999f0 100644 --- a/tests/logictest/suites/base/15_query/union.test +++ b/tests/logictest/suites/base/15_query/union.test @@ -75,6 +75,19 @@ select data_value from data2015 union all select value from data2016 order by da 1 42 +statement ok +INSERT INTO data2013(name,value) VALUES('Alice', 1000); + +statement query I +select value from data2013 union select data_value from data2015 order by value; + +---- +1 +42 +1000 +2000 +5000 + statement ok DROP TABLE data2013; From 953b9666529bb8eb634029db270c35ea0a11d15a Mon Sep 17 00:00:00 2001 From: xudong963 Date: Thu, 18 Aug 2022 14:09:22 +0800 Subject: [PATCH 09/14] use async_channel --- Cargo.lock | 1 + src/common/pipeline/sinks/Cargo.toml | 2 ++ src/common/pipeline/sinks/src/lib.rs | 1 + .../processors/sinks/union_receive_sink.rs | 23 ++++++++------ .../transforms/transform_merge_block.rs | 2 +- .../src/sql/executor/pipeline_builder.rs | 31 ++++++++++++------- 6 files changed, 37 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fcdad11faa1c7..da8041d86d1d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1599,6 +1599,7 @@ dependencies = [ name = "common-pipeline-sinks" version = "0.1.0" dependencies = [ + "async-channel", "async-trait 0.1.56", "common-base", "common-catalog", diff --git a/src/common/pipeline/sinks/Cargo.toml b/src/common/pipeline/sinks/Cargo.toml index 5fed8c8df44d7..7145d763b3d76 100644 --- a/src/common/pipeline/sinks/Cargo.toml +++ b/src/common/pipeline/sinks/Cargo.toml @@ -17,3 +17,5 @@ common-exception = { path = "../../exception" } common-pipeline-core = { path = "../core" } async-trait = { git = "https://github.com/datafuse-extras/async-trait", rev = "f0b0fd5" } + +async-channel = "1.7.1" \ No newline at end of file diff --git a/src/common/pipeline/sinks/src/lib.rs b/src/common/pipeline/sinks/src/lib.rs index 6e4d18641fd7f..b80e54357796c 100644 --- a/src/common/pipeline/sinks/src/lib.rs +++ b/src/common/pipeline/sinks/src/lib.rs @@ -14,5 +14,6 @@ #![feature(generic_associated_types)] #![deny(unused_crate_dependencies)] +#![feature(type_alias_impl_trait)] pub mod processors; diff --git a/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs b/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs index a9969bc45b808..a6df0335be492 100644 --- a/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs +++ b/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs @@ -14,15 +14,17 @@ use std::sync::Arc; -use common_base::base::tokio::sync::mpsc::Sender; +use async_channel::Sender; +use async_trait::async_trait; +use async_trait::unboxed_simple; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; use common_pipeline_core::processors::port::InputPort; use common_pipeline_core::processors::processor::ProcessorPtr; -use crate::processors::sinks::Sink; -use crate::processors::sinks::Sinker; +use crate::processors::sinks::AsyncSink; +use crate::processors::sinks::AsyncSinker; pub struct UnionReceiveSink { input_blocks: Vec, @@ -31,31 +33,32 @@ pub struct UnionReceiveSink { impl UnionReceiveSink { pub fn create(sender: Sender>, input: Arc) -> ProcessorPtr { - Sinker::create(input, UnionReceiveSink { + AsyncSinker::create(input, UnionReceiveSink { input_blocks: vec![], sender, }) } } -#[async_trait::async_trait] -impl Sink for UnionReceiveSink { +#[async_trait] +impl AsyncSink for UnionReceiveSink { const NAME: &'static str = "UnionReceiveSink"; - fn on_finish(&mut self) -> Result<()> { + async fn on_finish(&mut self) -> Result<()> { let send_blocks = if self.input_blocks.is_empty() { None } else { Some(DataBlock::concat_blocks(&self.input_blocks)?) }; - if let Err(_) = self.sender.try_send(send_blocks) { + if let Err(_) = self.sender.send(send_blocks).await { return Err(ErrorCode::UnexpectedError("UnionReceiveSink sender failed")); }; - + self.sender.close(); Ok(()) } - fn consume(&mut self, data_block: DataBlock) -> Result<()> { + #[unboxed_simple] + async fn consume(&mut self, data_block: DataBlock) -> Result<()> { self.input_blocks.push(data_block); Ok(()) } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs index 97cc86de62e38..037fe1c364244 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs @@ -15,7 +15,7 @@ use std::any::Any; use std::sync::Arc; -use common_base::base::tokio::sync::mpsc::Receiver; +use async_channel::Receiver; use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; use common_exception::Result; diff --git a/src/query/service/src/sql/executor/pipeline_builder.rs b/src/query/service/src/sql/executor/pipeline_builder.rs index c8d004e5f9425..53f235c3cc536 100644 --- a/src/query/service/src/sql/executor/pipeline_builder.rs +++ b/src/query/service/src/sql/executor/pipeline_builder.rs @@ -14,8 +14,7 @@ use std::sync::Arc; -use common_base::base::tokio::sync::mpsc::channel; -use common_base::base::tokio::sync::mpsc::Receiver; +use async_channel::Receiver; use common_datablocks::DataBlock; use common_datablocks::SortColumnDescription; use common_datavalues::DataField; @@ -490,7 +489,7 @@ impl PipelineBuilder { assert!(build_res.main_pipeline.is_pulling_pipeline()?); build_res.main_pipeline.resize(1)?; - let (tx, rx) = channel(1); + let (tx, rx) = async_channel::bounded(1); let input = InputPort::create(); build_res.main_pipeline.add_pipe(Pipe::SimplePipe { outputs_port: vec![], @@ -506,18 +505,26 @@ impl PipelineBuilder { pub fn build_union(&mut self, union: &Union) -> Result<()> { self.build_pipeline(&union.left)?; let union_receiver = self.expand_union(&union.right)?; - self.main_pipeline.resize(1)?; - let input_port = InputPort::create(); - let output_port = OutputPort::create(); - self.main_pipeline.add_pipe(Pipe::SimplePipe { - processors: vec![TransformMergeBlock::create( + let mut inputs_port = Vec::with_capacity(self.main_pipeline.output_len()); + let mut outputs_port = Vec::with_capacity(self.main_pipeline.output_len()); + let mut processors = Vec::with_capacity(self.main_pipeline.output_len()); + for _ in 0..self.main_pipeline.output_len() { + let input_port = InputPort::create(); + let output_port = OutputPort::create(); + let processor = TransformMergeBlock::create( input_port.clone(), output_port.clone(), union.left.output_schema()?, - union_receiver, - )], - outputs_port: vec![output_port], - inputs_port: vec![input_port], + union_receiver.clone(), + ); + inputs_port.push(input_port); + outputs_port.push(output_port); + processors.push(processor); + } + self.main_pipeline.add_pipe(Pipe::SimplePipe { + processors, + outputs_port, + inputs_port, }); Ok(()) } From 31b090fb850259ba684579a7459d8af027a60125 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Thu, 18 Aug 2022 14:51:32 +0800 Subject: [PATCH 10/14] address comments --- .../processors/sinks/union_receive_sink.rs | 23 ++++++------------- .../transforms/transform_merge_block.rs | 14 ++++++++--- .../src/sql/executor/pipeline_builder.rs | 4 ++-- 3 files changed, 20 insertions(+), 21 deletions(-) diff --git a/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs b/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs index a6df0335be492..fd24ee34cc2b6 100644 --- a/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs +++ b/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs @@ -27,16 +27,12 @@ use crate::processors::sinks::AsyncSink; use crate::processors::sinks::AsyncSinker; pub struct UnionReceiveSink { - input_blocks: Vec, - sender: Sender>, + sender: Sender, } impl UnionReceiveSink { - pub fn create(sender: Sender>, input: Arc) -> ProcessorPtr { - AsyncSinker::create(input, UnionReceiveSink { - input_blocks: vec![], - sender, - }) + pub fn create(sender: Sender, input: Arc) -> ProcessorPtr { + AsyncSinker::create(input, UnionReceiveSink { sender }) } } @@ -45,21 +41,16 @@ impl AsyncSink for UnionReceiveSink { const NAME: &'static str = "UnionReceiveSink"; async fn on_finish(&mut self) -> Result<()> { - let send_blocks = if self.input_blocks.is_empty() { - None - } else { - Some(DataBlock::concat_blocks(&self.input_blocks)?) - }; - if let Err(_) = self.sender.send(send_blocks).await { - return Err(ErrorCode::UnexpectedError("UnionReceiveSink sender failed")); - }; self.sender.close(); Ok(()) } #[unboxed_simple] async fn consume(&mut self, data_block: DataBlock) -> Result<()> { - self.input_blocks.push(data_block); + dbg!("come here"); + if self.sender.send(data_block).await.is_err() { + return Err(ErrorCode::UnexpectedError("UnionReceiveSink sender failed")); + }; Ok(()) } } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs index 037fe1c364244..f19c421e4cb03 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs @@ -34,7 +34,7 @@ pub struct TransformMergeBlock { output_data: Option, schema: DataSchemaRef, - receiver: Receiver>, + receiver: Receiver, receiver_result: Option, } @@ -43,7 +43,7 @@ impl TransformMergeBlock { input: Arc, output: Arc, schema: DataSchemaRef, - receiver: Receiver>, + receiver: Receiver, ) -> ProcessorPtr { ProcessorPtr::create(Box::new(TransformMergeBlock { initialized: false, @@ -127,7 +127,15 @@ impl Processor for TransformMergeBlock { async fn async_process(&mut self) -> Result<()> { if !self.initialized { self.initialized = true; - self.receiver_result = self.receiver.recv().await.unwrap_or(None); + let mut results = vec![]; + while let Some(result) = self.receiver.recv().await.ok() { + results.push(result); + } + if results.is_empty() { + self.receiver_result = None; + } else { + self.receiver_result = Some(DataBlock::concat_blocks(&results)?); + } } Ok(()) } diff --git a/src/query/service/src/sql/executor/pipeline_builder.rs b/src/query/service/src/sql/executor/pipeline_builder.rs index 53f235c3cc536..dd393f0c1e259 100644 --- a/src/query/service/src/sql/executor/pipeline_builder.rs +++ b/src/query/service/src/sql/executor/pipeline_builder.rs @@ -481,7 +481,7 @@ impl PipelineBuilder { self.build_pipeline(&exchange_sink.input) } - fn expand_union(&mut self, plan: &PhysicalPlan) -> Result>> { + fn expand_union(&mut self, plan: &PhysicalPlan) -> Result> { let union_ctx = QueryContext::create_from(self.ctx.clone()); let pipeline_builder = PipelineBuilder::create(union_ctx); let mut build_res = pipeline_builder.finalize(plan)?; @@ -489,7 +489,7 @@ impl PipelineBuilder { assert!(build_res.main_pipeline.is_pulling_pipeline()?); build_res.main_pipeline.resize(1)?; - let (tx, rx) = async_channel::bounded(1); + let (tx, rx) = async_channel::unbounded(); let input = InputPort::create(); build_res.main_pipeline.add_pipe(Pipe::SimplePipe { outputs_port: vec![], From 3da95373d4fc69e8f2a86ff77f628247dd56d363 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Thu, 18 Aug 2022 16:49:36 +0800 Subject: [PATCH 11/14] address comments --- .../processors/sinks/union_receive_sink.rs | 15 +++--- .../transforms/transform_merge_block.rs | 39 ++++++++------- .../src/sql/executor/pipeline_builder.rs | 47 +++++++++---------- 3 files changed, 48 insertions(+), 53 deletions(-) diff --git a/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs b/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs index fd24ee34cc2b6..446aa5b566db4 100644 --- a/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs +++ b/src/common/pipeline/sinks/src/processors/sinks/union_receive_sink.rs @@ -27,11 +27,11 @@ use crate::processors::sinks::AsyncSink; use crate::processors::sinks::AsyncSinker; pub struct UnionReceiveSink { - sender: Sender, + sender: Option>, } impl UnionReceiveSink { - pub fn create(sender: Sender, input: Arc) -> ProcessorPtr { + pub fn create(sender: Option>, input: Arc) -> ProcessorPtr { AsyncSinker::create(input, UnionReceiveSink { sender }) } } @@ -41,16 +41,17 @@ impl AsyncSink for UnionReceiveSink { const NAME: &'static str = "UnionReceiveSink"; async fn on_finish(&mut self) -> Result<()> { - self.sender.close(); + drop(self.sender.take()); Ok(()) } #[unboxed_simple] async fn consume(&mut self, data_block: DataBlock) -> Result<()> { - dbg!("come here"); - if self.sender.send(data_block).await.is_err() { - return Err(ErrorCode::UnexpectedError("UnionReceiveSink sender failed")); - }; + if let Some(sender) = self.sender.as_ref() { + if sender.send(data_block).await.is_err() { + return Err(ErrorCode::UnexpectedError("UnionReceiveSink sender failed")); + }; + } Ok(()) } } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs index f19c421e4cb03..ffa6e8919835b 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs @@ -26,7 +26,7 @@ use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_core::processors::Processor; pub struct TransformMergeBlock { - initialized: bool, + finished: bool, input: Arc, output: Arc, @@ -39,14 +39,14 @@ pub struct TransformMergeBlock { } impl TransformMergeBlock { - pub fn create( + pub fn try_create( input: Arc, output: Arc, schema: DataSchemaRef, receiver: Receiver, - ) -> ProcessorPtr { - ProcessorPtr::create(Box::new(TransformMergeBlock { - initialized: false, + ) -> Result { + Ok(ProcessorPtr::create(Box::new(TransformMergeBlock { + finished: false, input, output, input_data: None, @@ -54,7 +54,7 @@ impl TransformMergeBlock { schema, receiver, receiver_result: None, - })) + }))) } } @@ -69,10 +69,6 @@ impl Processor for TransformMergeBlock { } fn event(&mut self) -> Result { - if !self.initialized { - return Ok(Event::Async); - } - if self.output.is_finished() { self.input.finish(); return Ok(Event::Finished); @@ -92,7 +88,15 @@ impl Processor for TransformMergeBlock { return Ok(Event::Sync); } + if let Ok(result) = self.receiver.try_recv() { + self.receiver_result = Some(result); + return Ok(Event::Sync); + } + if self.input.is_finished() { + if !self.finished { + return Ok(Event::Async); + } self.output.finish(); return Ok(Event::Finished); } @@ -125,17 +129,12 @@ impl Processor for TransformMergeBlock { } async fn async_process(&mut self) -> Result<()> { - if !self.initialized { - self.initialized = true; - let mut results = vec![]; - while let Some(result) = self.receiver.recv().await.ok() { - results.push(result); - } - if results.is_empty() { - self.receiver_result = None; - } else { - self.receiver_result = Some(DataBlock::concat_blocks(&results)?); + if !self.finished { + if let Ok(result) = self.receiver.recv().await { + self.receiver_result = Some(result); + return Ok(()); } + self.finished = true; } Ok(()) } diff --git a/src/query/service/src/sql/executor/pipeline_builder.rs b/src/query/service/src/sql/executor/pipeline_builder.rs index dd393f0c1e259..8f6152c1cd509 100644 --- a/src/query/service/src/sql/executor/pipeline_builder.rs +++ b/src/query/service/src/sql/executor/pipeline_builder.rs @@ -25,7 +25,6 @@ use common_exception::Result; use common_functions::aggregates::AggregateFunctionFactory; use common_functions::aggregates::AggregateFunctionRef; use common_functions::scalars::FunctionFactory; -use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::Pipe; use common_pipeline_sinks::processors::sinks::UnionReceiveSink; @@ -488,13 +487,21 @@ impl PipelineBuilder { assert!(build_res.main_pipeline.is_pulling_pipeline()?); - build_res.main_pipeline.resize(1)?; let (tx, rx) = async_channel::unbounded(); - let input = InputPort::create(); + let mut inputs_port = Vec::with_capacity(build_res.main_pipeline.output_len()); + let mut processors = Vec::with_capacity(build_res.main_pipeline.output_len()); + for _ in 0..build_res.main_pipeline.output_len() { + let input_port = InputPort::create(); + processors.push(UnionReceiveSink::create( + Some(tx.clone()), + input_port.clone(), + )); + inputs_port.push(input_port); + } build_res.main_pipeline.add_pipe(Pipe::SimplePipe { outputs_port: vec![], - inputs_port: vec![input.clone()], - processors: vec![UnionReceiveSink::create(tx, input)], + inputs_port, + processors, }); self.pipelines.push(build_res.main_pipeline); self.pipelines @@ -505,27 +512,15 @@ impl PipelineBuilder { pub fn build_union(&mut self, union: &Union) -> Result<()> { self.build_pipeline(&union.left)?; let union_receiver = self.expand_union(&union.right)?; - let mut inputs_port = Vec::with_capacity(self.main_pipeline.output_len()); - let mut outputs_port = Vec::with_capacity(self.main_pipeline.output_len()); - let mut processors = Vec::with_capacity(self.main_pipeline.output_len()); - for _ in 0..self.main_pipeline.output_len() { - let input_port = InputPort::create(); - let output_port = OutputPort::create(); - let processor = TransformMergeBlock::create( - input_port.clone(), - output_port.clone(), - union.left.output_schema()?, - union_receiver.clone(), - ); - inputs_port.push(input_port); - outputs_port.push(output_port); - processors.push(processor); - } - self.main_pipeline.add_pipe(Pipe::SimplePipe { - processors, - outputs_port, - inputs_port, - }); + self.main_pipeline + .add_transform(|transform_input_port, transform_output_port| { + TransformMergeBlock::try_create( + transform_input_port, + transform_output_port, + union.left.output_schema()?, + union_receiver.clone(), + ) + })?; Ok(()) } } From 513883eb23350f83ca4fd8a637d24f689c056ea8 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Thu, 18 Aug 2022 17:34:38 +0800 Subject: [PATCH 12/14] fix cluster --- .../processors/transforms/transform_merge_block.rs | 2 +- src/query/service/src/sql/planner/binder/select.rs | 2 +- src/query/service/src/sql/planner/plans/union.rs | 9 ++++++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs index ffa6e8919835b..4295d87a5ed67 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs @@ -115,7 +115,7 @@ impl Processor for TransformMergeBlock { if let Some(receiver_result) = self.receiver_result.take() { let data_block = DataBlock::create(self.schema.clone(), receiver_result.columns().to_vec()); - self.output_data = Some(DataBlock::concat_blocks(&vec![input_data, data_block])?); + self.output_data = Some(DataBlock::concat_blocks(&[input_data, data_block])?); } else { self.output_data = Some(input_data); } diff --git a/src/query/service/src/sql/planner/binder/select.rs b/src/query/service/src/sql/planner/binder/select.rs index 42c71d2fb3fff..45cbad0fe734f 100644 --- a/src/query/service/src/sql/planner/binder/select.rs +++ b/src/query/service/src/sql/planner/binder/select.rs @@ -338,7 +338,7 @@ impl<'a> Binder { let mut new_expr = SExpr::create_binary(union_plan.into(), left_expr, right_expr); if distinct { new_expr = self.bind_distinct( - &bind_context, + bind_context, bind_context.all_column_bindings(), &mut HashMap::new(), new_expr, diff --git a/src/query/service/src/sql/planner/plans/union.rs b/src/query/service/src/sql/planner/plans/union.rs index 1672436f47ae4..f4198748315d2 100644 --- a/src/query/service/src/sql/planner/plans/union.rs +++ b/src/query/service/src/sql/planner/plans/union.rs @@ -14,6 +14,7 @@ use common_exception::Result; +use crate::sql::optimizer::Distribution; use crate::sql::optimizer::PhysicalProperty; use crate::sql::optimizer::RelExpr; use crate::sql::optimizer::RelationalProperty; @@ -76,15 +77,17 @@ impl LogicalOperator for Union { impl PhysicalOperator for Union { fn derive_physical_prop<'a>(&self, _rel_expr: &RelExpr<'a>) -> Result { - todo!() + Ok(PhysicalProperty { + distribution: Distribution::Serial, + }) } fn compute_required_prop_child<'a>( &self, _rel_expr: &RelExpr<'a>, _child_index: usize, - _required: &RequiredProperty, + required: &RequiredProperty, ) -> Result { - todo!() + Ok(required.clone()) } } From f2ec61bdb05c53adfd1f85110ae3f2ec73406df2 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Thu, 18 Aug 2022 18:18:12 +0800 Subject: [PATCH 13/14] fix toml format --- src/common/pipeline/sinks/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/pipeline/sinks/Cargo.toml b/src/common/pipeline/sinks/Cargo.toml index 7145d763b3d76..f363372632de6 100644 --- a/src/common/pipeline/sinks/Cargo.toml +++ b/src/common/pipeline/sinks/Cargo.toml @@ -18,4 +18,4 @@ common-pipeline-core = { path = "../core" } async-trait = { git = "https://github.com/datafuse-extras/async-trait", rev = "f0b0fd5" } -async-channel = "1.7.1" \ No newline at end of file +async-channel = "1.7.1" From a90628dd77074a21f40467e0f9124193afa738ee Mon Sep 17 00:00:00 2001 From: xudong963 Date: Fri, 19 Aug 2022 08:46:27 +0800 Subject: [PATCH 14/14] rename union to union all --- .../service/src/sql/executor/physical_plan.rs | 10 +++---- .../src/sql/executor/physical_plan_builder.rs | 6 ++-- .../src/sql/executor/physical_plan_display.rs | 8 +++--- .../src/sql/executor/physical_plan_visitor.rs | 10 +++---- .../src/sql/executor/pipeline_builder.rs | 16 +++++------ .../sql/optimizer/heuristic/decorrelate.rs | 6 ++-- .../sql/optimizer/heuristic/prune_columns.rs | 2 +- .../optimizer/heuristic/subquery_rewriter.rs | 12 ++++---- .../service/src/sql/planner/binder/select.rs | 4 +-- .../planner/format/display_rel_operator.rs | 2 +- .../service/src/sql/planner/plans/mod.rs | 4 +-- .../service/src/sql/planner/plans/operator.rs | 28 +++++++++---------- .../planner/plans/{union.rs => union_all.rs} | 10 +++---- 13 files changed, 60 insertions(+), 58 deletions(-) rename src/query/service/src/sql/planner/plans/{union.rs => union_all.rs} (94%) diff --git a/src/query/service/src/sql/executor/physical_plan.rs b/src/query/service/src/sql/executor/physical_plan.rs index f98fd84a3ae3e..bae1bfe111a6d 100644 --- a/src/query/service/src/sql/executor/physical_plan.rs +++ b/src/query/service/src/sql/executor/physical_plan.rs @@ -282,13 +282,13 @@ impl ExchangeSink { } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct Union { +pub struct UnionAll { pub left: Box, pub right: Box, pub schema: DataSchemaRef, } -impl Union { +impl UnionAll { pub fn output_schema(&self) -> Result { Ok(self.schema.clone()) } @@ -306,7 +306,7 @@ pub enum PhysicalPlan { Limit(Limit), HashJoin(HashJoin), Exchange(Exchange), - Union(Union), + UnionAll(UnionAll), /// Synthesized by fragmenter ExchangeSource(ExchangeSource), @@ -336,7 +336,7 @@ impl PhysicalPlan { PhysicalPlan::Exchange(plan) => plan.output_schema(), PhysicalPlan::ExchangeSource(plan) => plan.output_schema(), PhysicalPlan::ExchangeSink(plan) => plan.output_schema(), - PhysicalPlan::Union(plan) => plan.output_schema(), + PhysicalPlan::UnionAll(plan) => plan.output_schema(), } } @@ -356,7 +356,7 @@ impl PhysicalPlan { PhysicalPlan::Exchange(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::ExchangeSource(_) => Box::new(std::iter::empty()), PhysicalPlan::ExchangeSink(plan) => Box::new(std::iter::once(plan.input.as_ref())), - PhysicalPlan::Union(plan) => Box::new( + PhysicalPlan::UnionAll(plan) => Box::new( std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())), ), } diff --git a/src/query/service/src/sql/executor/physical_plan_builder.rs b/src/query/service/src/sql/executor/physical_plan_builder.rs index fd31fb6039fd7..ec45808b81855 100644 --- a/src/query/service/src/sql/executor/physical_plan_builder.rs +++ b/src/query/service/src/sql/executor/physical_plan_builder.rs @@ -41,7 +41,7 @@ use crate::sql::executor::ExpressionBuilderWithoutRenaming; use crate::sql::executor::PhysicalPlan; use crate::sql::executor::PhysicalScalar; use crate::sql::executor::SortDesc; -use crate::sql::executor::Union; +use crate::sql::executor::UnionAll; use crate::sql::optimizer::SExpr; use crate::sql::plans::AggregateMode; use crate::sql::plans::Exchange; @@ -324,10 +324,10 @@ impl PhysicalPlanBuilder { keys, })) } - RelOperator::Union(_) => { + RelOperator::UnionAll(_) => { let left = self.build(s_expr.child(0)?).await?; let schema = left.output_schema()?; - Ok(PhysicalPlan::Union(Union { + Ok(PhysicalPlan::UnionAll(UnionAll { left: Box::new(left), right: Box::new(self.build(s_expr.child(1)?).await?), schema, diff --git a/src/query/service/src/sql/executor/physical_plan_display.rs b/src/query/service/src/sql/executor/physical_plan_display.rs index d1c517ddf2cd0..ab39631acd7eb 100644 --- a/src/query/service/src/sql/executor/physical_plan_display.rs +++ b/src/query/service/src/sql/executor/physical_plan_display.rs @@ -32,7 +32,7 @@ use crate::sql::executor::PhysicalScalar; use crate::sql::executor::Project; use crate::sql::executor::Sort; use crate::sql::executor::TableScan; -use crate::sql::executor::Union; +use crate::sql::executor::UnionAll; use crate::sql::plans::JoinType; impl PhysicalPlan { @@ -63,7 +63,7 @@ impl<'a> Display for PhysicalPlanIndentFormatDisplay<'a> { PhysicalPlan::Exchange(exchange) => write!(f, "{}", exchange)?, PhysicalPlan::ExchangeSource(source) => write!(f, "{}", source)?, PhysicalPlan::ExchangeSink(sink) => write!(f, "{}", sink)?, - PhysicalPlan::Union(union) => write!(f, "{}", union)?, + PhysicalPlan::UnionAll(union_all) => write!(f, "{}", union_all)?, } for node in self.node.children() { @@ -308,8 +308,8 @@ impl Display for ExchangeSink { } } -impl Display for Union { +impl Display for UnionAll { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "Union") + write!(f, "UnionAll") } } diff --git a/src/query/service/src/sql/executor/physical_plan_visitor.rs b/src/query/service/src/sql/executor/physical_plan_visitor.rs index 7ff5250a1825f..66d4e1adb465b 100644 --- a/src/query/service/src/sql/executor/physical_plan_visitor.rs +++ b/src/query/service/src/sql/executor/physical_plan_visitor.rs @@ -27,7 +27,7 @@ use super::PhysicalPlan; use super::Project; use super::Sort; use super::TableScan; -use crate::sql::executor::Union; +use crate::sql::executor::UnionAll; pub trait PhysicalPlanReplacer { fn replace(&mut self, plan: &PhysicalPlan) -> Result { @@ -44,7 +44,7 @@ pub trait PhysicalPlanReplacer { PhysicalPlan::Exchange(plan) => self.replace_exchange(plan), PhysicalPlan::ExchangeSource(plan) => self.replace_exchange_source(plan), PhysicalPlan::ExchangeSink(plan) => self.replace_exchange_sink(plan), - PhysicalPlan::Union(plan) => self.replace_union(plan), + PhysicalPlan::UnionAll(plan) => self.replace_union(plan), } } @@ -163,10 +163,10 @@ pub trait PhysicalPlanReplacer { })) } - fn replace_union(&mut self, plan: &Union) -> Result { + fn replace_union(&mut self, plan: &UnionAll) -> Result { let left = self.replace(&plan.left)?; let right = self.replace(&plan.right)?; - Ok(PhysicalPlan::Union(Union { + Ok(PhysicalPlan::UnionAll(UnionAll { left: Box::new(left), right: Box::new(right), schema: plan.schema.clone(), @@ -217,7 +217,7 @@ impl PhysicalPlan { PhysicalPlan::ExchangeSink(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); } - PhysicalPlan::Union(plan) => { + PhysicalPlan::UnionAll(plan) => { Self::traverse(&plan.left, pre_visit, visit, post_visit); Self::traverse(&plan.right, pre_visit, visit, post_visit); } diff --git a/src/query/service/src/sql/executor/pipeline_builder.rs b/src/query/service/src/sql/executor/pipeline_builder.rs index 8f6152c1cd509..eab63a45fc81b 100644 --- a/src/query/service/src/sql/executor/pipeline_builder.rs +++ b/src/query/service/src/sql/executor/pipeline_builder.rs @@ -70,7 +70,7 @@ use crate::sql::executor::physical_plan::ColumnID; use crate::sql::executor::physical_plan::PhysicalPlan; use crate::sql::executor::AggregateFunctionDesc; use crate::sql::executor::PhysicalScalar; -use crate::sql::executor::Union; +use crate::sql::executor::UnionAll; use crate::sql::plans::JoinType; use crate::sql::ColumnBinding; @@ -119,7 +119,7 @@ impl PipelineBuilder { PhysicalPlan::HashJoin(join) => self.build_join(join), PhysicalPlan::ExchangeSink(sink) => self.build_exchange_sink(sink), PhysicalPlan::ExchangeSource(source) => self.build_exchange_source(source), - PhysicalPlan::Union(union) => self.build_union(union), + PhysicalPlan::UnionAll(union_all) => self.build_union_all(union_all), PhysicalPlan::Exchange(_) => Err(ErrorCode::LogicalError( "Invalid physical plan with PhysicalPlan::Exchange", )), @@ -480,7 +480,7 @@ impl PipelineBuilder { self.build_pipeline(&exchange_sink.input) } - fn expand_union(&mut self, plan: &PhysicalPlan) -> Result> { + fn expand_union_all(&mut self, plan: &PhysicalPlan) -> Result> { let union_ctx = QueryContext::create_from(self.ctx.clone()); let pipeline_builder = PipelineBuilder::create(union_ctx); let mut build_res = pipeline_builder.finalize(plan)?; @@ -509,16 +509,16 @@ impl PipelineBuilder { Ok(rx) } - pub fn build_union(&mut self, union: &Union) -> Result<()> { - self.build_pipeline(&union.left)?; - let union_receiver = self.expand_union(&union.right)?; + pub fn build_union_all(&mut self, union_all: &UnionAll) -> Result<()> { + self.build_pipeline(&union_all.left)?; + let union_all_receiver = self.expand_union_all(&union_all.right)?; self.main_pipeline .add_transform(|transform_input_port, transform_output_port| { TransformMergeBlock::try_create( transform_input_port, transform_output_port, - union.left.output_schema()?, - union_receiver.clone(), + union_all.left.output_schema()?, + union_all_receiver.clone(), ) })?; Ok(()) diff --git a/src/query/service/src/sql/optimizer/heuristic/decorrelate.rs b/src/query/service/src/sql/optimizer/heuristic/decorrelate.rs index 1ab3ac72cc680..7d217722a00cf 100644 --- a/src/query/service/src/sql/optimizer/heuristic/decorrelate.rs +++ b/src/query/service/src/sql/optimizer/heuristic/decorrelate.rs @@ -51,7 +51,7 @@ use crate::sql::plans::Scalar; use crate::sql::plans::ScalarItem; use crate::sql::plans::SubqueryExpr; use crate::sql::plans::SubqueryType; -use crate::sql::plans::Union; +use crate::sql::plans::UnionAll; use crate::sql::ColumnBinding; use crate::sql::IndexType; use crate::sql::MetadataRef; @@ -632,13 +632,13 @@ impl SubqueryRewriter { Ok(SExpr::create_unary(plan.plan().clone(), flatten_plan)) } - RelOperator::Union(_) => { + RelOperator::UnionAll(_) => { let left_flatten_plan = self.flatten(plan.child(0)?, correlated_columns, flatten_info)?; let right_flatten_plan = self.flatten(plan.child(1)?, correlated_columns, flatten_info)?; Ok(SExpr::create_binary( - Union {}.into(), + UnionAll {}.into(), left_flatten_plan, right_flatten_plan, )) diff --git a/src/query/service/src/sql/optimizer/heuristic/prune_columns.rs b/src/query/service/src/sql/optimizer/heuristic/prune_columns.rs index ad617447e5a3f..ec78aa593c17a 100644 --- a/src/query/service/src/sql/optimizer/heuristic/prune_columns.rs +++ b/src/query/service/src/sql/optimizer/heuristic/prune_columns.rs @@ -225,7 +225,7 @@ impl ColumnPruner { self.keep_required_columns(expr.child(0)?, required)?, )), - RelOperator::Union(_) => Ok(expr.clone()), + RelOperator::UnionAll(_) => Ok(expr.clone()), _ => Err(ErrorCode::LogicalError( "Attempting to prune columns of a physical plan is not allowed", diff --git a/src/query/service/src/sql/optimizer/heuristic/subquery_rewriter.rs b/src/query/service/src/sql/optimizer/heuristic/subquery_rewriter.rs index fb838bca0afe4..88203e4815194 100644 --- a/src/query/service/src/sql/optimizer/heuristic/subquery_rewriter.rs +++ b/src/query/service/src/sql/optimizer/heuristic/subquery_rewriter.rs @@ -119,11 +119,13 @@ impl SubqueryRewriter { Ok(SExpr::create_unary(plan.into(), input)) } - RelOperator::LogicalInnerJoin(_) | RelOperator::Union(_) => Ok(SExpr::create_binary( - s_expr.plan().clone(), - self.rewrite(s_expr.child(0)?)?, - self.rewrite(s_expr.child(1)?)?, - )), + RelOperator::LogicalInnerJoin(_) | RelOperator::UnionAll(_) => { + Ok(SExpr::create_binary( + s_expr.plan().clone(), + self.rewrite(s_expr.child(0)?)?, + self.rewrite(s_expr.child(1)?)?, + )) + } RelOperator::Project(_) | RelOperator::Limit(_) | RelOperator::Sort(_) => Ok( SExpr::create_unary(s_expr.plan().clone(), self.rewrite(s_expr.child(0)?)?), diff --git a/src/query/service/src/sql/planner/binder/select.rs b/src/query/service/src/sql/planner/binder/select.rs index 45cbad0fe734f..ee3bb4fa837a9 100644 --- a/src/query/service/src/sql/planner/binder/select.rs +++ b/src/query/service/src/sql/planner/binder/select.rs @@ -40,7 +40,7 @@ use crate::sql::plans::BoundColumnRef; use crate::sql::plans::Filter; use crate::sql::plans::JoinType; use crate::sql::plans::Scalar; -use crate::sql::plans::Union; +use crate::sql::plans::UnionAll; // A normalized IR for `SELECT` clause. #[derive(Debug, Default)] @@ -334,7 +334,7 @@ impl<'a> Binder { right_expr: SExpr, distinct: bool, ) -> Result { - let union_plan = Union {}; + let union_plan = UnionAll {}; let mut new_expr = SExpr::create_binary(union_plan.into(), left_expr, right_expr); if distinct { new_expr = self.bind_distinct( diff --git a/src/query/service/src/sql/planner/format/display_rel_operator.rs b/src/query/service/src/sql/planner/format/display_rel_operator.rs index 9bd5d61acc924..78398e1947525 100644 --- a/src/query/service/src/sql/planner/format/display_rel_operator.rs +++ b/src/query/service/src/sql/planner/format/display_rel_operator.rs @@ -78,7 +78,7 @@ impl Display for FormatContext { RelOperator::Sort(op) => format_sort(f, &self.metadata, op), RelOperator::Limit(op) => format_limit(f, &self.metadata, op), RelOperator::Exchange(op) => format_exchange(f, &self.metadata, op), - RelOperator::Union(_) => write!(f, "UNION"), + RelOperator::UnionAll(_) => write!(f, "UNION ALL"), RelOperator::Pattern(_) => write!(f, "Pattern"), } } diff --git a/src/query/service/src/sql/planner/plans/mod.rs b/src/query/service/src/sql/planner/plans/mod.rs index b5e8074ea2525..2d93d5a0e3ae1 100644 --- a/src/query/service/src/sql/planner/plans/mod.rs +++ b/src/query/service/src/sql/planner/plans/mod.rs @@ -31,7 +31,7 @@ mod project; mod scalar; pub mod share; mod sort; -mod union; +mod union_all; use std::fmt::Display; use std::sync::Arc; @@ -110,7 +110,7 @@ pub use scalar::*; pub use share::*; pub use sort::Sort; pub use sort::SortItem; -pub use union::Union; +pub use union_all::UnionAll; use super::BindContext; use super::MetadataRef; diff --git a/src/query/service/src/sql/planner/plans/operator.rs b/src/query/service/src/sql/planner/plans/operator.rs index ad1544080dc8c..587ccca875721 100644 --- a/src/query/service/src/sql/planner/plans/operator.rs +++ b/src/query/service/src/sql/planner/plans/operator.rs @@ -26,7 +26,7 @@ use super::pattern::PatternPlan; use super::physical_scan::PhysicalScan; use super::project::Project; use super::sort::Sort; -use super::union::Union; +use super::union_all::UnionAll; use crate::sql::optimizer::PhysicalProperty; use crate::sql::optimizer::RelExpr; use crate::sql::optimizer::RelationalProperty; @@ -83,7 +83,7 @@ pub enum RelOp { Sort, Limit, Exchange, - Union, + UnionAll, // Pattern Pattern, @@ -105,7 +105,7 @@ pub enum RelOperator { Sort(Sort), Limit(Limit), Exchange(Exchange), - Union(Union), + UnionAll(UnionAll), Pattern(PatternPlan), } @@ -125,7 +125,7 @@ impl Operator for RelOperator { RelOperator::Limit(rel_op) => rel_op.rel_op(), RelOperator::Pattern(rel_op) => rel_op.rel_op(), RelOperator::Exchange(rel_op) => rel_op.rel_op(), - RelOperator::Union(rel_op) => rel_op.rel_op(), + RelOperator::UnionAll(rel_op) => rel_op.rel_op(), } } @@ -143,7 +143,7 @@ impl Operator for RelOperator { RelOperator::Limit(rel_op) => rel_op.is_physical(), RelOperator::Pattern(rel_op) => rel_op.is_physical(), RelOperator::Exchange(rel_op) => rel_op.is_physical(), - RelOperator::Union(rel_op) => rel_op.is_physical(), + RelOperator::UnionAll(rel_op) => rel_op.is_physical(), } } @@ -161,7 +161,7 @@ impl Operator for RelOperator { RelOperator::Limit(rel_op) => rel_op.is_logical(), RelOperator::Pattern(rel_op) => rel_op.is_logical(), RelOperator::Exchange(rel_op) => rel_op.is_logical(), - RelOperator::Union(rel_op) => rel_op.is_logical(), + RelOperator::UnionAll(rel_op) => rel_op.is_logical(), } } @@ -179,7 +179,7 @@ impl Operator for RelOperator { RelOperator::Limit(rel_op) => rel_op.as_logical(), RelOperator::Pattern(rel_op) => rel_op.as_logical(), RelOperator::Exchange(rel_op) => rel_op.as_logical(), - RelOperator::Union(rel_op) => rel_op.as_logical(), + RelOperator::UnionAll(rel_op) => rel_op.as_logical(), } } @@ -197,7 +197,7 @@ impl Operator for RelOperator { RelOperator::Limit(rel_op) => rel_op.as_physical(), RelOperator::Pattern(rel_op) => rel_op.as_physical(), RelOperator::Exchange(rel_op) => rel_op.as_physical(), - RelOperator::Union(rel_op) => rel_op.as_physical(), + RelOperator::UnionAll(rel_op) => rel_op.as_physical(), } } } @@ -431,20 +431,20 @@ impl TryFrom for Exchange { } } -impl From for RelOperator { - fn from(v: Union) -> Self { - Self::Union(v) +impl From for RelOperator { + fn from(v: UnionAll) -> Self { + Self::UnionAll(v) } } -impl TryFrom for Union { +impl TryFrom for UnionAll { type Error = ErrorCode; fn try_from(value: RelOperator) -> Result { - if let RelOperator::Union(value) = value { + if let RelOperator::UnionAll(value) = value { Ok(value) } else { Err(ErrorCode::LogicalError( - "Cannot downcast RelOperator to Union", + "Cannot downcast RelOperator to UnionAll", )) } } diff --git a/src/query/service/src/sql/planner/plans/union.rs b/src/query/service/src/sql/planner/plans/union_all.rs similarity index 94% rename from src/query/service/src/sql/planner/plans/union.rs rename to src/query/service/src/sql/planner/plans/union_all.rs index f4198748315d2..ec45bcafc49e4 100644 --- a/src/query/service/src/sql/planner/plans/union.rs +++ b/src/query/service/src/sql/planner/plans/union_all.rs @@ -25,11 +25,11 @@ use crate::sql::plans::PhysicalOperator; use crate::sql::plans::RelOp; #[derive(Clone, Debug)] -pub struct Union; +pub struct UnionAll; -impl Operator for Union { +impl Operator for UnionAll { fn rel_op(&self) -> RelOp { - RelOp::Union + RelOp::UnionAll } fn is_physical(&self) -> bool { @@ -49,7 +49,7 @@ impl Operator for Union { } } -impl LogicalOperator for Union { +impl LogicalOperator for UnionAll { fn derive_relational_prop<'a>(&self, rel_expr: &RelExpr<'a>) -> Result { let left_prop = rel_expr.derive_relational_prop_child(0)?; let right_prop = rel_expr.derive_relational_prop_child(1)?; @@ -75,7 +75,7 @@ impl LogicalOperator for Union { } } -impl PhysicalOperator for Union { +impl PhysicalOperator for UnionAll { fn derive_physical_prop<'a>(&self, _rel_expr: &RelExpr<'a>) -> Result { Ok(PhysicalProperty { distribution: Distribution::Serial,