Skip to content

Commit ba0ff6b

Browse files
committed
Minor search controller changes (#36479)
This commit contains a few minor changes to our search code: - adjust the visibility of a couple of methods in our search code to package private from public or protected. - make some of the `SearchPhaseController` methods static where possible - rename one of the `SearchPhaseController#reducedQueryPhase` methods (used only for scroll requests) to `reducedScrollQueryPhase` without the `isScrollRequest` argument which was always set to `true` - replace leniency in `SearchPhaseController#setShardIndex` with an assert to make sure that we never set the shard index twice - remove two null checks where the checked field can never be null - resolve an unchecked warning - replace `List#toArray` invocation that creates an array providing the true size with array creation of length 0 - correct a couple of typos in comments
1 parent 3852401 commit ba0ff6b

File tree

8 files changed

+27
-42
lines changed

8 files changed

+27
-42
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
7272
private final TransportSearchAction.SearchTimeProvider timeProvider;
7373
private final SearchResponse.Clusters clusters;
7474

75-
protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
75+
AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
7676
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
7777
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
7878
Map<String, Set<String>> indexRoutings,

server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
* and collect the results. If a shard request returns a failure this class handles the advance to the next replica of the shard until
4343
* the shards replica iterator is exhausted. Each shard is referenced by position in the {@link GroupShardsIterator} which is later
4444
* referred to as the {@code shardIndex}.
45-
* The fan out and collect algorithm is traditionally used as the initial phase which can either be a query execution or collection
45+
* The fan out and collect algorithm is traditionally used as the initial phase which can either be a query execution or collection of
4646
* distributed frequencies
4747
*/
4848
abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends SearchPhase {
@@ -261,7 +261,6 @@ private void successfulShardExecution(SearchShardIterator shardsIt) {
261261
}
262262
}
263263

264-
265264
/**
266265
* Executed once all shard results have been received and processed
267266
* @see #onShardFailure(int, SearchShardTarget, Exception)
@@ -301,7 +300,7 @@ protected abstract void executePhaseOnShard(SearchShardIterator shardIt, ShardRo
301300
abstract static class SearchPhaseResults<Result extends SearchPhaseResult> {
302301
private final int numShards;
303302

304-
protected SearchPhaseResults(int numShards) {
303+
SearchPhaseResults(int numShards) {
305304
this.numShards = numShards;
306305
}
307306
/**

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

+14-27
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import com.carrotsearch.hppc.IntArrayList;
2323
import com.carrotsearch.hppc.ObjectObjectHashMap;
24-
2524
import org.apache.lucene.index.Term;
2625
import org.apache.lucene.search.CollectionStatistics;
2726
import org.apache.lucene.search.FieldDoc;
@@ -151,8 +150,8 @@ private static long optionalSum(long left, long right) {
151150
* @param from the offset into the search results top docs
152151
* @param size the number of hits to return from the merged top docs
153152
*/
154-
public SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results,
155-
final Collection<TopDocs> bufferedTopDocs, final TopDocsStats topDocsStats, int from, int size) {
153+
static SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results,
154+
final Collection<TopDocs> bufferedTopDocs, final TopDocsStats topDocsStats, int from, int size) {
156155
if (results.isEmpty()) {
157156
return SortedTopDocs.EMPTY;
158157
}
@@ -210,7 +209,7 @@ public SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPha
210209
}
211210
final boolean isSortedByField;
212211
final SortField[] sortFields;
213-
if (mergedTopDocs != null && mergedTopDocs instanceof TopFieldDocs) {
212+
if (mergedTopDocs instanceof TopFieldDocs) {
214213
TopFieldDocs fieldDocs = (TopFieldDocs) mergedTopDocs;
215214
isSortedByField = (fieldDocs instanceof CollapseTopFieldDocs &&
216215
fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) == false;
@@ -226,11 +225,10 @@ public SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPha
226225
}
227226
}
228227

229-
TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
228+
static TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
230229
if (results.isEmpty()) {
231230
return null;
232231
}
233-
assert results.isEmpty() == false;
234232
final boolean setShardIndex = false;
235233
final TopDocs topDocs = results.stream().findFirst().get();
236234
final TopDocs mergedTopDocs;
@@ -255,12 +253,8 @@ TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
255253
}
256254

257255
private static void setShardIndex(TopDocs topDocs, int shardIndex) {
256+
assert topDocs.scoreDocs.length == 0 || topDocs.scoreDocs[0].shardIndex == -1 : "shardIndex is already set";
258257
for (ScoreDoc doc : topDocs.scoreDocs) {
259-
if (doc.shardIndex != -1) {
260-
// once there is a single shard index initialized all others will be initialized too
261-
// there are many asserts down in lucene land that this is actually true. we can shortcut it here.
262-
return;
263-
}
264258
doc.shardIndex = shardIndex;
265259
}
266260
}
@@ -279,7 +273,6 @@ public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase,
279273
}
280274
}
281275
return lastEmittedDocPerShard;
282-
283276
}
284277

285278
/**
@@ -396,16 +389,15 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
396389
hits.add(searchHit);
397390
}
398391
}
399-
return new SearchHits(hits.toArray(new SearchHit[hits.size()]), reducedQueryPhase.totalHits,
400-
reducedQueryPhase.maxScore);
392+
return new SearchHits(hits.toArray(new SearchHit[0]), reducedQueryPhase.totalHits, reducedQueryPhase.maxScore);
401393
}
402394

403395
/**
404396
* Reduces the given query results and consumes all aggregations and profile results.
405397
* @param queryResults a list of non-null query shard results
406398
*/
407-
public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults, boolean isScrollRequest) {
408-
return reducedQueryPhase(queryResults, isScrollRequest, true);
399+
public ReducedQueryPhase reducedScrollQueryPhase(Collection<? extends SearchPhaseResult> queryResults) {
400+
return reducedQueryPhase(queryResults, true, true);
409401
}
410402

411403
/**
@@ -417,7 +409,6 @@ public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResul
417409
return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(trackTotalHits), 0, isScrollRequest);
418410
}
419411

420-
421412
/**
422413
* Reduces the given query results and consumes all aggregations and profile results.
423414
* @param queryResults a list of non-null query shard results
@@ -500,14 +491,12 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
500491
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
501492
firstResult.pipelineAggregators(), reduceContext);
502493
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
503-
final SortedTopDocs scoreDocs = this.sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
494+
final SortedTopDocs scoreDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
504495
return new ReducedQueryPhase(topDocsStats.totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
505496
timedOut, terminatedEarly, suggest, aggregations, shardResults, scoreDocs.scoreDocs, scoreDocs.sortFields,
506-
firstResult != null ? firstResult.sortValueFormats() : null,
507-
numReducePhases, scoreDocs.isSortedByField, size, from, firstResult == null);
497+
firstResult.sortValueFormats(), numReducePhases, scoreDocs.isSortedByField, size, from, false);
508498
}
509499

510-
511500
/**
512501
* Performs an intermediate reduce phase on the aggregations. For instance with this reduce phase never prune information
513502
* that relevant for the final reduce step. For final reduce see {@link #reduceAggs(List, List, ReduceContext)}
@@ -518,7 +507,7 @@ private InternalAggregations reduceAggsIncrementally(List<InternalAggregations>
518507
null, reduceContext);
519508
}
520509

521-
private InternalAggregations reduceAggs(List<InternalAggregations> aggregationsList,
510+
private static InternalAggregations reduceAggs(List<InternalAggregations> aggregationsList,
522511
List<SiblingPipelineAggregator> pipelineAggregators, ReduceContext reduceContext) {
523512
InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
524513
if (pipelineAggregators != null) {
@@ -649,7 +638,6 @@ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedR
649638
this.hasTopDocs = hasTopDocs;
650639
this.hasAggs = hasAggs;
651640
this.bufferSize = bufferSize;
652-
653641
}
654642

655643
@Override
@@ -667,7 +655,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
667655
aggsBuffer[0] = reducedAggs;
668656
}
669657
if (hasTopDocs) {
670-
TopDocs reducedTopDocs = controller.mergeTopDocs(Arrays.asList(topDocsBuffer),
658+
TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
671659
// we have to merge here in the same way we collect on a shard
672660
querySearchResult.from() + querySearchResult.size(), 0);
673661
Arrays.fill(topDocsBuffer, null);
@@ -683,7 +671,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
683671
if (hasTopDocs) {
684672
final TopDocs topDocs = querySearchResult.consumeTopDocs(); // can't be null
685673
topDocsStats.add(topDocs);
686-
SearchPhaseController.setShardIndex(topDocs, querySearchResult.getShardIndex());
674+
setShardIndex(topDocs, querySearchResult.getShardIndex());
687675
topDocsBuffer[i] = topDocs;
688676
}
689677
}
@@ -696,7 +684,6 @@ private synchronized List<TopDocs> getRemainingTopDocs() {
696684
return hasTopDocs ? Arrays.asList(topDocsBuffer).subList(0, index) : null;
697685
}
698686

699-
700687
@Override
701688
public ReducedQueryPhase reduce() {
702689
return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats,
@@ -730,7 +717,7 @@ InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResu
730717
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs);
731718
}
732719
}
733-
return new InitialSearchPhase.ArraySearchPhaseResults(numShards) {
720+
return new InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
734721
@Override
735722
public ReducedQueryPhase reduce() {
736723
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits);

server/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ protected void executeInitialPhase(Transport.Connection connection, InternalScro
5252

5353
@Override
5454
protected SearchPhase moveToNextPhase(BiFunction<String, String, DiscoveryNode> clusterNodeLookup) {
55-
return sendResponsePhase(searchPhaseController.reducedQueryPhase(queryFetchResults.asList(), true), queryFetchResults);
55+
return sendResponsePhase(searchPhaseController.reducedScrollQueryPhase(queryFetchResults.asList()), queryFetchResults);
5656
}
5757

5858
@Override

server/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ protected SearchPhase moveToNextPhase(BiFunction<String, String, DiscoveryNode>
6969
return new SearchPhase("fetch") {
7070
@Override
7171
public void run() throws IOException {
72-
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(
73-
queryResults.asList(), true);
72+
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedScrollQueryPhase(
73+
queryResults.asList());
7474
if (reducedQueryPhase.scoreDocs.length == 0) {
7575
sendResponse(reducedQueryPhase, fetchResults);
7676
return;

server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.elasticsearch.transport.TransportResponse;
2626

2727
/**
28-
* This class is a base class for all search releated results. It contains the shard target it
28+
* This class is a base class for all search related results. It contains the shard target it
2929
* was executed against, a shard index used to reference the result on the coordinating node
3030
* and a request ID that is used to reference the request context on the executing node. The
3131
* request ID is particularly important since it is used to reference and maintain a context

server/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/InternalTopHits.java

-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ public InternalTopHits(StreamInput in) throws IOException {
6363
from = in.readVInt();
6464
size = in.readVInt();
6565
topDocs = Lucene.readTopDocs(in);
66-
assert topDocs != null;
6766
searchHits = SearchHits.readSearchHits(in);
6867
}
6968

server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.elasticsearch.common.util.concurrent.AtomicArray;
2828
import org.elasticsearch.index.Index;
2929
import org.elasticsearch.search.DocValueFormat;
30+
import org.elasticsearch.search.SearchHit;
31+
import org.elasticsearch.search.SearchHits;
3032
import org.elasticsearch.search.SearchPhaseResult;
3133
import org.elasticsearch.search.SearchShardTarget;
3234
import org.elasticsearch.search.aggregations.AggregationBuilders;
@@ -35,8 +37,6 @@
3537
import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
3638
import org.elasticsearch.search.builder.SearchSourceBuilder;
3739
import org.elasticsearch.search.fetch.FetchSearchResult;
38-
import org.elasticsearch.search.SearchHit;
39-
import org.elasticsearch.search.SearchHits;
4040
import org.elasticsearch.search.internal.InternalSearchResponse;
4141
import org.elasticsearch.search.query.QuerySearchResult;
4242
import org.elasticsearch.search.suggest.Suggest;
@@ -70,7 +70,7 @@ public void setup() {
7070
(b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
7171
}
7272

73-
public void testSort() throws Exception {
73+
public void testSort() {
7474
List<CompletionSuggestion> suggestions = new ArrayList<>();
7575
for (int i = 0; i < randomIntBetween(1, 5); i++) {
7676
suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20), false));
@@ -85,7 +85,7 @@ public void testSort() throws Exception {
8585
size = first.get().queryResult().size();
8686
}
8787
int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results));
88-
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(),
88+
ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(),
8989
from, size)
9090
.scoreDocs;
9191
for (Suggest.Suggestion<?> suggestion : reducedSuggest(results)) {
@@ -110,12 +110,12 @@ public void testSortIsIdempotent() throws Exception {
110110
size = first.get().queryResult().size();
111111
}
112112
SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats();
113-
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs;
113+
ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs;
114114

115115
results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize,
116116
useConstantScore);
117117
SearchPhaseController.TopDocsStats topDocsStats2 = new SearchPhaseController.TopDocsStats();
118-
ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs;
118+
ScoreDoc[] sortedDocs2 = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs;
119119
assertEquals(sortedDocs.length, sortedDocs2.length);
120120
for (int i = 0; i < sortedDocs.length; i++) {
121121
assertEquals(sortedDocs[i].doc, sortedDocs2[i].doc);

0 commit comments

Comments
 (0)