Skip to content

Add new x-pack endpoints to track the progress of a search asynchronously #49931

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 81 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
750a81d
Add new x-pack endpoints to asynchronously track the progress of a se…
jimczi Dec 4, 2019
c28766c
notify partial reduce even on top-docs query
jimczi Dec 6, 2019
4bfafb4
fix npe
jimczi Dec 6, 2019
3ace355
use a single object named response in the xcontent serialization of t…
jimczi Dec 11, 2019
5d000ca
delete frozen search response automatically
jimczi Dec 11, 2019
1cd2a9a
add integration tests
jimczi Dec 12, 2019
52085bb
simplify schema for the .async-search index
jimczi Dec 12, 2019
da7329d
switch to true random uuuids
jimczi Dec 12, 2019
20c24a0
iter
jimczi Dec 13, 2019
438a7b0
replace waitUntil with assertBusy
jimczi Dec 13, 2019
5a907b9
add a security layer that restricts the usage of the async search API…
jimczi Dec 16, 2019
a1eac14
Merge branch 'master' into async_search
jimczi Dec 16, 2019
2180dde
fix checkstyle
jimczi Dec 16, 2019
be0ba05
fix double clean
jimczi Dec 16, 2019
3c1ffdd
iter
jimczi Dec 17, 2019
8434d4d
Merge branch 'master' into async_search
jimczi Dec 18, 2019
b1d2414
add more logging
jimczi Dec 18, 2019
542fff2
replace routing with preference in get requests
jimczi Dec 18, 2019
177e303
iter
jimczi Dec 18, 2019
baa498d
validate the index name when receiving a search id
jimczi Dec 18, 2019
10a0d66
Merge branch 'master' into async_search
jimczi Dec 18, 2019
fe5c150
add authentication test
jimczi Dec 19, 2019
ec8cdf2
Merge branch 'master' into async_search
jimczi Dec 19, 2019
b4fa0de
fix block plugin to throw exceptions in createWeight rather than toQuery
jimczi Dec 19, 2019
17c0602
remove clean_on_completion option after the initial submit
jimczi Dec 20, 2019
8e5229c
Merge branch 'master' into async_search
jimczi Dec 20, 2019
93189d3
move shard stats in a _shards section for partial response
jimczi Dec 20, 2019
d03b940
address review
jimczi Dec 22, 2019
96a29d9
Merge branch 'master' into async_search
jimczi Jan 6, 2020
7ffb1dd
address review
jimczi Jan 6, 2020
9930ccd
unused import
jimczi Jan 8, 2020
28fd67c
Merge branch 'master' into async_search
jimczi Jan 8, 2020
9d98706
line len
jimczi Jan 8, 2020
6c5288a
Merge branch 'master' into async_search
jimczi Jan 15, 2020
9056b87
address review
jimczi Jan 15, 2020
8a69a21
add more tests
jimczi Jan 15, 2020
78c2ca4
remove unrelated change
jimczi Jan 15, 2020
5eb7148
fix nocommit
jimczi Jan 15, 2020
fb3364f
add more statistics to the search progress listener (number of skippe…
jimczi Jan 22, 2020
1e102fb
Replace generic thread pool execution with asynchronous listener that…
jimczi Jan 22, 2020
139d56a
Merge branch 'master' into async_search
jimczi Jan 22, 2020
dc6daa8
fix checkstyle
jimczi Jan 22, 2020
5e410d3
fix wrong assert
jimczi Jan 22, 2020
eb7ba8a
Expose the logic to cancel task when the rest channel is closed
jimczi Jan 24, 2020
6ae7903
address review
jimczi Jan 24, 2020
159ed81
plug the automatic cancel of search if the rest channel is closed whe…
jimczi Jan 24, 2020
a529db0
Merge branch 'master' into async_search
jimczi Jan 24, 2020
19d89f9
handle expiration time on a per request basis
jimczi Jan 29, 2020
373af3a
use a single replica for the hidden index
jimczi Jan 31, 2020
75601ca
Merge branch 'master' into async_search
jimczi Jan 31, 2020
936cc97
Merge branch 'master' into async_search
jimczi Feb 4, 2020
4237122
iter
jimczi Feb 4, 2020
4b53aad
do not set id twice
jimczi Feb 4, 2020
434b491
change expectation in test now that we use one replica by default
jimczi Feb 4, 2020
15de81a
Merge branch 'master' into async_search
jimczi Feb 4, 2020
0f86649
fix x-pack user to allow restricted indicesé
jimczi Feb 4, 2020
3b9b89e
cleanup
jimczi Feb 4, 2020
b52a85a
iter
jimczi Feb 4, 2020
6a1c98d
Allow x-pack user to manage async-search indices
jimczi Feb 4, 2020
c068667
Merge branch 'master' into async_search
jimczi Feb 5, 2020
1df46f8
another big iter
jimczi Feb 5, 2020
aa85e2d
Merge branch 'master' into async_search
jimczi Feb 6, 2020
05d9638
small fix after merging master
jimczi Feb 6, 2020
e0e2072
checkstyle
jimczi Feb 6, 2020
8bce32f
Merge branch 'master' into async_search
jimczi Feb 6, 2020
33f921e
restore test
jimczi Feb 6, 2020
b903d0e
fix more test
jimczi Feb 6, 2020
7612748
iter
jimczi Feb 6, 2020
2721348
Merge branch 'master' into async_search
jimczi Mar 5, 2020
e9b7add
address review and plug the new async_search origin
jimczi Mar 5, 2020
3e8ba58
unused import
jimczi Mar 5, 2020
cd6f3f2
ensure that the task is unregistered before returning the final respo…
jimczi Mar 5, 2020
20aa0fe
checkstyle
jimczi Mar 5, 2020
05e2209
Merge branch 'master' into async_search
jimczi Mar 9, 2020
415c910
address review
jimczi Mar 9, 2020
91699d5
unused import
jimczi Mar 10, 2020
20d3f66
Merge branch 'master' into async_search
jimczi Mar 10, 2020
e3bea16
fix rest API reference to the outdated 304 response status
jimczi Mar 10, 2020
8727385
remove last_version parameter
jimczi Mar 10, 2020
315bb49
rephrase rest option after review
jimczi Mar 10, 2020
806ae8f
Merge branch 'master' into async_search
jimczi Mar 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

private final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
protected final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
protected final GroupShardsIterator<SearchShardIterator> shardsIts;
private final int expectedTotalOps;
private final AtomicInteger totalOps = new AtomicInteger();
Expand Down Expand Up @@ -385,7 +385,7 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
logger.trace(new ParameterizedMessage("{}: Failed to execute [{}]", shard, request), e);
}
}
onShardGroupFailure(shardIndex, e);
onShardGroupFailure(shardIndex, shardTarget, e);
onPhaseDone();
} else {
final ShardRouting nextShard = shardIt.nextOrNull();
Expand All @@ -405,18 +405,19 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e);
}
}
onShardGroupFailure(shardIndex, e);
onShardGroupFailure(shardIndex, shardTarget, e);
}
}
}

/**
* Executed once for every {@link ShardId} that failed on all available shard routing.
*
* @param shardIndex the shard target that failed
* @param exc the final failure reason
* @param shardIndex the shard index that failed
* @param shardTarget the last shard target for this failure
* @param exc the last failure reason
*/
protected void onShardGroupFailure(int shardIndex, Exception exc) {}
protected void onShardGroupFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {}

/**
* Executed once for every failed shard level request. This method is invoked before the next replica is tried for the given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.query.QuerySearchRequest;
Expand Down Expand Up @@ -72,8 +71,6 @@ public void run() throws IOException {
final CountedCollector<SearchPhaseResult> counter = new CountedCollector<>(queryResult::consumeResult,
resultList.size(),
() -> context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)), context);
final SearchSourceBuilder sourceBuilder = context.getRequest().source();
progressListener.notifyListShards(progressListener.searchShards(resultList), sourceBuilder == null || sourceBuilder.size() != 0);
for (final DfsSearchResult dfsResult : resultList) {
final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
Expand All @@ -97,7 +94,7 @@ public void onFailure(Exception exception) {
try {
context.getLogger().debug(() -> new ParameterizedMessage("[{}] Failed to execute query phase",
querySearchRequest.contextId()), exception);
progressListener.notifyQueryFailure(shardIndex, exception);
progressListener.notifyQueryFailure(shardIndex, searchShardTarget, exception);
counter.onFailure(shardIndex, searchShardTarget, exception);
} finally {
// the query might not have been executed at all (for example because thread pool rejected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.transport.Transport;
Expand Down Expand Up @@ -51,6 +52,10 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
shardsIts, timeProvider, clusterState, task, new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(), clusters);
this.searchPhaseController = searchPhaseController;
SearchProgressListener progressListener = task.getProgressListener();
SearchSourceBuilder sourceBuilder = request.source();
progressListener.notifyListShards(progressListener.searchShards(this.shardsIts),
progressListener.searchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,9 +664,9 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
}
numReducePhases++;
index = 1;
if (hasAggs) {
if (hasAggs || hasTopDocs) {
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0] : null, numReducePhases);
}
}
final int i = index++;
Expand Down Expand Up @@ -696,7 +696,7 @@ public ReducedQueryPhase reduce() {
ReducedQueryPhase reducePhase = controller.reducedQueryPhase(results.asList(),
getRemainingAggs(), getRemainingTopDocs(), topDocsStats, numReducePhases, false, performFinalReduce);
progressListener.notifyReduce(progressListener.searchShards(results.asList()),
reducePhase.totalHits, reducePhase.aggregations);
reducePhase.totalHits, reducePhase.aggregations, reducePhase.numReducePhases);
return reducePhase;
}

Expand Down Expand Up @@ -751,7 +751,8 @@ ReducedQueryPhase reduce() {
List<SearchPhaseResult> resultList = results.asList();
final ReducedQueryPhase reducePhase =
reducedQueryPhase(resultList, isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce());
listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits, reducePhase.aggregations);
listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits,
reducePhase.aggregations, reducePhase.numReducePhases);
return reducePhase;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.search.SearchResponse.Clusters;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
Expand All @@ -48,24 +49,27 @@ abstract class SearchProgressListener {
* Executed when shards are ready to be queried.
*
* @param shards The list of shards to query.
* @param skippedShards The list of skipped shards.
* @param clusters The statistics for remote clusters included in the search.
* @param fetchPhase <code>true</code> if the search needs a fetch phase, <code>false</code> otherwise.
**/
public void onListShards(List<SearchShard> shards, boolean fetchPhase) {}
public void onListShards(List<SearchShard> shards, List<SearchShard> skippedShards, Clusters clusters, boolean fetchPhase) {}

/**
* Executed when a shard returns a query result.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)} )}.
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards} )}.
*/
public void onQueryResult(int shardIndex) {}

/**
* Executed when a shard reports a query failure.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}.
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
* @param shardTarget The last shard target that thrown an exception.
* @param exc The cause of the failure.
*/
public void onQueryFailure(int shardIndex, Exception exc) {}
public void onQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {}

/**
* Executed when a partial reduce is created. The number of partial reduce can be controlled via
Expand All @@ -74,38 +78,39 @@ public void onQueryFailure(int shardIndex, Exception exc) {}
* @param shards The list of shards that are part of this reduce.
* @param totalHits The total number of hits in this reduce.
* @param aggs The partial result for aggregations.
* @param version The version number for this reduce.
* @param reducePhase The version number for this reduce.
*/
public void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int version) {}
public void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}

/**
* Executed once when the final reduce is created.
*
* @param shards The list of shards that are part of this reduce.
* @param totalHits The total number of hits in this reduce.
* @param aggs The final result for aggregations.
* @param reducePhase The version number for this reduce.
*/
public void onReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs) {}
public void onReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}

/**
* Executed when a shard returns a fetch result.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}.
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
*/
public void onFetchResult(int shardIndex) {}

/**
* Executed when a shard reports a fetch failure.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}.
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
* @param exc The cause of the failure.
*/
public void onFetchFailure(int shardIndex, Exception exc) {}

final void notifyListShards(List<SearchShard> shards, boolean fetchPhase) {
final void notifyListShards(List<SearchShard> shards, List<SearchShard> skippedShards, Clusters clusters, boolean fetchPhase) {
this.shards = shards;
try {
onListShards(shards, fetchPhase);
onListShards(shards, skippedShards, clusters, fetchPhase);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on list shards"), e);
}
Expand All @@ -120,26 +125,26 @@ final void notifyQueryResult(int shardIndex) {
}
}

final void notifyQueryFailure(int shardIndex, Exception exc) {
final void notifyQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {
try {
onQueryFailure(shardIndex, exc);
onQueryFailure(shardIndex, shardTarget, exc);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] Failed to execute progress listener on query failure",
shards.get(shardIndex)), e);
}
}

final void notifyPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int version) {
final void notifyPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
try {
onPartialReduce(shards, totalHits, aggs, version);
onPartialReduce(shards, totalHits, aggs, reducePhase);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on partial reduce"), e);
}
}

final void notifyReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs) {
final void notifyReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
try {
onReduce(shards, totalHits, aggs);
onReduce(shards, totalHits, aggs, reducePhase);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on reduce"), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.transport.Transport;
Expand Down Expand Up @@ -57,7 +58,7 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
final SearchProgressListener progressListener = task.getProgressListener();
final SearchSourceBuilder sourceBuilder = request.source();
progressListener.notifyListShards(progressListener.searchShards(this.shardsIts),
sourceBuilder == null || sourceBuilder.size() != 0);
progressListener.searchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
}

protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
Expand All @@ -67,8 +68,8 @@ protected void executePhaseOnShard(final SearchShardIterator shardIt, final Shar
}

@Override
protected void onShardGroupFailure(int shardIndex, Exception exc) {
progressListener.notifyQueryFailure(shardIndex, exc);
protected void onShardGroupFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {
progressListener.notifyQueryFailure(shardIndex, shardTarget, exc);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
Expand All @@ -56,9 +55,9 @@
* @see org.elasticsearch.client.Client#search(SearchRequest)
* @see SearchResponse
*/
public final class SearchRequest extends ActionRequest implements IndicesRequest.Replaceable {
public class SearchRequest extends ActionRequest implements IndicesRequest.Replaceable {

private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));
public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512;
Expand Down Expand Up @@ -560,7 +559,7 @@ public boolean isSuggestOnly() {
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
// generating description in a lazy way since source can be quite big
return new SearchTask(id, type, action, null, parentTaskId, headers) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public SearchRequestBuilder setVersion(boolean version) {
sourceBuilder().version(version);
return this;
}

/**
* Should each {@link org.elasticsearch.search.SearchHit} be returned with the
* sequence number and primary term of the last modification of the document.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ public RestStatus status() {
return RestStatus.status(successfulShards, totalShards, shardFailures);
}

public SearchResponseSections getInternalResponse() {
return internalResponse;
}

/**
* The search hits.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class SearchShard implements Comparable<SearchShard> {
private final String clusterAlias;
private final ShardId shardId;

SearchShard(@Nullable String clusterAlias, ShardId shardId) {
public SearchShard(@Nullable String clusterAlias, ShardId shardId) {
this.clusterAlias = clusterAlias;
this.shardId = shardId;
}
Expand Down
38 changes: 0 additions & 38 deletions server/src/main/java/org/elasticsearch/client/node/NodeClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchProgressActionListener;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient;
Expand Down Expand Up @@ -108,38 +102,6 @@ > Task executeLocally(ActionType<Response> action, Request request, TaskListener
listener::onResponse, listener::onFailure);
}

/**
* Execute a {@link SearchRequest} locally and track the progress of the request through
* a {@link SearchProgressActionListener}.
*/
public SearchTask executeSearchLocally(SearchRequest request, SearchProgressActionListener listener) {
// we cannot track the progress if remote cluster requests are splitted.
request.setCcsMinimizeRoundtrips(false);
TransportSearchAction action = (TransportSearchAction) actions.get(SearchAction.INSTANCE);
SearchTask task = (SearchTask) taskManager.register("transport", action.actionName, request);
task.setProgressListener(listener);
action.execute(task, request, new ActionListener<>() {
@Override
public void onResponse(SearchResponse response) {
try {
taskManager.unregister(task);
} finally {
listener.onResponse(response);
}
}

@Override
public void onFailure(Exception e) {
try {
taskManager.unregister(task);
} finally {
listener.onFailure(e);
}
}
});
return task;
}

/**
* The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by
* {@link #executeLocally(ActionType, ActionRequest, TaskListener)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r
searchRequest.routing(request.param("routing"));
searchRequest.preference(request.param("preference"));
searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));
searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", true));
searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));

checkRestTotalHits(request, searchRequest);
}
Expand Down
Loading