Skip to content

Commit 711ccbe

Browse files
committed
Allow for pluggable search threadpool
Today all searches happen on the search threadpool which is the correct behavior in almost any case. Yet, there are exceptions where for instance searches are required to succeed ie. in the case of a .security index. These searches should not be rejected if a node is under load. There are other more specialized usecases were searches should be passed through a single-thread threadpool to reduce impact on a node. Relates to elastic#33205
1 parent a192785 commit 711ccbe

File tree

7 files changed

+369
-197
lines changed

7 files changed

+369
-197
lines changed

server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.ActionListener;
2424
import org.elasticsearch.cluster.routing.GroupShardsIterator;
2525
import org.elasticsearch.cluster.routing.ShardRouting;
26+
import org.elasticsearch.search.SearchService;
2627
import org.elasticsearch.search.internal.AliasFilter;
2728
import org.elasticsearch.transport.Transport;
2829

@@ -40,7 +41,7 @@
4041
* which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
4142
* large portion of the clusters indices.
4243
*/
43-
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchTransportService.CanMatchResponse> {
44+
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchService.CanMatchResponse> {
4445

4546
private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory;
4647
private final GroupShardsIterator<SearchShardIterator> shardsIts;
@@ -67,13 +68,13 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
6768

6869
@Override
6970
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
70-
SearchActionListener<SearchTransportService.CanMatchResponse> listener) {
71+
SearchActionListener<SearchService.CanMatchResponse> listener) {
7172
getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
7273
buildShardSearchRequest(shardIt), getTask(), listener);
7374
}
7475

7576
@Override
76-
protected SearchPhase getNextPhase(SearchPhaseResults<SearchTransportService.CanMatchResponse> results,
77+
protected SearchPhase getNextPhase(SearchPhaseResults<SearchService.CanMatchResponse> results,
7778
SearchPhaseContext context) {
7879

7980
return phaseFactory.apply(getIterator((BitSetSearchPhaseResults) results, shardsIts));
@@ -100,7 +101,7 @@ private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseRe
100101
}
101102

102103
private static final class BitSetSearchPhaseResults extends InitialSearchPhase.
103-
SearchPhaseResults<SearchTransportService.CanMatchResponse> {
104+
SearchPhaseResults<SearchService.CanMatchResponse> {
104105

105106
private final FixedBitSet possibleMatches;
106107
private int numPossibleMatches;
@@ -111,7 +112,7 @@ private static final class BitSetSearchPhaseResults extends InitialSearchPhase.
111112
}
112113

113114
@Override
114-
void consumeResult(SearchTransportService.CanMatchResponse result) {
115+
void consumeResult(SearchService.CanMatchResponse result) {
115116
if (result.canMatch()) {
116117
consumeShardFailure(result.getShardIndex());
117118
}
@@ -139,7 +140,7 @@ synchronized FixedBitSet getPossibleMatches() {
139140
}
140141

141142
@Override
142-
Stream<SearchTransportService.CanMatchResponse> getSuccessfulResults() {
143+
Stream<SearchService.CanMatchResponse> getSuccessfulResults() {
143144
return Stream.empty();
144145
}
145146
}

server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 32 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.elasticsearch.transport.RemoteClusterService;
4949
import org.elasticsearch.transport.Transport;
5050
import org.elasticsearch.transport.TransportActionProxy;
51+
import org.elasticsearch.transport.TransportChannel;
5152
import org.elasticsearch.transport.TransportException;
5253
import org.elasticsearch.transport.TransportRequest;
5354
import org.elasticsearch.transport.TransportRequestOptions;
@@ -111,9 +112,9 @@ public void sendFreeContext(Transport.Connection connection, long contextId, fin
111112
}
112113

113114
public void sendCanMatch(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final
114-
ActionListener<CanMatchResponse> listener) {
115+
ActionListener<SearchService.CanMatchResponse> listener) {
115116
transportService.sendChildRequest(connection, QUERY_CAN_MATCH_NAME, request, task,
116-
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, CanMatchResponse::new));
117+
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, SearchService.CanMatchResponse::new));
117118
}
118119

119120
public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener<TransportResponse> listener) {
@@ -348,99 +349,74 @@ public void onFailure(Exception e) {
348349

349350
transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
350351
(request, channel, task) -> {
351-
searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
352-
@Override
353-
public void onResponse(SearchPhaseResult searchPhaseResult) {
354-
try {
355-
channel.sendResponse(searchPhaseResult);
356-
} catch (IOException e) {
357-
throw new UncheckedIOException(e);
358-
}
359-
}
360-
361-
@Override
362-
public void onFailure(Exception e) {
363-
try {
364-
channel.sendResponse(e);
365-
} catch (IOException e1) {
366-
throw new UncheckedIOException(e1);
367-
}
368-
}
369-
});
352+
searchService.executeQueryPhase(request, (SearchTask) task, new ChannelActionListener<>(channel));
370353
});
371354
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME,
372355
(request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new);
373356

374-
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SEARCH, QuerySearchRequest::new,
357+
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SAME, QuerySearchRequest::new,
375358
(request, channel, task) -> {
376-
QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
377-
channel.sendResponse(result);
359+
searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel));
378360
});
379361
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new);
380362

381-
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
363+
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new,
382364
(request, channel, task) -> {
383-
ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
384-
channel.sendResponse(result);
365+
searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel));
385366
});
386367
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);
387368

388-
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
369+
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new,
389370
(request, channel, task) -> {
390-
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
391-
channel.sendResponse(result);
371+
searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel));
392372
});
393373
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new);
394374

395-
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchRequest::new,
375+
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ShardFetchRequest::new,
396376
(request, channel, task) -> {
397-
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
398-
channel.sendResponse(result);
377+
searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel));
399378
});
400379
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new);
401380

402-
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SEARCH, true, true, ShardFetchSearchRequest::new,
381+
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SAME, true, true, ShardFetchSearchRequest::new,
403382
(request, channel, task) -> {
404-
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
405-
channel.sendResponse(result);
383+
searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel));
406384
});
407385
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);
408386

409387
// this is cheap, it does not fetch during the rewrite phase, so we can let it quickly execute on a networking thread
410388
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
411389
(request, channel, task) -> {
412-
boolean canMatch = searchService.canMatch(request);
413-
channel.sendResponse(new CanMatchResponse(canMatch));
390+
searchService.canMatch(request, new ChannelActionListener<>(channel));
414391
});
415392
TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME,
416-
(Supplier<TransportResponse>) CanMatchResponse::new);
393+
(Supplier<TransportResponse>) SearchService.CanMatchResponse::new);
417394
}
418395

419-
public static final class CanMatchResponse extends SearchPhaseResult {
420-
private boolean canMatch;
396+
private static class ChannelActionListener<T extends TransportResponse> implements ActionListener<T>{
421397

422-
public CanMatchResponse() {
423-
}
398+
private final TransportChannel channel;
424399

425-
public CanMatchResponse(boolean canMatch) {
426-
this.canMatch = canMatch;
400+
private ChannelActionListener(TransportChannel channel) {
401+
this.channel = channel;
427402
}
428403

429-
430404
@Override
431-
public void readFrom(StreamInput in) throws IOException {
432-
super.readFrom(in);
433-
canMatch = in.readBoolean();
405+
public void onResponse(T result) {
406+
try {
407+
channel.sendResponse(result);
408+
} catch (IOException e) {
409+
throw new UncheckedIOException(e);
410+
}
434411
}
435412

436413
@Override
437-
public void writeTo(StreamOutput out) throws IOException {
438-
super.writeTo(out);
439-
out.writeBoolean(canMatch);
440-
}
441-
442-
public boolean canMatch() {
443-
return canMatch;
414+
public void onFailure(Exception e) {
415+
try {
416+
channel.sendResponse(e);
417+
} catch (IOException e1) {
418+
throw new UncheckedIOException(e1);
419+
}
444420
}
445421
}
446422

server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
142142
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
143143
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,
144144
IndexSettings.INDEX_SEARCH_IDLE_AFTER,
145+
IndexSettings.INDEX_SEARCH_THREAD_POOL,
145146
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
146147
FieldMapper.IGNORE_MALFORMED_SETTING,
147148
FieldMapper.COERCE_SETTING,

server/src/main/java/org/elasticsearch/index/IndexSettings.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.index.translog.Translog;
3434
import org.elasticsearch.ingest.IngestService;
3535
import org.elasticsearch.node.Node;
36+
import org.elasticsearch.threadpool.ThreadPool;
3637

3738
import java.util.Collections;
3839
import java.util.List;
@@ -277,6 +278,26 @@ public final class IndexSettings {
277278
return s;
278279
}, Property.Dynamic, Property.IndexScope);
279280

281+
/**
282+
* Allows to specify a dedicated threadpool to execute searches. This is an expert setting and should be used with care.
283+
* Indices that for instance contain metadata information that need to be always accessible and should not be rejected
284+
* can be served through a different threadpool. Or searches that have a lower priority can go through a threadpool with a
285+
* single thread to prevent larger impact on other searches with a higher priority. This setting allows for custom threadpools
286+
* or search and generic. Other build-in threadpools are disallowed.
287+
*/
288+
public static final Setting<String> INDEX_SEARCH_THREAD_POOL =
289+
new Setting<>("index.search.threadpool", ThreadPool.Names.SEARCH, s -> {
290+
if (s == null || s.isEmpty()) {
291+
throw new IllegalArgumentException("Value for [index.search.threadpool] must be a non-empty string.");
292+
}
293+
if (ThreadPool.Names.SEARCH.equals(s) == false && ThreadPool.Names.GENERIC.equals(s) == false &&
294+
ThreadPool.THREAD_POOL_TYPES.containsKey(s)) {
295+
throw new IllegalArgumentException("Invalid valid for [index.search.threadpool] - " + s + " is a reserved built-in " +
296+
"threadpool");
297+
}
298+
return s;
299+
}, Property.IndexScope, Property.Dynamic);
300+
280301
private final Index index;
281302
private final Version version;
282303
private final Logger logger;
@@ -319,6 +340,7 @@ public final class IndexSettings {
319340
private volatile int maxAnalyzedOffset;
320341
private volatile int maxTermsCount;
321342
private volatile String defaultPipeline;
343+
private volatile String searchThreadPool;
322344

323345
/**
324346
* The maximum number of refresh listeners allows on this shard.
@@ -402,6 +424,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
402424
this.indexMetaData = indexMetaData;
403425
numberOfShards = settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, null);
404426

427+
this.searchThreadPool = INDEX_SEARCH_THREAD_POOL.get(settings);
405428
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
406429
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
407430
this.queryStringAllowLeadingWildcard = QUERY_STRING_ALLOW_LEADING_WILDCARD.get(nodeSettings);
@@ -478,6 +501,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
478501
scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
479502
scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline);
480503
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
504+
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THREAD_POOL, this::setSearchThreadPool);
481505
}
482506

483507
private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }
@@ -879,4 +903,15 @@ private void setSoftDeleteRetentionOperations(long ops) {
879903
public long getSoftDeleteRetentionOperations() {
880904
return this.softDeleteRetentionOperations;
881905
}
906+
907+
/**
908+
* Returns the thread-pool name to execute search requests on for this index.
909+
*/
910+
public String getSearchThreadPool() {
911+
return searchThreadPool;
912+
}
913+
914+
private void setSearchThreadPool(String searchThreadPool) {
915+
this.searchThreadPool = searchThreadPool;
916+
}
882917
}

0 commit comments

Comments
 (0)