Skip to content

Commit 8b3cd7b

Browse files
alambblaginin
andauthored
Do not swap with projection when file is partitioned (#14956) (#14964)
* Do not swap with projection when file is partitioned * Narrow the case when not swapping * Add test Co-authored-by: Dmitrii Blaginin <[email protected]>
1 parent 0867086 commit 8b3cd7b

File tree

2 files changed

+78
-16
lines changed

2 files changed

+78
-16
lines changed

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

+51
Original file line numberDiff line numberDiff line change
@@ -1382,3 +1382,54 @@ fn test_union_after_projection() -> Result<()> {
13821382

13831383
Ok(())
13841384
}
1385+
1386+
#[test]
1387+
fn test_partition_col_projection_pushdown() -> Result<()> {
1388+
let file_schema = Arc::new(Schema::new(vec![
1389+
Field::new("int_col", DataType::Int32, true),
1390+
Field::new("string_col", DataType::Utf8, true),
1391+
]));
1392+
1393+
let partitioned_schema = Arc::new(Schema::new(vec![
1394+
Field::new("int_col", DataType::Int32, true),
1395+
Field::new("string_col", DataType::Utf8, true),
1396+
Field::new("partition_col", DataType::Utf8, true),
1397+
]));
1398+
1399+
let source = FileScanConfig::new(
1400+
ObjectStoreUrl::parse("test:///").unwrap(),
1401+
file_schema.clone(),
1402+
Arc::new(CsvSource::default()),
1403+
)
1404+
.with_file(PartitionedFile::new("x".to_string(), 100))
1405+
.with_table_partition_cols(vec![Field::new("partition_col", DataType::Utf8, true)])
1406+
.with_projection(Some(vec![0, 1, 2]))
1407+
.build();
1408+
1409+
let projection = Arc::new(ProjectionExec::try_new(
1410+
vec![
1411+
(
1412+
col("string_col", partitioned_schema.as_ref())?,
1413+
"string_col".to_string(),
1414+
),
1415+
(
1416+
col("partition_col", partitioned_schema.as_ref())?,
1417+
"partition_col".to_string(),
1418+
),
1419+
(
1420+
col("int_col", partitioned_schema.as_ref())?,
1421+
"int_col".to_string(),
1422+
),
1423+
],
1424+
source,
1425+
)?);
1426+
1427+
let after_optimize =
1428+
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
1429+
1430+
let expected = ["ProjectionExec: expr=[string_col@1 as string_col, partition_col@2 as partition_col, int_col@0 as int_col]",
1431+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col, string_col, partition_col], file_type=csv, has_header=false"];
1432+
assert_eq!(get_plan_string(&after_optimize), expected);
1433+
1434+
Ok(())
1435+
}

datafusion/datasource/src/file_scan_config.rs

+27-16
Original file line numberDiff line numberDiff line change
@@ -266,22 +266,33 @@ impl DataSource for FileScanConfig {
266266
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
267267
// If there is any non-column or alias-carrier expression, Projection should not be removed.
268268
// This process can be moved into CsvExec, but it would be an overlap of their responsibility.
269-
Ok(all_alias_free_columns(projection.expr()).then(|| {
270-
let file_scan = self.clone();
271-
let source = Arc::clone(&file_scan.file_source);
272-
let new_projections = new_projections_for_columns(
273-
projection,
274-
&file_scan
275-
.projection
276-
.clone()
277-
.unwrap_or((0..self.file_schema.fields().len()).collect()),
278-
);
279-
file_scan
280-
// Assign projected statistics to source
281-
.with_projection(Some(new_projections))
282-
.with_source(source)
283-
.build() as _
284-
}))
269+
270+
let partitioned_columns_in_proj = projection.expr().iter().any(|(expr, _)| {
271+
expr.as_any()
272+
.downcast_ref::<Column>()
273+
.map(|expr| expr.index() >= self.file_schema.fields().len())
274+
.unwrap_or(false)
275+
});
276+
277+
Ok(
278+
(all_alias_free_columns(projection.expr()) && !partitioned_columns_in_proj)
279+
.then(|| {
280+
let file_scan = self.clone();
281+
let source = Arc::clone(&file_scan.file_source);
282+
let new_projections = new_projections_for_columns(
283+
projection,
284+
&file_scan
285+
.projection
286+
.clone()
287+
.unwrap_or((0..self.file_schema.fields().len()).collect()),
288+
);
289+
file_scan
290+
// Assign projected statistics to source
291+
.with_projection(Some(new_projections))
292+
.with_source(source)
293+
.build() as _
294+
}),
295+
)
285296
}
286297
}
287298

0 commit comments

Comments
 (0)