Skip to content

Commit 8df5361

Browse files
authored
Merge branch 'main' into fea-values-query
2 parents fe72445 + ab7340b commit 8df5361

15 files changed

+240
-52
lines changed

.github/workflows/test-upstream.yml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ jobs:
112112
use-mamba: true
113113
python-version: "3.8"
114114
channel-priority: strict
115+
- name: Install Protoc
116+
uses: arduino/setup-protoc@v1
117+
with:
118+
version: '3.x'
119+
repo-token: ${{ secrets.GITHUB_TOKEN }}
115120
- name: Optionally update upstream cargo dependencies
116121
if: env.which_upstream == 'DataFusion'
117122
env:
@@ -138,12 +143,11 @@ jobs:
138143
139144
report-failures:
140145
name: Open issue for upstream dev failures
141-
needs: [test-dev, cluster-dev, import-dev]
146+
needs: [test-dev, import-dev]
142147
if: |
143148
always()
144149
&& (
145150
needs.test-dev.result == 'failure'
146-
|| needs.cluster-dev.result == 'failure'
147151
|| needs.import-dev.result == 'failure'
148152
)
149153
&& github.repository == 'dask-contrib/dask-sql'

conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ def pytest_runtest_setup(item):
1515
pytest.skip("need --rungpu option to run")
1616
# FIXME: P2P shuffle isn't fully supported on GPU, so we must explicitly disable it
1717
dask.config.set({"dataframe.shuffle.algorithm": "tasks"})
18+
# manually enable cudf decimal support
19+
dask.config.set({"sql.mappings.decimal_support": "cudf"})
1820
else:
1921
dask.config.set({"dataframe.shuffle.algorithm": None})
2022
if "queries" in item.keywords and not item.config.getoption("--runqueries"):

continuous_integration/environment-3.10-dev.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ dependencies:
1313
- maturin>=0.12.8
1414
- mlflow
1515
- mock
16+
# tpot imports fail with numpy >=1.24.0
17+
# https://github.com/EpistasisLab/tpot/issues/1281
18+
- numpy<1.24.0
1619
- pandas>=1.4.0
1720
- pre-commit
1821
- prompt_toolkit>=3.0.8

continuous_integration/environment-3.8-dev.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ dependencies:
1212
- maturin=0.12.8
1313
- mlflow
1414
- mock
15+
# tpot imports fail with numpy >=1.24.0
16+
# https://github.com/EpistasisLab/tpot/issues/1281
17+
- numpy<1.24.0
1518
- pandas=1.4.0
1619
- pre-commit
1720
- prompt_toolkit=3.0.8

continuous_integration/environment-3.9-dev.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ dependencies:
1313
- maturin>=0.12.8
1414
- mlflow
1515
- mock
16+
# tpot imports fail with numpy >=1.24.0
17+
# https://github.com/EpistasisLab/tpot/issues/1281
18+
- numpy<1.24.0
1619
- pandas>=1.4.0
1720
- pre-commit
1821
- prompt_toolkit>=3.0.8

continuous_integration/gpuci/environment-3.10.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ dependencies:
4141
- cuml=23.06
4242
- dask-cudf=23.06
4343
- dask-cuda=23.06
44-
- numpy>=1.20.1
44+
# tpot imports fail with numpy >=1.24.0
45+
# https://github.com/EpistasisLab/tpot/issues/1281
46+
- numpy>=1.20.1, <1.24.0
4547
- ucx-proc=*=gpu
4648
- ucx-py=0.32
4749
- xgboost=*rapidsai23.06

continuous_integration/gpuci/environment-3.9.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ dependencies:
4141
- cuml=23.06
4242
- dask-cudf=23.06
4343
- dask-cuda=23.06
44-
- numpy>=1.20.1
44+
# tpot imports fail with numpy >=1.24.0
45+
# https://github.com/EpistasisLab/tpot/issues/1281
46+
- numpy>=1.20.1, <1.24.0
4547
- ucx-proc=*=gpu
4648
- ucx-py=0.32
4749
- xgboost=*rapidsai23.06

dask_planner/Cargo.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dask_planner/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub enum DaskPlannerError {
1414
ParserError(ParserError),
1515
TokenizerError(TokenizerError),
1616
Internal(String),
17+
InvalidIOFilter(String),
1718
}
1819

1920
impl Display for DaskPlannerError {
@@ -23,6 +24,7 @@ impl Display for DaskPlannerError {
2324
Self::ParserError(e) => write!(f, "SQL Parser Error: {e}"),
2425
Self::TokenizerError(e) => write!(f, "SQL Tokenizer Error: {e}"),
2526
Self::Internal(e) => write!(f, "Internal Error: {e}"),
27+
Self::InvalidIOFilter(e) => write!(f, "Invalid pyarrow filter: {e} encountered. Defaulting to Dask CPU/GPU bound task operation"),
2628
}
2729
}
2830
}

dask_planner/src/sql/logical/table_scan.rs

Lines changed: 141 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use std::sync::Arc;
22

33
use datafusion_python::{
4-
datafusion_common::DFSchema,
5-
datafusion_expr::{logical_plan::TableScan, LogicalPlan},
4+
datafusion_common::{DFSchema, ScalarValue},
5+
datafusion_expr::{logical_plan::TableScan, Expr, LogicalPlan},
66
};
77
use pyo3::prelude::*;
88

99
use crate::{
10+
error::DaskPlannerError,
1011
expression::{py_expr_list, PyExpr},
1112
sql::exceptions::py_type_err,
1213
};
@@ -18,6 +19,138 @@ pub struct PyTableScan {
1819
input: Arc<LogicalPlan>,
1920
}
2021

22+
#[pyclass(name = "FilteredResult", module = "dask_planner", subclass)]
23+
#[derive(Debug, Clone)]
24+
pub struct PyFilteredResult {
25+
// Certain Expr(s) do not have supporting logic in pyarrow for IO filtering
26+
// at read time. Those Expr(s) cannot be ignored however. This field stores
27+
// those Expr(s) so that they can be used on the Python side to create
28+
// Dask operations that handle that filtering as an extra task in the graph.
29+
#[pyo3(get)]
30+
pub io_unfilterable_exprs: Vec<PyExpr>,
31+
// Expr(s) that can have their filtering logic performed in the pyarrow IO logic
32+
// are stored here in a DNF format that is expected by pyarrow.
33+
#[pyo3(get)]
34+
pub filtered_exprs: Vec<(String, String, Vec<PyObject>)>,
35+
}
36+
37+
impl PyTableScan {
38+
/// Ensures that a valid Expr variant type is present
39+
fn _valid_expr_type(expr: &[Expr]) -> bool {
40+
expr.iter()
41+
.all(|f| matches!(f, Expr::Column(_) | Expr::Literal(_)))
42+
}
43+
44+
/// Transform the singular Expr instance into its DNF form serialized in a Vec instance. Possibly recursively expanding
45+
/// it as well if needed.
46+
pub fn _expand_dnf_filter(
47+
filter: &Expr,
48+
py: Python,
49+
) -> Result<Vec<(String, String, Vec<PyObject>)>, DaskPlannerError> {
50+
let mut filter_tuple: Vec<(String, String, Vec<PyObject>)> = Vec::new();
51+
52+
match filter {
53+
Expr::InList {
54+
expr,
55+
list,
56+
negated,
57+
} => {
58+
// Only handle simple Expr(s) for InList operations for now
59+
if PyTableScan::_valid_expr_type(list) {
60+
// While ANSI SQL would not allow for anything other than a Column or Literal
61+
// value in this "identifying" `expr` we explicitly check that here just to be sure.
62+
// IF it is something else it is returned to Dask to handle
63+
let ident = match *expr.clone() {
64+
Expr::Column(col) => Ok(col.name),
65+
Expr::Alias(_, name) => Ok(name),
66+
Expr::Literal(val) => Ok(format!("{}", val)),
67+
_ => Err(DaskPlannerError::InvalidIOFilter(format!(
68+
"Invalid InList Expr type `{}`. using in Dask instead",
69+
filter
70+
))),
71+
};
72+
73+
let op = if *negated { "not in" } else { "in" };
74+
let il: Result<Vec<PyObject>, DaskPlannerError> = list
75+
.iter()
76+
.map(|f| match f {
77+
Expr::Column(col) => Ok(col.name.clone().into_py(py)),
78+
Expr::Alias(_, name) => Ok(name.clone().into_py(py)),
79+
Expr::Literal(val) => match val {
80+
ScalarValue::Boolean(val) => Ok(val.unwrap().into_py(py)),
81+
ScalarValue::Float32(val) => Ok(val.unwrap().into_py(py)),
82+
ScalarValue::Float64(val) => Ok(val.unwrap().into_py(py)),
83+
ScalarValue::Int8(val) => Ok(val.unwrap().into_py(py)),
84+
ScalarValue::Int16(val) => Ok(val.unwrap().into_py(py)),
85+
ScalarValue::Int32(val) => Ok(val.unwrap().into_py(py)),
86+
ScalarValue::Int64(val) => Ok(val.unwrap().into_py(py)),
87+
ScalarValue::UInt8(val) => Ok(val.unwrap().into_py(py)),
88+
ScalarValue::UInt16(val) => Ok(val.unwrap().into_py(py)),
89+
ScalarValue::UInt32(val) => Ok(val.unwrap().into_py(py)),
90+
ScalarValue::UInt64(val) => Ok(val.unwrap().into_py(py)),
91+
ScalarValue::Utf8(val) => Ok(val.clone().unwrap().into_py(py)),
92+
ScalarValue::LargeUtf8(val) => Ok(val.clone().unwrap().into_py(py)),
93+
_ => Err(DaskPlannerError::InvalidIOFilter(format!(
94+
"Unsupported ScalarValue `{}` encountered. using in Dask instead",
95+
filter
96+
))),
97+
},
98+
_ => Ok(f.canonical_name().into_py(py)),
99+
})
100+
.collect();
101+
102+
filter_tuple.push((
103+
ident.unwrap_or(expr.canonical_name()),
104+
op.to_string(),
105+
il?,
106+
));
107+
Ok(filter_tuple)
108+
} else {
109+
let er = DaskPlannerError::InvalidIOFilter(format!(
110+
"Invalid identifying column Expr instance `{}`. using in Dask instead",
111+
filter
112+
));
113+
Err::<Vec<(String, String, Vec<PyObject>)>, DaskPlannerError>(er)
114+
}
115+
}
116+
_ => {
117+
let er = DaskPlannerError::InvalidIOFilter(format!(
118+
"Unable to apply filter: `{}` to IO reader, using in Dask instead",
119+
filter
120+
));
121+
Err::<Vec<(String, String, Vec<PyObject>)>, DaskPlannerError>(er)
122+
}
123+
}
124+
}
125+
126+
/// Consume the `TableScan` filters (Expr(s)) and convert them into a PyArrow understandable
127+
/// DNF format that can be directly passed to PyArrow IO readers for Predicate Pushdown. Expr(s)
128+
/// that cannot be converted to correlating PyArrow IO calls will be returned as is and can be
129+
/// used in the Python logic to form Dask tasks for the graph to do computational filtering.
130+
pub fn _expand_dnf_filters(
131+
input: &Arc<LogicalPlan>,
132+
filters: &[Expr],
133+
py: Python,
134+
) -> PyFilteredResult {
135+
let mut filtered_exprs: Vec<(String, String, Vec<PyObject>)> = Vec::new();
136+
let mut unfiltered_exprs: Vec<PyExpr> = Vec::new();
137+
138+
filters
139+
.iter()
140+
.for_each(|f| match PyTableScan::_expand_dnf_filter(f, py) {
141+
Ok(mut expanded_dnf_filter) => filtered_exprs.append(&mut expanded_dnf_filter),
142+
Err(_e) => {
143+
unfiltered_exprs.push(PyExpr::from(f.clone(), Some(vec![input.clone()])))
144+
}
145+
});
146+
147+
PyFilteredResult {
148+
io_unfilterable_exprs: unfiltered_exprs,
149+
filtered_exprs,
150+
}
151+
}
152+
}
153+
21154
#[pymethods]
22155
impl PyTableScan {
23156
#[pyo3(name = "getTableScanProjects")]
@@ -45,6 +178,12 @@ impl PyTableScan {
45178
fn scan_filters(&self) -> PyResult<Vec<PyExpr>> {
46179
py_expr_list(&self.input, &self.table_scan.filters)
47180
}
181+
182+
#[pyo3(name = "getDNFFilters")]
183+
fn dnf_io_filters(&self, py: Python) -> PyResult<PyFilteredResult> {
184+
let results = PyTableScan::_expand_dnf_filters(&self.input, &self.table_scan.filters, py);
185+
Ok(results)
186+
}
48187
}
49188

50189
impl TryFrom<LogicalPlan> for PyTableScan {

dask_sql/mappings.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11
import logging
2-
from decimal import Decimal
32
from typing import Any
43

54
import dask.array as da
5+
import dask.config as dask_config
66
import dask.dataframe as dd
77
import numpy as np
88
import pandas as pd
99

1010
from dask_planner.rust import DaskTypeMap, SqlTypeName
1111

12-
try:
13-
import cudf
14-
except ImportError:
15-
cudf = None
16-
1712
logger = logging.getLogger(__name__)
1813

1914

@@ -54,7 +49,7 @@
5449
_SQL_TO_PYTHON_SCALARS = {
5550
"SqlTypeName.DOUBLE": np.float64,
5651
"SqlTypeName.FLOAT": np.float32,
57-
"SqlTypeName.DECIMAL": Decimal,
52+
"SqlTypeName.DECIMAL": np.float32,
5853
"SqlTypeName.BIGINT": np.int64,
5954
"SqlTypeName.INTEGER": np.int32,
6055
"SqlTypeName.SMALLINT": np.int16,
@@ -71,8 +66,7 @@
7166
_SQL_TO_PYTHON_FRAMES = {
7267
"SqlTypeName.DOUBLE": np.float64,
7368
"SqlTypeName.FLOAT": np.float32,
74-
# a column of Decimals in pandas is `object`, but cuDF has a dedicated dtype
75-
"SqlTypeName.DECIMAL": object if not cudf else cudf.Decimal128Dtype(38, 10),
69+
"SqlTypeName.DECIMAL": np.float64, # We use np.float64 always, even though we might be able to use a smaller type
7670
"SqlTypeName.BIGINT": pd.Int64Dtype(),
7771
"SqlTypeName.INTEGER": pd.Int32Dtype(),
7872
"SqlTypeName.SMALLINT": pd.Int16Dtype(),
@@ -151,6 +145,14 @@ def sql_to_python_value(sql_type: "SqlTypeName", literal_value: Any) -> Any:
151145

152146
return literal_value
153147

148+
elif (
149+
sql_type == SqlTypeName.DECIMAL
150+
and dask_config.get("sql.mappings.decimal_support") == "cudf"
151+
):
152+
from decimal import Decimal
153+
154+
python_type = Decimal
155+
154156
elif sql_type == SqlTypeName.INTERVAL_DAY:
155157
return np.timedelta64(literal_value[0], "D") + np.timedelta64(
156158
literal_value[1], "ms"
@@ -219,7 +221,16 @@ def sql_to_python_value(sql_type: "SqlTypeName", literal_value: Any) -> Any:
219221
def sql_to_python_type(sql_type: "SqlTypeName", *args) -> type:
220222
"""Turn an SQL type into a dataframe dtype"""
221223
try:
222-
if str(sql_type) == "SqlTypeName.DECIMAL":
224+
if (
225+
sql_type == SqlTypeName.DECIMAL
226+
and dask_config.get("sql.mappings.decimal_support") == "cudf"
227+
):
228+
try:
229+
import cudf
230+
except ImportError:
231+
raise ModuleNotFoundError(
232+
"Setting `sql.mappings.decimal_support=cudf` requires cudf"
233+
)
223234
return cudf.Decimal128Dtype(*args)
224235
return _SQL_TO_PYTHON_FRAMES[str(sql_type)]
225236
except KeyError: # pragma: no cover

dask_sql/sql-schema.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,12 @@ properties:
7575
optimization (when possible). ``nelem`` is defined as the limit or ``k`` value times the
7676
number of columns. Default is 1000000, corresponding to a LIMIT clause of 1 million in a
7777
1 column table.
78+
79+
mappings:
80+
type: object
81+
properties:
82+
83+
decimal_support:
84+
type: string
85+
description:
86+
Decides how to handle decimal scalars/columns. ``"pandas"`` handling will treat decimals scalars and columns as floats and float64 columns, respectively, while ``"cudf"`` handling treats decimal scalars as ``decimal.Decimal`` objects and decimal columns as ``cudf.Decimal128Dtype`` columns, handling precision/scale accordingly. Default is ``"pandas"``, but ``"cudf"`` should be used if attempting to work with decimal columns on GPU.

dask_sql/sql.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,6 @@ sql:
1818

1919
sort:
2020
topk-nelem-limit: 1000000
21+
22+
mappings:
23+
decimal_support: "pandas"

0 commit comments

Comments
 (0)