Skip to content

ESQL: Add optimization to purge join on null merge key #127583

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions docs/changelog/127583.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 127583
summary: Add optimization to purge join on null merge key
area: ES|QL
type: enhancement
issues:
- 125577
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public String toString() {

@Override
public String nodeString() {
return child.nodeString() + " AS " + name();
return child.nodeString() + " AS " + name() + "#" + id();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly related, but not sure why we wouldn't include the id, it's easier to track which exactly reference points to an alias.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1742,3 +1742,22 @@ max:long
3450233
8268153
;

nullifiedJoinKeyToPurgeTheJoin
required_capability: join_lookup_v12

FROM employees
| RENAME languages AS language_code
| SORT emp_no, language_code
| LIMIT 4
| EVAL language_code = TO_INTEGER(NULL)
| LOOKUP JOIN languages_lookup ON language_code
| KEEP emp_no, language_code, language_name
;

emp_no:integer | language_code:integer | language_name:keyword
10001 |null |null
10002 |null |null
10003 |null |null
10004 |null |null
;
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogateExpressions;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogatePlans;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.TranslateTimeSeriesAggregate;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.PruneLeftJoinOnNullMatchingField;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
import org.elasticsearch.xpack.esql.rule.RuleExecutor;
Expand Down Expand Up @@ -201,7 +202,8 @@ protected static Batch<LogicalPlan> operators() {
new PushDownEnrich(),
new PushDownAndCombineOrderBy(),
new PruneRedundantOrderBy(),
new PruneRedundantSortClauses()
new PruneRedundantSortClauses(),
new PruneLeftJoinOnNullMatchingField()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,37 +26,41 @@ public final class PropagateEvalFoldables extends ParameterizedRule<LogicalPlan,

@Override
public LogicalPlan apply(LogicalPlan plan, LogicalOptimizerContext ctx) {
AttributeMap.Builder<Expression> collectRefsBuilder = AttributeMap.builder();
AttributeMap<Expression> collectRefs = foldableReferences(plan, ctx);
if (collectRefs.isEmpty()) {
return plan;
}

plan = plan.transformUp(p -> {
// Apply the replacement inside Filter and Eval (which shouldn't make a difference)
// TODO: also allow aggregates once aggs on constants are supported.
// C.f. https://github.com/elastic/elasticsearch/issues/100634
if (p instanceof Filter || p instanceof Eval) {
p = p.transformExpressionsOnly(ReferenceAttribute.class, r -> collectRefs.resolve(r, r));
}
return p;
});

return plan;
}

java.util.function.Function<ReferenceAttribute, Expression> replaceReference = r -> collectRefsBuilder.build().resolve(r, r);
public static AttributeMap<Expression> foldableReferences(LogicalPlan plan, LogicalOptimizerContext ctx) {
AttributeMap.Builder<Expression> collectRefsBuilder = AttributeMap.builder();

// collect aliases bottom-up
plan.forEachExpressionUp(Alias.class, a -> {
var c = a.child();
boolean shouldCollect = c.foldable();
// try to resolve the expression based on an existing foldables
if (shouldCollect == false) {
c = c.transformUp(ReferenceAttribute.class, replaceReference);
c = c.transformUp(ReferenceAttribute.class, r -> collectRefsBuilder.build().resolve(r, r));
shouldCollect = c.foldable();
}
if (shouldCollect) {
collectRefsBuilder.put(a.toAttribute(), Literal.of(ctx.foldCtx(), c));
}
});
if (collectRefsBuilder.isEmpty()) {
return plan;
}

plan = plan.transformUp(p -> {
// Apply the replacement inside Filter and Eval (which shouldn't make a difference)
// TODO: also allow aggregates once aggs on constants are supported.
// C.f. https://github.com/elastic/elasticsearch/issues/100634
if (p instanceof Filter || p instanceof Eval) {
p = p.transformExpressionsOnly(ReferenceAttribute.class, replaceReference);
}
return p;
});

return plan;
return collectRefsBuilder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer.rules.logical.local;

import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateEvalFoldables;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;

import static org.elasticsearch.xpack.esql.core.expression.Expressions.isGuaranteedNull;
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;

/**
* The rule checks if the join's performed on a field which is aliased to null (in type or value); if that's the case, it prunes the join,
* replacing it with an Eval - returning aliases to null for all the fields added in by the right side of the Join - plus a Project on top
* of it. The rule can apply on the coordinator already, but it's more likely to be effective on the data nodes, where null aliasing is
* inserted due to locally missing fields. This rule relies on that behavior -- see {@link ReplaceMissingFieldWithNull}.
*/
public class PruneLeftJoinOnNullMatchingField extends OptimizerRules.ParameterizedOptimizerRule<Join, LogicalOptimizerContext> {

public PruneLeftJoinOnNullMatchingField() {
super(OptimizerRules.TransformDirection.DOWN);
}

@Override
protected LogicalPlan rule(Join join, LogicalOptimizerContext ctx) {
LogicalPlan plan = join;
if (join.config().type() == LEFT) { // other types will have different replacement logic
AttributeMap<Expression> attributeMap = PropagateEvalFoldables.foldableReferences(join, ctx);

for (var attr : AttributeSet.of(join.config().matchFields())) {
var resolved = attributeMap.resolve(attr);
if (resolved != null && isGuaranteedNull(resolved)) {
plan = replaceJoin(join);
break;
}
}
}
return plan;
}

private static LogicalPlan replaceJoin(Join join) {
var joinRightOutput = join.rightOutputFields();
// can be empty when the join key is null and the rest of the right side entries pruned (such as by an agg)
if (joinRightOutput.isEmpty()) {
return join.left();
}
var aliasedNulls = ReplaceMissingFieldWithNull.aliasedNulls(joinRightOutput, a -> true);
var eval = new Eval(join.source(), join.left(), aliasedNulls.v1());
return new Project(join.source(), eval, join.computeOutput(join.left().output(), Expressions.asAttributes(aliasedNulls.v2())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.optimizer.rules.logical.local;

import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
Expand Down Expand Up @@ -73,42 +74,15 @@ private LogicalPlan missingToNull(LogicalPlan plan, Predicate<FieldAttribute> sh
// \_Eval[field1 = null, field3 = null]
// \_EsRelation[field1, field2, field3]
List<Attribute> relationOutput = relation.output();
Map<DataType, Alias> nullLiterals = Maps.newLinkedHashMapWithExpectedSize(DataType.types().size());
List<NamedExpression> newProjections = new ArrayList<>(relationOutput.size());
for (int i = 0, size = relationOutput.size(); i < size; i++) {
Attribute attr = relationOutput.get(i);
NamedExpression projection;
if (attr instanceof FieldAttribute f && shouldBeRetained.test(f) == false) {
DataType dt = f.dataType();
Alias nullAlias = nullLiterals.get(dt);
// save the first field as null (per datatype)
if (nullAlias == null) {
// Keep the same id so downstream query plans don't need updating
// NOTE: THIS IS BRITTLE AND CAN LEAD TO BUGS.
// In case some optimizer rule or so inserts a plan node that requires the field BEFORE the Eval that we're adding
// on top of the EsRelation, this can trigger a field extraction in the physical optimizer phase, causing wrong
// layouts due to a duplicate name id.
// If someone reaches here AGAIN when debugging e.g. ClassCastExceptions NPEs from wrong layouts, we should probably
// give up on this approach and instead insert EvalExecs in InsertFieldExtraction.
Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null), f.id());
nullLiterals.put(dt, alias);
projection = alias.toAttribute();
}
// otherwise point to it since this avoids creating field copies
else {
projection = new Alias(f.source(), f.name(), nullAlias.toAttribute(), f.id());
}
} else {
projection = attr;
}
newProjections.add(projection);
}
var aliasedNulls = aliasedNulls(relationOutput, attr -> attr instanceof FieldAttribute f && shouldBeRetained.test(f) == false);
var nullLiterals = aliasedNulls.v1();
var newProjections = aliasedNulls.v2();

if (nullLiterals.size() == 0) {
return plan;
}

Eval eval = new Eval(plan.source(), relation, new ArrayList<>(nullLiterals.values()));
Eval eval = new Eval(plan.source(), relation, nullLiterals);
// This projection is redundant if there's another projection downstream (and no commands depend on the order until we hit it).
return new Project(plan.source(), eval, newProjections);
}
Expand All @@ -123,4 +97,41 @@ private LogicalPlan missingToNull(LogicalPlan plan, Predicate<FieldAttribute> sh

return plan;
}

public static Tuple<List<Alias>, List<NamedExpression>> aliasedNulls(
List<Attribute> outputAttributes,
Predicate<Attribute> shouldBeReplaced
) {
Map<DataType, Alias> nullLiterals = Maps.newLinkedHashMapWithExpectedSize(DataType.types().size());
List<NamedExpression> newProjections = new ArrayList<>(outputAttributes.size());
for (Attribute attr : outputAttributes) {
NamedExpression projection;
if (shouldBeReplaced.test(attr)) {
DataType dt = attr.dataType();
Alias nullAlias = nullLiterals.get(dt);
// save the first field as null (per datatype)
if (nullAlias == null) {
// Keep the same id so downstream query plans don't need updating
// NOTE: THIS IS BRITTLE AND CAN LEAD TO BUGS.
// In case some optimizer rule or so inserts a plan node that requires the field BEFORE the Eval that we're adding
// on top of the EsRelation, this can trigger a field extraction in the physical optimizer phase, causing wrong
// layouts due to a duplicate name id.
// If someone reaches here AGAIN when debugging e.g. ClassCastExceptions NPEs from wrong layouts, we should probably
// give up on this approach and instead insert EvalExecs in InsertFieldExtraction.
Alias alias = new Alias(attr.source(), attr.name(), Literal.of(attr, null), attr.id());
nullLiterals.put(dt, alias);
projection = alias.toAttribute();
}
// otherwise point to it since this avoids creating field copies
else {
projection = new Alias(attr.source(), attr.name(), nullAlias.toAttribute(), attr.id());
}
} else {
projection = attr;
}
newProjections.add(projection);
}

return new Tuple<>(new ArrayList<>(nullLiterals.values()), newProjections);
}
}
Loading
Loading