Skip to content

[ES|QL] Assign new id to alias created by ReplaceMissingFieldWithNull when there is lookup join #125462

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
6 changes: 6 additions & 0 deletions docs/changelog/125462.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 125462
summary: Assign new id to alias created by `ReplaceMissingFieldWithNull` when there
is lookup join
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,32 @@ public void testDoubleParamsWithLookupJoin() throws IOException {
);
}

public void testMultipleBatchesWithLookupJoin() throws IOException {
assumeTrue(
"Requires new null alias ids for join with multiple batches",
EsqlCapabilities.Cap.REPLACE_MISSING_FIELD_WITH_NULL_NEW_ALIAS_ID_FOR_JOIN_AND_MULTIPLE_BATCHES.isEnabled()
);
// Create more than 10 indices to trigger multiple batches of data node execution.
// The sort field should be missing on some indices to reproduce NullPointerException caused by duplicated items in layout
for (int i = 1; i <= 20; i++) {
createIndex("idx" + i, randomBoolean(), "\"mappings\": {\"properties\" : {\"a\" : {\"type\" : \"keyword\"}}}");
}
bulkLoadTestDataLookupMode(10);
// lookup join with and without sort
for (String sort : List.of("", "| sort integer")) {
var query = requestObjectBuilder().query(format(null, "from * | lookup join {} on integer {}", testIndexName(), sort));
Map<String, Object> result = runEsql(query);
var columns = as(result.get("columns"), List.class);
assertEquals(21, columns.size());
var values = as(result.get("values"), List.class);
assertEquals(10, values.size());
}
// clean up
for (int i = 1; i <= 20; i++) {
assertThat(deleteIndex("idx" + i).isAcknowledged(), is(true));
}
}

private void validateResultsOfDoubleParametersForIdentifiers(RequestObjectBuilder query) throws IOException {
Map<String, Object> result = runEsql(query);
Map<String, String> colA = Map.of("name", "boolean", "type", "boolean");
Expand Down Expand Up @@ -1668,6 +1694,13 @@ private static String repeatValueAsMV(Object value) {
return "[" + value + ", " + value + "]";
}

private static void createIndex(String indexName, boolean lookupMode, String mapping) throws IOException {
Request request = new Request("PUT", "/" + indexName);
String settings = "\"settings\" : {\"mode\" : \"lookup\"}, ";
request.setJsonEntity("{" + (lookupMode ? settings : "") + mapping + "}");
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
}

public static RequestObjectBuilder requestObjectBuilder() throws IOException {
return new RequestObjectBuilder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1452,3 +1452,90 @@ emp_no:integer | language_code:integer | language_name:keyword
10092 | 1 | English
10093 | 3 | Spanish
;

multipleBatchesWithSort
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| sort language_code, birth_date
| keep language_code
| limit 1
;

language_code:integer
1
;

multipleBatchesWithMvExpand
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| mv_expand birth_date
| sort birth_date, language_code
| limit 1
;

birth_date:datetime |language_code:integer
1952-02-27T00:00:00.000Z |null
;

multipleBatchesWithAggregate1
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| stats x=max(birth_date), y=min(language_code)
;

x:datetime |y:integer
1965-01-03T00:00:00.000Z |1
;

multipleBatchesWithAggregate2
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| stats m=min(birth_date) by language_code
| sort language_code
| limit 1
;

m:datetime |language_code:integer
null |1
;

multipleBatchesWithAggregate3
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| stats m=min(language_code) by birth_date
| sort birth_date
| limit 1
;

m:integer |birth_date:datetime
null |1952-02-27T00:00:00.000Z
;
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,12 @@ public enum Cap {
/**
* Index component selector syntax (my-data-stream-name::failures)
*/
INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled());
INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled()),

/**
* Create null alias with new id in ReplaceMissingFieldWithNull when there is lookup join with multiple batches.
*/
REPLACE_MISSING_FIELD_WITH_NULL_NEW_ALIAS_ID_FOR_JOIN_AND_MULTIPLE_BATCHES;

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.PotentiallyUnmappedKeywordEsField;
import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext;
Expand All @@ -22,6 +24,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
Expand Down Expand Up @@ -81,7 +84,11 @@ else if (plan instanceof Project project) {
Alias nullAlias = nullLiteral.get(f.dataType());
// save the first field as null (per datatype)
if (nullAlias == null) {
Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null), f.id());
// In case of batch executions on data nodes and join exists, SearchStats may not always be available for all
// fields, creating a new alias for null with the same id as the field id can potentially cause planEval to add a
// duplicated ChannelSet to a layout, and Layout.builder().build() could throw a NullPointerException.
// As a workaround, assign a new alias id to the null alias when join exists and SearchStats is not available.
Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null), joinAttributes.isEmpty() ? f.id() : null);
Copy link
Contributor

@alex-spies alex-spies Mar 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot @fang-xing-esql for getting to the bottom of this!

I think this approach unfortunately still has problems. There could be downstream commands that still reference the old name id, but now the attribute with the old id is gone for good (projected away). For instance, there might be another LOOKUP JOIN command after the projection, or an MV_EXPAND, and I think this one will just break due to the name id being completely absent from the previous commands' layouts. The only subsequent commands that are OK to use downstream are those taken care of in the next else if branch below, namely Eval, Filter, OrderBy, RegexExtract and TopN.

I guess there are 2 approaches of eliminating missing fields: either, define a new attribute pointing to a null literal with an own id - and update all downstream references to point to the new attribute; OR update the attribute in place by defining a null literal with the same name id + make sure we never extract the original attribute in the first place; the latter is sketchy because, as we can see here, it can lead to re-defining the same name id multiple times.

Now, I think a part of the correct long-term solution is that we avoid the lookup join when the left hand side join key is missing. But that's not sufficient: I can see that we're introducing multiple Projects for the same missing attribute even if it is not a join key, like so (taken from your csv test):

Project[[birth_date{f}#15, language_code{f}#9]]
\_TopN[[Order[language_code{f}#9,ASC,LAST], Order[birth_date{f}#15,ASC,LAST]],1[INTEGER]]                      
  \_Join[LEFT,[message{r}#3],[message{r}#3],[message{f}#17]]                                                   
    |_EsqlProject[[birth_date{f}#15, city.country.continent.planet.name{f}#14 AS message, language_code{f}#9]]
    | \_EsRelation[*][birth_date{f}#15, city.country.continent.planet.nam..]                                   
    \_EsRelation[message_types_lookup][LOOKUP][message{f}#17]                                                  

The problem is that there are 2 Projects for birth_date, which is locally missing, and ReplaceMissingFieldWithNull will lead to the addition of the null attribute birth_date with the id 15 twice. Your fix gives that a new id each time, but if birth_date{f}#15 is used anywhere after the second Project (in a more complex query), it will be missing.

Therefore, I think the second, more crucial, part of a correct solution is that we avoid replacing attributes by other attributes with the same name id, at all. Instead, I think we should update ReplaceMissingFieldWithNull so that it always assigns new ids - and updates any downstream references to the missing field attribute with a reference attribute to the newly defined null attribute.

nullLiteral.put(dt, alias);
projection = alias.toAttribute();
}
Expand Down Expand Up @@ -113,14 +120,40 @@ else if (plan instanceof Project project) {
? f
: Literal.of(f, null)
);
} else if (plan instanceof MvExpand m) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can also be other downstream commands that will have the same problem, though, like CHANGE_POINT.

NamedExpression target = m.target();
AttributeSet joinAttributes = joinAttributes(m);
if (joinAttributes.isEmpty() == false // rewrite only when there is join, TODO do we want to rewrite when there is no join?
&& target instanceof FieldAttribute f
&& stats.exists(f.fieldName()) == false
&& joinAttributes.contains(f) == false
&& f.field() instanceof PotentiallyUnmappedKeywordEsField == false) {
// Replace missing target field with null.
Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null));
NamedExpression nullTarget = alias.toAttribute();
plan = new Eval(m.source(), m.child(), List.of(alias));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will change the position of the expanded field in the output because EVAL always places new attributes on the right. This may be fine when there's a projection some time later, but it may also create another bug.

// The expanded reference is built on top of target field with the same name, and the parent plans all reference to the
// expanded reference other than the target field, keep expanded's id unchanged, otherwise the parent plans cannot find
// it.
Attribute nullExpanded = new ReferenceAttribute(
nullTarget.source(),
nullTarget.name(),
nullTarget.dataType(),
nullTarget.nullable(),
m.expanded().id(),
false
);
plan = new MvExpand(m.source(), plan, nullTarget, nullExpanded);
}
}

return plan;
}

private AttributeSet joinAttributes(Project project) {
private AttributeSet joinAttributes(LogicalPlan plan) {
var attributes = new AttributeSet();
project.forEachDown(Join.class, j -> j.right().forEachDown(EsRelation.class, p -> attributes.addAll(p.output())));
if (plan instanceof Project || plan instanceof MvExpand) {
plan.forEachDown(Join.class, j -> j.right().forEachDown(EsRelation.class, p -> attributes.addAll(p.output())));
}
return attributes;
}
}
Loading