21
21
22
22
import com .carrotsearch .hppc .IntArrayList ;
23
23
import com .carrotsearch .hppc .ObjectObjectHashMap ;
24
-
25
24
import org .apache .lucene .index .Term ;
26
25
import org .apache .lucene .search .CollectionStatistics ;
27
26
import org .apache .lucene .search .FieldDoc ;
@@ -151,8 +150,8 @@ private static long optionalSum(long left, long right) {
151
150
* @param from the offset into the search results top docs
152
151
* @param size the number of hits to return from the merged top docs
153
152
*/
154
- public SortedTopDocs sortDocs (boolean ignoreFrom , Collection <? extends SearchPhaseResult > results ,
155
- final Collection <TopDocs > bufferedTopDocs , final TopDocsStats topDocsStats , int from , int size ) {
153
+ static SortedTopDocs sortDocs (boolean ignoreFrom , Collection <? extends SearchPhaseResult > results ,
154
+ final Collection <TopDocs > bufferedTopDocs , final TopDocsStats topDocsStats , int from , int size ) {
156
155
if (results .isEmpty ()) {
157
156
return SortedTopDocs .EMPTY ;
158
157
}
@@ -210,7 +209,7 @@ public SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPha
210
209
}
211
210
final boolean isSortedByField ;
212
211
final SortField [] sortFields ;
213
- if (mergedTopDocs != null && mergedTopDocs instanceof TopFieldDocs ) {
212
+ if (mergedTopDocs instanceof TopFieldDocs ) {
214
213
TopFieldDocs fieldDocs = (TopFieldDocs ) mergedTopDocs ;
215
214
isSortedByField = (fieldDocs instanceof CollapseTopFieldDocs &&
216
215
fieldDocs .fields .length == 1 && fieldDocs .fields [0 ].getType () == SortField .Type .SCORE ) == false ;
@@ -226,11 +225,10 @@ public SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPha
226
225
}
227
226
}
228
227
229
- TopDocs mergeTopDocs (Collection <TopDocs > results , int topN , int from ) {
228
+ static TopDocs mergeTopDocs (Collection <TopDocs > results , int topN , int from ) {
230
229
if (results .isEmpty ()) {
231
230
return null ;
232
231
}
233
- assert results .isEmpty () == false ;
234
232
final boolean setShardIndex = false ;
235
233
final TopDocs topDocs = results .stream ().findFirst ().get ();
236
234
final TopDocs mergedTopDocs ;
@@ -255,12 +253,8 @@ TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
255
253
}
256
254
257
255
private static void setShardIndex (TopDocs topDocs , int shardIndex ) {
256
+ assert topDocs .scoreDocs .length == 0 || topDocs .scoreDocs [0 ].shardIndex == -1 : "shardIndex is already set" ;
258
257
for (ScoreDoc doc : topDocs .scoreDocs ) {
259
- if (doc .shardIndex != -1 ) {
260
- // once there is a single shard index initialized all others will be initialized too
261
- // there are many asserts down in lucene land that this is actually true. we can shortcut it here.
262
- return ;
263
- }
264
258
doc .shardIndex = shardIndex ;
265
259
}
266
260
}
@@ -279,7 +273,6 @@ public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase,
279
273
}
280
274
}
281
275
return lastEmittedDocPerShard ;
282
-
283
276
}
284
277
285
278
/**
@@ -396,16 +389,15 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
396
389
hits .add (searchHit );
397
390
}
398
391
}
399
- return new SearchHits (hits .toArray (new SearchHit [hits .size ()]), reducedQueryPhase .totalHits ,
400
- reducedQueryPhase .maxScore );
392
+ return new SearchHits (hits .toArray (new SearchHit [0 ]), reducedQueryPhase .totalHits , reducedQueryPhase .maxScore );
401
393
}
402
394
403
395
/**
404
396
* Reduces the given query results and consumes all aggregations and profile results.
405
397
* @param queryResults a list of non-null query shard results
406
398
*/
407
- public ReducedQueryPhase reducedQueryPhase (Collection <? extends SearchPhaseResult > queryResults , boolean isScrollRequest ) {
408
- return reducedQueryPhase (queryResults , isScrollRequest , true );
399
+ public ReducedQueryPhase reducedScrollQueryPhase (Collection <? extends SearchPhaseResult > queryResults ) {
400
+ return reducedQueryPhase (queryResults , true , true );
409
401
}
410
402
411
403
/**
@@ -417,7 +409,6 @@ public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResul
417
409
return reducedQueryPhase (queryResults , null , new ArrayList <>(), new TopDocsStats (trackTotalHits ), 0 , isScrollRequest );
418
410
}
419
411
420
-
421
412
/**
422
413
* Reduces the given query results and consumes all aggregations and profile results.
423
414
* @param queryResults a list of non-null query shard results
@@ -500,14 +491,12 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
500
491
final InternalAggregations aggregations = aggregationsList .isEmpty () ? null : reduceAggs (aggregationsList ,
501
492
firstResult .pipelineAggregators (), reduceContext );
502
493
final SearchProfileShardResults shardResults = profileResults .isEmpty () ? null : new SearchProfileShardResults (profileResults );
503
- final SortedTopDocs scoreDocs = this . sortDocs (isScrollRequest , queryResults , bufferedTopDocs , topDocsStats , from , size );
494
+ final SortedTopDocs scoreDocs = sortDocs (isScrollRequest , queryResults , bufferedTopDocs , topDocsStats , from , size );
504
495
return new ReducedQueryPhase (topDocsStats .totalHits , topDocsStats .fetchHits , topDocsStats .maxScore ,
505
496
timedOut , terminatedEarly , suggest , aggregations , shardResults , scoreDocs .scoreDocs , scoreDocs .sortFields ,
506
- firstResult != null ? firstResult .sortValueFormats () : null ,
507
- numReducePhases , scoreDocs .isSortedByField , size , from , firstResult == null );
497
+ firstResult .sortValueFormats (), numReducePhases , scoreDocs .isSortedByField , size , from , false );
508
498
}
509
499
510
-
511
500
/**
512
501
* Performs an intermediate reduce phase on the aggregations. For instance with this reduce phase never prune information
513
502
* that relevant for the final reduce step. For final reduce see {@link #reduceAggs(List, List, ReduceContext)}
@@ -518,7 +507,7 @@ private InternalAggregations reduceAggsIncrementally(List<InternalAggregations>
518
507
null , reduceContext );
519
508
}
520
509
521
- private InternalAggregations reduceAggs (List <InternalAggregations > aggregationsList ,
510
+ private static InternalAggregations reduceAggs (List <InternalAggregations > aggregationsList ,
522
511
List <SiblingPipelineAggregator > pipelineAggregators , ReduceContext reduceContext ) {
523
512
InternalAggregations aggregations = InternalAggregations .reduce (aggregationsList , reduceContext );
524
513
if (pipelineAggregators != null ) {
@@ -649,7 +638,6 @@ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedR
649
638
this .hasTopDocs = hasTopDocs ;
650
639
this .hasAggs = hasAggs ;
651
640
this .bufferSize = bufferSize ;
652
-
653
641
}
654
642
655
643
@ Override
@@ -667,7 +655,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
667
655
aggsBuffer [0 ] = reducedAggs ;
668
656
}
669
657
if (hasTopDocs ) {
670
- TopDocs reducedTopDocs = controller . mergeTopDocs (Arrays .asList (topDocsBuffer ),
658
+ TopDocs reducedTopDocs = mergeTopDocs (Arrays .asList (topDocsBuffer ),
671
659
// we have to merge here in the same way we collect on a shard
672
660
querySearchResult .from () + querySearchResult .size (), 0 );
673
661
Arrays .fill (topDocsBuffer , null );
@@ -683,7 +671,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
683
671
if (hasTopDocs ) {
684
672
final TopDocs topDocs = querySearchResult .consumeTopDocs (); // can't be null
685
673
topDocsStats .add (topDocs );
686
- SearchPhaseController . setShardIndex (topDocs , querySearchResult .getShardIndex ());
674
+ setShardIndex (topDocs , querySearchResult .getShardIndex ());
687
675
topDocsBuffer [i ] = topDocs ;
688
676
}
689
677
}
@@ -696,7 +684,6 @@ private synchronized List<TopDocs> getRemainingTopDocs() {
696
684
return hasTopDocs ? Arrays .asList (topDocsBuffer ).subList (0 , index ) : null ;
697
685
}
698
686
699
-
700
687
@ Override
701
688
public ReducedQueryPhase reduce () {
702
689
return controller .reducedQueryPhase (results .asList (), getRemainingAggs (), getRemainingTopDocs (), topDocsStats ,
@@ -730,7 +717,7 @@ InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResu
730
717
return new QueryPhaseResultConsumer (this , numShards , request .getBatchedReduceSize (), hasTopDocs , hasAggs );
731
718
}
732
719
}
733
- return new InitialSearchPhase .ArraySearchPhaseResults (numShards ) {
720
+ return new InitialSearchPhase .ArraySearchPhaseResults < SearchPhaseResult > (numShards ) {
734
721
@ Override
735
722
public ReducedQueryPhase reduce () {
736
723
return reducedQueryPhase (results .asList (), isScrollRequest , trackTotalHits );
0 commit comments