Skip to content

Commit b87d01f

Browse files
committed
Improved regular scroll api by using IndexSearch#searchAfter instead of regular search methods which rely on from for pagination.
This prevents the creation of priority queues of `from + size`, instead the size of the priority queue will always be equal to `size`. Closes #4940
1 parent 40bf265 commit b87d01f

33 files changed

+1035
-157
lines changed

src/main/java/org/elasticsearch/action/search/type/ParsedScrollId.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.search.type;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.common.collect.Tuple;
2324

2425
import java.util.Map;
@@ -34,6 +35,8 @@ public class ParsedScrollId {
3435

3536
public static final String SCAN = "scan";
3637

38+
public static final Version SCROLL_SEARCH_AFTER_MINIMUM_VERSION = Version.V_1_2_0;
39+
3740
private final String source;
3841

3942
private final String type;

src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ void finishHim() {
179179
}
180180

181181
void innerFinishHim() throws Exception {
182-
sortedShardList = searchPhaseController.sortDocs(queryFetchResults);
182+
sortedShardList = searchPhaseController.sortDocs(request, useSlowScroll, queryFetchResults);
183183
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
184184
String scrollId = null;
185185
if (request.scroll() != null) {

src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.action.search.type;
2121

2222
import com.carrotsearch.hppc.IntArrayList;
23+
import org.apache.lucene.search.ScoreDoc;
2324
import org.elasticsearch.action.ActionListener;
2425
import org.elasticsearch.action.search.ReduceSearchPhaseException;
2526
import org.elasticsearch.action.search.SearchOperationThreading;
@@ -181,15 +182,18 @@ void executeFetchPhase() {
181182
}
182183
}
183184

184-
void innerExecuteFetchPhase() {
185-
sortedShardList = searchPhaseController.sortDocs(queryResults);
185+
void innerExecuteFetchPhase() throws Exception {
186+
sortedShardList = searchPhaseController.sortDocs(request, useSlowScroll, queryResults);
186187
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
187188

188189
if (docIdsToLoad.asList().isEmpty()) {
189190
finishHim();
190191
return;
191192
}
192193

194+
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
195+
request, sortedShardList, firstResults.length()
196+
);
193197
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
194198
int localOperations = 0;
195199
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
@@ -198,7 +202,7 @@ void innerExecuteFetchPhase() {
198202
if (node.id().equals(nodes.localNodeId())) {
199203
localOperations++;
200204
} else {
201-
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
205+
FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
202206
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
203207
}
204208
}
@@ -212,7 +216,7 @@ public void run() {
212216
QuerySearchResult queryResult = queryResults.get(entry.index);
213217
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
214218
if (node.id().equals(nodes.localNodeId())) {
215-
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
219+
FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
216220
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
217221
}
218222
}
@@ -224,7 +228,7 @@ public void run() {
224228
final QuerySearchResult queryResult = queryResults.get(entry.index);
225229
final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
226230
if (node.id().equals(nodes.localNodeId())) {
227-
final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
231+
final FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
228232
try {
229233
if (localAsync) {
230234
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {

src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@
4747
*/
4848
public abstract class TransportSearchHelper {
4949

50-
public static ShardSearchRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request, String[] filteringAliases, long nowInMillis) {
51-
ShardSearchRequest shardRequest = new ShardSearchRequest(request, shardRouting, numberOfShards);
50+
public static ShardSearchRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request, String[] filteringAliases, long nowInMillis, boolean useSlowScroll) {
51+
ShardSearchRequest shardRequest = new ShardSearchRequest(request, shardRouting, numberOfShards, useSlowScroll);
5252
shardRequest.filteringAliases(filteringAliases);
5353
shardRequest.nowInMillis(nowInMillis);
5454
return shardRequest;

src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ protected void moveToSecondPhase() throws Exception {
8585
}
8686

8787
private void innerFinishHim() throws IOException {
88-
sortedShardList = searchPhaseController.sortDocs(firstResults);
88+
sortedShardList = searchPhaseController.sortDocs(request, useSlowScroll, firstResults);
8989
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults);
9090
String scrollId = null;
9191
if (request.scroll() != null) {

src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.action.search.type;
2121

2222
import com.carrotsearch.hppc.IntArrayList;
23+
import org.apache.lucene.search.ScoreDoc;
2324
import org.elasticsearch.action.ActionListener;
2425
import org.elasticsearch.action.search.ReduceSearchPhaseException;
2526
import org.elasticsearch.action.search.SearchOperationThreading;
@@ -81,25 +82,27 @@ protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest requ
8182
}
8283

8384
@Override
84-
protected void moveToSecondPhase() {
85-
sortedShardList = searchPhaseController.sortDocs(firstResults);
85+
protected void moveToSecondPhase() throws Exception {
86+
sortedShardList = searchPhaseController.sortDocs(request, useSlowScroll, firstResults);
8687
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
8788

8889
if (docIdsToLoad.asList().isEmpty()) {
8990
finishHim();
9091
return;
9192
}
9293

94+
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
95+
request, sortedShardList, firstResults.length()
96+
);
9397
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
94-
9598
int localOperations = 0;
9699
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
97100
QuerySearchResult queryResult = firstResults.get(entry.index);
98101
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
99102
if (node.id().equals(nodes.localNodeId())) {
100103
localOperations++;
101104
} else {
102-
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
105+
FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
103106
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
104107
}
105108
}
@@ -113,7 +116,7 @@ public void run() {
113116
QuerySearchResult queryResult = firstResults.get(entry.index);
114117
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
115118
if (node.id().equals(nodes.localNodeId())) {
116-
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
119+
FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
117120
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
118121
}
119122
}
@@ -125,7 +128,7 @@ public void run() {
125128
final QuerySearchResult queryResult = firstResults.get(entry.index);
126129
final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
127130
if (node.id().equals(nodes.localNodeId())) {
128-
final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
131+
final FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
129132
try {
130133
if (localAsync) {
131134
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {

src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.action.search.type;
2121

2222
import org.apache.lucene.search.ScoreDoc;
23+
import org.elasticsearch.Version;
2324
import org.elasticsearch.action.ActionListener;
2425
import org.elasticsearch.action.search.*;
2526
import org.elasticsearch.cluster.ClusterService;
@@ -34,6 +35,7 @@
3435
import org.elasticsearch.search.action.SearchServiceTransportAction;
3536
import org.elasticsearch.search.controller.SearchPhaseController;
3637
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
38+
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
3739
import org.elasticsearch.search.internal.InternalSearchResponse;
3840
import org.elasticsearch.threadpool.ThreadPool;
3941

@@ -72,6 +74,7 @@ public void execute(SearchScrollRequest request, ParsedScrollId scrollId, Action
7274
private class AsyncAction {
7375

7476
private final SearchScrollRequest request;
77+
private volatile boolean useSlowScroll;
7578

7679
private final ActionListener<SearchResponse> listener;
7780

@@ -131,6 +134,9 @@ public void start() {
131134
Tuple<String, Long> target = context[i];
132135
DiscoveryNode node = nodes.get(target.v1());
133136
if (node != null) {
137+
if (node.getVersion().before(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) {
138+
useSlowScroll = true;
139+
}
134140
if (nodes.localNodeId().equals(node.id())) {
135141
localOperations++;
136142
} else {
@@ -205,7 +211,8 @@ public void run() {
205211
}
206212

207213
void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
208-
searchService.sendExecuteFetch(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QueryFetchSearchResult>() {
214+
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
215+
searchService.sendExecuteFetch(node, internalRequest, new SearchServiceListener<QueryFetchSearchResult>() {
209216
@Override
210217
public void onResult(QueryFetchSearchResult result) {
211218
queryFetchResults.set(shardIndex, result);
@@ -240,8 +247,13 @@ private void finishHim() {
240247
}
241248
}
242249

243-
private void innerFinishHim() {
244-
ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(queryFetchResults);
250+
private void innerFinishHim() throws Exception {
251+
ScoreDoc[] sortedShardList;
252+
if (useSlowScroll) {
253+
sortedShardList = searchPhaseController.sortDocs(queryFetchResults);
254+
} else {
255+
sortedShardList = searchPhaseController.sortDocsForScroll(queryFetchResults);
256+
}
245257
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
246258
String scrollId = null;
247259
if (request.scroll() != null) {

src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.search.controller.SearchPhaseController;
3737
import org.elasticsearch.search.fetch.FetchSearchRequest;
3838
import org.elasticsearch.search.fetch.FetchSearchResult;
39+
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
3940
import org.elasticsearch.search.internal.InternalSearchResponse;
4041
import org.elasticsearch.search.query.QuerySearchResult;
4142
import org.elasticsearch.threadpool.ThreadPool;
@@ -92,6 +93,8 @@ private class AsyncAction {
9293

9394
private final long startTime = System.currentTimeMillis();
9495

96+
private volatile boolean useSlowScroll;
97+
9598
private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
9699
this.request = request;
97100
this.listener = listener;
@@ -137,6 +140,9 @@ public void start() {
137140
Tuple<String, Long> target = context[i];
138141
DiscoveryNode node = nodes.get(target.v1());
139142
if (node != null) {
143+
if (node.getVersion().before(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) {
144+
useSlowScroll = true;
145+
}
140146
if (nodes.localNodeId().equals(node.id())) {
141147
localOperations++;
142148
} else {
@@ -148,7 +154,12 @@ public void start() {
148154
}
149155
successfulOps.decrementAndGet();
150156
if (counter.decrementAndGet() == 0) {
151-
executeFetchPhase();
157+
try {
158+
executeFetchPhase();
159+
} catch (Throwable e) {
160+
listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, null));
161+
return;
162+
}
152163
}
153164
}
154165
}
@@ -197,12 +208,17 @@ public void run() {
197208
}
198209

199210
private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) {
200-
searchService.sendExecuteQuery(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QuerySearchResult>() {
211+
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
212+
searchService.sendExecuteQuery(node, internalRequest, new SearchServiceListener<QuerySearchResult>() {
201213
@Override
202214
public void onResult(QuerySearchResult result) {
203215
queryResults.set(shardIndex, result);
204216
if (counter.decrementAndGet() == 0) {
205-
executeFetchPhase();
217+
try {
218+
executeFetchPhase();
219+
} catch (Throwable e) {
220+
onFailure(e);
221+
}
206222
}
207223
}
208224

@@ -220,25 +236,41 @@ void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, fina
220236
addShardFailure(shardIndex, new ShardSearchFailure(t));
221237
successfulOps.decrementAndGet();
222238
if (counter.decrementAndGet() == 0) {
223-
executeFetchPhase();
239+
try {
240+
executeFetchPhase();
241+
} catch (Throwable e) {
242+
listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, null));
243+
}
224244
}
225245
}
226246

227-
private void executeFetchPhase() {
228-
sortedShardList = searchPhaseController.sortDocs(queryResults);
247+
private void executeFetchPhase() throws Exception {
248+
if (useSlowScroll) {
249+
sortedShardList = searchPhaseController.sortDocs(queryResults);
250+
} else {
251+
sortedShardList = searchPhaseController.sortDocsForScroll(queryResults);
252+
}
229253
AtomicArray<IntArrayList> docIdsToLoad = new AtomicArray<IntArrayList>(queryResults.length());
230254
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
231255

232256
if (docIdsToLoad.asList().isEmpty()) {
233257
finishHim();
258+
return;
234259
}
235260

236-
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
237261

262+
final ScoreDoc[] lastEmittedDocPerShard;
263+
if (useSlowScroll) {
264+
lastEmittedDocPerShard = new ScoreDoc[queryResults.length()];
265+
} else {
266+
lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length());
267+
}
268+
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
238269
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
239270
IntArrayList docIds = entry.value;
240271
final QuerySearchResult querySearchResult = queryResults.get(entry.index);
241-
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, querySearchResult.id(), docIds);
272+
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
273+
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, querySearchResult.id(), docIds, lastEmittedDoc);
242274
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId());
243275
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
244276
@Override

0 commit comments

Comments
 (0)