Skip to content

Commit 761f4c1

Browse files
committed
Introduce a search_throttled threadpool (#33732)
Today all searches happen on the search threadpool which is the correct behavior in almost any case. Yet, there are exceptions where for instance searches searches should be passed through a single-thread thread-pool to reduce impact on a node. This change adds a index-private setting that allows to mark an index as throttled for searches and forks off all non-stats searcher access to this thread-pool for indices that are marked as `index.search.throttled`
1 parent 78c2600 commit 761f4c1

17 files changed

+441
-247
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ protected ShardValidateQueryResponse shardOperation(ShardValidateQueryRequest re
200200
} catch (QueryShardException|ParsingException e) {
201201
valid = false;
202202
error = e.getDetailedMessage();
203-
} catch (AssertionError|IOException e) {
203+
} catch (AssertionError e) {
204204
valid = false;
205205
error = e.getMessage();
206206
} finally {
@@ -210,7 +210,7 @@ protected ShardValidateQueryResponse shardOperation(ShardValidateQueryRequest re
210210
return new ShardValidateQueryResponse(request.shardId(), valid, explanation, error);
211211
}
212212

213-
private String explain(SearchContext context, boolean rewritten) throws IOException {
213+
private String explain(SearchContext context, boolean rewritten) {
214214
Query query = context.query();
215215
if (rewritten && query instanceof MatchNoDocsQuery) {
216216
return context.parsedQuery().query().toString();

server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.common.inject.Inject;
3434
import org.elasticsearch.common.lease.Releasables;
3535
import org.elasticsearch.common.settings.Settings;
36+
import org.elasticsearch.index.IndexService;
3637
import org.elasticsearch.index.engine.Engine;
3738
import org.elasticsearch.index.get.GetResult;
3839
import org.elasticsearch.index.shard.ShardId;
@@ -136,4 +137,11 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
136137
clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), request.request().preference()
137138
);
138139
}
140+
141+
@Override
142+
protected String getExecutor(ExplainRequest request, ShardId shardId) {
143+
IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex());
144+
return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
145+
shardId);
146+
}
139147
}

server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,11 @@ protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
9393
protected GetResponse newResponse() {
9494
return new GetResponse();
9595
}
96+
97+
@Override
98+
protected String getExecutor(GetRequest request, ShardId shardId) {
99+
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
100+
return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
101+
shardId);
102+
}
96103
}

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,11 @@ protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, Sha
102102

103103
return response;
104104
}
105+
106+
@Override
107+
protected String getExecutor(MultiGetShardRequest request, ShardId shardId) {
108+
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
109+
return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
110+
shardId);
111+
}
105112
}

server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.ActionListener;
2424
import org.elasticsearch.cluster.routing.GroupShardsIterator;
2525
import org.elasticsearch.cluster.routing.ShardRouting;
26+
import org.elasticsearch.search.SearchService;
2627
import org.elasticsearch.search.internal.AliasFilter;
2728
import org.elasticsearch.transport.Transport;
2829

@@ -40,7 +41,7 @@
4041
* which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
4142
* large portion of the clusters indices.
4243
*/
43-
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchTransportService.CanMatchResponse> {
44+
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchService.CanMatchResponse> {
4445

4546
private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory;
4647
private final GroupShardsIterator<SearchShardIterator> shardsIts;
@@ -67,13 +68,13 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
6768

6869
@Override
6970
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
70-
SearchActionListener<SearchTransportService.CanMatchResponse> listener) {
71+
SearchActionListener<SearchService.CanMatchResponse> listener) {
7172
getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
7273
buildShardSearchRequest(shardIt), getTask(), listener);
7374
}
7475

7576
@Override
76-
protected SearchPhase getNextPhase(SearchPhaseResults<SearchTransportService.CanMatchResponse> results,
77+
protected SearchPhase getNextPhase(SearchPhaseResults<SearchService.CanMatchResponse> results,
7778
SearchPhaseContext context) {
7879

7980
return phaseFactory.apply(getIterator((BitSetSearchPhaseResults) results, shardsIts));
@@ -100,7 +101,7 @@ private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseRe
100101
}
101102

102103
private static final class BitSetSearchPhaseResults extends InitialSearchPhase.
103-
SearchPhaseResults<SearchTransportService.CanMatchResponse> {
104+
SearchPhaseResults<SearchService.CanMatchResponse> {
104105

105106
private final FixedBitSet possibleMatches;
106107
private int numPossibleMatches;
@@ -111,7 +112,7 @@ private static final class BitSetSearchPhaseResults extends InitialSearchPhase.
111112
}
112113

113114
@Override
114-
void consumeResult(SearchTransportService.CanMatchResponse result) {
115+
void consumeResult(SearchService.CanMatchResponse result) {
115116
if (result.canMatch()) {
116117
consumeShardFailure(result.getShardIndex());
117118
}
@@ -139,7 +140,7 @@ synchronized FixedBitSet getPossibleMatches() {
139140
}
140141

141142
@Override
142-
Stream<SearchTransportService.CanMatchResponse> getSuccessfulResults() {
143+
Stream<SearchService.CanMatchResponse> getSuccessfulResults() {
143144
return Stream.empty();
144145
}
145146
}

server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 22 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.elasticsearch.action.ActionListenerResponseHandler;
2525
import org.elasticsearch.action.IndicesRequest;
2626
import org.elasticsearch.action.OriginalIndices;
27-
import org.elasticsearch.action.support.HandledTransportAction;
27+
import org.elasticsearch.action.support.HandledTransportAction.ChannelActionListener;
2828
import org.elasticsearch.action.support.IndicesOptions;
2929
import org.elasticsearch.cluster.node.DiscoveryNode;
3030
import org.elasticsearch.common.component.AbstractComponent;
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
3535
import org.elasticsearch.search.SearchPhaseResult;
3636
import org.elasticsearch.search.SearchService;
37+
import org.elasticsearch.search.SearchService.CanMatchResponse;
3738
import org.elasticsearch.search.dfs.DfsSearchResult;
3839
import org.elasticsearch.search.fetch.FetchSearchResult;
3940
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
@@ -60,7 +61,6 @@
6061
import org.elasticsearch.transport.TransportService;
6162

6263
import java.io.IOException;
63-
import java.io.UncheckedIOException;
6464
import java.util.HashMap;
6565
import java.util.Map;
6666
import java.util.function.BiFunction;
@@ -340,26 +340,9 @@ public void messageReceived(TransportRequest.Empty request, TransportChannel cha
340340
transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
341341
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
342342
@Override
343-
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
344-
searchService.executeDfsPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
345-
@Override
346-
public void onResponse(SearchPhaseResult searchPhaseResult) {
347-
try {
348-
channel.sendResponse(searchPhaseResult);
349-
} catch (IOException e) {
350-
throw new UncheckedIOException(e);
351-
}
352-
}
353-
354-
@Override
355-
public void onFailure(Exception e) {
356-
try {
357-
channel.sendResponse(e);
358-
} catch (IOException e1) {
359-
throw new UncheckedIOException(e1);
360-
}
361-
}
362-
});
343+
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) {
344+
searchService.executeDfsPhase(request, (SearchTask) task,
345+
new ChannelActionListener<>(channel, DFS_ACTION_NAME, request));
363346

364347
}
365348
});
@@ -369,8 +352,8 @@ public void onFailure(Exception e) {
369352
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
370353
@Override
371354
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) {
372-
searchService.executeQueryPhase(request, (SearchTask) task, new HandledTransportAction.ChannelActionListener<>
373-
(channel, QUERY_ACTION_NAME, request));
355+
searchService.executeQueryPhase(request, (SearchTask) task,
356+
new ChannelActionListener<>(channel, QUERY_ACTION_NAME, request));
374357
}
375358
});
376359
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME,
@@ -379,49 +362,49 @@ public void messageReceived(ShardSearchTransportRequest request, TransportChanne
379362
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
380363
new TaskAwareTransportRequestHandler<QuerySearchRequest>() {
381364
@Override
382-
public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception {
383-
QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
384-
channel.sendResponse(result);
365+
public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) {
366+
searchService.executeQueryPhase(request, (SearchTask)task,
367+
new ChannelActionListener<>(channel, QUERY_ID_ACTION_NAME, request));
385368
}
386369
});
387370
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new);
388371

389372
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
390373
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
391374
@Override
392-
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
393-
ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
394-
channel.sendResponse(result);
375+
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) {
376+
searchService.executeQueryPhase(request, (SearchTask)task,
377+
new ChannelActionListener<>(channel, QUERY_SCROLL_ACTION_NAME, request));
395378
}
396379
});
397380
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);
398381

399382
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
400383
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
401384
@Override
402-
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
403-
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
404-
channel.sendResponse(result);
385+
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) {
386+
searchService.executeFetchPhase(request, (SearchTask)task,
387+
new ChannelActionListener<>(channel, QUERY_FETCH_SCROLL_ACTION_NAME, request));
405388
}
406389
});
407390
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new);
408391

409392
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH,
410393
new TaskAwareTransportRequestHandler<ShardFetchRequest>() {
411394
@Override
412-
public void messageReceived(ShardFetchRequest request, TransportChannel channel, Task task) throws Exception {
413-
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
414-
channel.sendResponse(result);
395+
public void messageReceived(ShardFetchRequest request, TransportChannel channel, Task task){
396+
searchService.executeFetchPhase(request, (SearchTask)task,
397+
new ChannelActionListener<>(channel, FETCH_ID_SCROLL_ACTION_NAME, request));
415398
}
416399
});
417400
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new);
418401

419402
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH, true, true,
420403
new TaskAwareTransportRequestHandler<ShardFetchSearchRequest>() {
421404
@Override
422-
public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception {
423-
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
424-
channel.sendResponse(result);
405+
public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) {
406+
searchService.executeFetchPhase(request, (SearchTask)task,
407+
new ChannelActionListener<>(channel, FETCH_ID_ACTION_NAME, request));
425408
}
426409
});
427410
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);
@@ -439,35 +422,6 @@ public void messageReceived(ShardSearchTransportRequest request, TransportChanne
439422
(Supplier<TransportResponse>) CanMatchResponse::new);
440423
}
441424

442-
public static final class CanMatchResponse extends SearchPhaseResult {
443-
private boolean canMatch;
444-
445-
public CanMatchResponse() {
446-
}
447-
448-
public CanMatchResponse(boolean canMatch) {
449-
this.canMatch = canMatch;
450-
}
451-
452-
453-
@Override
454-
public void readFrom(StreamInput in) throws IOException {
455-
super.readFrom(in);
456-
canMatch = in.readBoolean();
457-
}
458-
459-
@Override
460-
public void writeTo(StreamOutput out) throws IOException {
461-
super.writeTo(out);
462-
out.writeBoolean(canMatch);
463-
}
464-
465-
public boolean canMatch() {
466-
return canMatch;
467-
}
468-
}
469-
470-
471425
/**
472426
* Returns a connection to the given node on the provided cluster. If the cluster alias is <code>null</code> the node will be resolved
473427
* against the local cluster.

server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.cluster.service.ClusterService;
3737
import org.elasticsearch.common.Nullable;
3838
import org.elasticsearch.common.settings.Settings;
39+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3940
import org.elasticsearch.tasks.Task;
4041
import org.elasticsearch.threadpool.ThreadPool;
4142
import org.elasticsearch.transport.TransportChannel;
@@ -56,6 +57,7 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
5657
protected final TransportService transportService;
5758

5859
final String transportShardAction;
60+
private final String shardExecutor;
5961

6062
protected TransportBroadcastAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService,
6163
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
@@ -64,8 +66,9 @@ protected TransportBroadcastAction(Settings settings, String actionName, ThreadP
6466
this.clusterService = clusterService;
6567
this.transportService = transportService;
6668
this.transportShardAction = actionName + "[s]";
69+
this.shardExecutor = shardExecutor;
6770

68-
transportService.registerRequestHandler(transportShardAction, shardRequest, shardExecutor, new ShardTransportHandler());
71+
transportService.registerRequestHandler(transportShardAction, shardRequest, ThreadPool.Names.SAME, new ShardTransportHandler());
6972
}
7073

7174
@Override
@@ -280,12 +283,50 @@ class ShardTransportHandler implements TransportRequestHandler<ShardRequest> {
280283

281284
@Override
282285
public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception {
283-
channel.sendResponse(shardOperation(request, task));
286+
asyncShardOperation(request, task, new ActionListener<ShardResponse>() {
287+
@Override
288+
public void onResponse(ShardResponse response) {
289+
try {
290+
channel.sendResponse(response);
291+
} catch (Exception e) {
292+
onFailure(e);
293+
}
294+
}
295+
296+
@Override
297+
public void onFailure(Exception e) {
298+
try {
299+
channel.sendResponse(e);
300+
} catch (Exception e1) {
301+
logger.warn(() -> new ParameterizedMessage(
302+
"Failed to send error response for action [{}] and request [{}]", actionName, request), e1);
303+
}
304+
}
305+
});
284306
}
285307

286308
@Override
287309
public final void messageReceived(final ShardRequest request, final TransportChannel channel) throws Exception {
288310
throw new UnsupportedOperationException("the task parameter is required");
289311
}
290312
}
313+
314+
protected void asyncShardOperation(ShardRequest request, Task task, ActionListener<ShardResponse> listener) {
315+
transportService.getThreadPool().executor(getExecutor(request)).execute(new AbstractRunnable() {
316+
@Override
317+
public void onFailure(Exception e) {
318+
listener.onFailure(e);
319+
}
320+
321+
@Override
322+
protected void doRun() throws Exception {
323+
listener.onResponse(shardOperation(request, task));
324+
}
325+
});
326+
}
327+
328+
protected String getExecutor(ShardRequest request) {
329+
return shardExecutor;
330+
}
331+
291332
}

0 commit comments

Comments
 (0)