diff --git a/docs/changelog/126770.yaml b/docs/changelog/126770.yaml new file mode 100644 index 0000000000000..cc4bc2d1d842f --- /dev/null +++ b/docs/changelog/126770.yaml @@ -0,0 +1,6 @@ +pr: 126770 +summary: Remove empty results before merging +area: Search +type: bug +issues: + - 126742 diff --git a/muted-tests.yml b/muted-tests.yml index df121ec649579..fb639fe4af334 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -315,9 +315,6 @@ tests: - class: org.elasticsearch.search.CCSDuelIT method: testTerminateAfter issue: https://github.com/elastic/elasticsearch/issues/126085 -- class: org.elasticsearch.search.sort.GeoDistanceIT - method: testDistanceSortingWithUnmappedField - issue: https://github.com/elastic/elasticsearch/issues/126118 - class: org.elasticsearch.search.basic.SearchWithRandomDisconnectsIT method: testSearchWithRandomDisconnects issue: https://github.com/elastic/elasticsearch/issues/122707 diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index ecdb1daa337c9..a6fd243a30df8 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -225,6 +225,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_QUERY_PLANNING_DURATION = def(9_051_0_00); public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED = def(9_052_0_00); public static final TransportVersion BATCHED_QUERY_EXECUTION_DELAYABLE_WRITABLE = def(9_053_0_00); + public static final TransportVersion SEARCH_INCREMENTAL_TOP_DOCS_NULL = def(9_054_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java index beb1acde66746..3bde579a823f6 100644 --- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java @@ -303,13 +303,19 @@ private static void consumePartialMergeResult( Collection> aggsList ) { if (topDocsList != null) { - topDocsList.add(partialResult.reducedTopDocs); + addTopDocsToList(partialResult, topDocsList); } if (aggsList != null) { addAggsToList(partialResult, aggsList); } } + private static void addTopDocsToList(MergeResult partialResult, List topDocsList) { + if (partialResult.reducedTopDocs != null) { + topDocsList.add(partialResult.reducedTopDocs); + } + } + private static void addAggsToList(MergeResult partialResult, Collection> aggsList) { var aggs = partialResult.reducedAggs; if (aggs != null) { @@ -340,7 +346,7 @@ private MergeResult partialReduce( if (hasTopDocs) { topDocsList = new ArrayList<>(resultSetSize); if (lastMerge != null) { - topDocsList.add(lastMerge.reducedTopDocs); + addTopDocsToList(lastMerge, topDocsList); } } else { topDocsList = null; @@ -358,7 +364,7 @@ private MergeResult partialReduce( } } // we have to merge here in the same way we collect on a shard - newTopDocs = topDocsList == null ? Lucene.EMPTY_TOP_DOCS : mergeTopDocs(topDocsList, topNSize, 0); + newTopDocs = topDocsList == null ? null : mergeTopDocs(topDocsList, topNSize, 0); newAggs = hasAggs ? aggregate( toConsume.iterator(), @@ -636,7 +642,7 @@ private static void releaseAggs(List toConsume) { record MergeResult( List processedShards, - TopDocs reducedTopDocs, + @Nullable TopDocs reducedTopDocs, @Nullable DelayableWriteable reducedAggs, long estimatedSize ) implements Writeable { diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 4f028bfb85764..3f066e83a59d4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -60,6 +60,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.Executor; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -140,24 +141,26 @@ static SortedTopDocs sortDocs( } static TopDocs mergeTopDocs(List results, int topN, int from) { - if (results.isEmpty()) { + List topDocsList = results.stream().filter(Objects::nonNull).toList(); + if (topDocsList.isEmpty()) { return null; } - final TopDocs topDocs = results.getFirst(); - final TopDocs mergedTopDocs; - final int numShards = results.size(); + final TopDocs topDocs = topDocsList.getFirst(); + final int numShards = topDocsList.size(); if (numShards == 1 && from == 0) { // only one shard and no pagination we can just return the topDocs as we got them. return topDocs; - } else if (topDocs instanceof TopFieldGroups firstTopDocs) { + } + final TopDocs mergedTopDocs; + if (topDocs instanceof TopFieldGroups firstTopDocs) { final Sort sort = new Sort(firstTopDocs.fields); - final TopFieldGroups[] shardTopDocs = results.stream().filter(td -> td != Lucene.EMPTY_TOP_DOCS).toArray(TopFieldGroups[]::new); + TopFieldGroups[] shardTopDocs = topDocsList.toArray(new TopFieldGroups[0]); mergedTopDocs = TopFieldGroups.merge(sort, from, topN, shardTopDocs, false); } else if (topDocs instanceof TopFieldDocs firstTopDocs) { - final Sort sort = checkSameSortTypes(results, firstTopDocs.fields); - final TopFieldDocs[] shardTopDocs = results.stream().filter((td -> td != Lucene.EMPTY_TOP_DOCS)).toArray(TopFieldDocs[]::new); + TopFieldDocs[] shardTopDocs = topDocsList.toArray(new TopFieldDocs[0]); + final Sort sort = checkSameSortTypes(topDocsList, firstTopDocs.fields); mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs); } else { - final TopDocs[] shardTopDocs = results.toArray(new TopDocs[numShards]); + final TopDocs[] shardTopDocs = topDocsList.toArray(new TopDocs[0]); mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs); } return mergedTopDocs; diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 0db16c2960dd7..e552d9c9606c8 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; @@ -722,7 +721,7 @@ private static final class QueryPerNodeState { private static final QueryPhaseResultConsumer.MergeResult EMPTY_PARTIAL_MERGE_RESULT = new QueryPhaseResultConsumer.MergeResult( List.of(), - Lucene.EMPTY_TOP_DOCS, + null, null, 0L ); @@ -782,10 +781,12 @@ void onShardDone() { // also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other // indices without a roundtrip to the coordinating node final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size()); - for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) { - final int localIndex = scoreDoc.shardIndex; - scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex; - relevantShardIndices.set(localIndex); + if (mergeResult.reducedTopDocs() != null) { + for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) { + final int localIndex = scoreDoc.shardIndex; + scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex; + relevantShardIndices.set(localIndex); + } } final Object[] results = new Object[queryPhaseResultConsumer.getNumShards()]; for (int i = 0; i < results.length; i++) { diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 9b3a27c0e578e..fcfac38f8ae22 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -64,6 +64,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Version; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -384,6 +385,14 @@ public static void writeTotalHits(StreamOutput out, TotalHits totalHits) throws * by shard for sorting purposes. */ public static void writeTopDocsIncludingShardIndex(StreamOutput out, TopDocs topDocs) throws IOException { + if (topDocs == null) { + if (out.getTransportVersion().onOrAfter(TransportVersions.SEARCH_INCREMENTAL_TOP_DOCS_NULL)) { + out.writeByte((byte) -1); + return; + } else { + topDocs = Lucene.EMPTY_TOP_DOCS; + } + } if (topDocs instanceof TopFieldGroups topFieldGroups) { out.writeByte((byte) 2); writeTotalHits(out, topDocs.totalHits); @@ -424,7 +433,10 @@ public static void writeSortFieldArray(StreamOutput out, SortField[] sortFields) */ public static TopDocs readTopDocsIncludingShardIndex(StreamInput in) throws IOException { byte type = in.readByte(); - if (type == 0) { + if (type == -1) { + assert in.getTransportVersion().onOrAfter(TransportVersions.SEARCH_INCREMENTAL_TOP_DOCS_NULL); + return null; + } else if (type == 0) { TotalHits totalHits = readTotalHits(in); final int scoreDocCount = in.readVInt();