From 711ccbeb9ad07587293ffcbbff9830f3d3adec72 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Sat, 15 Sep 2018 09:15:01 +0200 Subject: [PATCH 1/9] Allow for pluggable search threadpool Today all searches happen on the search threadpool which is the correct behavior in almost any case. Yet, there are exceptions where for instance searches are required to succeed ie. in the case of a .security index. These searches should not be rejected if a node is under load. There are other more specialized usecases were searches should be passed through a single-thread threadpool to reduce impact on a node. Relates to #33205 --- .../search/CanMatchPreFilterSearchPhase.java | 13 +- .../action/search/SearchTransportService.java | 88 ++--- .../common/settings/IndexScopedSettings.java | 1 + .../elasticsearch/index/IndexSettings.java | 35 ++ .../elasticsearch/search/SearchService.java | 318 +++++++++++------- .../CanMatchPreFilterSearchPhaseTests.java | 13 +- .../search/SearchServiceTests.java | 98 +++++- 7 files changed, 369 insertions(+), 197 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index 0873ff40f7500..82c8f6b815ab4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.transport.Transport; @@ -40,7 +41,7 @@ * which allows to fan out to more shards at the same time without running into rejections even if we are hitting a * large portion of the clusters indices. */ -final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction { +final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction { private final Function, SearchPhase> phaseFactory; private final GroupShardsIterator shardsIts; @@ -67,13 +68,13 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction listener) { + SearchActionListener listener) { getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()), buildShardSearchRequest(shardIt), getTask(), listener); } @Override - protected SearchPhase getNextPhase(SearchPhaseResults results, + protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { return phaseFactory.apply(getIterator((BitSetSearchPhaseResults) results, shardsIts)); @@ -100,7 +101,7 @@ private GroupShardsIterator getIterator(BitSetSearchPhaseRe } private static final class BitSetSearchPhaseResults extends InitialSearchPhase. - SearchPhaseResults { + SearchPhaseResults { private final FixedBitSet possibleMatches; private int numPossibleMatches; @@ -111,7 +112,7 @@ private static final class BitSetSearchPhaseResults extends InitialSearchPhase. } @Override - void consumeResult(SearchTransportService.CanMatchResponse result) { + void consumeResult(SearchService.CanMatchResponse result) { if (result.canMatch()) { consumeShardFailure(result.getShardIndex()); } @@ -139,7 +140,7 @@ synchronized FixedBitSet getPossibleMatches() { } @Override - Stream getSuccessfulResults() { + Stream getSuccessfulResults() { return Stream.empty(); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index a4ea2616e0a21..a94ec214d0ea4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -48,6 +48,7 @@ import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportActionProxy; +import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -111,9 +112,9 @@ public void sendFreeContext(Transport.Connection connection, long contextId, fin } public void sendCanMatch(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final - ActionListener listener) { + ActionListener listener) { transportService.sendChildRequest(connection, QUERY_CAN_MATCH_NAME, request, task, - TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, CanMatchResponse::new)); + TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, SearchService.CanMatchResponse::new)); } public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener listener) { @@ -348,99 +349,74 @@ public void onFailure(Exception e) { transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new, (request, channel, task) -> { - searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener() { - @Override - public void onResponse(SearchPhaseResult searchPhaseResult) { - try { - channel.sendResponse(searchPhaseResult); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (IOException e1) { - throw new UncheckedIOException(e1); - } - } - }); + searchService.executeQueryPhase(request, (SearchTask) task, new ChannelActionListener<>(channel)); }); TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, (request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new); - transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SEARCH, QuerySearchRequest::new, + transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SAME, QuerySearchRequest::new, (request, channel, task) -> { - QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel)); }); TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new); - transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new, + transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new, (request, channel, task) -> { - ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel)); }); TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new); - transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new, + transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new, (request, channel, task) -> { - ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel)); }); TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new); - transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchRequest::new, + transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ShardFetchRequest::new, (request, channel, task) -> { - FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel)); }); TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new); - transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SEARCH, true, true, ShardFetchSearchRequest::new, + transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SAME, true, true, ShardFetchSearchRequest::new, (request, channel, task) -> { - FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel)); }); TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new); // this is cheap, it does not fetch during the rewrite phase, so we can let it quickly execute on a networking thread transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new, (request, channel, task) -> { - boolean canMatch = searchService.canMatch(request); - channel.sendResponse(new CanMatchResponse(canMatch)); + searchService.canMatch(request, new ChannelActionListener<>(channel)); }); TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, - (Supplier) CanMatchResponse::new); + (Supplier) SearchService.CanMatchResponse::new); } - public static final class CanMatchResponse extends SearchPhaseResult { - private boolean canMatch; + private static class ChannelActionListener implements ActionListener{ - public CanMatchResponse() { - } + private final TransportChannel channel; - public CanMatchResponse(boolean canMatch) { - this.canMatch = canMatch; + private ChannelActionListener(TransportChannel channel) { + this.channel = channel; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - canMatch = in.readBoolean(); + public void onResponse(T result) { + try { + channel.sendResponse(result); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(canMatch); - } - - public boolean canMatch() { - return canMatch; + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (IOException e1) { + throw new UncheckedIOException(e1); + } } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index ae8529af5b53e..712ae876f3cc4 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -142,6 +142,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, IndexSettings.INDEX_SEARCH_IDLE_AFTER, + IndexSettings.INDEX_SEARCH_THREAD_POOL, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 9801cc3e26bb1..294fec14ab6f5 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -33,6 +33,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.node.Node; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; import java.util.List; @@ -277,6 +278,26 @@ public final class IndexSettings { return s; }, Property.Dynamic, Property.IndexScope); + /** + * Allows to specify a dedicated threadpool to execute searches. This is an expert setting and should be used with care. + * Indices that for instance contain metadata information that need to be always accessible and should not be rejected + * can be served through a different threadpool. Or searches that have a lower priority can go through a threadpool with a + * single thread to prevent larger impact on other searches with a higher priority. This setting allows for custom threadpools + * or search and generic. Other build-in threadpools are disallowed. + */ + public static final Setting INDEX_SEARCH_THREAD_POOL = + new Setting<>("index.search.threadpool", ThreadPool.Names.SEARCH, s -> { + if (s == null || s.isEmpty()) { + throw new IllegalArgumentException("Value for [index.search.threadpool] must be a non-empty string."); + } + if (ThreadPool.Names.SEARCH.equals(s) == false && ThreadPool.Names.GENERIC.equals(s) == false && + ThreadPool.THREAD_POOL_TYPES.containsKey(s)) { + throw new IllegalArgumentException("Invalid valid for [index.search.threadpool] - " + s + " is a reserved built-in " + + "threadpool"); + } + return s; + }, Property.IndexScope, Property.Dynamic); + private final Index index; private final Version version; private final Logger logger; @@ -319,6 +340,7 @@ public final class IndexSettings { private volatile int maxAnalyzedOffset; private volatile int maxTermsCount; private volatile String defaultPipeline; + private volatile String searchThreadPool; /** * The maximum number of refresh listeners allows on this shard. @@ -402,6 +424,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti this.indexMetaData = indexMetaData; numberOfShards = settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, null); + this.searchThreadPool = INDEX_SEARCH_THREAD_POOL.get(settings); this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings); this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings); this.queryStringAllowLeadingWildcard = QUERY_STRING_ALLOW_LEADING_WILDCARD.get(nodeSettings); @@ -478,6 +501,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength); scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations); + scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THREAD_POOL, this::setSearchThreadPool); } private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; } @@ -879,4 +903,15 @@ private void setSoftDeleteRetentionOperations(long ops) { public long getSoftDeleteRetentionOperations() { return this.softDeleteRetentionOperations; } + + /** + * Returns the thread-pool name to execute search requests on for this index. + */ + public String getSearchThreadPool() { + return searchThreadPool; + } + + private void setSearchThreadPool(String searchThreadPool) { + this.searchThreadPool = searchThreadPool; + } } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 5cb9f81626c94..ddd0ff480ad35 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -30,6 +30,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -106,8 +108,10 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; +import java.util.function.Supplier; import static org.elasticsearch.common.unit.TimeValue.timeValueHours; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -344,7 +348,21 @@ public void onFailure(Exception e) { }); } - SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException { + private void runAsync(long id, Supplier executable, ActionListener listener) { + getExecutor(id).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() { + listener.onResponse(executable.get()); + } + }); + } + + private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException { final SearchContext context = createAndPutContext(request); final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); context.incRef(); @@ -405,59 +423,64 @@ private QueryFetchSearchResult executeFetchPhase(SearchContext context, SearchOp return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } - public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request, SearchTask task) { - final SearchContext context = findContext(request.id(), request); - SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); - context.incRef(); - try { - context.setTask(task); - operationListener.onPreQueryPhase(context); - long time = System.nanoTime(); - contextProcessing(context); - processScroll(request, context); - queryPhase.execute(context); - contextProcessedSuccessfully(context); - operationListener.onQueryPhase(context, System.nanoTime() - time); - return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget()); - } catch (Exception e) { - operationListener.onFailedQueryPhase(context); - logger.trace("Query phase failed", e); - processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); - } finally { - cleanContext(context); - } + public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask task, + ActionListener listener) { + runAsync(request.id(), () -> { + final SearchContext context = findContext(request.id(), request); + SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); + context.incRef(); + try { + context.setTask(task); + operationListener.onPreQueryPhase(context); + long time = System.nanoTime(); + contextProcessing(context); + processScroll(request, context); + queryPhase.execute(context); + contextProcessedSuccessfully(context); + operationListener.onQueryPhase(context, System.nanoTime() - time); + return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget()); + } catch (Exception e) { + operationListener.onFailedQueryPhase(context); + logger.trace("Query phase failed", e); + processFailure(context, e); + throw ExceptionsHelper.convertToRuntime(e); + } finally { + cleanContext(context); + } + }, listener); } - public QuerySearchResult executeQueryPhase(QuerySearchRequest request, SearchTask task) { - final SearchContext context = findContext(request.id(), request); - context.setTask(task); - IndexShard indexShard = context.indexShard(); - SearchOperationListener operationListener = indexShard.getSearchOperationListener(); - context.incRef(); - try { - contextProcessing(context); - context.searcher().setAggregatedDfs(request.dfs()); + public void executeQueryPhase(QuerySearchRequest request, SearchTask task, ActionListener listener) { + runAsync(request.id(), () -> { + final SearchContext context = findContext(request.id(), request); + context.setTask(task); + IndexShard indexShard = context.indexShard(); + SearchOperationListener operationListener = indexShard.getSearchOperationListener(); + context.incRef(); + try { + contextProcessing(context); + context.searcher().setAggregatedDfs(request.dfs()); - operationListener.onPreQueryPhase(context); - long time = System.nanoTime(); - queryPhase.execute(context); - if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { - // no hits, we can release the context since there will be no fetch phase - freeContext(context.id()); - } else { - contextProcessedSuccessfully(context); + operationListener.onPreQueryPhase(context); + long time = System.nanoTime(); + queryPhase.execute(context); + if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { + // no hits, we can release the context since there will be no fetch phase + freeContext(context.id()); + } else { + contextProcessedSuccessfully(context); + } + operationListener.onQueryPhase(context, System.nanoTime() - time); + return context.queryResult(); + } catch (Exception e) { + operationListener.onFailedQueryPhase(context); + logger.trace("Query phase failed", e); + processFailure(context, e); + throw ExceptionsHelper.convertToRuntime(e); + } finally { + cleanContext(context); } - operationListener.onQueryPhase(context, System.nanoTime() - time); - return context.queryResult(); - } catch (Exception e) { - operationListener.onFailedQueryPhase(context); - logger.trace("Query phase failed", e); - processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); - } finally { - cleanContext(context); - } + }, listener); } private boolean fetchPhaseShouldFreeContext(SearchContext context) { @@ -470,66 +493,78 @@ private boolean fetchPhaseShouldFreeContext(SearchContext context) { } } - public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request, SearchTask task) { - final SearchContext context = findContext(request.id(), request); - context.incRef(); - try { - context.setTask(task); - contextProcessing(context); - SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); - processScroll(request, context); - operationListener.onPreQueryPhase(context); - final long time = System.nanoTime(); + final Executor getExecutor(long id) { + SearchContext context = activeContexts.get(id); + if (context == null) { + throw new SearchContextMissingException(id); + } + return threadPool.executor(context.indexShard().indexSettings().getSearchThreadPool()); + } + + public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask task, + ActionListener listener) { + runAsync(request.id(), () -> { + final SearchContext context = findContext(request.id(), request); + context.incRef(); try { - queryPhase.execute(context); + context.setTask(task); + contextProcessing(context); + SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); + processScroll(request, context); + operationListener.onPreQueryPhase(context); + final long time = System.nanoTime(); + try { + queryPhase.execute(context); + } catch (Exception e) { + operationListener.onFailedQueryPhase(context); + throw ExceptionsHelper.convertToRuntime(e); + } + long afterQueryTime = System.nanoTime(); + operationListener.onQueryPhase(context, afterQueryTime - time); + QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, operationListener, afterQueryTime); + return new ScrollQueryFetchSearchResult(fetchSearchResult, + context.shardTarget()); } catch (Exception e) { - operationListener.onFailedQueryPhase(context); + logger.trace("Fetch phase failed", e); + processFailure(context, e); throw ExceptionsHelper.convertToRuntime(e); + } finally { + cleanContext(context); } - long afterQueryTime = System.nanoTime(); - operationListener.onQueryPhase(context, afterQueryTime - time); - QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, operationListener, afterQueryTime); - - return new ScrollQueryFetchSearchResult(fetchSearchResult, - context.shardTarget()); - } catch (Exception e) { - logger.trace("Fetch phase failed", e); - processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); - } finally { - cleanContext(context); - } + }, listener); } - public FetchSearchResult executeFetchPhase(ShardFetchRequest request, SearchTask task) { - final SearchContext context = findContext(request.id(), request); - final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); - context.incRef(); - try { - context.setTask(task); - contextProcessing(context); - if (request.lastEmittedDoc() != null) { - context.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); - } - context.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); - operationListener.onPreFetchPhase(context); - long time = System.nanoTime(); - fetchPhase.execute(context); - if (fetchPhaseShouldFreeContext(context)) { - freeContext(request.id()); - } else { - contextProcessedSuccessfully(context); + public void executeFetchPhase(ShardFetchRequest request, SearchTask task, ActionListener listener) { + runAsync(request.id(), () -> { + final SearchContext context = findContext(request.id(), request); + final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); + context.incRef(); + try { + context.setTask(task); + contextProcessing(context); + if (request.lastEmittedDoc() != null) { + context.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); + } + context.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); + operationListener.onPreFetchPhase(context); + long time = System.nanoTime(); + fetchPhase.execute(context); + if (fetchPhaseShouldFreeContext(context)) { + freeContext(request.id()); + } else { + contextProcessedSuccessfully(context); + } + operationListener.onFetchPhase(context, System.nanoTime() - time); + return context.fetchResult(); + } catch (Exception e) { + operationListener.onFailedFetchPhase(context); + logger.trace("Fetch phase failed", e); + processFailure(context, e); + throw ExceptionsHelper.convertToRuntime(e); + } finally { + cleanContext(context); } - operationListener.onFetchPhase(context, System.nanoTime() - time); - return context.fetchResult(); - } catch (Exception e) { - operationListener.onFailedFetchPhase(context); - logger.trace("Fetch phase failed", e); - processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); - } finally { - cleanContext(context); - } + }, listener); } private SearchContext findContext(long id, TransportRequest request) throws SearchContextMissingException { @@ -968,12 +1003,7 @@ public AliasFilter buildAliasFilter(ClusterState state, String index, String... return indicesService.buildAliasFilter(state, index, expressions); } - /** - * This method does a very quick rewrite of the query and returns true if the query can potentially match any documents. - * This method can have false positives while if it returns false the query won't match any documents on the current - * shard. - */ - public boolean canMatch(ShardSearchRequest request) throws IOException { + boolean canMatch(ShardSearchRequest request) throws IOException { assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType(); try (DefaultSearchContext context = createSearchContext(request, defaultSearchTimeout, false)) { SearchSourceBuilder source = context.request().source(); @@ -985,6 +1015,32 @@ public boolean canMatch(ShardSearchRequest request) throws IOException { } } + /** + * This method does a very quick rewrite of the query and returns true if the query can potentially match any documents. + * This method can have false positives while if it returns false the query won't match any documents on the current + * shard. + */ + public void canMatch(ShardSearchRequest request, ActionListener listener) { + IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); + String searchThreadPool = indexService.getIndexSettings().getSearchThreadPool(); + if (Names.SEARCH.equals(searchThreadPool)) { + // special case - if we use the default threadpool here we rewrite it on the incoming thread to make sure it's not subject to + // rejections. + searchThreadPool = Names.SAME; + } + threadPool.executor(searchThreadPool).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws IOException { + listener.onResponse(new CanMatchResponse(canMatch(request))); + } + }); + } + /** * Returns true iff the given search source builder can be early terminated by rewriting to a match none query. Or in other words * if the execution of a the search request can be early terminated without executing it. This is for instance not possible if @@ -1009,31 +1065,31 @@ public static boolean canRewriteToMatchNone(SearchSourceBuilder source) { * The action listener is guaranteed to be executed on the search thread-pool */ private void rewriteShardRequest(ShardSearchRequest request, ActionListener listener) { + IndexShard shardOrNull = indicesService.getShardOrNull(request.shardId()); + String threadPoolName = shardOrNull == null ? Names.SEARCH : shardOrNull.indexSettings().getSearchThreadPool(); + Executor executor = threadPool.executor(threadPoolName); ActionListener actionListener = ActionListener.wrap(r -> - threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() { + executor.execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { listener.onFailure(e); } @Override - protected void doRun() throws Exception { + protected void doRun() { listener.onResponse(request); } }), listener::onFailure); - IndexShard shardOrNull = indicesService.getShardOrNull(request.shardId()); if (shardOrNull != null) { // now we need to check if there is a pending refresh and register - ActionListener finalListener = actionListener; - actionListener = ActionListener.wrap(r -> - shardOrNull.awaitShardSearchActive(b -> finalListener.onResponse(r)), finalListener::onFailure); + ActionListener finalListener = actionListener; + actionListener = ActionListener.wrap(r -> + shardOrNull.awaitShardSearchActive(b -> finalListener.onResponse(r)), finalListener::onFailure); } // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not // adding a lot of overhead Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), actionListener); - - } /** @@ -1050,4 +1106,32 @@ public IndicesService getIndicesService() { public InternalAggregation.ReduceContext createReduceContext(boolean finalReduce) { return new InternalAggregation.ReduceContext(bigArrays, scriptService, multiBucketConsumerService.create(), finalReduce); } + + public static final class CanMatchResponse extends SearchPhaseResult { + private boolean canMatch; + + public CanMatchResponse() { + } + + public CanMatchResponse(boolean canMatch) { + this.canMatch = canMatch; + } + + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + canMatch = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(canMatch); + } + + public boolean canMatch() { + return canMatch; + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 2a0fa6c7ce134..2798d66160044 100644 --- a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchTransportRequest; @@ -64,8 +65,8 @@ public void testFilterShards() throws InterruptedException { @Override public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task, - ActionListener listener) { - new Thread(() -> listener.onResponse(new CanMatchResponse(request.shardId().id() == 0 ? shard1 : + ActionListener listener) { + new Thread(() -> listener.onResponse(new SearchService.CanMatchResponse(request.shardId().id() == 0 ? shard1 : shard2))).start(); } }; @@ -123,14 +124,14 @@ public void testFilterWithFailure() throws InterruptedException { @Override public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task, - ActionListener listener) { + ActionListener listener) { boolean throwException = request.shardId().id() != 0; if (throwException && randomBoolean()) { throw new IllegalArgumentException("boom"); } else { new Thread(() -> { if (throwException == false) { - listener.onResponse(new CanMatchResponse(shard1)); + listener.onResponse(new SearchService.CanMatchResponse(shard1)); } else { listener.onFailure(new NullPointerException()); } @@ -192,8 +193,8 @@ public void sendCanMatch( Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task, - ActionListener listener) { - listener.onResponse(new CanMatchResponse(randomBoolean())); + ActionListener listener) { + listener.onResponse(new SearchService.CanMatchResponse(randomBoolean())); } }; diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 2562683466a8c..1257770430299 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -35,7 +36,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.MatchNoneQueryBuilder; @@ -44,6 +48,8 @@ import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.SearchOperationListener; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SearchPlugin; @@ -55,6 +61,7 @@ import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchRequest; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.SearchContext; @@ -77,9 +84,12 @@ import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.startsWith; public class SearchServiceTests extends ESSingleNodeTestCase { @@ -97,12 +107,44 @@ public static class CustomScriptPlugin extends MockScriptPlugin { static final String DUMMY_SCRIPT = "dummyScript"; + @Override protected Map, Object>> pluginScripts() { - return Collections.singletonMap(DUMMY_SCRIPT, vars -> { - return "dummy"; + return Collections.singletonMap(DUMMY_SCRIPT, vars -> "dummy"); + } + + @Override + public void onIndexModule(IndexModule indexModule) { + indexModule.addSearchOperationListener(new SearchOperationListener() { + @Override + public void onNewContext(SearchContext context) { + if ("generic_theadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][generic]")); + } else { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + } + } + + @Override + public void onFetchPhase(SearchContext context, long tookInNanos) { + if ("generic_theadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][generic]")); + } else { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + } + } + + @Override + public void onQueryPhase(SearchContext context, long tookInNanos) { + if ("generic_theadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][generic]")); + } else { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + } + } }); } + } @Override @@ -210,15 +252,24 @@ public void onFailure(Exception e) { final int rounds = scaledRandomIntBetween(100, 10000); for (int i = 0; i < rounds; i++) { try { - SearchPhaseResult searchPhaseResult = service.executeQueryPhase( + try { + PlainActionFuture result = new PlainActionFuture<>(); + service.executeQueryPhase( new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT, - new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, - true, null, null), - new SearchTask(123L, "", "", "", null, Collections.emptyMap())); - IntArrayList intCursors = new IntArrayList(1); - intCursors.add(0); - ShardFetchRequest req = new ShardFetchRequest(searchPhaseResult.getRequestId(), intCursors, null /* not a scroll */); - service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, + true, null, null), + new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result); + SearchPhaseResult searchPhaseResult = result.get(); + IntArrayList intCursors = new IntArrayList(1); + intCursors.add(0); + ShardFetchRequest req = new ShardFetchRequest(searchPhaseResult.getRequestId(), intCursors, null/* not a scroll */); + PlainActionFuture listener = new PlainActionFuture<>(); + service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener); + listener.get(); + } catch (ExecutionException ex) { + assertThat(ex.getCause(), instanceOf(RuntimeException.class)); + throw ((RuntimeException)ex.getCause()); + } } catch (AlreadyClosedException ex) { throw ex; } catch (IllegalStateException ex) { @@ -467,4 +518,27 @@ public void testCanRewriteToMatchNone() { .suggest(new SuggestBuilder()))); } + + public void testSetSearchThreadPool() { + createIndex("generic_theadpool_index"); + client().admin().indices().prepareUpdateSettings("generic_theadpool_index").setSettings(Settings.builder().put(IndexSettings + .INDEX_SEARCH_THREAD_POOL.getKey(), "generic")).get(); + final SearchService service = getInstanceFromNode(SearchService.class); + Index index = resolveIndex("generic_theadpool_index"); + assertEquals("generic", service.getIndicesService().indexServiceSafe(index).getIndexSettings().getSearchThreadPool()); + client().prepareIndex("generic_theadpool_index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + SearchResponse searchResponse = client().prepareSearch("generic_theadpool_index").setSize(1).get(); + assertSearchHits(searchResponse, "1"); + ShardSearchLocalRequest req = new ShardSearchLocalRequest(new ShardId(index, 0), 1, SearchType.QUERY_THEN_FETCH, null, + Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, false, null, null); + service.canMatch(req, ActionListener.wrap(r -> assertThat(Thread.currentThread().getName(), + startsWith("elasticsearch[node_s_0][generic]")), e -> fail("unexpected"))); + // we add a search action listener in a plugin above to assert that this is actually used + client().admin().indices().prepareUpdateSettings("generic_theadpool_index").setSettings(Settings.builder().put(IndexSettings + .INDEX_SEARCH_THREAD_POOL.getKey(), "search")).get(); + assertEquals("search", service.getIndicesService().indexServiceSafe(index).getIndexSettings().getSearchThreadPool()); + + Thread currentThread = Thread.currentThread(); + service.canMatch(req, ActionListener.wrap(r -> assertSame(Thread.currentThread(), currentThread), e -> fail("unexpected"))); + } } From 352e1b61d3fab35506ccd4a7739c9f7a03449287 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 18 Sep 2018 14:49:30 +0200 Subject: [PATCH 2/9] cut over to a boolean setting --- .../common/settings/IndexScopedSettings.java | 2 +- .../elasticsearch/index/IndexSettings.java | 40 +++++---------- .../elasticsearch/search/SearchService.java | 51 ++++++++++--------- .../elasticsearch/threadpool/ThreadPool.java | 4 ++ .../search/SearchServiceTests.java | 35 +++++++------ 5 files changed, 61 insertions(+), 71 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 712ae876f3cc4..4819f4dafadd3 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -142,7 +142,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, IndexSettings.INDEX_SEARCH_IDLE_AFTER, - IndexSettings.INDEX_SEARCH_THREAD_POOL, + IndexSettings.INDEX_SEARCH_SEQUENTIAL, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 294fec14ab6f5..20817fd33861a 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -33,7 +33,6 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.node.Node; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; import java.util.List; @@ -279,24 +278,10 @@ public final class IndexSettings { }, Property.Dynamic, Property.IndexScope); /** - * Allows to specify a dedicated threadpool to execute searches. This is an expert setting and should be used with care. - * Indices that for instance contain metadata information that need to be always accessible and should not be rejected - * can be served through a different threadpool. Or searches that have a lower priority can go through a threadpool with a - * single thread to prevent larger impact on other searches with a higher priority. This setting allows for custom threadpools - * or search and generic. Other build-in threadpools are disallowed. - */ - public static final Setting INDEX_SEARCH_THREAD_POOL = - new Setting<>("index.search.threadpool", ThreadPool.Names.SEARCH, s -> { - if (s == null || s.isEmpty()) { - throw new IllegalArgumentException("Value for [index.search.threadpool] must be a non-empty string."); - } - if (ThreadPool.Names.SEARCH.equals(s) == false && ThreadPool.Names.GENERIC.equals(s) == false && - ThreadPool.THREAD_POOL_TYPES.containsKey(s)) { - throw new IllegalArgumentException("Invalid valid for [index.search.threadpool] - " + s + " is a reserved built-in " + - "threadpool"); - } - return s; - }, Property.IndexScope, Property.Dynamic); + * Marks an index to be searched sequentially. This means that never more than one shard of such an index will be searched concurrently + */ + public static final Setting INDEX_SEARCH_SEQUENTIAL = Setting.boolSetting("index.search.sequential", false, Property + .IndexScope, Property.Dynamic); private final Index index; private final Version version; @@ -340,7 +325,7 @@ public final class IndexSettings { private volatile int maxAnalyzedOffset; private volatile int maxTermsCount; private volatile String defaultPipeline; - private volatile String searchThreadPool; + private volatile boolean searchSequential; /** * The maximum number of refresh listeners allows on this shard. @@ -424,7 +409,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti this.indexMetaData = indexMetaData; numberOfShards = settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, null); - this.searchThreadPool = INDEX_SEARCH_THREAD_POOL.get(settings); + this.searchSequential = INDEX_SEARCH_SEQUENTIAL.get(settings); this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings); this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings); this.queryStringAllowLeadingWildcard = QUERY_STRING_ALLOW_LEADING_WILDCARD.get(nodeSettings); @@ -501,7 +486,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength); scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations); - scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THREAD_POOL, this::setSearchThreadPool); + scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_SEQUENTIAL, this::setSearchSequential); } private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; } @@ -905,13 +890,14 @@ public long getSoftDeleteRetentionOperations() { } /** - * Returns the thread-pool name to execute search requests on for this index. + * Returns true if the this index should be searched sequentially ie. using the + * {@link org.elasticsearch.threadpool.ThreadPool.Names#SEARCH_SEQUENTIAL} threadpool */ - public String getSearchThreadPool() { - return searchThreadPool; + public boolean getSearchSequential() { + return searchSequential; } - private void setSearchThreadPool(String searchThreadPool) { - this.searchThreadPool = searchThreadPool; + private void setSearchSequential(boolean searchSequential) { + this.searchSequential = searchSequential; } } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index ddd0ff480ad35..bd2333bdd2b92 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -498,7 +498,12 @@ final Executor getExecutor(long id) { if (context == null) { throw new SearchContextMissingException(id); } - return threadPool.executor(context.indexShard().indexSettings().getSearchThreadPool()); + return getExecutor(context.indexShard()); + } + + private Executor getExecutor(IndexShard indexShard) { + assert indexShard != null; + return threadPool.executor(indexShard.indexSettings().getSearchSequential() ? Names.SEARCH_SEQUENTIAL : Names.SEARCH); } public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask task, @@ -1022,11 +1027,11 @@ boolean canMatch(ShardSearchRequest request) throws IOException { */ public void canMatch(ShardSearchRequest request, ActionListener listener) { IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); - String searchThreadPool = indexService.getIndexSettings().getSearchThreadPool(); - if (Names.SEARCH.equals(searchThreadPool)) { - // special case - if we use the default threadpool here we rewrite it on the incoming thread to make sure it's not subject to - // rejections. - searchThreadPool = Names.SAME; + String searchThreadPool = Names.SAME; + if (indexService.getIndexSettings().getSearchSequential()) { + // special case - if we are marked as search sequential we fork off to the seq threadpool to guarantee we never concurrently + // search such an index on the same node. + searchThreadPool = Names.SEARCH_SEQUENTIAL; } threadPool.executor(searchThreadPool).execute(new AbstractRunnable() { @Override @@ -1065,27 +1070,23 @@ public static boolean canRewriteToMatchNone(SearchSourceBuilder source) { * The action listener is guaranteed to be executed on the search thread-pool */ private void rewriteShardRequest(ShardSearchRequest request, ActionListener listener) { - IndexShard shardOrNull = indicesService.getShardOrNull(request.shardId()); - String threadPoolName = shardOrNull == null ? Names.SEARCH : shardOrNull.indexSettings().getSearchThreadPool(); - Executor executor = threadPool.executor(threadPoolName); + IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); + Executor executor = getExecutor(shard); ActionListener actionListener = ActionListener.wrap(r -> - executor.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - - @Override - protected void doRun() { - listener.onResponse(request); - } - }), listener::onFailure); - if (shardOrNull != null) { // now we need to check if there is a pending refresh and register - ActionListener finalListener = actionListener; - actionListener = ActionListener.wrap(r -> - shardOrNull.awaitShardSearchActive(b -> finalListener.onResponse(r)), finalListener::onFailure); - } + shard.awaitShardSearchActive(b -> + executor.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() { + listener.onResponse(request); + } + }) + ), listener::onFailure); // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not // adding a lot of overhead diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 2d3be2435b401..69501cef07ef8 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -71,6 +71,7 @@ public static class Names { public static final String ANALYZE = "analyze"; public static final String WRITE = "write"; public static final String SEARCH = "search"; + public static final String SEARCH_SEQUENTIAL = "search_sequential"; public static final String MANAGEMENT = "management"; public static final String FLUSH = "flush"; public static final String REFRESH = "refresh"; @@ -135,6 +136,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED); map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING); map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING); + map.put(Names.SEARCH_SEQUENTIAL, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -175,6 +177,8 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16)); builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000)); + builders.put(Names.SEARCH_SEQUENTIAL, new AutoQueueAdjustingExecutorBuilder(settings, + Names.SEARCH_SEQUENTIAL, 1, 100, 100, 100, 200)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 1257770430299..0e31b71c0fa9f 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -118,8 +118,8 @@ public void onIndexModule(IndexModule indexModule) { indexModule.addSearchOperationListener(new SearchOperationListener() { @Override public void onNewContext(SearchContext context) { - if ("generic_theadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { - assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][generic]")); + if ("sequential_theadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_sequential]")); } else { assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); } @@ -127,8 +127,8 @@ public void onNewContext(SearchContext context) { @Override public void onFetchPhase(SearchContext context, long tookInNanos) { - if ("generic_theadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { - assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][generic]")); + if ("sequential_theadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_sequential]")); } else { assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); } @@ -136,8 +136,8 @@ public void onFetchPhase(SearchContext context, long tookInNanos) { @Override public void onQueryPhase(SearchContext context, long tookInNanos) { - if ("generic_theadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { - assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][generic]")); + if ("sequential_theadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_sequential]")); } else { assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); } @@ -520,24 +520,23 @@ public void testCanRewriteToMatchNone() { } public void testSetSearchThreadPool() { - createIndex("generic_theadpool_index"); - client().admin().indices().prepareUpdateSettings("generic_theadpool_index").setSettings(Settings.builder().put(IndexSettings - .INDEX_SEARCH_THREAD_POOL.getKey(), "generic")).get(); + createIndex("sequential_theadpool_index"); + client().admin().indices().prepareUpdateSettings("sequential_theadpool_index").setSettings(Settings.builder().put(IndexSettings + .INDEX_SEARCH_SEQUENTIAL.getKey(), true)).get(); final SearchService service = getInstanceFromNode(SearchService.class); - Index index = resolveIndex("generic_theadpool_index"); - assertEquals("generic", service.getIndicesService().indexServiceSafe(index).getIndexSettings().getSearchThreadPool()); - client().prepareIndex("generic_theadpool_index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); - SearchResponse searchResponse = client().prepareSearch("generic_theadpool_index").setSize(1).get(); + Index index = resolveIndex("sequential_theadpool_index"); + assertTrue(service.getIndicesService().indexServiceSafe(index).getIndexSettings().getSearchSequential()); + client().prepareIndex("sequential_theadpool_index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + SearchResponse searchResponse = client().prepareSearch("sequential_theadpool_index").setSize(1).get(); assertSearchHits(searchResponse, "1"); ShardSearchLocalRequest req = new ShardSearchLocalRequest(new ShardId(index, 0), 1, SearchType.QUERY_THEN_FETCH, null, Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, false, null, null); service.canMatch(req, ActionListener.wrap(r -> assertThat(Thread.currentThread().getName(), - startsWith("elasticsearch[node_s_0][generic]")), e -> fail("unexpected"))); + startsWith("elasticsearch[node_s_0][search_sequential]")), e -> fail("unexpected"))); // we add a search action listener in a plugin above to assert that this is actually used - client().admin().indices().prepareUpdateSettings("generic_theadpool_index").setSettings(Settings.builder().put(IndexSettings - .INDEX_SEARCH_THREAD_POOL.getKey(), "search")).get(); - assertEquals("search", service.getIndicesService().indexServiceSafe(index).getIndexSettings().getSearchThreadPool()); - + client().admin().indices().prepareUpdateSettings("sequential_theadpool_index").setSettings(Settings.builder().put(IndexSettings + .INDEX_SEARCH_SEQUENTIAL.getKey(), false)).get(); + assertFalse(service.getIndicesService().indexServiceSafe(index).getIndexSettings().getSearchSequential()); Thread currentThread = Thread.currentThread(); service.canMatch(req, ActionListener.wrap(r -> assertSame(Thread.currentThread(), currentThread), e -> fail("unexpected"))); } From 0b974f822ea2faaa026a5638f3f7c29f0200e4bd Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 18 Sep 2018 14:56:36 +0200 Subject: [PATCH 3/9] cut over to PrivateIndex settings --- .../elasticsearch/index/IndexSettings.java | 4 ++-- .../InternalOrPrivateSettingsPlugin.java | 6 ++--- .../search/SearchServiceTests.java | 23 +++++++++++++++---- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 20817fd33861a..a486a593a6d8a 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -280,8 +280,8 @@ public final class IndexSettings { /** * Marks an index to be searched sequentially. This means that never more than one shard of such an index will be searched concurrently */ - public static final Setting INDEX_SEARCH_SEQUENTIAL = Setting.boolSetting("index.search.sequential", false, Property - .IndexScope, Property.Dynamic); + public static final Setting INDEX_SEARCH_SEQUENTIAL = Setting.boolSetting("index.search.sequential", false, + Property.IndexScope, Property.PrivateIndex, Property.Dynamic); private final Index index; private final Version version; diff --git a/server/src/test/java/org/elasticsearch/indices/settings/InternalOrPrivateSettingsPlugin.java b/server/src/test/java/org/elasticsearch/indices/settings/InternalOrPrivateSettingsPlugin.java index 8792232b381db..e2592ec41ccac 100644 --- a/server/src/test/java/org/elasticsearch/indices/settings/InternalOrPrivateSettingsPlugin.java +++ b/server/src/test/java/org/elasticsearch/indices/settings/InternalOrPrivateSettingsPlugin.java @@ -64,14 +64,14 @@ public List> getSettings() { public static class UpdateInternalOrPrivateAction extends Action { - static final UpdateInternalOrPrivateAction INSTANCE = new UpdateInternalOrPrivateAction(); + public static final UpdateInternalOrPrivateAction INSTANCE = new UpdateInternalOrPrivateAction(); private static final String NAME = "indices:admin/settings/update-internal-or-private-index"; public UpdateInternalOrPrivateAction() { super(NAME); } - static class Request extends MasterNodeRequest { + public static class Request extends MasterNodeRequest { private String index; private String key; @@ -81,7 +81,7 @@ static class Request extends MasterNodeRequest { } - Request(final String index, final String key, final String value) { + public Request(final String index, final String key, final String value) { this.index = index; this.key = key; this.value = value; diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 0e31b71c0fa9f..56563a71e0b09 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -51,6 +51,7 @@ import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.settings.InternalOrPrivateSettingsPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.script.MockScriptEngine; @@ -100,7 +101,7 @@ protected boolean resetNodeAfterTest() { @Override protected Collection> getPlugins() { - return pluginList(FailOnRewriteQueryPlugin.class, CustomScriptPlugin.class); + return pluginList(FailOnRewriteQueryPlugin.class, CustomScriptPlugin.class, InternalOrPrivateSettingsPlugin.class); } public static class CustomScriptPlugin extends MockScriptPlugin { @@ -521,8 +522,11 @@ public void testCanRewriteToMatchNone() { public void testSetSearchThreadPool() { createIndex("sequential_theadpool_index"); - client().admin().indices().prepareUpdateSettings("sequential_theadpool_index").setSettings(Settings.builder().put(IndexSettings - .INDEX_SEARCH_SEQUENTIAL.getKey(), true)).get(); + client().execute( + InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.INSTANCE, + new InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.Request("sequential_theadpool_index", + IndexSettings.INDEX_SEARCH_SEQUENTIAL.getKey(), "true")) + .actionGet(); final SearchService service = getInstanceFromNode(SearchService.class); Index index = resolveIndex("sequential_theadpool_index"); assertTrue(service.getIndicesService().indexServiceSafe(index).getIndexSettings().getSearchSequential()); @@ -534,8 +538,17 @@ public void testSetSearchThreadPool() { service.canMatch(req, ActionListener.wrap(r -> assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_sequential]")), e -> fail("unexpected"))); // we add a search action listener in a plugin above to assert that this is actually used - client().admin().indices().prepareUpdateSettings("sequential_theadpool_index").setSettings(Settings.builder().put(IndexSettings - .INDEX_SEARCH_SEQUENTIAL.getKey(), false)).get(); + client().execute( + InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.INSTANCE, + new InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.Request("sequential_theadpool_index", + IndexSettings.INDEX_SEARCH_SEQUENTIAL.getKey(), "false")) + .actionGet(); + + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> + client().admin().indices().prepareUpdateSettings("sequential_theadpool_index").setSettings(Settings.builder().put(IndexSettings + .INDEX_SEARCH_SEQUENTIAL.getKey(), false)).get()); + assertEquals("can not update private setting [index.search.sequential]; this setting is managed by Elasticsearch", + iae.getMessage()); assertFalse(service.getIndicesService().indexServiceSafe(index).getIndexSettings().getSearchSequential()); Thread currentThread = Thread.currentThread(); service.canMatch(req, ActionListener.wrap(r -> assertSame(Thread.currentThread(), currentThread), e -> fail("unexpected"))); From a166ed293ff427e7b074c844e7e09844cb15844f Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 18 Sep 2018 15:52:59 +0200 Subject: [PATCH 4/9] prepare validate action --- .../query/TransportValidateQueryAction.java | 4 +- .../action/search/SearchTransportService.java | 2 +- .../broadcast/TransportBroadcastAction.java | 40 ++++++++++++++++++- 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java index 3601947950335..2b3c8a7bbcc33 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java @@ -200,7 +200,7 @@ protected ShardValidateQueryResponse shardOperation(ShardValidateQueryRequest re } catch (QueryShardException|ParsingException e) { valid = false; error = e.getDetailedMessage(); - } catch (AssertionError|IOException e) { + } catch (AssertionError e) { valid = false; error = e.getMessage(); } finally { @@ -210,7 +210,7 @@ protected ShardValidateQueryResponse shardOperation(ShardValidateQueryRequest re return new ShardValidateQueryResponse(request.shardId(), valid, explanation, error); } - private String explain(SearchContext context, boolean rewritten) throws IOException { + private String explain(SearchContext context, boolean rewritten) { Query query = context.query(); if (rewritten && query instanceof MatchNoDocsQuery) { return context.parsedQuery().query().toString(); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index a94ec214d0ea4..cf92313f7d8fb 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -393,7 +393,7 @@ public void onFailure(Exception e) { (Supplier) SearchService.CanMatchResponse::new); } - private static class ChannelActionListener implements ActionListener{ + public static class ChannelActionListener implements ActionListener{ private final TransportChannel channel; diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java index 3045f6ea43aa1..cea898d4a2165 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; @@ -57,6 +58,7 @@ public abstract class TransportBroadcastAction { @Override public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception { - channel.sendResponse(shardOperation(request, task)); + asyncShardOperation(request, task, new ActionListener() { + @Override + public void onResponse(ShardResponse response) { + try { + channel.sendResponse(response); + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (Exception e1) { + logger.warn(() -> new ParameterizedMessage( + "Failed to send error response for action [{}] and request [{}]", actionName, request), e1); + } + } + }); } } + + protected void asyncShardOperation(ShardRequest request, Task task, ActionListener listener) { + transportService.getThreadPool().executor(shardExecutor).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + listener.onResponse(shardOperation(request, task)); + } + }); + } } From 3f2fa47596b2465895358829acbfb5098be909ef Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 18 Sep 2018 20:16:19 +0200 Subject: [PATCH 5/9] Ensure all access to getSearcher executes on a search_sequential thread if marked as such --- .../explain/TransportExplainAction.java | 7 +++ .../action/get/TransportGetAction.java | 7 +++ .../get/TransportShardMultiGetAction.java | 7 +++ .../broadcast/TransportBroadcastAction.java | 7 ++- .../shard/TransportSingleShardAction.java | 52 ++++--------------- .../TransportShardMultiTermsVectorAction.java | 7 +++ .../TransportTermVectorsAction.java | 7 +++ 7 files changed, 52 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java b/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java index 5ea178f595acf..fcdc780b5be42 100644 --- a/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java +++ b/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java @@ -152,4 +152,11 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) { clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), request.request().preference() ); } + + @Override + protected String getExecutor(ExplainRequest request, ShardId shardId) { + IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex()); + return indexService.getIndexSettings().getSearchSequential() ? ThreadPool.Names.SEARCH_SEQUENTIAL : super.getExecutor(request, + shardId); + } } diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 63d3d30e1e27f..f9975e7b579d3 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -111,4 +111,11 @@ protected GetResponse shardOperation(GetRequest request, ShardId shardId) { protected GetResponse newResponse() { return new GetResponse(); } + + @Override + protected String getExecutor(GetRequest request, ShardId shardId) { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + return indexService.getIndexSettings().getSearchSequential() ? ThreadPool.Names.SEARCH_SEQUENTIAL : super.getExecutor(request, + shardId); + } } diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index e0a6cd827863a..24838d165e6d0 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -102,4 +102,11 @@ protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, Sha return response; } + + @Override + protected String getExecutor(MultiGetShardRequest request, ShardId shardId) { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + return indexService.getIndexSettings().getSearchSequential() ? ThreadPool.Names.SEARCH_SEQUENTIAL : super.getExecutor(request, + shardId); + } } diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java index cea898d4a2165..22c4a70b0ea55 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java @@ -303,7 +303,7 @@ public void onFailure(Exception e) { } protected void asyncShardOperation(ShardRequest request, Task task, ActionListener listener) { - transportService.getThreadPool().executor(shardExecutor).execute(new AbstractRunnable() { + transportService.getThreadPool().executor(getExecutor(request)).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { listener.onFailure(e); @@ -315,4 +315,9 @@ protected void doRun() throws Exception { } }); } + + protected String getExecutor(ShardRequest request) { + return shardExecutor; + } + } diff --git a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index 7a83b0c455da4..83676d8972b18 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.ClusterState; @@ -66,8 +67,8 @@ public abstract class TransportSingleShardAction li protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException; protected void asyncShardOperation(Request request, ShardId shardId, ActionListener listener) throws IOException { - threadPool.executor(this.executor).execute(new AbstractRunnable() { + threadPool.executor(getExecutor(request, shardId)).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { listener.onFailure(e); @@ -274,25 +275,7 @@ private class TransportHandler implements TransportRequestHandler { @Override public void messageReceived(Request request, final TransportChannel channel, Task task) throws Exception { // if we have a local operation, execute it on a thread since we don't spawn - execute(request, new ActionListener() { - @Override - public void onResponse(Response result) { - try { - channel.sendResponse(result); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn("failed to send response for get", e1); - } - } - }); + execute(request, new HandledTransportAction.ChannelActionListener<>(channel, actionName, request)); } } @@ -303,25 +286,8 @@ public void messageReceived(final Request request, final TransportChannel channe if (logger.isTraceEnabled()) { logger.trace("executing [{}] on shard [{}]", request, request.internalShardId); } - asyncShardOperation(request, request.internalShardId, new ActionListener() { - @Override - public void onResponse(Response response) { - try { - channel.sendResponse(response); - } catch (IOException e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (IOException e1) { - throw new UncheckedIOException(e1); - } - } - }); + asyncShardOperation(request, request.internalShardId, new HandledTransportAction.ChannelActionListener<>(channel, + transportShardAction, request)); } } /** @@ -344,4 +310,8 @@ public String concreteIndex() { return concreteIndex; } } + + protected String getExecutor(Request request, ShardId shardId) { + return executor; + } } diff --git a/server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java b/server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java index f1641fdd25c98..17cae6b6db02b 100644 --- a/server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java +++ b/server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java @@ -96,4 +96,11 @@ protected MultiTermVectorsShardResponse shardOperation(MultiTermVectorsShardRequ return response; } + + @Override + protected String getExecutor(MultiTermVectorsShardRequest request, ShardId shardId) { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + return indexService.getIndexSettings().getSearchSequential() ? ThreadPool.Names.SEARCH_SEQUENTIAL : super.getExecutor(request, + shardId); + } } diff --git a/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java b/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java index d2a6055bbe75a..b2bfe1d76b968 100644 --- a/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java +++ b/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java @@ -113,4 +113,11 @@ protected TermVectorsResponse shardOperation(TermVectorsRequest request, ShardId protected TermVectorsResponse newResponse() { return new TermVectorsResponse(); } + + @Override + protected String getExecutor(TermVectorsRequest request, ShardId shardId) { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + return indexService.getIndexSettings().getSearchSequential() ? ThreadPool.Names.SEARCH_SEQUENTIAL : super.getExecutor(request, + shardId); + } } From e447a2bfa29641d582fcfa7201cac6ef47cabfc6 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 18 Sep 2018 20:30:41 +0200 Subject: [PATCH 6/9] Fix imports --- .../action/support/single/shard/TransportSingleShardAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index 83676d8972b18..436089ab3be73 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -50,7 +50,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.function.Supplier; import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException; From 98a2aca219b362bee05dc0524229fd08653ff5c7 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 19 Sep 2018 10:15:50 +0200 Subject: [PATCH 7/9] Rename search_sequentical to search_throttled and don't fork in canMatch --- .../explain/TransportExplainAction.java | 2 +- .../action/get/TransportGetAction.java | 2 +- .../get/TransportShardMultiGetAction.java | 2 +- .../TransportShardMultiTermsVectorAction.java | 2 +- .../TransportTermVectorsAction.java | 2 +- .../common/settings/IndexScopedSettings.java | 2 +- .../elasticsearch/index/IndexSettings.java | 22 +++++------ .../elasticsearch/search/SearchService.java | 36 ++++++----------- .../elasticsearch/threadpool/ThreadPool.java | 8 ++-- .../search/SearchServiceTests.java | 39 +++++++++---------- 10 files changed, 52 insertions(+), 65 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java b/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java index fcdc780b5be42..c1695e13864cb 100644 --- a/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java +++ b/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java @@ -156,7 +156,7 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) { @Override protected String getExecutor(ExplainRequest request, ShardId shardId) { IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex()); - return indexService.getIndexSettings().getSearchSequential() ? ThreadPool.Names.SEARCH_SEQUENTIAL : super.getExecutor(request, + return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request, shardId); } } diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index f9975e7b579d3..69753bdd9795e 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -115,7 +115,7 @@ protected GetResponse newResponse() { @Override protected String getExecutor(GetRequest request, ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - return indexService.getIndexSettings().getSearchSequential() ? ThreadPool.Names.SEARCH_SEQUENTIAL : super.getExecutor(request, + return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request, shardId); } } diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 24838d165e6d0..7a7c02ad476e7 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -106,7 +106,7 @@ protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, Sha @Override protected String getExecutor(MultiGetShardRequest request, ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - return indexService.getIndexSettings().getSearchSequential() ? ThreadPool.Names.SEARCH_SEQUENTIAL : super.getExecutor(request, + return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request, shardId); } } diff --git a/server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java b/server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java index 17cae6b6db02b..10fd954354ba8 100644 --- a/server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java +++ b/server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java @@ -100,7 +100,7 @@ protected MultiTermVectorsShardResponse shardOperation(MultiTermVectorsShardRequ @Override protected String getExecutor(MultiTermVectorsShardRequest request, ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - return indexService.getIndexSettings().getSearchSequential() ? ThreadPool.Names.SEARCH_SEQUENTIAL : super.getExecutor(request, + return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request, shardId); } } diff --git a/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java b/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java index b2bfe1d76b968..e0babdb6c4359 100644 --- a/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java +++ b/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java @@ -117,7 +117,7 @@ protected TermVectorsResponse newResponse() { @Override protected String getExecutor(TermVectorsRequest request, ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - return indexService.getIndexSettings().getSearchSequential() ? ThreadPool.Names.SEARCH_SEQUENTIAL : super.getExecutor(request, + return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request, shardId); } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 4819f4dafadd3..c4e0257c95395 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -142,7 +142,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, IndexSettings.INDEX_SEARCH_IDLE_AFTER, - IndexSettings.INDEX_SEARCH_SEQUENTIAL, + IndexSettings.INDEX_SEARCH_THROTTLED, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index a486a593a6d8a..3a55eb8961172 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -278,9 +278,9 @@ public final class IndexSettings { }, Property.Dynamic, Property.IndexScope); /** - * Marks an index to be searched sequentially. This means that never more than one shard of such an index will be searched concurrently + * Marks an index to be searched throttled. This means that never more than one shard of such an index will be searched concurrently */ - public static final Setting INDEX_SEARCH_SEQUENTIAL = Setting.boolSetting("index.search.sequential", false, + public static final Setting INDEX_SEARCH_THROTTLED = Setting.boolSetting("index.search.throttled", false, Property.IndexScope, Property.PrivateIndex, Property.Dynamic); private final Index index; @@ -325,7 +325,7 @@ public final class IndexSettings { private volatile int maxAnalyzedOffset; private volatile int maxTermsCount; private volatile String defaultPipeline; - private volatile boolean searchSequential; + private volatile boolean searchThrottled; /** * The maximum number of refresh listeners allows on this shard. @@ -409,7 +409,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti this.indexMetaData = indexMetaData; numberOfShards = settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, null); - this.searchSequential = INDEX_SEARCH_SEQUENTIAL.get(settings); + this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings); this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings); this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings); this.queryStringAllowLeadingWildcard = QUERY_STRING_ALLOW_LEADING_WILDCARD.get(nodeSettings); @@ -486,7 +486,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength); scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations); - scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_SEQUENTIAL, this::setSearchSequential); + scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled); } private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; } @@ -890,14 +890,14 @@ public long getSoftDeleteRetentionOperations() { } /** - * Returns true if the this index should be searched sequentially ie. using the - * {@link org.elasticsearch.threadpool.ThreadPool.Names#SEARCH_SEQUENTIAL} threadpool + * Returns true if the this index should be searched throttled ie. using the + * {@link org.elasticsearch.threadpool.ThreadPool.Names#SEARCH_THROTTLED} thread-pool */ - public boolean getSearchSequential() { - return searchSequential; + public boolean isSearchThrottled() { + return searchThrottled; } - private void setSearchSequential(boolean searchSequential) { - this.searchSequential = searchSequential; + private void setSearchThrottled(boolean searchThrottled) { + this.searchThrottled = searchThrottled; } } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index bd2333bdd2b92..1f408228c0b26 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -503,7 +503,7 @@ final Executor getExecutor(long id) { private Executor getExecutor(IndexShard indexShard) { assert indexShard != null; - return threadPool.executor(indexShard.indexSettings().getSearchSequential() ? Names.SEARCH_SEQUENTIAL : Names.SEARCH); + return threadPool.executor(indexShard.indexSettings().isSearchThrottled() ? Names.SEARCH_THROTTLED : Names.SEARCH); } public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask task, @@ -1008,7 +1008,12 @@ public AliasFilter buildAliasFilter(ClusterState state, String index, String... return indicesService.buildAliasFilter(state, index, expressions); } - boolean canMatch(ShardSearchRequest request) throws IOException { + /** + * This method does a very quick rewrite of the query and returns true if the query can potentially match any documents. + * This method can have false positives while if it returns false the query won't match any documents on the current + * shard. + */ + public boolean canMatch(ShardSearchRequest request) throws IOException { assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType(); try (DefaultSearchContext context = createSearchContext(request, defaultSearchTimeout, false)) { SearchSourceBuilder source = context.request().source(); @@ -1020,30 +1025,13 @@ boolean canMatch(ShardSearchRequest request) throws IOException { } } - /** - * This method does a very quick rewrite of the query and returns true if the query can potentially match any documents. - * This method can have false positives while if it returns false the query won't match any documents on the current - * shard. - */ + public void canMatch(ShardSearchRequest request, ActionListener listener) { - IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); - String searchThreadPool = Names.SAME; - if (indexService.getIndexSettings().getSearchSequential()) { - // special case - if we are marked as search sequential we fork off to the seq threadpool to guarantee we never concurrently - // search such an index on the same node. - searchThreadPool = Names.SEARCH_SEQUENTIAL; + try { + listener.onResponse(new CanMatchResponse(canMatch(request))); + } catch (IOException e) { + listener.onFailure(e); } - threadPool.executor(searchThreadPool).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - - @Override - protected void doRun() throws IOException { - listener.onResponse(new CanMatchResponse(canMatch(request))); - } - }); } /** diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 69501cef07ef8..ecf311bc4b91d 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -71,7 +71,7 @@ public static class Names { public static final String ANALYZE = "analyze"; public static final String WRITE = "write"; public static final String SEARCH = "search"; - public static final String SEARCH_SEQUENTIAL = "search_sequential"; + public static final String SEARCH_THROTTLED = "search_throttled"; public static final String MANAGEMENT = "management"; public static final String FLUSH = "flush"; public static final String REFRESH = "refresh"; @@ -136,7 +136,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED); map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING); map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING); - map.put(Names.SEARCH_SEQUENTIAL, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE); + map.put(Names.SEARCH_THROTTLED, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -177,8 +177,8 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16)); builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000)); - builders.put(Names.SEARCH_SEQUENTIAL, new AutoQueueAdjustingExecutorBuilder(settings, - Names.SEARCH_SEQUENTIAL, 1, 100, 100, 100, 200)); + builders.put(Names.SEARCH_THROTTLED, new AutoQueueAdjustingExecutorBuilder(settings, + Names.SEARCH_THROTTLED, 1, 100, 100, 100, 200)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 56563a71e0b09..9c7734ca96464 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -119,7 +119,7 @@ public void onIndexModule(IndexModule indexModule) { indexModule.addSearchOperationListener(new SearchOperationListener() { @Override public void onNewContext(SearchContext context) { - if ("sequential_theadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { + if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_sequential]")); } else { assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); @@ -128,7 +128,7 @@ public void onNewContext(SearchContext context) { @Override public void onFetchPhase(SearchContext context, long tookInNanos) { - if ("sequential_theadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { + if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_sequential]")); } else { assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); @@ -137,7 +137,7 @@ public void onFetchPhase(SearchContext context, long tookInNanos) { @Override public void onQueryPhase(SearchContext context, long tookInNanos) { - if ("sequential_theadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { + if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_sequential]")); } else { assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); @@ -520,37 +520,36 @@ public void testCanRewriteToMatchNone() { } - public void testSetSearchThreadPool() { - createIndex("sequential_theadpool_index"); + public void testSetSearchThrottled() { + createIndex("throttled_threadpool_index"); client().execute( InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.INSTANCE, - new InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.Request("sequential_theadpool_index", - IndexSettings.INDEX_SEARCH_SEQUENTIAL.getKey(), "true")) + new InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.Request("throttled_threadpool_index", + IndexSettings.INDEX_SEARCH_THROTTLED.getKey(), "true")) .actionGet(); final SearchService service = getInstanceFromNode(SearchService.class); - Index index = resolveIndex("sequential_theadpool_index"); - assertTrue(service.getIndicesService().indexServiceSafe(index).getIndexSettings().getSearchSequential()); - client().prepareIndex("sequential_theadpool_index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); - SearchResponse searchResponse = client().prepareSearch("sequential_theadpool_index").setSize(1).get(); + Index index = resolveIndex("throttled_threadpool_index"); + assertTrue(service.getIndicesService().indexServiceSafe(index).getIndexSettings().isSearchThrottled()); + client().prepareIndex("throttled_threadpool_index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + SearchResponse searchResponse = client().prepareSearch("throttled_threadpool_index").setSize(1).get(); assertSearchHits(searchResponse, "1"); - ShardSearchLocalRequest req = new ShardSearchLocalRequest(new ShardId(index, 0), 1, SearchType.QUERY_THEN_FETCH, null, - Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, false, null, null); - service.canMatch(req, ActionListener.wrap(r -> assertThat(Thread.currentThread().getName(), - startsWith("elasticsearch[node_s_0][search_sequential]")), e -> fail("unexpected"))); // we add a search action listener in a plugin above to assert that this is actually used client().execute( InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.INSTANCE, - new InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.Request("sequential_theadpool_index", - IndexSettings.INDEX_SEARCH_SEQUENTIAL.getKey(), "false")) + new InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.Request("throttled_threadpool_index", + IndexSettings.INDEX_SEARCH_THROTTLED.getKey(), "false")) .actionGet(); IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> - client().admin().indices().prepareUpdateSettings("sequential_theadpool_index").setSettings(Settings.builder().put(IndexSettings - .INDEX_SEARCH_SEQUENTIAL.getKey(), false)).get()); + client().admin().indices().prepareUpdateSettings("throttled_threadpool_index").setSettings(Settings.builder().put(IndexSettings + .INDEX_SEARCH_THROTTLED.getKey(), false)).get()); assertEquals("can not update private setting [index.search.sequential]; this setting is managed by Elasticsearch", iae.getMessage()); - assertFalse(service.getIndicesService().indexServiceSafe(index).getIndexSettings().getSearchSequential()); + assertFalse(service.getIndicesService().indexServiceSafe(index).getIndexSettings().isSearchThrottled()); + ShardSearchLocalRequest req = new ShardSearchLocalRequest(new ShardId(index, 0), 1, SearchType.QUERY_THEN_FETCH, null, + Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, false, null, null); Thread currentThread = Thread.currentThread(); + // we still make sure can match is executed on the network thread service.canMatch(req, ActionListener.wrap(r -> assertSame(Thread.currentThread(), currentThread), e -> fail("unexpected"))); } } From dd3172be04015bfc495685d9c9f449aeb6cf7d97 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 19 Sep 2018 10:17:44 +0200 Subject: [PATCH 8/9] fix nits --- .../src/main/java/org/elasticsearch/search/SearchService.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 1f408228c0b26..d8829bd11d386 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -423,8 +423,7 @@ private QueryFetchSearchResult executeFetchPhase(SearchContext context, SearchOp return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } - public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask task, - ActionListener listener) { + public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask task, ActionListener listener) { runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); @@ -1106,7 +1105,6 @@ public CanMatchResponse(boolean canMatch) { this.canMatch = canMatch; } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); From 5f8aa3b628cd644019f2b3f424bb976cbd8c6b19 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 19 Sep 2018 10:57:07 +0200 Subject: [PATCH 9/9] fix test --- .../java/org/elasticsearch/search/SearchServiceTests.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 9c7734ca96464..ed9b8992577d8 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -120,7 +120,7 @@ public void onIndexModule(IndexModule indexModule) { @Override public void onNewContext(SearchContext context) { if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { - assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_sequential]")); + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]")); } else { assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); } @@ -129,7 +129,7 @@ public void onNewContext(SearchContext context) { @Override public void onFetchPhase(SearchContext context, long tookInNanos) { if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { - assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_sequential]")); + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]")); } else { assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); } @@ -138,7 +138,7 @@ public void onFetchPhase(SearchContext context, long tookInNanos) { @Override public void onQueryPhase(SearchContext context, long tookInNanos) { if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { - assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_sequential]")); + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]")); } else { assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); } @@ -543,7 +543,7 @@ public void testSetSearchThrottled() { IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> client().admin().indices().prepareUpdateSettings("throttled_threadpool_index").setSettings(Settings.builder().put(IndexSettings .INDEX_SEARCH_THROTTLED.getKey(), false)).get()); - assertEquals("can not update private setting [index.search.sequential]; this setting is managed by Elasticsearch", + assertEquals("can not update private setting [index.search.throttled]; this setting is managed by Elasticsearch", iae.getMessage()); assertFalse(service.getIndicesService().indexServiceSafe(index).getIndexSettings().isSearchThrottled()); ShardSearchLocalRequest req = new ShardSearchLocalRequest(new ShardId(index, 0), 1, SearchType.QUERY_THEN_FETCH, null,