Skip to content

Introduce a search_throttled threadpool #33732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Sep 20, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.transport.Transport;

Expand All @@ -40,7 +41,7 @@
* which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
* large portion of the clusters indices.
*/
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchTransportService.CanMatchResponse> {
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchService.CanMatchResponse> {

private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory;
private final GroupShardsIterator<SearchShardIterator> shardsIts;
Expand All @@ -67,13 +68,13 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc

@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
SearchActionListener<SearchTransportService.CanMatchResponse> listener) {
SearchActionListener<SearchService.CanMatchResponse> listener) {
getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchTransportService.CanMatchResponse> results,
protected SearchPhase getNextPhase(SearchPhaseResults<SearchService.CanMatchResponse> results,
SearchPhaseContext context) {

return phaseFactory.apply(getIterator((BitSetSearchPhaseResults) results, shardsIts));
Expand All @@ -100,7 +101,7 @@ private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseRe
}

private static final class BitSetSearchPhaseResults extends InitialSearchPhase.
SearchPhaseResults<SearchTransportService.CanMatchResponse> {
SearchPhaseResults<SearchService.CanMatchResponse> {

private final FixedBitSet possibleMatches;
private int numPossibleMatches;
Expand All @@ -111,7 +112,7 @@ private static final class BitSetSearchPhaseResults extends InitialSearchPhase.
}

@Override
void consumeResult(SearchTransportService.CanMatchResponse result) {
void consumeResult(SearchService.CanMatchResponse result) {
if (result.canMatch()) {
consumeShardFailure(result.getShardIndex());
}
Expand Down Expand Up @@ -139,7 +140,7 @@ synchronized FixedBitSet getPossibleMatches() {
}

@Override
Stream<SearchTransportService.CanMatchResponse> getSuccessfulResults() {
Stream<SearchService.CanMatchResponse> getSuccessfulResults() {
return Stream.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
Expand Down Expand Up @@ -111,9 +112,9 @@ public void sendFreeContext(Transport.Connection connection, long contextId, fin
}

public void sendCanMatch(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final
ActionListener<CanMatchResponse> listener) {
ActionListener<SearchService.CanMatchResponse> listener) {
transportService.sendChildRequest(connection, QUERY_CAN_MATCH_NAME, request, task,
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, CanMatchResponse::new));
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, SearchService.CanMatchResponse::new));
}

public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener<TransportResponse> listener) {
Expand Down Expand Up @@ -348,99 +349,74 @@ public void onFailure(Exception e) {

transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
(request, channel, task) -> {
searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
channel.sendResponse(searchPhaseResult);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
}
});
searchService.executeQueryPhase(request, (SearchTask) task, new ChannelActionListener<>(channel));
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME,
(request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new);

transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SEARCH, QuerySearchRequest::new,
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SAME, QuerySearchRequest::new,
(request, channel, task) -> {
QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel));
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new);

transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new,
(request, channel, task) -> {
ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel));
});
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);

transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new,
(request, channel, task) -> {
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel));
});
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new);

transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchRequest::new,
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ShardFetchRequest::new,
(request, channel, task) -> {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel));
});
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new);

transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SEARCH, true, true, ShardFetchSearchRequest::new,
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SAME, true, true, ShardFetchSearchRequest::new,
(request, channel, task) -> {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel));
});
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);

// this is cheap, it does not fetch during the rewrite phase, so we can let it quickly execute on a networking thread
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
(request, channel, task) -> {
boolean canMatch = searchService.canMatch(request);
channel.sendResponse(new CanMatchResponse(canMatch));
searchService.canMatch(request, new ChannelActionListener<>(channel));
});
TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME,
(Supplier<TransportResponse>) CanMatchResponse::new);
(Supplier<TransportResponse>) SearchService.CanMatchResponse::new);
}

public static final class CanMatchResponse extends SearchPhaseResult {
private boolean canMatch;
private static class ChannelActionListener<T extends TransportResponse> implements ActionListener<T>{

public CanMatchResponse() {
}
private final TransportChannel channel;

public CanMatchResponse(boolean canMatch) {
this.canMatch = canMatch;
private ChannelActionListener(TransportChannel channel) {
this.channel = channel;
}


@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
canMatch = in.readBoolean();
public void onResponse(T result) {
try {
channel.sendResponse(result);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(canMatch);
}

public boolean canMatch() {
return canMatch;
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,
IndexSettings.INDEX_SEARCH_IDLE_AFTER,
IndexSettings.INDEX_SEARCH_THREAD_POOL,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,
Expand Down
35 changes: 35 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -277,6 +278,26 @@ public final class IndexSettings {
return s;
}, Property.Dynamic, Property.IndexScope);

/**
* Allows to specify a dedicated threadpool to execute searches. This is an expert setting and should be used with care.
* Indices that for instance contain metadata information that need to be always accessible and should not be rejected
* can be served through a different threadpool. Or searches that have a lower priority can go through a threadpool with a
* single thread to prevent larger impact on other searches with a higher priority. This setting allows for custom threadpools
* or search and generic. Other build-in threadpools are disallowed.
*/
public static final Setting<String> INDEX_SEARCH_THREAD_POOL =
new Setting<>("index.search.threadpool", ThreadPool.Names.SEARCH, s -> {
if (s == null || s.isEmpty()) {
throw new IllegalArgumentException("Value for [index.search.threadpool] must be a non-empty string.");
}
if (ThreadPool.Names.SEARCH.equals(s) == false && ThreadPool.Names.GENERIC.equals(s) == false &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really do not like that these can go on the generic thread pool. This thread pool can be quite large (we only recently bound it but it's a really large bound) and it has no queue. Especially since the need for this only arises if the node is under load, now that we are going unbounded it might only add to the pain the node is already experiencing. I understand the entire point is so that certain search requests never get rejected but I think we should find a different solution than using the generic thread pool for that. For example, could we force put them? Could we enable force putting them at the top of the queue?

ThreadPool.THREAD_POOL_TYPES.containsKey(s)) {
throw new IllegalArgumentException("Invalid valid for [index.search.threadpool] - " + s + " is a reserved built-in " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

valid -> value

"threadpool");
}
return s;
}, Property.IndexScope, Property.Dynamic);

private final Index index;
private final Version version;
private final Logger logger;
Expand Down Expand Up @@ -319,6 +340,7 @@ public final class IndexSettings {
private volatile int maxAnalyzedOffset;
private volatile int maxTermsCount;
private volatile String defaultPipeline;
private volatile String searchThreadPool;

/**
* The maximum number of refresh listeners allows on this shard.
Expand Down Expand Up @@ -402,6 +424,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
this.indexMetaData = indexMetaData;
numberOfShards = settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, null);

this.searchThreadPool = INDEX_SEARCH_THREAD_POOL.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
this.queryStringAllowLeadingWildcard = QUERY_STRING_ALLOW_LEADING_WILDCARD.get(nodeSettings);
Expand Down Expand Up @@ -478,6 +501,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline);
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THREAD_POOL, this::setSearchThreadPool);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }
Expand Down Expand Up @@ -879,4 +903,15 @@ private void setSoftDeleteRetentionOperations(long ops) {
public long getSoftDeleteRetentionOperations() {
return this.softDeleteRetentionOperations;
}

/**
* Returns the thread-pool name to execute search requests on for this index.
*/
public String getSearchThreadPool() {
return searchThreadPool;
}

private void setSearchThreadPool(String searchThreadPool) {
this.searchThreadPool = searchThreadPool;
}
}
Loading