Skip to content

Introduce a search_throttled threadpool #33732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Sep 20, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ protected ShardValidateQueryResponse shardOperation(ShardValidateQueryRequest re
} catch (QueryShardException|ParsingException e) {
valid = false;
error = e.getDetailedMessage();
} catch (AssertionError|IOException e) {
} catch (AssertionError e) {
valid = false;
error = e.getMessage();
} finally {
Expand All @@ -210,7 +210,7 @@ protected ShardValidateQueryResponse shardOperation(ShardValidateQueryRequest re
return new ShardValidateQueryResponse(request.shardId(), valid, explanation, error);
}

private String explain(SearchContext context, boolean rewritten) throws IOException {
private String explain(SearchContext context, boolean rewritten) {
Query query = context.query();
if (rewritten && query instanceof MatchNoDocsQuery) {
return context.parsedQuery().query().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,11 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), request.request().preference()
);
}

@Override
protected String getExecutor(ExplainRequest request, ShardId shardId) {
IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex());
return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
shardId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,11 @@ protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
protected GetResponse newResponse() {
return new GetResponse();
}

@Override
protected String getExecutor(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
shardId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,11 @@ protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, Sha

return response;
}

@Override
protected String getExecutor(MultiGetShardRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
shardId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.transport.Transport;

Expand All @@ -40,7 +41,7 @@
* which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
* large portion of the clusters indices.
*/
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchTransportService.CanMatchResponse> {
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchService.CanMatchResponse> {

private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory;
private final GroupShardsIterator<SearchShardIterator> shardsIts;
Expand All @@ -67,13 +68,13 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc

@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
SearchActionListener<SearchTransportService.CanMatchResponse> listener) {
SearchActionListener<SearchService.CanMatchResponse> listener) {
getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchTransportService.CanMatchResponse> results,
protected SearchPhase getNextPhase(SearchPhaseResults<SearchService.CanMatchResponse> results,
SearchPhaseContext context) {

return phaseFactory.apply(getIterator((BitSetSearchPhaseResults) results, shardsIts));
Expand All @@ -100,7 +101,7 @@ private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseRe
}

private static final class BitSetSearchPhaseResults extends InitialSearchPhase.
SearchPhaseResults<SearchTransportService.CanMatchResponse> {
SearchPhaseResults<SearchService.CanMatchResponse> {

private final FixedBitSet possibleMatches;
private int numPossibleMatches;
Expand All @@ -111,7 +112,7 @@ private static final class BitSetSearchPhaseResults extends InitialSearchPhase.
}

@Override
void consumeResult(SearchTransportService.CanMatchResponse result) {
void consumeResult(SearchService.CanMatchResponse result) {
if (result.canMatch()) {
consumeShardFailure(result.getShardIndex());
}
Expand Down Expand Up @@ -139,7 +140,7 @@ synchronized FixedBitSet getPossibleMatches() {
}

@Override
Stream<SearchTransportService.CanMatchResponse> getSuccessfulResults() {
Stream<SearchService.CanMatchResponse> getSuccessfulResults() {
return Stream.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.HandledTransportAction.ChannelActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
Expand Down Expand Up @@ -112,9 +112,9 @@ public void sendFreeContext(Transport.Connection connection, long contextId, fin
}

public void sendCanMatch(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final
ActionListener<CanMatchResponse> listener) {
ActionListener<SearchService.CanMatchResponse> listener) {
transportService.sendChildRequest(connection, QUERY_CAN_MATCH_NAME, request, task,
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, CanMatchResponse::new));
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, SearchService.CanMatchResponse::new));
}

public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener<TransportResponse> listener) {
Expand Down Expand Up @@ -349,83 +349,54 @@ public void onFailure(Exception e) {

transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
(request, channel, task) -> {
searchService.executeQueryPhase(request, (SearchTask) task, new HandledTransportAction.ChannelActionListener<>(
searchService.executeQueryPhase(request, (SearchTask) task, new ChannelActionListener<>(
channel, QUERY_ACTION_NAME, request));
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME,
(request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new);

transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SEARCH, QuerySearchRequest::new,
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SAME, QuerySearchRequest::new,
(request, channel, task) -> {
QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, QUERY_ID_ACTION_NAME,
request));
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new);

transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new,
(request, channel, task) -> {
ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, QUERY_SCROLL_ACTION_NAME,
request));
});
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);

transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new,
(request, channel, task) -> {
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel,
QUERY_FETCH_SCROLL_ACTION_NAME, request));
});
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new);

transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchRequest::new,
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ShardFetchRequest::new,
(request, channel, task) -> {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel,
FETCH_ID_SCROLL_ACTION_NAME, request));
});
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new);

transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SEARCH, true, true, ShardFetchSearchRequest::new,
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SAME, true, true, ShardFetchSearchRequest::new,
(request, channel, task) -> {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, FETCH_ID_ACTION_NAME,
request));
});
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);

// this is cheap, it does not fetch during the rewrite phase, so we can let it quickly execute on a networking thread
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
(request, channel, task) -> {
boolean canMatch = searchService.canMatch(request);
channel.sendResponse(new CanMatchResponse(canMatch));
searchService.canMatch(request, new ChannelActionListener<>(channel, QUERY_CAN_MATCH_NAME, request));
});
TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME,
(Supplier<TransportResponse>) CanMatchResponse::new);
}

public static final class CanMatchResponse extends SearchPhaseResult {
private boolean canMatch;

public CanMatchResponse() {
}

public CanMatchResponse(boolean canMatch) {
this.canMatch = canMatch;
}


@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
canMatch = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(canMatch);
}

public boolean canMatch() {
return canMatch;
}
(Supplier<TransportResponse>) SearchService.CanMatchResponse::new);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
Expand All @@ -57,6 +58,7 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
protected final IndexNameExpressionResolver indexNameExpressionResolver;

final String transportShardAction;
private final String shardExecutor;

protected TransportBroadcastAction(Settings settings, String actionName, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Expand All @@ -66,8 +68,9 @@ protected TransportBroadcastAction(Settings settings, String actionName, Cluster
this.transportService = transportService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.transportShardAction = actionName + "[s]";
this.shardExecutor = shardExecutor;

transportService.registerRequestHandler(transportShardAction, shardRequest, shardExecutor, new ShardTransportHandler());
transportService.registerRequestHandler(transportShardAction, shardRequest, ThreadPool.Names.SAME, new ShardTransportHandler());
}

@Override
Expand Down Expand Up @@ -276,7 +279,45 @@ class ShardTransportHandler implements TransportRequestHandler<ShardRequest> {

@Override
public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception {
channel.sendResponse(shardOperation(request, task));
asyncShardOperation(request, task, new ActionListener<ShardResponse>() {
@Override
public void onResponse(ShardResponse response) {
try {
channel.sendResponse(response);
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn(() -> new ParameterizedMessage(
"Failed to send error response for action [{}] and request [{}]", actionName, request), e1);
}
}
});
}
}

protected void asyncShardOperation(ShardRequest request, Task task, ActionListener<ShardResponse> listener) {
transportService.getThreadPool().executor(getExecutor(request)).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
protected void doRun() throws Exception {
listener.onResponse(shardOperation(request, task));
}
});
}

protected String getExecutor(ShardRequest request) {
return shardExecutor;
}

}
Loading