Skip to content

Commit e6680be

Browse files
jimczijavanna
andauthored
Add new x-pack endpoints to track the progress of a search asynchronously (#49931) (#53591)
This change introduces a new API in x-pack basic that allows to track the progress of a search. Users can submit an asynchronous search through a new endpoint called `_async_search` that works exactly the same as the `_search` endpoint but instead of blocking and returning the final response when available, it returns a response after a provided `wait_for_completion` time. ```` GET my_index_pattern*/_async_search?wait_for_completion=100ms { "aggs": { "date_histogram": { "field": "@timestamp", "fixed_interval": "1h" } } } ```` If after 100ms the final response is not available, a `partial_response` is included in the body: ```` { "id": "9N3J1m4BgyzUDzqgC15b", "version": 1, "is_running": true, "is_partial": true, "response": { "_shards": { "total": 100, "successful": 5, "failed": 0 }, "total_hits": { "value": 1653433, "relation": "eq" }, "aggs": { ... } } } ```` The partial response contains the total number of requested shards, the number of shards that successfully returned and the number of shards that failed. It also contains the total hits as well as partial aggregations computed from the successful shards. To continue to monitor the progress of the search users can call the get `_async_search` API like the following: ```` GET _async_search/9N3J1m4BgyzUDzqgC15b/?wait_for_completion=100ms ```` That returns a new response that can contain the same partial response than the previous call if the search didn't progress, in such case the returned `version` should be the same. If new partial results are available, the version is incremented and the `partial_response` contains the updated progress. Finally if the response is fully available while or after waiting for completion, the `partial_response` is replaced by a `response` section that contains the usual _search response: ```` { "id": "9N3J1m4BgyzUDzqgC15b", "version": 10, "is_running": false, "response": { "is_partial": false, ... } } ```` Asynchronous search are stored in a restricted index called `.async-search` if they survive (still running) after the initial submit. Each request has a keep alive that defaults to 5 days but this value can be changed/updated any time: ````` GET my_index_pattern*/_async_search?wait_for_completion=100ms&keep_alive=10d ````` The default can be changed when submitting the search, the example above raises the default value for the search to `10d`. ````` GET _async_search/9N3J1m4BgyzUDzqgC15b/?wait_for_completion=100ms&keep_alive=10d ````` The time to live for a specific search can be extended when getting the progress/result. In the example above we extend the keep alive to 10 more days. A background service that runs only on the node that holds the first primary shard of the `async-search` index is responsible for deleting the expired results. It runs every hour but the expiration is also checked by running queries (if they take longer than the keep_alive) and when getting a result. Like a normal `_search`, if the http channel that is used to submit a request is closed before getting a response, the search is automatically cancelled. Note that this behavior is only for the submit API, subsequent GET requests will not cancel if they are closed. Asynchronous search are not persistent, if the coordinator node crashes or is restarted during the search, the asynchronous search will stop. To know if the search is still running or not the response contains a field called `is_running` that indicates if the task is up or not. It is the responsibility of the user to resume an asynchronous search that didn't reach a final response by re-submitting the query. However final responses and failures are persisted in a system index that allows to retrieve a response even if the task finishes. ```` DELETE _async_search/9N3J1m4BgyzUDzqgC15b ```` The response is also not stored if the initial submit action returns a final response. This allows to not add any overhead to queries that completes within the initial `wait_for_completion`. The `.async-search` index is a restricted index (should be migrated to a system index in +8.0) that is accessible only through the async search APIs. These APIs also ensure that only the user that submitted the initial query can retrieve or delete the running search. Note that admins/superusers would still be able to cancel the search task through the task manager like any other tasks. Relates #49091 Co-authored-by: Luca Cavanna <[email protected]>
1 parent 7230340 commit e6680be

File tree

62 files changed

+4627
-118
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+4627
-118
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

+10-8
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
9090
private final SearchTimeProvider timeProvider;
9191
private final SearchResponse.Clusters clusters;
9292

93-
private final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
93+
protected final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
9494
protected final GroupShardsIterator<SearchShardIterator> shardsIts;
9595
private final int expectedTotalOps;
9696
private final AtomicInteger totalOps = new AtomicInteger();
@@ -375,6 +375,11 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
375375
// we do make sure to clean it on a successful response from a shard
376376
SearchShardTarget shardTarget = shardIt.newSearchShardTarget(nodeId);
377377
onShardFailure(shardIndex, shardTarget, e);
378+
final ShardRouting nextShard = shardIt.nextOrNull();
379+
final boolean lastShard = nextShard == null;
380+
if (lastShard) {
381+
onShardGroupFailure(shardIndex, shardTarget, e);
382+
}
378383

379384
if (totalOps.incrementAndGet() == expectedTotalOps) {
380385
if (logger.isDebugEnabled()) {
@@ -385,11 +390,8 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
385390
logger.trace(new ParameterizedMessage("{}: Failed to execute [{}]", shard, request), e);
386391
}
387392
}
388-
onShardGroupFailure(shardIndex, e);
389393
onPhaseDone();
390394
} else {
391-
final ShardRouting nextShard = shardIt.nextOrNull();
392-
final boolean lastShard = nextShard == null;
393395
// trace log this exception
394396
logger.trace(() -> new ParameterizedMessage(
395397
"{}: Failed to execute [{}] lastShard [{}]",
@@ -405,18 +407,18 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
405407
shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e);
406408
}
407409
}
408-
onShardGroupFailure(shardIndex, e);
409410
}
410411
}
411412
}
412413

413414
/**
414415
* Executed once for every {@link ShardId} that failed on all available shard routing.
415416
*
416-
* @param shardIndex the shard target that failed
417-
* @param exc the final failure reason
417+
* @param shardIndex the shard index that failed
418+
* @param shardTarget the last shard target for this failure
419+
* @param exc the last failure reason
418420
*/
419-
protected void onShardGroupFailure(int shardIndex, Exception exc) {}
421+
protected void onShardGroupFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {}
420422

421423
/**
422424
* Executed once for every failed shard level request. This method is invoked before the next replica is tried for the given

server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.common.util.concurrent.AtomicArray;
2323
import org.elasticsearch.search.SearchPhaseResult;
2424
import org.elasticsearch.search.SearchShardTarget;
25-
import org.elasticsearch.search.builder.SearchSourceBuilder;
2625
import org.elasticsearch.search.dfs.AggregatedDfs;
2726
import org.elasticsearch.search.dfs.DfsSearchResult;
2827
import org.elasticsearch.search.query.QuerySearchRequest;
@@ -72,8 +71,6 @@ public void run() throws IOException {
7271
final CountedCollector<SearchPhaseResult> counter = new CountedCollector<>(queryResult::consumeResult,
7372
resultList.size(),
7473
() -> context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)), context);
75-
final SearchSourceBuilder sourceBuilder = context.getRequest().source();
76-
progressListener.notifyListShards(progressListener.searchShards(resultList), sourceBuilder == null || sourceBuilder.size() != 0);
7774
for (final DfsSearchResult dfsResult : resultList) {
7875
final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget();
7976
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
@@ -97,7 +94,7 @@ public void onFailure(Exception exception) {
9794
try {
9895
context.getLogger().debug(() -> new ParameterizedMessage("[{}] Failed to execute query phase",
9996
querySearchRequest.contextId()), exception);
100-
progressListener.notifyQueryFailure(shardIndex, exception);
97+
progressListener.notifyQueryFailure(shardIndex, searchShardTarget, exception);
10198
counter.onFailure(shardIndex, searchShardTarget, exception);
10299
} finally {
103100
// the query might not have been executed at all (for example because thread pool rejected

server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

+5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.cluster.ClusterState;
2525
import org.elasticsearch.cluster.routing.GroupShardsIterator;
2626
import org.elasticsearch.cluster.routing.ShardRouting;
27+
import org.elasticsearch.search.builder.SearchSourceBuilder;
2728
import org.elasticsearch.search.dfs.DfsSearchResult;
2829
import org.elasticsearch.search.internal.AliasFilter;
2930
import org.elasticsearch.transport.Transport;
@@ -51,6 +52,10 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
5152
shardsIts, timeProvider, clusterState, task, new ArraySearchPhaseResults<>(shardsIts.size()),
5253
request.getMaxConcurrentShardRequests(), clusters);
5354
this.searchPhaseController = searchPhaseController;
55+
SearchProgressListener progressListener = task.getProgressListener();
56+
SearchSourceBuilder sourceBuilder = request.source();
57+
progressListener.notifyListShards(progressListener.searchShards(this.shardsIts),
58+
progressListener.searchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
5459
}
5560

5661
@Override

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -664,9 +664,9 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
664664
}
665665
numReducePhases++;
666666
index = 1;
667-
if (hasAggs) {
667+
if (hasAggs || hasTopDocs) {
668668
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
669-
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
669+
topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0] : null, numReducePhases);
670670
}
671671
}
672672
final int i = index++;
@@ -696,7 +696,7 @@ public ReducedQueryPhase reduce() {
696696
ReducedQueryPhase reducePhase = controller.reducedQueryPhase(results.asList(),
697697
getRemainingAggs(), getRemainingTopDocs(), topDocsStats, numReducePhases, false, performFinalReduce);
698698
progressListener.notifyReduce(progressListener.searchShards(results.asList()),
699-
reducePhase.totalHits, reducePhase.aggregations);
699+
reducePhase.totalHits, reducePhase.aggregations, reducePhase.numReducePhases);
700700
return reducePhase;
701701
}
702702

@@ -751,7 +751,8 @@ ReducedQueryPhase reduce() {
751751
List<SearchPhaseResult> resultList = results.asList();
752752
final ReducedQueryPhase reducePhase =
753753
reducedQueryPhase(resultList, isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce());
754-
listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits, reducePhase.aggregations);
754+
listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits,
755+
reducePhase.aggregations, reducePhase.numReducePhases);
755756
return reducePhase;
756757
}
757758
};

server/src/main/java/org/elasticsearch/action/search/SearchProgressListener.java

+22-17
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.logging.log4j.Logger;
2424
import org.apache.logging.log4j.message.ParameterizedMessage;
2525
import org.apache.lucene.search.TotalHits;
26+
import org.elasticsearch.action.search.SearchResponse.Clusters;
2627
import org.elasticsearch.cluster.routing.GroupShardsIterator;
2728
import org.elasticsearch.search.SearchPhaseResult;
2829
import org.elasticsearch.search.SearchShardTarget;
@@ -49,24 +50,27 @@ abstract class SearchProgressListener {
4950
* Executed when shards are ready to be queried.
5051
*
5152
* @param shards The list of shards to query.
53+
* @param skippedShards The list of skipped shards.
54+
* @param clusters The statistics for remote clusters included in the search.
5255
* @param fetchPhase <code>true</code> if the search needs a fetch phase, <code>false</code> otherwise.
5356
**/
54-
public void onListShards(List<SearchShard> shards, boolean fetchPhase) {}
57+
public void onListShards(List<SearchShard> shards, List<SearchShard> skippedShards, Clusters clusters, boolean fetchPhase) {}
5558

5659
/**
5760
* Executed when a shard returns a query result.
5861
*
59-
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)} )}.
62+
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards} )}.
6063
*/
6164
public void onQueryResult(int shardIndex) {}
6265

6366
/**
6467
* Executed when a shard reports a query failure.
6568
*
66-
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}.
69+
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
70+
* @param shardTarget The last shard target that thrown an exception.
6771
* @param exc The cause of the failure.
6872
*/
69-
public void onQueryFailure(int shardIndex, Exception exc) {}
73+
public void onQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {}
7074

7175
/**
7276
* Executed when a partial reduce is created. The number of partial reduce can be controlled via
@@ -75,38 +79,39 @@ public void onQueryFailure(int shardIndex, Exception exc) {}
7579
* @param shards The list of shards that are part of this reduce.
7680
* @param totalHits The total number of hits in this reduce.
7781
* @param aggs The partial result for aggregations.
78-
* @param version The version number for this reduce.
82+
* @param reducePhase The version number for this reduce.
7983
*/
80-
public void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int version) {}
84+
public void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}
8185

8286
/**
8387
* Executed once when the final reduce is created.
8488
*
8589
* @param shards The list of shards that are part of this reduce.
8690
* @param totalHits The total number of hits in this reduce.
8791
* @param aggs The final result for aggregations.
92+
* @param reducePhase The version number for this reduce.
8893
*/
89-
public void onReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs) {}
94+
public void onReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}
9095

9196
/**
9297
* Executed when a shard returns a fetch result.
9398
*
94-
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}.
99+
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
95100
*/
96101
public void onFetchResult(int shardIndex) {}
97102

98103
/**
99104
* Executed when a shard reports a fetch failure.
100105
*
101-
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}.
106+
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
102107
* @param exc The cause of the failure.
103108
*/
104109
public void onFetchFailure(int shardIndex, Exception exc) {}
105110

106-
final void notifyListShards(List<SearchShard> shards, boolean fetchPhase) {
111+
final void notifyListShards(List<SearchShard> shards, List<SearchShard> skippedShards, Clusters clusters, boolean fetchPhase) {
107112
this.shards = shards;
108113
try {
109-
onListShards(shards, fetchPhase);
114+
onListShards(shards, skippedShards, clusters, fetchPhase);
110115
} catch (Exception e) {
111116
logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on list shards"), e);
112117
}
@@ -121,26 +126,26 @@ final void notifyQueryResult(int shardIndex) {
121126
}
122127
}
123128

124-
final void notifyQueryFailure(int shardIndex, Exception exc) {
129+
final void notifyQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {
125130
try {
126-
onQueryFailure(shardIndex, exc);
131+
onQueryFailure(shardIndex, shardTarget, exc);
127132
} catch (Exception e) {
128133
logger.warn(() -> new ParameterizedMessage("[{}] Failed to execute progress listener on query failure",
129134
shards.get(shardIndex)), e);
130135
}
131136
}
132137

133-
final void notifyPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int version) {
138+
final void notifyPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
134139
try {
135-
onPartialReduce(shards, totalHits, aggs, version);
140+
onPartialReduce(shards, totalHits, aggs, reducePhase);
136141
} catch (Exception e) {
137142
logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on partial reduce"), e);
138143
}
139144
}
140145

141-
final void notifyReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs) {
146+
final void notifyReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
142147
try {
143-
onReduce(shards, totalHits, aggs);
148+
onReduce(shards, totalHits, aggs, reducePhase);
144149
} catch (Exception e) {
145150
logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on reduce"), e);
146151
}

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.cluster.routing.GroupShardsIterator;
2626
import org.elasticsearch.cluster.routing.ShardRouting;
2727
import org.elasticsearch.search.SearchPhaseResult;
28+
import org.elasticsearch.search.SearchShardTarget;
2829
import org.elasticsearch.search.builder.SearchSourceBuilder;
2930
import org.elasticsearch.search.internal.AliasFilter;
3031
import org.elasticsearch.transport.Transport;
@@ -57,7 +58,7 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
5758
final SearchProgressListener progressListener = task.getProgressListener();
5859
final SearchSourceBuilder sourceBuilder = request.source();
5960
progressListener.notifyListShards(progressListener.searchShards(this.shardsIts),
60-
sourceBuilder == null || sourceBuilder.size() != 0);
61+
progressListener.searchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
6162
}
6263

6364
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
@@ -67,8 +68,8 @@ protected void executePhaseOnShard(final SearchShardIterator shardIt, final Shar
6768
}
6869

6970
@Override
70-
protected void onShardGroupFailure(int shardIndex, Exception exc) {
71-
progressListener.notifyQueryFailure(shardIndex, exc);
71+
protected void onShardGroupFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {
72+
progressListener.notifyQueryFailure(shardIndex, shardTarget, exc);
7273
}
7374

7475
@Override

server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.elasticsearch.search.Scroll;
3434
import org.elasticsearch.search.builder.SearchSourceBuilder;
3535
import org.elasticsearch.search.internal.SearchContext;
36-
import org.elasticsearch.tasks.Task;
3736
import org.elasticsearch.tasks.TaskId;
3837

3938
import java.io.IOException;
@@ -56,9 +55,9 @@
5655
* @see org.elasticsearch.client.Client#search(SearchRequest)
5756
* @see SearchResponse
5857
*/
59-
public final class SearchRequest extends ActionRequest implements IndicesRequest.Replaceable {
58+
public class SearchRequest extends ActionRequest implements IndicesRequest.Replaceable {
6059

61-
private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));
60+
public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));
6261

6362
public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
6463
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512;
@@ -597,7 +596,7 @@ public boolean isSuggestOnly() {
597596
}
598597

599598
@Override
600-
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
599+
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
601600
// generating description in a lazy way since source can be quite big
602601
return new SearchTask(id, type, action, null, parentTaskId, headers) {
603602
@Override

server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ public SearchRequestBuilder setVersion(boolean version) {
224224
sourceBuilder().version(version);
225225
return this;
226226
}
227-
227+
228228
/**
229229
* Should each {@link org.elasticsearch.search.SearchHit} be returned with the
230230
* sequence number and primary term of the last modification of the document.

server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

+4
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ public RestStatus status() {
118118
return RestStatus.status(successfulShards, totalShards, shardFailures);
119119
}
120120

121+
public SearchResponseSections getInternalResponse() {
122+
return internalResponse;
123+
}
124+
121125
/**
122126
* The search hits.
123127
*/

server/src/main/java/org/elasticsearch/action/search/SearchShard.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class SearchShard implements Comparable<SearchShard> {
3434
private final String clusterAlias;
3535
private final ShardId shardId;
3636

37-
SearchShard(@Nullable String clusterAlias, ShardId shardId) {
37+
public SearchShard(@Nullable String clusterAlias, ShardId shardId) {
3838
this.clusterAlias = clusterAlias;
3939
this.shardId = shardId;
4040
}

0 commit comments

Comments
 (0)