Skip to content

Commit 04a119e

Browse files
refactor: unify copy pipeline. (#11569)
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent 529d184 commit 04a119e

File tree

17 files changed

+855
-1108
lines changed

17 files changed

+855
-1108
lines changed

src/query/catalog/src/plan/datasource/datasource_info/stage.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ use std::fmt::Debug;
1616
use std::fmt::Formatter;
1717
use std::sync::Arc;
1818

19+
use common_exception::Result;
1920
use common_expression::TableSchema;
2021
use common_expression::TableSchemaRef;
2122
use common_meta_app::principal::StageInfo;
23+
use common_storage::init_stage_operator;
2224
use common_storage::StageFileInfo;
2325
use common_storage::StageFilesInfo;
2426

@@ -38,6 +40,18 @@ impl StageTableInfo {
3840
pub fn desc(&self) -> String {
3941
self.stage_info.stage_name.clone()
4042
}
43+
44+
#[async_backtrace::framed]
45+
pub async fn list_files(&self, max_files: Option<usize>) -> Result<Vec<StageFileInfo>> {
46+
let op = init_stage_operator(&self.stage_info)?;
47+
let infos = self
48+
.files_info
49+
.list(&op, false, max_files)
50+
.await?
51+
.into_iter()
52+
.collect::<Vec<_>>();
53+
Ok(infos)
54+
}
4155
}
4256

4357
impl Debug for StageTableInfo {

src/query/catalog/src/table_context.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ pub struct StageAttachment {
6969
pub location: String,
7070
pub file_format_options: Option<BTreeMap<String, String>>,
7171
pub copy_options: Option<BTreeMap<String, String>>,
72-
pub values_str: String,
7372
}
7473

7574
#[async_trait::async_trait]

src/query/service/src/interpreters/access/privilege_access.rs

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -413,35 +413,13 @@ impl AccessChecker for PrivilegeAccess {
413413
.await?;
414414
}
415415
Plan::Copy(plan) => match plan.as_ref() {
416-
CopyPlan::IntoTable {
417-
catalog_name,
418-
database_name,
419-
table_name,
420-
..
421-
} => {
416+
CopyPlan::IntoTable(plan) => {
422417
session
423418
.validate_privilege(
424419
&GrantObject::Table(
425-
catalog_name.to_string(),
426-
database_name.to_string(),
427-
table_name.to_string(),
428-
),
429-
vec![UserPrivilegeType::Insert],
430-
)
431-
.await?;
432-
}
433-
CopyPlan::IntoTableWithTransform {
434-
catalog_name,
435-
database_name,
436-
table_name,
437-
..
438-
} => {
439-
session
440-
.validate_privilege(
441-
&GrantObject::Table(
442-
catalog_name.to_string(),
443-
database_name.to_string(),
444-
table_name.to_string(),
420+
plan.catalog_name.to_string(),
421+
plan.database_name.to_string(),
422+
plan.table_name.to_string(),
445423
),
446424
vec![UserPrivilegeType::Insert],
447425
)

src/query/service/src/interpreters/common/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,5 @@ mod grant;
1616
mod stage;
1717
mod table;
1818
pub use grant::validate_grant_object_exists;
19-
pub use stage::exprs_to_scalar;
20-
pub use stage::fill_default_value;
21-
pub use stage::prepared_values;
2219
pub use stage::try_purge_files;
2320
pub use table::append2table;

src/query/service/src/interpreters/common/stage.rs

Lines changed: 0 additions & 217 deletions
Original file line numberDiff line numberDiff line change
@@ -14,93 +14,15 @@
1414

1515
use std::sync::Arc;
1616

17-
use common_ast::ast::Expr as AExpr;
18-
use common_ast::parser::parse_expr;
19-
use common_ast::parser::parser_values_with_placeholder;
20-
use common_ast::parser::tokenize_sql;
21-
use common_ast::Dialect;
22-
use common_catalog::table_context::StageAttachment;
2317
use common_catalog::table_context::TableContext;
24-
use common_exception::ErrorCode;
25-
use common_exception::Result;
26-
use common_expression::types::DataType;
27-
use common_expression::types::NumberDataType;
28-
use common_expression::types::NumberScalar;
29-
use common_expression::BlockEntry;
30-
use common_expression::DataBlock;
31-
use common_expression::DataField;
32-
use common_expression::DataSchema;
33-
use common_expression::DataSchemaRef;
34-
use common_expression::Expr;
35-
use common_expression::Scalar;
36-
use common_expression::Value;
3718
use common_meta_app::principal::StageInfo;
38-
use common_pipeline_transforms::processors::transforms::Transform;
39-
use common_sql::binder::wrap_cast;
40-
use common_sql::evaluator::BlockOperator;
41-
use common_sql::evaluator::CompoundBlockOperator;
42-
use common_sql::plans::FunctionCall;
43-
use common_sql::BindContext;
44-
use common_sql::Metadata;
45-
use common_sql::MetadataRef;
46-
use common_sql::NameResolutionContext;
47-
use common_sql::ScalarBinder;
48-
use common_sql::ScalarExpr;
4919
use common_storage::StageFileInfo;
5020
use common_storages_fuse::io::Files;
5121
use common_storages_stage::StageTable;
52-
use parking_lot::RwLock;
5322
use tracing::error;
5423

5524
use crate::sessions::QueryContext;
5625

57-
#[async_backtrace::framed]
58-
pub async fn prepared_values(
59-
ctx: &Arc<QueryContext>,
60-
source_schema: &DataSchemaRef,
61-
attachment: &Arc<StageAttachment>,
62-
) -> Result<(DataSchemaRef, Vec<Scalar>)> {
63-
let settings = ctx.get_settings();
64-
let sql_dialect = settings.get_sql_dialect()?;
65-
let tokens = tokenize_sql(attachment.values_str.as_str())?;
66-
let expr_or_placeholders = parser_values_with_placeholder(&tokens, sql_dialect)?;
67-
68-
if source_schema.num_fields() != expr_or_placeholders.len() {
69-
return Err(ErrorCode::SemanticError(format!(
70-
"need {} fields in values, got only {}",
71-
source_schema.num_fields(),
72-
expr_or_placeholders.len()
73-
)));
74-
}
75-
76-
let mut attachment_fields = vec![];
77-
let mut const_fields = vec![];
78-
let mut exprs = vec![];
79-
for (i, eo) in expr_or_placeholders.into_iter().enumerate() {
80-
match eo {
81-
Some(e) => {
82-
exprs.push(e);
83-
const_fields.push(source_schema.fields()[i].clone());
84-
}
85-
None => attachment_fields.push(source_schema.fields()[i].clone()),
86-
}
87-
}
88-
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
89-
let mut bind_context = BindContext::new();
90-
let metadata = Arc::new(RwLock::new(Metadata::default()));
91-
let const_schema = Arc::new(DataSchema::new(const_fields));
92-
let const_values = exprs_to_scalar(
93-
exprs,
94-
&const_schema,
95-
ctx.clone(),
96-
&name_resolution_ctx,
97-
&mut bind_context,
98-
metadata,
99-
)
100-
.await?;
101-
Ok((Arc::new(DataSchema::new(attachment_fields)), const_values))
102-
}
103-
10426
#[async_backtrace::framed]
10527
pub async fn try_purge_files(
10628
ctx: Arc<QueryContext>,
@@ -125,142 +47,3 @@ pub async fn try_purge_files(
12547
}
12648
}
12749
}
128-
129-
pub async fn exprs_to_scalar(
130-
exprs: Vec<AExpr>,
131-
schema: &DataSchemaRef,
132-
ctx: Arc<dyn TableContext>,
133-
name_resolution_ctx: &NameResolutionContext,
134-
bind_context: &mut BindContext,
135-
metadata: MetadataRef,
136-
) -> Result<Vec<Scalar>> {
137-
let schema_fields_len = schema.fields().len();
138-
if exprs.len() != schema_fields_len {
139-
return Err(ErrorCode::TableSchemaMismatch(format!(
140-
"Table columns count is not match, expect {schema_fields_len}, input: {}, expr: {:?}",
141-
exprs.len(),
142-
exprs
143-
)));
144-
}
145-
let mut scalar_binder = ScalarBinder::new(
146-
bind_context,
147-
ctx.clone(),
148-
name_resolution_ctx,
149-
metadata.clone(),
150-
&[],
151-
);
152-
153-
let mut map_exprs = Vec::with_capacity(exprs.len());
154-
for (i, expr) in exprs.iter().enumerate() {
155-
// `DEFAULT` in insert values will be parsed as `Expr::ColumnRef`.
156-
if let AExpr::ColumnRef { column, .. } = expr {
157-
if column.name.eq_ignore_ascii_case("default") {
158-
let field = schema.field(i);
159-
fill_default_value(&mut scalar_binder, &mut map_exprs, field, schema).await?;
160-
continue;
161-
}
162-
}
163-
164-
let (mut scalar, data_type) = scalar_binder.bind(expr).await?;
165-
let field_data_type = schema.field(i).data_type();
166-
scalar = if field_data_type.remove_nullable() == DataType::Variant {
167-
match data_type.remove_nullable() {
168-
DataType::Boolean
169-
| DataType::Number(_)
170-
| DataType::Decimal(_)
171-
| DataType::Timestamp
172-
| DataType::Date
173-
| DataType::Bitmap
174-
| DataType::Variant => wrap_cast(&scalar, field_data_type),
175-
DataType::String => {
176-
// parse string to JSON value
177-
ScalarExpr::FunctionCall(FunctionCall {
178-
span: None,
179-
func_name: "parse_json".to_string(),
180-
params: vec![],
181-
arguments: vec![scalar],
182-
})
183-
}
184-
_ => {
185-
if data_type == DataType::Null && field_data_type.is_nullable() {
186-
scalar
187-
} else {
188-
return Err(ErrorCode::BadBytes(format!(
189-
"unable to cast type `{}` to type `{}`",
190-
data_type, field_data_type
191-
)));
192-
}
193-
}
194-
}
195-
} else {
196-
wrap_cast(&scalar, field_data_type)
197-
};
198-
let expr = scalar
199-
.as_expr()?
200-
.project_column_ref(|col| schema.index_of(&col.index.to_string()).unwrap());
201-
map_exprs.push(expr);
202-
}
203-
204-
let mut operators = Vec::with_capacity(schema_fields_len);
205-
operators.push(BlockOperator::Map { exprs: map_exprs });
206-
207-
let one_row_chunk = DataBlock::new(
208-
vec![BlockEntry {
209-
data_type: DataType::Number(NumberDataType::UInt8),
210-
value: Value::Scalar(Scalar::Number(NumberScalar::UInt8(1))),
211-
}],
212-
1,
213-
);
214-
let func_ctx = ctx.get_function_context()?;
215-
let mut expression_transform = CompoundBlockOperator {
216-
operators,
217-
ctx: func_ctx,
218-
};
219-
let res = expression_transform.transform(one_row_chunk)?;
220-
let scalars: Vec<Scalar> = res
221-
.columns()
222-
.iter()
223-
.skip(1)
224-
.map(|col| unsafe { col.value.as_ref().index_unchecked(0).to_owned() })
225-
.collect();
226-
Ok(scalars)
227-
}
228-
229-
pub async fn fill_default_value(
230-
binder: &mut ScalarBinder<'_>,
231-
map_exprs: &mut Vec<Expr>,
232-
field: &DataField,
233-
schema: &DataSchema,
234-
) -> Result<()> {
235-
if let Some(default_expr) = field.default_expr() {
236-
let tokens = tokenize_sql(default_expr)?;
237-
let ast = parse_expr(&tokens, Dialect::PostgreSQL)?;
238-
let (mut scalar, _) = binder.bind(&ast).await?;
239-
scalar = wrap_cast(&scalar, field.data_type());
240-
241-
let expr = scalar
242-
.as_expr()?
243-
.project_column_ref(|col| schema.index_of(&col.index.to_string()).unwrap());
244-
map_exprs.push(expr);
245-
} else {
246-
// If field data type is nullable, then we'll fill it with null.
247-
if field.data_type().is_nullable() {
248-
let expr = Expr::Constant {
249-
span: None,
250-
scalar: Scalar::Null,
251-
data_type: field.data_type().clone(),
252-
};
253-
map_exprs.push(expr);
254-
} else {
255-
let data_type = field.data_type().clone();
256-
let default_value = Scalar::default_value(&data_type);
257-
let expr = Expr::Constant {
258-
span: None,
259-
scalar: default_value,
260-
data_type,
261-
};
262-
map_exprs.push(expr);
263-
}
264-
}
265-
Ok(())
266-
}

0 commit comments

Comments
 (0)