Skip to content

Commit 09a6ba5

Browse files
authored
Add support for merging multiple search responses into one (#37566)
This will be used in cross-cluster search when reduction will be performed locally on each cluster. The CCS coordinating node will send one search request per remote cluster involved and will get one search response back from each one of them. Such responses contain all the info to be able to perform an additional reduction and return results back to the user. Relates to #32125
1 parent 14d74eb commit 09a6ba5

File tree

8 files changed

+899
-49
lines changed

8 files changed

+899
-49
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.ExceptionsHelper;
2727
import org.elasticsearch.action.ActionListener;
2828
import org.elasticsearch.action.ShardOperationFailedException;
29+
import org.elasticsearch.action.search.TransportSearchAction.SearchTimeProvider;
2930
import org.elasticsearch.action.support.TransportActions;
3031
import org.elasticsearch.cluster.routing.GroupShardsIterator;
3132
import org.elasticsearch.common.Nullable;
@@ -43,7 +44,6 @@
4344
import java.util.Map;
4445
import java.util.Set;
4546
import java.util.concurrent.Executor;
46-
import java.util.concurrent.TimeUnit;
4747
import java.util.concurrent.atomic.AtomicInteger;
4848
import java.util.function.BiFunction;
4949
import java.util.stream.Collectors;
@@ -70,7 +70,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
7070
private final Object shardFailuresMutex = new Object();
7171
private final AtomicInteger successfulOps = new AtomicInteger();
7272
private final AtomicInteger skippedOps = new AtomicInteger();
73-
private final TransportSearchAction.SearchTimeProvider timeProvider;
73+
private final SearchTimeProvider timeProvider;
7474
private final SearchResponse.Clusters clusters;
7575

7676
AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
@@ -79,7 +79,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
7979
Map<String, Set<String>> indexRoutings,
8080
Executor executor, SearchRequest request,
8181
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
82-
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
82+
SearchTimeProvider timeProvider, long clusterStateVersion,
8383
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentRequestsPerNode,
8484
SearchResponse.Clusters clusters) {
8585
super(name, request, shardsIts, logger, maxConcurrentRequestsPerNode, executor);
@@ -103,8 +103,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
103103
* Builds how long it took to execute the search.
104104
*/
105105
long buildTookInMillis() {
106-
return TimeUnit.NANOSECONDS.toMillis(
107-
timeProvider.getRelativeCurrentNanos() - timeProvider.getRelativeStartNanos());
106+
return timeProvider.buildTookInMillis();
108107
}
109108

110109
/**

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ static SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPha
170170
if (queryResult.hasConsumedTopDocs() == false) { // already consumed?
171171
final TopDocsAndMaxScore td = queryResult.consumeTopDocs();
172172
assert td != null;
173-
topDocsStats.add(td);
173+
topDocsStats.add(td, queryResult.searchTimedOut(), queryResult.terminatedEarly());
174174
// make sure we set the shard index before we add it - the consumer didn't do that yet
175175
if (td.topDocs.scoreDocs.length > 0) {
176176
setShardIndex(td.topDocs, queryResult.getShardIndex());
@@ -439,12 +439,10 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
439439
boolean performFinalReduce) {
440440
assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
441441
numReducePhases++; // increment for this phase
442-
boolean timedOut = false;
443-
Boolean terminatedEarly = null;
444442
if (queryResults.isEmpty()) { // early terminate we have nothing to reduce
445443
final TotalHits totalHits = topDocsStats.getTotalHits();
446-
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
447-
timedOut, terminatedEarly, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true);
444+
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(),
445+
false, null, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true);
448446
}
449447
final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult();
450448
final boolean hasSuggest = firstResult.suggest() != null;
@@ -476,16 +474,6 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
476474
QuerySearchResult result = entry.queryResult();
477475
from = result.from();
478476
size = result.size();
479-
if (result.searchTimedOut()) {
480-
timedOut = true;
481-
}
482-
if (result.terminatedEarly() != null) {
483-
if (terminatedEarly == null) {
484-
terminatedEarly = result.terminatedEarly();
485-
} else if (result.terminatedEarly()) {
486-
terminatedEarly = true;
487-
}
488-
}
489477
if (hasSuggest) {
490478
assert result.suggest() != null;
491479
for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : result.suggest()) {
@@ -508,8 +496,8 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
508496
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
509497
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
510498
final TotalHits totalHits = topDocsStats.getTotalHits();
511-
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
512-
timedOut, terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs,
499+
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(),
500+
topDocsStats.timedOut, topDocsStats.terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs,
513501
firstResult.sortValueFormats(), numReducePhases, size, from, false);
514502
}
515503

@@ -577,11 +565,7 @@ public static final class ReducedQueryPhase {
577565
}
578566
this.totalHits = totalHits;
579567
this.fetchHits = fetchHits;
580-
if (Float.isInfinite(maxScore)) {
581-
this.maxScore = Float.NaN;
582-
} else {
583-
this.maxScore = maxScore;
584-
}
568+
this.maxScore = maxScore;
585569
this.timedOut = timedOut;
586570
this.terminatedEarly = terminatedEarly;
587571
this.suggest = suggest;
@@ -682,7 +666,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
682666
}
683667
if (hasTopDocs) {
684668
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
685-
topDocsStats.add(topDocs);
669+
topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly());
686670
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
687671
topDocsBuffer[i] = topDocs.topDocs;
688672
}
@@ -744,18 +728,20 @@ static final class TopDocsStats {
744728
private long totalHits;
745729
private TotalHits.Relation totalHitsRelation;
746730
long fetchHits;
747-
float maxScore = Float.NEGATIVE_INFINITY;
748-
749-
TopDocsStats() {
750-
this(SearchContext.TRACK_TOTAL_HITS_ACCURATE);
751-
}
731+
private float maxScore = Float.NEGATIVE_INFINITY;
732+
boolean timedOut;
733+
Boolean terminatedEarly;
752734

753735
TopDocsStats(int trackTotalHitsUpTo) {
754736
this.trackTotalHitsUpTo = trackTotalHitsUpTo;
755737
this.totalHits = 0;
756738
this.totalHitsRelation = Relation.EQUAL_TO;
757739
}
758740

741+
float getMaxScore() {
742+
return Float.isInfinite(maxScore) ? Float.NaN : maxScore;
743+
}
744+
759745
TotalHits getTotalHits() {
760746
if (trackTotalHitsUpTo == SearchContext.TRACK_TOTAL_HITS_DISABLED) {
761747
return null;
@@ -766,7 +752,7 @@ TotalHits getTotalHits() {
766752
if (totalHits < trackTotalHitsUpTo) {
767753
return new TotalHits(totalHits, totalHitsRelation);
768754
} else {
769-
/**
755+
/*
770756
* The user requested to count the total hits up to <code>trackTotalHitsUpTo</code>
771757
* so we return this lower bound when the total hits is greater than this value.
772758
* This can happen when multiple shards are merged since the limit to track total hits
@@ -777,7 +763,7 @@ TotalHits getTotalHits() {
777763
}
778764
}
779765

780-
void add(TopDocsAndMaxScore topDocs) {
766+
void add(TopDocsAndMaxScore topDocs, boolean timedOut, Boolean terminatedEarly) {
781767
if (trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED) {
782768
totalHits += topDocs.topDocs.totalHits.value;
783769
if (topDocs.topDocs.totalHits.relation == Relation.GREATER_THAN_OR_EQUAL_TO) {
@@ -788,6 +774,16 @@ void add(TopDocsAndMaxScore topDocs) {
788774
if (!Float.isNaN(topDocs.maxScore)) {
789775
maxScore = Math.max(maxScore, topDocs.maxScore);
790776
}
777+
if (timedOut) {
778+
this.timedOut = true;
779+
}
780+
if (terminatedEarly != null) {
781+
if (this.terminatedEarly == null) {
782+
this.terminatedEarly = terminatedEarly;
783+
} else if (terminatedEarly) {
784+
this.terminatedEarly = true;
785+
}
786+
}
791787
}
792788
}
793789

0 commit comments

Comments
 (0)