Skip to content

TableScan filters to PyArrow DNF format #1130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dask_planner/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub enum DaskPlannerError {
ParserError(ParserError),
TokenizerError(TokenizerError),
Internal(String),
InvalidIOFilter(String),
}

impl Display for DaskPlannerError {
Expand All @@ -21,6 +22,7 @@ impl Display for DaskPlannerError {
Self::ParserError(e) => write!(f, "SQL Parser Error: {e}"),
Self::TokenizerError(e) => write!(f, "SQL Tokenizer Error: {e}"),
Self::Internal(e) => write!(f, "Internal Error: {e}"),
Self::InvalidIOFilter(e) => write!(f, "Invalid pyarrow filter: {e} encountered. Defaulting to Dask CPU/GPU bound task operation"),
}
}
}
Expand Down
118 changes: 117 additions & 1 deletion dask_planner/src/sql/logical/table_scan.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::sync::Arc;

use datafusion_common::DFSchema;
use datafusion_expr::{logical_plan::TableScan, LogicalPlan};
use datafusion_expr::{logical_plan::TableScan, Expr, LogicalPlan};
use pyo3::prelude::*;

use crate::{
error::DaskPlannerError,
expression::{py_expr_list, PyExpr},
sql::exceptions::py_type_err,
};
Expand All @@ -16,6 +17,73 @@ pub struct PyTableScan {
input: Arc<LogicalPlan>,
}

#[pyclass(name = "FilteredResult", module = "dask_planner", subclass)]
#[derive(Debug, Clone)]
pub struct PyFilteredResult {
// Certain Expr(s) do not have supporting logic in pyarrow for IO filtering
// at read time. Those Expr(s) cannot be ignored however. This field stores
// those Expr(s) so that they can be used on the Python side to create
// Dask operations that handle that filtering as an extra task in the graph.
#[pyo3(get)]
pub io_unfilterable_exprs: Vec<PyExpr>,
// Expr(s) that can have their filtering logic performed in the pyarrow IO logic
// are stored here in a DNF format that is expected by pyarrow.
#[pyo3(get)]
pub filtered_exprs: Vec<(String, String, Vec<String>)>,
}

impl PyTableScan {
/// Transform the singular Expr instance into its DNF form serialized in a Vec instance. Possibly recursively expanding
/// it as well if needed.
pub fn _expand_dnf_filter(
filter: &Expr,
) -> Result<Vec<(String, String, Vec<String>)>, DaskPlannerError> {
let mut filter_tuple: Vec<(String, String, Vec<String>)> = Vec::new();

match filter {
Expr::InList {
expr,
list,
negated: _,
} => {
let il: Vec<String> = list.iter().map(|f| f.canonical_name()).collect();
filter_tuple.push((expr.canonical_name(), "in".to_string(), il));
Ok(filter_tuple)
}
_ => {
let er = DaskPlannerError::InvalidIOFilter(format!(
"Unable to apply filter: `{}` to IO reader, using in Dask instead",
filter
));
Err::<Vec<(String, String, Vec<String>)>, DaskPlannerError>(er)
}
}
}

/// Consume the `TableScan` filters (Expr(s)) and convert them into a PyArrow understandable
/// DNF format that can be directly passed to PyArrow IO readers for Predicate Pushdown. Expr(s)
/// that cannot be converted to correlating PyArrow IO calls will be returned as is and can be
/// used in the Python logic to form Dask tasks for the graph to do computational filtering.
pub fn _expand_dnf_filters(input: &Arc<LogicalPlan>, filters: &[Expr]) -> PyFilteredResult {
let mut filtered_exprs: Vec<(String, String, Vec<String>)> = Vec::new();
let mut unfiltered_exprs: Vec<PyExpr> = Vec::new();

filters
.iter()
.for_each(|f| match PyTableScan::_expand_dnf_filter(f) {
Ok(mut expanded_dnf_filter) => filtered_exprs.append(&mut expanded_dnf_filter),
Err(_e) => {
unfiltered_exprs.push(PyExpr::from(f.clone(), Some(vec![input.clone()])))
}
});

PyFilteredResult {
io_unfilterable_exprs: unfiltered_exprs,
filtered_exprs,
}
}
}

#[pymethods]
impl PyTableScan {
#[pyo3(name = "getTableScanProjects")]
Expand Down Expand Up @@ -43,6 +111,12 @@ impl PyTableScan {
fn scan_filters(&self) -> PyResult<Vec<PyExpr>> {
py_expr_list(&self.input, &self.table_scan.filters)
}

#[pyo3(name = "getDNFFilters")]
fn dnf_io_filters(&self) -> PyResult<PyFilteredResult> {
let results = PyTableScan::_expand_dnf_filters(&self.input, &self.table_scan.filters);
Ok(results)
}
}

impl TryFrom<LogicalPlan> for PyTableScan {
Expand All @@ -68,3 +142,45 @@ impl TryFrom<LogicalPlan> for PyTableScan {
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::{Result, TableReference};
use datafusion_expr::{col, in_list, lit, logical_plan::table_scan, Expr};

use super::PyTableScan;

#[test]
fn dnf_inlist() -> Result<()> {
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);

// Dummy logical plan
let plan = Arc::new(
table_scan(TableReference::none(), &schema, None)
.unwrap()
.filter(col("id").eq(Expr::Placeholder {
id: "".into(),
data_type: Some(DataType::Int32),
}))
.unwrap()
.build()
.unwrap(),
);

let il = in_list(col("id"), vec![lit(1), lit(2), lit(3)], false);
let results = PyTableScan::_expand_dnf_filters(&plan, &vec![il]);

assert_eq!(results.io_unfilterable_exprs.len(), 0);
assert_eq!(results.filtered_exprs.len(), 1);
assert_eq!(results.filtered_exprs[0].0, "id");
assert_eq!(results.filtered_exprs[0].1, "in");
assert_eq!(results.filtered_exprs[0].2[0], "Int32(1)");
assert_eq!(results.filtered_exprs[0].2[1], "Int32(2)");
assert_eq!(results.filtered_exprs[0].2[2], "Int32(3)");

Ok(())
}
}