Skip to content

Commit 61c2f94

Browse files
s1monwkcm
authored andcommitted
Introduce a search_throttled threadpool (#33732)
Today all searches happen on the search threadpool which is the correct behavior in almost any case. Yet, there are exceptions where for instance searches searches should be passed through a single-thread thread-pool to reduce impact on a node. This change adds a index-private setting that allows to mark an index as throttled for searches and forks off all non-stats searcher access to this thread-pool for indices that are marked as `index.search.throttled`
1 parent f592659 commit 61c2f94

17 files changed

+446
-246
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ protected ShardValidateQueryResponse shardOperation(ShardValidateQueryRequest re
200200
} catch (QueryShardException|ParsingException e) {
201201
valid = false;
202202
error = e.getDetailedMessage();
203-
} catch (AssertionError|IOException e) {
203+
} catch (AssertionError e) {
204204
valid = false;
205205
error = e.getMessage();
206206
} finally {
@@ -210,7 +210,7 @@ protected ShardValidateQueryResponse shardOperation(ShardValidateQueryRequest re
210210
return new ShardValidateQueryResponse(request.shardId(), valid, explanation, error);
211211
}
212212

213-
private String explain(SearchContext context, boolean rewritten) throws IOException {
213+
private String explain(SearchContext context, boolean rewritten) {
214214
Query query = context.query();
215215
if (rewritten && query instanceof MatchNoDocsQuery) {
216216
return context.parsedQuery().query().toString();

server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,11 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
152152
clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), request.request().preference()
153153
);
154154
}
155+
156+
@Override
157+
protected String getExecutor(ExplainRequest request, ShardId shardId) {
158+
IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex());
159+
return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
160+
shardId);
161+
}
155162
}

server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,11 @@ protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
111111
protected GetResponse newResponse() {
112112
return new GetResponse();
113113
}
114+
115+
@Override
116+
protected String getExecutor(GetRequest request, ShardId shardId) {
117+
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
118+
return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
119+
shardId);
120+
}
114121
}

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,11 @@ protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, Sha
102102

103103
return response;
104104
}
105+
106+
@Override
107+
protected String getExecutor(MultiGetShardRequest request, ShardId shardId) {
108+
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
109+
return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
110+
shardId);
111+
}
105112
}

server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.ActionListener;
2424
import org.elasticsearch.cluster.routing.GroupShardsIterator;
2525
import org.elasticsearch.cluster.routing.ShardRouting;
26+
import org.elasticsearch.search.SearchService;
2627
import org.elasticsearch.search.internal.AliasFilter;
2728
import org.elasticsearch.transport.Transport;
2829

@@ -40,7 +41,7 @@
4041
* which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
4142
* large portion of the clusters indices.
4243
*/
43-
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchTransportService.CanMatchResponse> {
44+
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchService.CanMatchResponse> {
4445

4546
private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory;
4647
private final GroupShardsIterator<SearchShardIterator> shardsIts;
@@ -67,13 +68,13 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
6768

6869
@Override
6970
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
70-
SearchActionListener<SearchTransportService.CanMatchResponse> listener) {
71+
SearchActionListener<SearchService.CanMatchResponse> listener) {
7172
getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
7273
buildShardSearchRequest(shardIt), getTask(), listener);
7374
}
7475

7576
@Override
76-
protected SearchPhase getNextPhase(SearchPhaseResults<SearchTransportService.CanMatchResponse> results,
77+
protected SearchPhase getNextPhase(SearchPhaseResults<SearchService.CanMatchResponse> results,
7778
SearchPhaseContext context) {
7879

7980
return phaseFactory.apply(getIterator((BitSetSearchPhaseResults) results, shardsIts));
@@ -100,7 +101,7 @@ private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseRe
100101
}
101102

102103
private static final class BitSetSearchPhaseResults extends InitialSearchPhase.
103-
SearchPhaseResults<SearchTransportService.CanMatchResponse> {
104+
SearchPhaseResults<SearchService.CanMatchResponse> {
104105

105106
private final FixedBitSet possibleMatches;
106107
private int numPossibleMatches;
@@ -111,7 +112,7 @@ private static final class BitSetSearchPhaseResults extends InitialSearchPhase.
111112
}
112113

113114
@Override
114-
void consumeResult(SearchTransportService.CanMatchResponse result) {
115+
void consumeResult(SearchService.CanMatchResponse result) {
115116
if (result.canMatch()) {
116117
consumeShardFailure(result.getShardIndex());
117118
}
@@ -139,7 +140,7 @@ synchronized FixedBitSet getPossibleMatches() {
139140
}
140141

141142
@Override
142-
Stream<SearchTransportService.CanMatchResponse> getSuccessfulResults() {
143+
Stream<SearchService.CanMatchResponse> getSuccessfulResults() {
143144
return Stream.empty();
144145
}
145146
}

server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 21 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.elasticsearch.action.ActionListenerResponseHandler;
2424
import org.elasticsearch.action.IndicesRequest;
2525
import org.elasticsearch.action.OriginalIndices;
26-
import org.elasticsearch.action.support.HandledTransportAction;
26+
import org.elasticsearch.action.support.HandledTransportAction.ChannelActionListener;
2727
import org.elasticsearch.action.support.IndicesOptions;
2828
import org.elasticsearch.cluster.node.DiscoveryNode;
2929
import org.elasticsearch.common.component.AbstractComponent;
@@ -112,9 +112,9 @@ public void sendFreeContext(Transport.Connection connection, long contextId, fin
112112
}
113113

114114
public void sendCanMatch(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final
115-
ActionListener<CanMatchResponse> listener) {
115+
ActionListener<SearchService.CanMatchResponse> listener) {
116116
transportService.sendChildRequest(connection, QUERY_CAN_MATCH_NAME, request, task,
117-
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, CanMatchResponse::new));
117+
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, SearchService.CanMatchResponse::new));
118118
}
119119

120120
public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener<TransportResponse> listener) {
@@ -349,83 +349,54 @@ public void onFailure(Exception e) {
349349

350350
transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
351351
(request, channel, task) -> {
352-
searchService.executeQueryPhase(request, (SearchTask) task, new HandledTransportAction.ChannelActionListener<>(
352+
searchService.executeQueryPhase(request, (SearchTask) task, new ChannelActionListener<>(
353353
channel, QUERY_ACTION_NAME, request));
354354
});
355355
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME,
356356
(request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new);
357357

358-
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SEARCH, QuerySearchRequest::new,
358+
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SAME, QuerySearchRequest::new,
359359
(request, channel, task) -> {
360-
QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
361-
channel.sendResponse(result);
360+
searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, QUERY_ID_ACTION_NAME,
361+
request));
362362
});
363363
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new);
364364

365-
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
365+
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new,
366366
(request, channel, task) -> {
367-
ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
368-
channel.sendResponse(result);
367+
searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, QUERY_SCROLL_ACTION_NAME,
368+
request));
369369
});
370370
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);
371371

372-
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
372+
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new,
373373
(request, channel, task) -> {
374-
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
375-
channel.sendResponse(result);
374+
searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel,
375+
QUERY_FETCH_SCROLL_ACTION_NAME, request));
376376
});
377377
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new);
378378

379-
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchRequest::new,
379+
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ShardFetchRequest::new,
380380
(request, channel, task) -> {
381-
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
382-
channel.sendResponse(result);
381+
searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel,
382+
FETCH_ID_SCROLL_ACTION_NAME, request));
383383
});
384384
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new);
385385

386-
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SEARCH, true, true, ShardFetchSearchRequest::new,
386+
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SAME, true, true, ShardFetchSearchRequest::new,
387387
(request, channel, task) -> {
388-
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
389-
channel.sendResponse(result);
388+
searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, FETCH_ID_ACTION_NAME,
389+
request));
390390
});
391391
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);
392392

393393
// this is cheap, it does not fetch during the rewrite phase, so we can let it quickly execute on a networking thread
394394
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
395395
(request, channel, task) -> {
396-
boolean canMatch = searchService.canMatch(request);
397-
channel.sendResponse(new CanMatchResponse(canMatch));
396+
searchService.canMatch(request, new ChannelActionListener<>(channel, QUERY_CAN_MATCH_NAME, request));
398397
});
399398
TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME,
400-
(Supplier<TransportResponse>) CanMatchResponse::new);
401-
}
402-
403-
public static final class CanMatchResponse extends SearchPhaseResult {
404-
private boolean canMatch;
405-
406-
public CanMatchResponse() {
407-
}
408-
409-
public CanMatchResponse(boolean canMatch) {
410-
this.canMatch = canMatch;
411-
}
412-
413-
414-
@Override
415-
public void readFrom(StreamInput in) throws IOException {
416-
super.readFrom(in);
417-
canMatch = in.readBoolean();
418-
}
419-
420-
@Override
421-
public void writeTo(StreamOutput out) throws IOException {
422-
super.writeTo(out);
423-
out.writeBoolean(canMatch);
424-
}
425-
426-
public boolean canMatch() {
427-
return canMatch;
428-
}
399+
(Supplier<TransportResponse>) SearchService.CanMatchResponse::new);
429400
}
430401

431402

server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.cluster.service.ClusterService;
3737
import org.elasticsearch.common.Nullable;
3838
import org.elasticsearch.common.settings.Settings;
39+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3940
import org.elasticsearch.tasks.Task;
4041
import org.elasticsearch.threadpool.ThreadPool;
4142
import org.elasticsearch.transport.TransportChannel;
@@ -57,6 +58,7 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
5758
protected final IndexNameExpressionResolver indexNameExpressionResolver;
5859

5960
final String transportShardAction;
61+
private final String shardExecutor;
6062

6163
protected TransportBroadcastAction(Settings settings, String actionName, ClusterService clusterService,
6264
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
@@ -66,8 +68,9 @@ protected TransportBroadcastAction(Settings settings, String actionName, Cluster
6668
this.transportService = transportService;
6769
this.indexNameExpressionResolver = indexNameExpressionResolver;
6870
this.transportShardAction = actionName + "[s]";
71+
this.shardExecutor = shardExecutor;
6972

70-
transportService.registerRequestHandler(transportShardAction, shardRequest, shardExecutor, new ShardTransportHandler());
73+
transportService.registerRequestHandler(transportShardAction, shardRequest, ThreadPool.Names.SAME, new ShardTransportHandler());
7174
}
7275

7376
@Override
@@ -276,7 +279,45 @@ class ShardTransportHandler implements TransportRequestHandler<ShardRequest> {
276279

277280
@Override
278281
public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception {
279-
channel.sendResponse(shardOperation(request, task));
282+
asyncShardOperation(request, task, new ActionListener<ShardResponse>() {
283+
@Override
284+
public void onResponse(ShardResponse response) {
285+
try {
286+
channel.sendResponse(response);
287+
} catch (Exception e) {
288+
onFailure(e);
289+
}
290+
}
291+
292+
@Override
293+
public void onFailure(Exception e) {
294+
try {
295+
channel.sendResponse(e);
296+
} catch (Exception e1) {
297+
logger.warn(() -> new ParameterizedMessage(
298+
"Failed to send error response for action [{}] and request [{}]", actionName, request), e1);
299+
}
300+
}
301+
});
280302
}
281303
}
304+
305+
protected void asyncShardOperation(ShardRequest request, Task task, ActionListener<ShardResponse> listener) {
306+
transportService.getThreadPool().executor(getExecutor(request)).execute(new AbstractRunnable() {
307+
@Override
308+
public void onFailure(Exception e) {
309+
listener.onFailure(e);
310+
}
311+
312+
@Override
313+
protected void doRun() throws Exception {
314+
listener.onResponse(shardOperation(request, task));
315+
}
316+
});
317+
}
318+
319+
protected String getExecutor(ShardRequest request) {
320+
return shardExecutor;
321+
}
322+
282323
}

0 commit comments

Comments
 (0)