Skip to content

Commit 7dc3d3b

Browse files
authored
Add sort and collapse info to SearchHits transport serialization (#36555)
In order for CCS alternate execution mode (see #32125) to be able to do the final reduction step on the CCS coordinating node, we need to serialize additional info in the transport layer as part of the `SearchHits`, specifically: - lucene `SortField[]` which contains info about the fields that sorting was performed on and their type, which depends on mappings (that the CCS node does not know about) - collapse field (`String`) that field collapsing was executed on, if requested - collapse values (`Object[]`) that field collapsing was based on, if requested This info is needed to be able to reconstruct the `TopFieldDocs` or `CollapseFieldTopDocs` in the CCS coordinating node to feed the `mergeTopDocs` method and reduce multiple search responses received (one per cluster) into one. This commit adds such information to the `SearchHits` class. It's nullable info that is not serialized through the REST layer. `SearchPhaseController` sets such info at the end of the hits reduction phase.
1 parent c5b3ac5 commit 7dc3d3b

File tree

12 files changed

+583
-153
lines changed

12 files changed

+583
-153
lines changed

server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,9 @@ private void innerRun() throws IOException {
109109
// query AND fetch optimization
110110
finishPhase.run();
111111
} else {
112-
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs);
113-
if (reducedQueryPhase.scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return
112+
ScoreDoc[] scoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;
113+
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, scoreDocs);
114+
if (scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return
114115
phaseResults.stream()
115116
.map(SearchPhaseResult::queryResult)
116117
.forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

Lines changed: 47 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -211,18 +211,23 @@ static SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPha
211211
}
212212
}
213213
}
214-
final boolean isSortedByField;
215-
final SortField[] sortFields;
214+
boolean isSortedByField = false;
215+
SortField[] sortFields = null;
216+
String collapseField = null;
217+
Object[] collapseValues = null;
216218
if (mergedTopDocs instanceof TopFieldDocs) {
217219
TopFieldDocs fieldDocs = (TopFieldDocs) mergedTopDocs;
218-
isSortedByField = (fieldDocs instanceof CollapseTopFieldDocs &&
219-
fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) == false;
220220
sortFields = fieldDocs.fields;
221-
} else {
222-
isSortedByField = false;
223-
sortFields = null;
221+
if (fieldDocs instanceof CollapseTopFieldDocs) {
222+
isSortedByField = (fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) == false;
223+
CollapseTopFieldDocs collapseTopFieldDocs = (CollapseTopFieldDocs) fieldDocs;
224+
collapseField = collapseTopFieldDocs.field;
225+
collapseValues = collapseTopFieldDocs.collapseValues;
226+
} else {
227+
isSortedByField = true;
228+
}
224229
}
225-
return new SortedTopDocs(scoreDocs, isSortedByField, sortFields);
230+
return new SortedTopDocs(scoreDocs, isSortedByField, sortFields, collapseField, collapseValues);
226231
} else {
227232
// no relevant docs
228233
return SortedTopDocs.EMPTY;
@@ -266,7 +271,7 @@ private static void setShardIndex(TopDocs topDocs, int shardIndex) {
266271
public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase, int numShards) {
267272
final ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards];
268273
if (reducedQueryPhase.isEmptyResult == false) {
269-
final ScoreDoc[] sortedScoreDocs = reducedQueryPhase.scoreDocs;
274+
final ScoreDoc[] sortedScoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;
270275
// from is always zero as when we use scroll, we ignore from
271276
long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.size);
272277
// with collapsing we can have more hits than sorted docs
@@ -307,7 +312,7 @@ public InternalSearchResponse merge(boolean ignoreFrom, ReducedQueryPhase reduce
307312
if (reducedQueryPhase.isEmptyResult) {
308313
return InternalSearchResponse.empty();
309314
}
310-
ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs;
315+
ScoreDoc[] sortedDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;
311316
SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, fetchResults, resultsLookup);
312317
if (reducedQueryPhase.suggest != null) {
313318
if (!fetchResults.isEmpty()) {
@@ -345,12 +350,12 @@ public InternalSearchResponse merge(boolean ignoreFrom, ReducedQueryPhase reduce
345350

346351
private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom,
347352
Collection<? extends SearchPhaseResult> fetchResults, IntFunction<SearchPhaseResult> resultsLookup) {
348-
final boolean sorted = reducedQueryPhase.isSortedByField;
349-
ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs;
353+
SortedTopDocs sortedTopDocs = reducedQueryPhase.sortedTopDocs;
350354
int sortScoreIndex = -1;
351-
if (sorted) {
352-
for (int i = 0; i < reducedQueryPhase.sortField.length; i++) {
353-
if (reducedQueryPhase.sortField[i].getType() == SortField.Type.SCORE) {
355+
if (sortedTopDocs.isSortedByField) {
356+
SortField[] sortFields = sortedTopDocs.sortFields;
357+
for (int i = 0; i < sortFields.length; i++) {
358+
if (sortFields[i].getType() == SortField.Type.SCORE) {
354359
sortScoreIndex = i;
355360
}
356361
}
@@ -362,12 +367,12 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
362367
int from = ignoreFrom ? 0 : reducedQueryPhase.from;
363368
int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.size);
364369
// with collapsing we can have more fetch hits than sorted docs
365-
numSearchHits = Math.min(sortedDocs.length, numSearchHits);
370+
numSearchHits = Math.min(sortedTopDocs.scoreDocs.length, numSearchHits);
366371
// merge hits
367372
List<SearchHit> hits = new ArrayList<>();
368373
if (!fetchResults.isEmpty()) {
369374
for (int i = 0; i < numSearchHits; i++) {
370-
ScoreDoc shardDoc = sortedDocs[i];
375+
ScoreDoc shardDoc = sortedTopDocs.scoreDocs[i];
371376
SearchPhaseResult fetchResultProvider = resultsLookup.apply(shardDoc.shardIndex);
372377
if (fetchResultProvider == null) {
373378
// this can happen if we are hitting a shard failure during the fetch phase
@@ -381,21 +386,21 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
381386
assert index < fetchResult.hits().getHits().length : "not enough hits fetched. index [" + index + "] length: "
382387
+ fetchResult.hits().getHits().length;
383388
SearchHit searchHit = fetchResult.hits().getHits()[index];
384-
if (sorted == false) {
385-
searchHit.score(shardDoc.score);
386-
}
387389
searchHit.shard(fetchResult.getSearchShardTarget());
388-
if (sorted) {
390+
if (sortedTopDocs.isSortedByField) {
389391
FieldDoc fieldDoc = (FieldDoc) shardDoc;
390392
searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.sortValueFormats);
391393
if (sortScoreIndex != -1) {
392394
searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue());
393395
}
396+
} else {
397+
searchHit.score(shardDoc.score);
394398
}
395399
hits.add(searchHit);
396400
}
397401
}
398-
return new SearchHits(hits.toArray(new SearchHit[0]), reducedQueryPhase.totalHits, reducedQueryPhase.maxScore);
402+
return new SearchHits(hits.toArray(new SearchHit[0]), reducedQueryPhase.totalHits,
403+
reducedQueryPhase.maxScore, sortedTopDocs.sortFields, sortedTopDocs.collapseField, sortedTopDocs.collapseValues);
399404
}
400405

401406
/**
@@ -436,8 +441,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
436441
if (queryResults.isEmpty()) { // early terminate we have nothing to reduce
437442
final TotalHits totalHits = topDocsStats.getTotalHits();
438443
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
439-
timedOut, terminatedEarly, null, null, null, EMPTY_DOCS, null,
440-
null, numReducePhases, false, 0, 0, true);
444+
timedOut, terminatedEarly, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true);
441445
}
442446
final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult();
443447
final boolean hasSuggest = firstResult.suggest() != null;
@@ -499,11 +503,11 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
499503
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
500504
firstResult.pipelineAggregators(), reduceContext);
501505
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
502-
final SortedTopDocs scoreDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
506+
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
503507
final TotalHits totalHits = topDocsStats.getTotalHits();
504508
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
505-
timedOut, terminatedEarly, suggest, aggregations, shardResults, scoreDocs.scoreDocs, scoreDocs.sortFields,
506-
firstResult.sortValueFormats(), numReducePhases, scoreDocs.isSortedByField, size, from, false);
509+
timedOut, terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs,
510+
firstResult.sortValueFormats(), numReducePhases, size, from, firstResult == null);
507511
}
508512

509513
/**
@@ -551,12 +555,8 @@ public static final class ReducedQueryPhase {
551555
final SearchProfileShardResults shardResults;
552556
// the number of reduces phases
553557
final int numReducePhases;
554-
// the searches merged top docs
555-
final ScoreDoc[] scoreDocs;
556-
// the top docs sort fields used to sort the score docs, <code>null</code> if the results are not sorted
557-
final SortField[] sortField;
558-
// <code>true</code> iff the result score docs is sorted by a field (not score), this implies that <code>sortField</code> is set.
559-
final boolean isSortedByField;
558+
//encloses info about the merged top docs, the sort fields used to sort the score docs etc.
559+
final SortedTopDocs sortedTopDocs;
560560
// the size of the top hits to return
561561
final int size;
562562
// <code>true</code> iff the query phase had no results. Otherwise <code>false</code>
@@ -567,9 +567,8 @@ public static final class ReducedQueryPhase {
567567
final DocValueFormat[] sortValueFormats;
568568

569569
ReducedQueryPhase(TotalHits totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly, Suggest suggest,
570-
InternalAggregations aggregations, SearchProfileShardResults shardResults, ScoreDoc[] scoreDocs,
571-
SortField[] sortFields, DocValueFormat[] sortValueFormats, int numReducePhases, boolean isSortedByField, int size,
572-
int from, boolean isEmptyResult) {
570+
InternalAggregations aggregations, SearchProfileShardResults shardResults, SortedTopDocs sortedTopDocs,
571+
DocValueFormat[] sortValueFormats, int numReducePhases, int size, int from, boolean isEmptyResult) {
573572
if (numReducePhases <= 0) {
574573
throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases);
575574
}
@@ -586,9 +585,7 @@ public static final class ReducedQueryPhase {
586585
this.aggregations = aggregations;
587586
this.shardResults = shardResults;
588587
this.numReducePhases = numReducePhases;
589-
this.scoreDocs = scoreDocs;
590-
this.sortField = sortFields;
591-
this.isSortedByField = isSortedByField;
588+
this.sortedTopDocs = sortedTopDocs;
592589
this.size = size;
593590
this.from = from;
594591
this.isEmptyResult = isEmptyResult;
@@ -728,7 +725,7 @@ InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResu
728725
}
729726
return new InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
730727
@Override
731-
public ReducedQueryPhase reduce() {
728+
ReducedQueryPhase reduce() {
732729
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits);
733730
}
734731
};
@@ -770,15 +767,23 @@ void add(TopDocsAndMaxScore topDocs) {
770767
}
771768

772769
static final class SortedTopDocs {
773-
static final SortedTopDocs EMPTY = new SortedTopDocs(EMPTY_DOCS, false, null);
770+
static final SortedTopDocs EMPTY = new SortedTopDocs(EMPTY_DOCS, false, null, null, null);
771+
// the searches merged top docs
774772
final ScoreDoc[] scoreDocs;
773+
// <code>true</code> iff the result score docs is sorted by a field (not score), this implies that <code>sortField</code> is set.
775774
final boolean isSortedByField;
775+
// the top docs sort fields used to sort the score docs, <code>null</code> if the results are not sorted
776776
final SortField[] sortFields;
777+
final String collapseField;
778+
final Object[] collapseValues;
777779

778-
SortedTopDocs(ScoreDoc[] scoreDocs, boolean isSortedByField, SortField[] sortFields) {
780+
SortedTopDocs(ScoreDoc[] scoreDocs, boolean isSortedByField, SortField[] sortFields,
781+
String collapseField, Object[] collapseValues) {
779782
this.scoreDocs = scoreDocs;
780783
this.isSortedByField = isSortedByField;
781784
this.sortFields = sortFields;
785+
this.collapseField = collapseField;
786+
this.collapseValues = collapseValues;
782787
}
783788
}
784789
}

server/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.elasticsearch.search.query.ScrollQuerySearchResult;
3636
import org.elasticsearch.transport.Transport;
3737

38-
import java.io.IOException;
3938
import java.util.function.BiFunction;
4039

4140
final class SearchScrollQueryThenFetchAsyncAction extends SearchScrollAsyncAction<ScrollQuerySearchResult> {
@@ -68,16 +67,16 @@ protected void executeInitialPhase(Transport.Connection connection, InternalScro
6867
protected SearchPhase moveToNextPhase(BiFunction<String, String, DiscoveryNode> clusterNodeLookup) {
6968
return new SearchPhase("fetch") {
7069
@Override
71-
public void run() throws IOException {
70+
public void run() {
7271
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedScrollQueryPhase(
7372
queryResults.asList());
74-
if (reducedQueryPhase.scoreDocs.length == 0) {
73+
ScoreDoc[] scoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;
74+
if (scoreDocs.length == 0) {
7575
sendResponse(reducedQueryPhase, fetchResults);
7676
return;
7777
}
7878

79-
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(),
80-
reducedQueryPhase.scoreDocs);
79+
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), scoreDocs);
8180
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase,
8281
queryResults.length());
8382
final CountDown counter = new CountDown(docIdsToLoad.length);

server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -785,22 +785,36 @@ public <T> void writeArray(final Writer<T> writer, final T[] array) throws IOExc
785785
}
786786
}
787787

788-
public <T extends Writeable> void writeArray(T[] array) throws IOException {
789-
writeVInt(array.length);
790-
for (T value: array) {
791-
value.writeTo(this);
792-
}
793-
}
794-
795-
public <T extends Writeable> void writeOptionalArray(@Nullable T[] array) throws IOException {
788+
/**
789+
* Same as {@link #writeArray(Writer, Object[])} but the provided array may be null. An additional boolean value is
790+
* serialized to indicate whether the array was null or not.
791+
*/
792+
public <T> void writeOptionalArray(final Writer<T> writer, final @Nullable T[] array) throws IOException {
796793
if (array == null) {
797794
writeBoolean(false);
798795
} else {
799796
writeBoolean(true);
800-
writeArray(array);
797+
writeArray(writer, array);
801798
}
802799
}
803800

801+
/**
802+
* Writes the specified array of {@link Writeable}s. This method can be seen as
803+
* writer version of {@link StreamInput#readArray(Writeable.Reader, IntFunction)}. The length of array encoded as a variable-length
804+
* integer is first written to the stream, and then the elements of the array are written to the stream.
805+
*/
806+
public <T extends Writeable> void writeArray(T[] array) throws IOException {
807+
writeArray((out, value) -> value.writeTo(out), array);
808+
}
809+
810+
/**
811+
* Same as {@link #writeArray(Writeable[])} but the provided array may be null. An additional boolean value is
812+
* serialized to indicate whether the array was null or not.
813+
*/
814+
public <T extends Writeable> void writeOptionalArray(@Nullable T[] array) throws IOException {
815+
writeOptionalArray((out, value) -> value.writeTo(out), array);
816+
}
817+
804818
/**
805819
* Serializes a potential null value.
806820
*/

0 commit comments

Comments
 (0)