Skip to content

Commit 9377349

Browse files
committed
Add parquet layer to attempt predicate pushdown, update table_scan logic
1 parent 7ab6ee3 commit 9377349

File tree

2 files changed

+51
-19
lines changed

2 files changed

+51
-19
lines changed

dask_sql/physical/rel/logical/table_scan.py

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
from functools import reduce
44
from typing import TYPE_CHECKING
55

6+
from dask.utils_test import hlg_layer
7+
68
from dask_sql.datacontainer import DataContainer
79
from dask_sql.physical.rel.base import BaseRelPlugin
810
from dask_sql.physical.rel.logical.filter import filter_or_scalar
911
from dask_sql.physical.rex import RexConverter
12+
from dask_sql.physical.utils.filter import attempt_predicate_pushdown
1013

1114
if TYPE_CHECKING:
1215
import dask_sql
@@ -81,25 +84,45 @@ def _apply_filters(self, table_scan, rel, dc, context):
8184
conjunctive_dnf_filters = table_scan.getDNFFilters().filtered_exprs
8285
non_dnf_filters = table_scan.getDNFFilters().io_unfilterable_exprs
8386
# All filters here are applied in conjunction (&)
84-
if non_dnf_filters or conjunctive_dnf_filters:
85-
df_condition = reduce(
86-
operator.and_,
87-
[
88-
RexConverter.convert(rel, rex, dc, context=context)
89-
for rex in non_dnf_filters
90-
],
91-
)
92-
df = filter_or_scalar(
93-
df, df_condition, conjunctive_filters=conjunctive_dnf_filters
94-
)
95-
if conjunctive_dnf_filters:
87+
if conjunctive_dnf_filters:
88+
if non_dnf_filters:
9689
df_condition = reduce(
9790
operator.and_,
9891
[
9992
RexConverter.convert(rel, rex, dc, context=context)
100-
for rex in all_filters
93+
for rex in non_dnf_filters
10194
],
10295
)
103-
df = filter_or_scalar(df, df_condition)
96+
df = filter_or_scalar(
97+
df, df_condition, conjunctive_filters=conjunctive_dnf_filters
98+
)
99+
else:
100+
df = attempt_predicate_pushdown(
101+
df, conjunctive_filters=conjunctive_dnf_filters
102+
)
103+
104+
df_condition = reduce(
105+
operator.and_,
106+
[
107+
RexConverter.convert(
108+
rel, rex, DataContainer(df, cc), context=context
109+
)
110+
for rex in all_filters
111+
],
112+
)
113+
df = filter_or_scalar(df, df_condition)
114+
elif all_filters:
115+
df_condition = reduce(
116+
operator.and_,
117+
[
118+
RexConverter.convert(rel, rex, dc, context=context)
119+
for rex in all_filters
120+
],
121+
)
122+
df = filter_or_scalar(df, df_condition)
123+
try:
124+
logger.debug(hlg_layer(df.dask, "read-parquet").creation_info)
125+
except KeyError:
126+
pass
104127

105128
return DataContainer(df, cc)

dask_sql/physical/utils/filter.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,13 @@ def attempt_predicate_pushdown(
8383
try:
8484
filters = dsk.layers[name]._dnf_filter_expression(dsk)
8585
if not isinstance(filters, frozenset):
86-
# No filters encountered
87-
return ddf
88-
filters = filters.to_list_tuple()
86+
if conjunctive_filters or disjunctive_filters:
87+
filters = []
88+
else:
89+
# No filters encountered
90+
return ddf
91+
else:
92+
filters = filters.to_list_tuple()
8993
except ValueError:
9094
# DNF dispatching failed for 1+ layers
9195
logger.warning(
@@ -97,8 +101,11 @@ def attempt_predicate_pushdown(
97101
# Expand the filter set with provided conjunctive and disjunctive filters
98102
filters.extend([f] for f in disjunctive_filters or [])
99103
# Add conjunctive filters to each disjunctive filter
100-
for f in filters:
101-
f.extend(conjunctive_filters or [])
104+
if filters:
105+
for f in filters:
106+
f.extend(conjunctive_filters or [])
107+
else:
108+
filters = [conjunctive_filters or []]
102109
# Regenerate collection with filtered IO layer
103110
try:
104111
return dsk.layers[name]._regenerate_collection(
@@ -275,6 +282,8 @@ def _dnf_filter_expression(self, dsk):
275282
func = _blockwise_getitem_dnf
276283
elif op == dd._Frame.fillna:
277284
func = _blockwise_fillna_dnf
285+
elif op == dd.io.read_parquet:
286+
return []
278287
else:
279288
raise ValueError(f"No DNF expression for {op}")
280289

0 commit comments

Comments
 (0)