Skip to content

Commit 286fb36

Browse files
committed
fix output schema
1 parent e586a43 commit 286fb36

File tree

8 files changed

+72
-72
lines changed

8 files changed

+72
-72
lines changed

src/query/service/src/interpreters/access/privilege_access.rs

+17-22
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use databend_common_sql::plans::OptimizeCompactBlock;
4040
use databend_common_sql::plans::PresignAction;
4141
use databend_common_sql::plans::Recluster;
4242
use databend_common_sql::plans::RewriteKind;
43+
use databend_common_sql::BindContext;
4344
use databend_common_sql::Planner;
4445
use databend_common_users::RoleCacheManager;
4546
use databend_common_users::UserApiProvider;
@@ -1002,18 +1003,22 @@ impl AccessChecker for PrivilegeAccess {
10021003
self.validate_access(&GrantObject::Global, UserPrivilegeType::Super, false, false)
10031004
.await?;
10041005
}
1005-
// Others.
1006-
// Plan::Insert(plan) => {
1007-
// let target_table_privileges = if plan.overwrite {
1008-
// vec![UserPrivilegeType::Insert, UserPrivilegeType::Delete]
1009-
// } else {
1010-
// vec![UserPrivilegeType::Insert]
1011-
// };
1012-
// for privilege in target_table_privileges {
1013-
// self.validate_table_access(&plan.catalog, &plan.database, &plan.table, privilege, false, false).await?;
1014-
// }
1015-
// self.validate_insert_source(ctx, &plan.source).await?;
1016-
// }
1006+
Plan::Append { s_expr, target_table_index,metadata,overwrite,.. } => {
1007+
let target_table_privileges = if *overwrite {
1008+
vec![UserPrivilegeType::Insert, UserPrivilegeType::Delete]
1009+
} else {
1010+
vec![UserPrivilegeType::Insert]
1011+
};
1012+
let (catalog, database, table) = {
1013+
let metadata_guard = metadata.read();
1014+
let table_entry = metadata_guard.table(*target_table_index);
1015+
(table_entry.catalog().to_string(), table_entry.database().to_string(), table_entry.name().to_string())
1016+
};
1017+
for privilege in target_table_privileges {
1018+
self.validate_table_access(&catalog, &database, &table, privilege, false, false).await?;
1019+
}
1020+
self.check(ctx, &Plan::Query { s_expr:s_expr.clone(), metadata: metadata.clone(), bind_context: Box::new(BindContext::new()), rewrite_kind: None, formatted_ast: None, ignore_result: false }).await?;
1021+
}
10171022
Plan::InsertMultiTable(plan) => {
10181023
let target_table_privileges = if plan.overwrite {
10191024
vec![UserPrivilegeType::Insert, UserPrivilegeType::Delete]
@@ -1164,16 +1169,6 @@ impl AccessChecker for PrivilegeAccess {
11641169
self.validate_access(&GrantObject::Global, UserPrivilegeType::Alter, false, false)
11651170
.await?;
11661171
}
1167-
Plan::Append { .. } => {
1168-
// match &plan.source{
1169-
1170-
// }
1171-
// self.validate_stage_access(&plan.stage_table_info.stage_info, UserPrivilegeType::Read).await?;
1172-
// self.validate_table_access(&plan.catalog_name, &plan.database_name, &plan.table_name, UserPrivilegeType::Insert, false, false).await?;
1173-
// if let Some(query) = &plan.query {
1174-
// self.check(ctx, query).await?;
1175-
// }
1176-
}
11771172
Plan::CopyIntoLocation(plan) => {
11781173
self.validate_stage_access(&plan.stage, UserPrivilegeType::Write).await?;
11791174
let from = plan.from.clone();

src/query/service/src/interpreters/interpreter_append.rs

+13-39
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::sync::Arc;
1717
use databend_common_catalog::lock::LockTableOption;
1818
use databend_common_catalog::plan::StageTableInfo;
1919
use databend_common_catalog::table::TableExt;
20-
use databend_common_exception::ErrorCode;
2120
use databend_common_exception::Result;
2221
use databend_common_expression::types::Int32Type;
2322
use databend_common_expression::types::StringType;
@@ -28,8 +27,7 @@ use databend_common_sql::executor::physical_plans::MutationKind;
2827
use databend_common_sql::executor::PhysicalPlanBuilder;
2928
use databend_common_sql::optimizer::SExpr;
3029
use databend_common_sql::plans::AppendType;
31-
use databend_common_sql::plans::RelOperator;
32-
use log::debug;
30+
use databend_common_sql::IndexType;
3331
use log::info;
3432

3533
use crate::interpreters::common::check_deduplicate_label;
@@ -41,17 +39,18 @@ use crate::pipelines::PipelineBuilder;
4139
use crate::schedulers::build_query_pipeline_without_render_result_set;
4240
use crate::sessions::QueryContext;
4341
use crate::sessions::TableContext;
44-
use crate::sql::plans::Append;
4542
use crate::sql::MetadataRef;
4643
use crate::stream::DataBlockStream;
4744

4845
pub struct AppendInterpreter {
4946
ctx: Arc<QueryContext>,
5047
s_expr: SExpr,
5148
metadata: MetadataRef,
49+
target_table_index: IndexType,
5250
stage_table_info: Option<Box<StageTableInfo>>,
5351
overwrite: bool,
54-
col_type_modified: bool,
52+
forbid_occ_retry: bool,
53+
append_type: AppendType,
5554
}
5655

5756
#[async_trait::async_trait]
@@ -67,44 +66,20 @@ impl Interpreter for AppendInterpreter {
6766
#[fastrace::trace]
6867
#[async_backtrace::framed]
6968
async fn execute2(&self) -> Result<PipelineBuildResult> {
70-
debug!("ctx.id" = self.ctx.get_id().as_str(); "append_interpreter_execute");
7169
if check_deduplicate_label(self.ctx.clone()).await? {
7270
return Ok(PipelineBuildResult::create());
7371
}
74-
75-
let append: Append = match &self.s_expr.plan() {
76-
RelOperator::Append(append) => append.clone(),
77-
RelOperator::Exchange(_) => self.s_expr.child(0).unwrap().plan().clone().try_into()?,
78-
plan => {
79-
return Err(ErrorCode::Internal(format!(
80-
"AppendInterpreter: unexpected plan type: {:?}",
81-
plan
82-
)));
83-
}
84-
};
8572
let (target_table, catalog, database, table) = {
8673
let metadata = self.metadata.read();
87-
let t = metadata.table(append.table_index);
74+
let t = metadata.table(self.target_table_index);
8875
(
8976
t.table(),
9077
t.catalog().to_string(),
9178
t.database().to_string(),
9279
t.name().to_string(),
9380
)
9481
};
95-
9682
target_table.check_mutable()?;
97-
if append
98-
.project_columns
99-
.as_ref()
100-
.is_some_and(|p| p.len() != append.required_source_schema.num_fields())
101-
{
102-
return Err(ErrorCode::BadArguments(format!(
103-
"Fields in select statement is not equal with expected, select fields: {}, insert fields: {}",
104-
append.project_columns.as_ref().unwrap().len(),
105-
append.required_source_schema.num_fields(),
106-
)));
107-
}
10883

10984
// 1. build source and append pipeline
11085
let mut build_res = {
@@ -137,7 +112,7 @@ impl Interpreter for AppendInterpreter {
137112
copied_files_meta_req,
138113
update_stream_meta,
139114
self.overwrite,
140-
self.col_type_modified,
115+
self.forbid_occ_retry,
141116
unsafe { self.ctx.get_settings().get_deduplicate_label()? },
142117
)?;
143118

@@ -184,12 +159,7 @@ impl Interpreter for AppendInterpreter {
184159
}
185160

186161
fn inject_result(&self) -> Result<SendableDataBlockStream> {
187-
let append: Append = match &self.s_expr.plan() {
188-
RelOperator::Append(append) => append.clone(),
189-
RelOperator::Exchange(_) => self.s_expr.child(0).unwrap().plan().clone().try_into()?,
190-
_ => unreachable!(),
191-
};
192-
match &append.append_type {
162+
match &self.append_type {
193163
AppendType::CopyInto => {
194164
let blocks = self.get_copy_into_table_result()?;
195165
Ok(Box::pin(DataBlockStream::create(None, blocks)))
@@ -206,15 +176,19 @@ impl AppendInterpreter {
206176
metadata: MetadataRef,
207177
stage_table_info: Option<Box<StageTableInfo>>,
208178
overwrite: bool,
209-
col_type_modified: bool,
179+
forbid_occ_retry: bool,
180+
append_type: AppendType,
181+
table_index: IndexType,
210182
) -> Result<Self> {
211183
Ok(AppendInterpreter {
212184
ctx,
213185
s_expr,
214186
metadata,
215187
stage_table_info,
216188
overwrite,
217-
col_type_modified,
189+
forbid_occ_retry,
190+
append_type,
191+
target_table_index: table_index,
218192
})
219193
}
220194

src/query/service/src/interpreters/interpreter_factory.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -161,14 +161,18 @@ impl InterpreterFactory {
161161
metadata,
162162
stage_table_info,
163163
overwrite,
164-
forbid_occ_retry: col_type_modified,
164+
forbid_occ_retry,
165+
append_type,
166+
target_table_index,
165167
} => Ok(Arc::new(AppendInterpreter::try_create(
166168
ctx,
167169
*s_expr.clone(),
168170
metadata.clone(),
169171
stage_table_info.clone(),
170172
*overwrite,
171-
*col_type_modified,
173+
*forbid_occ_retry,
174+
append_type.clone(),
175+
*target_table_index,
172176
)?)),
173177
Plan::CopyIntoLocation(copy_plan) => Ok(Arc::new(
174178
CopyIntoLocationInterpreter::try_create(ctx, copy_plan.clone())?,

src/query/sql/src/planner/binder/copy_into_table.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ impl<'a> Binder {
107107
bind_context,
108108
copy_into_table_plan,
109109
stage_table_info,
110+
AppendType::CopyInto,
110111
)
111112
.await
112113
}
@@ -255,7 +256,6 @@ impl<'a> Binder {
255256
required_source_schema: required_values_schema.clone(),
256257
required_values_schema: required_values_schema.clone(),
257258
project_columns: None,
258-
append_type: AppendType::CopyInto,
259259
};
260260
Ok((copy_into_plan, stage_table_info))
261261
}
@@ -267,6 +267,7 @@ impl<'a> Binder {
267267
bind_ctx: &BindContext,
268268
mut copy_into_table_plan: Append,
269269
stage_table_info: StageTableInfo,
270+
append_type: AppendType,
270271
) -> Result<Plan> {
271272
let use_query = matches!(&stage_table_info.stage_info.file_format_params,
272273
FileFormatParams::Parquet(fmt) if fmt.missing_field_as == NullAs::Error);
@@ -343,10 +344,12 @@ impl<'a> Binder {
343344
SExpr::create_unary(Arc::new(copy_into_table_plan.into()), Arc::new(scan));
344345
Ok(Plan::Append {
345346
s_expr: Box::new(copy_into),
347+
target_table_index: table_index,
346348
metadata: self.metadata.clone(),
347349
stage_table_info: Some(Box::new(stage_table_info)),
348350
overwrite: false,
349351
forbid_occ_retry: false,
352+
append_type,
350353
})
351354
}
352355
}
@@ -467,14 +470,14 @@ impl<'a> Binder {
467470
required_values_schema,
468471
values_consts: const_columns,
469472
required_source_schema: data_schema.clone(),
470-
append_type: AppendType::Insert,
471473
project_columns: None,
472474
};
473475

474476
self.bind_copy_into_table_from_location(
475477
bind_context,
476478
copy_into_table_plan,
477479
stage_table_info,
480+
AppendType::Insert,
478481
)
479482
.await
480483
}
@@ -564,15 +567,18 @@ impl<'a> Binder {
564567
}
565568
}
566569

570+
let target_table_index = copy_into_table_plan.table_index;
567571
let copy_into =
568572
SExpr::create_unary(Arc::new(copy_into_table_plan.into()), Arc::new(s_expr));
569573

570574
Ok(Plan::Append {
571575
s_expr: Box::new(copy_into),
576+
target_table_index,
572577
metadata: self.metadata.clone(),
573578
stage_table_info: Some(Box::new(stage_table_info)),
574579
overwrite: false,
575580
forbid_occ_retry: false,
581+
append_type: AppendType::CopyInto,
576582
})
577583
}
578584

src/query/sql/src/planner/binder/insert.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@ impl Binder {
182182
required_values_schema: schema.clone(),
183183
values_consts: vec![],
184184
required_source_schema: schema,
185-
append_type: AppendType::Insert,
186185
project_columns,
187186
};
188187

@@ -195,6 +194,8 @@ impl Binder {
195194
stage_table_info: None,
196195
overwrite: *overwrite,
197196
forbid_occ_retry: false,
197+
append_type: AppendType::Insert,
198+
target_table_index: table_index,
198199
})
199200
}
200201
}

src/query/sql/src/planner/optimizer/optimizer.rs

+4
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,8 @@ pub async fn optimize(mut opt_ctx: OptimizerContext, plan: Plan) -> Result<Plan>
294294
stage_table_info,
295295
overwrite,
296296
forbid_occ_retry,
297+
append_type,
298+
target_table_index,
297299
} => {
298300
let append: Append = s_expr.plan().clone().try_into()?;
299301
let source = s_expr.child(0)?.clone();
@@ -310,6 +312,8 @@ pub async fn optimize(mut opt_ctx: OptimizerContext, plan: Plan) -> Result<Plan>
310312
stage_table_info,
311313
overwrite,
312314
forbid_occ_retry,
315+
append_type,
316+
target_table_index,
313317
})
314318
}
315319
Plan::DataMutation { s_expr, .. } => optimize_mutation(opt_ctx, *s_expr).await,

src/query/sql/src/planner/plans/append.rs

+16-5
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,9 @@ pub struct Append {
5757
pub values_consts: Vec<Scalar>,
5858
pub required_source_schema: DataSchemaRef,
5959
pub project_columns: Option<Vec<ColumnBinding>>,
60-
pub append_type: AppendType,
6160
}
6261

63-
#[derive(Clone, PartialEq, Eq)]
62+
#[derive(Clone, Debug)]
6463
pub enum AppendType {
6564
Insert,
6665
CopyInto,
@@ -107,7 +106,6 @@ pub async fn create_append_plan_from_subquery(
107106
values_consts: vec![],
108107
required_source_schema: target_schema,
109108
project_columns,
110-
append_type: AppendType::Insert,
111109
};
112110

113111
let optimized_append = optimize_append(insert_plan, source, metadata.clone(), ctx.as_ref())?;
@@ -118,6 +116,8 @@ pub async fn create_append_plan_from_subquery(
118116
stage_table_info: None,
119117
overwrite,
120118
forbid_occ_retry,
119+
append_type: AppendType::Insert,
120+
target_table_index: table_index,
121121
})
122122
}
123123

@@ -283,8 +283,8 @@ impl Append {
283283
])
284284
}
285285

286-
pub fn schema(&self) -> DataSchemaRef {
287-
match self.append_type {
286+
pub fn schema(append_type: &AppendType) -> DataSchemaRef {
287+
match append_type {
288288
AppendType::CopyInto => Self::copy_into_table_schema(),
289289
AppendType::Insert => Arc::new(DataSchema::empty()),
290290
}
@@ -303,6 +303,17 @@ impl PhysicalPlanBuilder {
303303
s_expr: &SExpr,
304304
plan: &crate::plans::Append,
305305
) -> Result<PhysicalPlan> {
306+
if plan
307+
.project_columns
308+
.as_ref()
309+
.is_some_and(|p| p.len() != plan.required_source_schema.num_fields())
310+
{
311+
return Err(ErrorCode::BadArguments(format!(
312+
"Fields in select statement is not equal with expected, select fields: {}, insert fields: {}",
313+
plan.project_columns.as_ref().unwrap().len(),
314+
plan.required_source_schema.num_fields(),
315+
)));
316+
}
306317
let target_table = self.metadata.read().table(plan.table_index).table();
307318

308319
let column_set = plan

0 commit comments

Comments
 (0)