Skip to content

Commit fee013c

Browse files
committed
Add support for returning documents with completion suggester
This commit enables completion suggester to return documents associated with suggestions. Now the document source is returned with every suggestion, which respects source filtering options. In case of suggest queries spanning more than one shard, the suggest is executed in two phases, where the last phase fetches the relevant documents from shards, implying executing suggest requests against a single shard is more performant due to the document fetch overhead when the suggest spans multiple shards.
1 parent 3be1e7e commit fee013c

20 files changed

+918
-206
lines changed

core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
4747
import org.elasticsearch.search.query.QuerySearchResult;
4848
import org.elasticsearch.search.query.QuerySearchResultProvider;
49+
import org.elasticsearch.search.suggest.Suggest;
4950
import org.elasticsearch.threadpool.ThreadPool;
5051

5152
import java.util.List;
@@ -74,7 +75,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
7475
protected final AtomicArray<FirstResult> firstResults;
7576
private volatile AtomicArray<ShardSearchFailure> shardFailures;
7677
private final Object shardFailuresMutex = new Object();
77-
protected volatile ScoreDoc[] sortedShardList;
78+
protected volatile ScoreDoc[] sortedShardDocs;
7879

7980
protected AbstractSearchAsyncAction(ESLogger logger, SearchTransportService searchTransportService, ClusterService clusterService,
8081
IndexNameExpressionResolver indexNameExpressionResolver,
@@ -321,8 +322,11 @@ protected void releaseIrrelevantSearchContexts(AtomicArray<? extends QuerySearch
321322
// we only release search context that we did not fetch from if we are not scrolling
322323
if (request.scroll() == null) {
323324
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults.asList()) {
324-
final TopDocs topDocs = entry.value.queryResult().queryResult().topDocs();
325-
if (topDocs != null && topDocs.scoreDocs.length > 0 // the shard had matches
325+
QuerySearchResult queryResult = entry.value.queryResult().queryResult();
326+
final TopDocs topDocs = queryResult.topDocs();
327+
final Suggest suggest = queryResult.suggest();
328+
if (((topDocs != null && topDocs.scoreDocs.length > 0) // the shard had matches
329+
||suggest != null && suggest.hasScoreDocs()) // or had suggest docs
326330
&& docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs
327331
try {
328332
DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId());
@@ -343,12 +347,8 @@ protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) {
343347

344348
protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry<IntArrayList> entry,
345349
ScoreDoc[] lastEmittedDocPerShard) {
346-
if (lastEmittedDocPerShard != null) {
347-
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
348-
return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc);
349-
} else {
350-
return new ShardFetchSearchRequest(request, queryResult.id(), entry.value);
351-
}
350+
final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[entry.index] : null;
351+
return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc);
352352
}
353353

354354
protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,

core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ private void finishHim() {
118118
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
119119
@Override
120120
public void doRun() throws IOException {
121-
sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
122-
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
121+
sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults);
122+
final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryFetchResults,
123123
queryFetchResults);
124124
String scrollId = null;
125125
if (request.scroll() != null) {

core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -135,18 +135,17 @@ void executeFetchPhase() {
135135
}
136136

137137
void innerExecuteFetchPhase() throws Exception {
138-
boolean useScroll = request.scroll() != null;
139-
sortedShardList = searchPhaseController.sortDocs(useScroll, queryResults);
140-
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
138+
final boolean isScrollRequest = request.scroll() != null;
139+
sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults);
140+
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);
141141

142142
if (docIdsToLoad.asList().isEmpty()) {
143143
finishHim();
144144
return;
145145
}
146146

147-
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
148-
request, sortedShardList, firstResults.length()
149-
);
147+
final ScoreDoc[] lastEmittedDocPerShard = (request.scroll() != null) ?
148+
searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, firstResults.length()) : null;
150149
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
151150
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
152151
QuerySearchResult queryResult = queryResults.get(entry.index);
@@ -196,12 +195,10 @@ private void finishHim() {
196195
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
197196
@Override
198197
public void doRun() throws IOException {
199-
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults,
198+
final boolean isScrollRequest = request.scroll() != null;
199+
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, queryResults,
200200
fetchResults);
201-
String scrollId = null;
202-
if (request.scroll() != null) {
203-
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults);
204-
}
201+
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
205202
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
206203
buildTookInMillis(), buildShardFailures()));
207204
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);

core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,11 @@ protected void moveToSecondPhase() throws Exception {
6060
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
6161
@Override
6262
public void doRun() throws IOException {
63-
boolean useScroll = request.scroll() != null;
64-
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
65-
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
63+
final boolean isScrollRequest = request.scroll() != null;
64+
sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults);
65+
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults,
6666
firstResults);
67-
String scrollId = null;
68-
if (request.scroll() != null) {
69-
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults);
70-
}
67+
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
7168
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
7269
buildTookInMillis(), buildShardFailures()));
7370
}

core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,18 +68,17 @@ protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportReq
6868

6969
@Override
7070
protected void moveToSecondPhase() throws Exception {
71-
boolean useScroll = request.scroll() != null;
72-
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
73-
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
71+
final boolean isScrollRequest = request.scroll() != null;
72+
sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults);
73+
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);
7474

7575
if (docIdsToLoad.asList().isEmpty()) {
7676
finishHim();
7777
return;
7878
}
7979

80-
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
81-
request, sortedShardList, firstResults.length()
82-
);
80+
final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ?
81+
searchPhaseController.getLastEmittedDocPerShard(firstResults.asList(), sortedShardDocs, firstResults.length()) : null;
8382
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
8483
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
8584
QuerySearchResultProvider queryResult = firstResults.get(entry.index);
@@ -129,12 +128,10 @@ private void finishHim() {
129128
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
130129
@Override
131130
public void doRun() throws IOException {
132-
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
131+
final boolean isScrollRequest = request.scroll() != null;
132+
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults,
133133
fetchResults);
134-
String scrollId = null;
135-
if (request.scroll() != null) {
136-
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults);
137-
}
134+
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
138135
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps,
139136
successfulOps.get(), buildTookInMillis(), buildShardFailures()));
140137
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);

core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,8 @@ private void finishHim() {
168168
}
169169

170170
private void innerFinishHim() throws Exception {
171-
ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
172-
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
171+
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults);
172+
final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryFetchResults,
173173
queryFetchResults);
174174
String scrollId = null;
175175
if (request.scroll() != null) {

core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
5353
private volatile AtomicArray<ShardSearchFailure> shardFailures;
5454
final AtomicArray<QuerySearchResult> queryResults;
5555
final AtomicArray<FetchSearchResult> fetchResults;
56-
private volatile ScoreDoc[] sortedShardList;
56+
private volatile ScoreDoc[] sortedShardDocs;
5757
private final AtomicInteger successfulOps;
5858

5959
SearchScrollQueryThenFetchAsyncAction(ESLogger logger, ClusterService clusterService,
@@ -165,17 +165,18 @@ void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, fina
165165
}
166166

167167
private void executeFetchPhase() throws Exception {
168-
sortedShardList = searchPhaseController.sortDocs(true, queryResults);
168+
sortedShardDocs = searchPhaseController.sortDocs(true, queryResults);
169169
AtomicArray<IntArrayList> docIdsToLoad = new AtomicArray<>(queryResults.length());
170-
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
170+
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);
171171

172172
if (docIdsToLoad.asList().isEmpty()) {
173173
finishHim();
174174
return;
175175
}
176176

177177

178-
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length());
178+
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(),
179+
sortedShardDocs, queryResults.length());
179180
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
180181
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
181182
IntArrayList docIds = entry.value;
@@ -216,7 +217,7 @@ private void finishHim() {
216217
}
217218

218219
private void innerFinishHim() {
219-
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
220+
InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryResults, fetchResults);
220221
String scrollId = null;
221222
if (request.scroll() != null) {
222223
scrollId = request.scrollId();

core/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.carrotsearch.hppc.ObjectFloatHashMap;
2323
import org.apache.lucene.search.FieldDoc;
24+
import org.apache.lucene.search.ScoreDoc;
2425
import org.apache.lucene.search.TopDocs;
2526
import org.elasticsearch.ElasticsearchException;
2627
import org.elasticsearch.ExceptionsHelper;
@@ -87,13 +88,16 @@
8788
import org.elasticsearch.search.searchafter.SearchAfterBuilder;
8889
import org.elasticsearch.search.sort.SortAndFormats;
8990
import org.elasticsearch.search.sort.SortBuilder;
91+
import org.elasticsearch.search.suggest.Suggest;
92+
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
9093
import org.elasticsearch.threadpool.ThreadPool;
9194
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
9295
import org.elasticsearch.threadpool.ThreadPool.Names;
9396

9497
import java.io.IOException;
9598
import java.util.Collections;
9699
import java.util.HashMap;
100+
import java.util.List;
97101
import java.util.Map;
98102
import java.util.Optional;
99103
import java.util.concurrent.ExecutionException;
@@ -265,7 +269,7 @@ public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) t
265269

266270
loadOrExecuteQueryPhase(request, context);
267271

268-
if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) {
272+
if (hasHits(context.queryResult()) == false && context.scrollContext() == null) {
269273
freeContext(context.id());
270274
} else {
271275
contextProcessedSuccessfully(context);
@@ -320,7 +324,7 @@ public QuerySearchResult executeQueryPhase(QuerySearchRequest request) {
320324
operationListener.onPreQueryPhase(context);
321325
long time = System.nanoTime();
322326
queryPhase.execute(context);
323-
if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) {
327+
if (hasHits(context.queryResult()) == false && context.scrollContext() == null) {
324328
// no hits, we can release the context since there will be no fetch phase
325329
freeContext(context.id());
326330
} else {
@@ -811,40 +815,55 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
811815
}
812816
}
813817

814-
private static final int[] EMPTY_DOC_IDS = new int[0];
815-
816818
/**
817819
* Shortcut ids to load, we load only "from" and up to "size". The phase controller
818820
* handles this as well since the result is always size * shards for Q_A_F
819821
*/
820822
private void shortcutDocIdsToLoad(SearchContext context) {
823+
final int[] docIdsToLoad;
824+
int docsOffset = 0;
825+
final Suggest suggest = context.queryResult().suggest();
826+
int numSuggestDocs = 0;
827+
final List<CompletionSuggestion> completionSuggestions;
828+
if (suggest != null && suggest.hasScoreDocs()) {
829+
completionSuggestions = suggest.filter(CompletionSuggestion.class);
830+
for (CompletionSuggestion completionSuggestion : completionSuggestions) {
831+
numSuggestDocs += completionSuggestion.getOptions().size();
832+
}
833+
} else {
834+
completionSuggestions = Collections.emptyList();
835+
}
821836
if (context.request().scroll() != null) {
822837
TopDocs topDocs = context.queryResult().topDocs();
823-
int[] docIdsToLoad = new int[topDocs.scoreDocs.length];
838+
docIdsToLoad = new int[topDocs.scoreDocs.length + numSuggestDocs];
824839
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
825-
docIdsToLoad[i] = topDocs.scoreDocs[i].doc;
840+
docIdsToLoad[docsOffset++] = topDocs.scoreDocs[i].doc;
826841
}
827-
context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
828842
} else {
829843
TopDocs topDocs = context.queryResult().topDocs();
830844
if (topDocs.scoreDocs.length < context.from()) {
831845
// no more docs...
832-
context.docIdsToLoad(EMPTY_DOC_IDS, 0, 0);
833-
return;
834-
}
835-
int totalSize = context.from() + context.size();
836-
int[] docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size())];
837-
int counter = 0;
838-
for (int i = context.from(); i < totalSize; i++) {
839-
if (i < topDocs.scoreDocs.length) {
840-
docIdsToLoad[counter] = topDocs.scoreDocs[i].doc;
841-
} else {
842-
break;
846+
docIdsToLoad = new int[numSuggestDocs];
847+
} else {
848+
int totalSize = context.from() + context.size();
849+
docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size()) +
850+
numSuggestDocs];
851+
for (int i = context.from(); i < Math.min(totalSize, topDocs.scoreDocs.length); i++) {
852+
docIdsToLoad[docsOffset++] = topDocs.scoreDocs[i].doc;
843853
}
844-
counter++;
845854
}
846-
context.docIdsToLoad(docIdsToLoad, 0, counter);
847855
}
856+
for (CompletionSuggestion completionSuggestion : completionSuggestions) {
857+
for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) {
858+
docIdsToLoad[docsOffset++] = option.getDoc().doc;
859+
}
860+
}
861+
context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
862+
}
863+
864+
private static boolean hasHits(final QuerySearchResult searchResult) {
865+
return searchResult.topDocs().scoreDocs.length > 0 ||
866+
(searchResult.suggest() != null && searchResult.suggest().hasScoreDocs());
848867
}
849868

850869
private void processScroll(InternalScrollSearchRequest request, SearchContext context) {

0 commit comments

Comments
 (0)