|
8 | 8 | package org.elasticsearch.xpack.esql.optimizer;
|
9 | 9 |
|
10 | 10 | import org.elasticsearch.action.ActionListener;
|
| 11 | +import org.elasticsearch.xpack.esql.plan.logical.Eval; |
11 | 12 | import org.elasticsearch.xpack.esql.plan.logical.LocalRelation;
|
12 | 13 | import org.elasticsearch.xpack.esql.session.EsqlSession;
|
13 | 14 | import org.elasticsearch.xpack.esql.session.LocalExecutable;
|
|
20 | 21 | import org.elasticsearch.xpack.ql.expression.Literal;
|
21 | 22 | import org.elasticsearch.xpack.ql.expression.NamedExpression;
|
22 | 23 | import org.elasticsearch.xpack.ql.expression.Nullability;
|
| 24 | +import org.elasticsearch.xpack.ql.expression.function.aggregate.AggregateFunction; |
| 25 | +import org.elasticsearch.xpack.ql.expression.predicate.Predicates; |
23 | 26 | import org.elasticsearch.xpack.ql.optimizer.OptimizerRules;
|
24 | 27 | import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.BinaryComparisonSimplification;
|
25 | 28 | import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.BooleanFunctionEqualsElimination;
|
26 | 29 | import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.CombineDisjunctionsToIn;
|
27 | 30 | import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.ConstantFolding;
|
28 | 31 | import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.LiteralsOnTheRight;
|
29 | 32 | import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.PruneLiteralsInOrderBy;
|
30 |
| -import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.PushDownAndCombineFilters; |
31 | 33 | import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.SetAsOptimized;
|
32 | 34 | import org.elasticsearch.xpack.ql.optimizer.OptimizerRules.SimplifyComparisonsArithmetics;
|
33 | 35 | import org.elasticsearch.xpack.ql.plan.logical.Aggregate;
|
34 | 36 | import org.elasticsearch.xpack.ql.plan.logical.Filter;
|
35 | 37 | import org.elasticsearch.xpack.ql.plan.logical.Limit;
|
36 | 38 | import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
|
| 39 | +import org.elasticsearch.xpack.ql.plan.logical.OrderBy; |
37 | 40 | import org.elasticsearch.xpack.ql.plan.logical.Project;
|
38 | 41 | import org.elasticsearch.xpack.ql.plan.logical.UnaryPlan;
|
39 | 42 | import org.elasticsearch.xpack.ql.rule.RuleExecutor;
|
40 | 43 | import org.elasticsearch.xpack.ql.type.DataTypes;
|
41 | 44 |
|
42 | 45 | import java.util.ArrayList;
|
43 | 46 | import java.util.List;
|
| 47 | +import java.util.function.Predicate; |
44 | 48 |
|
45 | 49 | import static java.util.Arrays.asList;
|
46 | 50 |
|
@@ -217,4 +221,64 @@ public void execute(EsqlSession session, ActionListener<Result> listener) {
|
217 | 221 | }
|
218 | 222 | });
|
219 | 223 | }
|
| 224 | + |
| 225 | + protected static class PushDownAndCombineFilters extends OptimizerRules.OptimizerRule<Filter> { |
| 226 | + @Override |
| 227 | + protected LogicalPlan rule(Filter filter) { |
| 228 | + LogicalPlan plan = filter; |
| 229 | + LogicalPlan child = filter.child(); |
| 230 | + Expression condition = filter.condition(); |
| 231 | + |
| 232 | + if (child instanceof Filter f) { |
| 233 | + // combine nodes into a single Filter with updated ANDed condition |
| 234 | + plan = f.with(Predicates.combineAnd(List.of(f.condition(), condition))); |
| 235 | + } else if (child instanceof UnaryPlan unary) { |
| 236 | + if (unary instanceof Aggregate agg) { // TODO: re-evaluate along with multi-value support |
| 237 | + // Only push [parts of] a filter past an agg if these/it operates on agg's grouping[s], not output. |
| 238 | + plan = maybePushDownPastUnary( |
| 239 | + filter, |
| 240 | + agg, |
| 241 | + e -> e instanceof Attribute && agg.output().contains(e) && agg.groupings().contains(e) == false |
| 242 | + || e instanceof AggregateFunction |
| 243 | + ); |
| 244 | + } else if (unary instanceof Eval eval) { |
| 245 | + // Don't push if Filter (still) contains references of Eval's fields. |
| 246 | + List<Attribute> attributes = new ArrayList<>(eval.fields().size()); |
| 247 | + for (NamedExpression ne : eval.fields()) { |
| 248 | + attributes.add(ne.toAttribute()); |
| 249 | + } |
| 250 | + plan = maybePushDownPastUnary(filter, eval, e -> e instanceof Attribute && attributes.contains(e)); |
| 251 | + } else { // Project, OrderBy, Limit |
| 252 | + if (unary instanceof Project || unary instanceof OrderBy) { |
| 253 | + // swap the filter with its child |
| 254 | + plan = unary.replaceChild(filter.with(unary.child(), condition)); |
| 255 | + } |
| 256 | + // cannot push past a Limit, this could change the tailing result set returned |
| 257 | + } |
| 258 | + } |
| 259 | + return plan; |
| 260 | + } |
| 261 | + |
| 262 | + private static LogicalPlan maybePushDownPastUnary(Filter filter, UnaryPlan unary, Predicate<Expression> cannotPush) { |
| 263 | + LogicalPlan plan; |
| 264 | + List<Expression> pushable = new ArrayList<>(); |
| 265 | + List<Expression> nonPushable = new ArrayList<>(); |
| 266 | + for (Expression exp : Predicates.splitAnd(filter.condition())) { |
| 267 | + (exp.anyMatch(cannotPush) ? nonPushable : pushable).add(exp); |
| 268 | + } |
| 269 | + // Push the filter down even if it might not be pushable all the way to ES eventually: eval'ing it closer to the source, |
| 270 | + // potentially still in the Exec Engine, distributes the computation. |
| 271 | + if (pushable.size() > 0) { |
| 272 | + if (nonPushable.size() > 0) { |
| 273 | + Filter pushed = new Filter(filter.source(), unary.child(), Predicates.combineAnd(pushable)); |
| 274 | + plan = filter.with(unary.replaceChild(pushed), Predicates.combineAnd(nonPushable)); |
| 275 | + } else { |
| 276 | + plan = unary.replaceChild(filter.with(unary.child(), filter.condition())); |
| 277 | + } |
| 278 | + } else { |
| 279 | + plan = filter; |
| 280 | + } |
| 281 | + return plan; |
| 282 | + } |
| 283 | + } |
220 | 284 | }
|
0 commit comments