From fbdf230057f5d028937cde0e464c5a6147610e29 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Wed, 5 Aug 2015 16:25:13 +0200 Subject: [PATCH 1/3] Share thread pools that have similar purposes. Because we have thread pools for almost everything, even if each of them has a reasonable size, the total number of threads that elasticsearch creates is high-ish. For instance, with 8 processors, elasticsearch creates between 58 (only fixed thread pools) and 111 threads (including fixed and scaling pools). With this change, the numbers go down to 33/59. Ideally the SEARCH and GET thread pools should be the same, but I couldn't do it now given that some SEARCH requests block on GET requests in order to retrieve indexed scripts or geo shapes. So they are still separate pools for now. However, the INDEX, BULK, REFRESH and FLUSH thread pools have been merged into a single WRITE thread pool, the SEARCH, PERCOLATE and SUGGEST have been merged into a single READ thread pool and FETCH_SHARD_STARTED and FETCH_SHARD_STORE have been merged into FETCH_SHARD. Also the WARMER pool has been removed: it was useful to parallelize fielddata loading but now that we have doc values by default, we can make things simpler by just loading them in the current thread. Close #12666 --- .../index/cache/bitset/BitsetFilterCache.java | 47 ++-- .../elasticsearch/indices/IndicesWarmer.java | 50 +---- .../rest/action/cat/RestThreadPoolAction.java | 23 +- .../elasticsearch/search/SearchService.java | 210 ++++++------------ .../elasticsearch/threadpool/ThreadPool.java | 82 ++++--- .../action/RejectionActionIT.java | 10 +- .../search/SearchWithRejectionsIT.java | 6 +- .../test/InternalTestCluster.java | 6 +- .../threadpool/SimpleThreadPoolIT.java | 8 +- .../ThreadPoolSerializationTests.java | 4 +- .../UpdateThreadPoolSettingsTests.java | 46 ++-- .../test/cat.thread_pool/10_basic.yaml | 137 +----------- 12 files changed, 201 insertions(+), 428 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java b/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java index 2203c94bd5a74..336be368d38b7 100644 --- a/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java +++ b/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java @@ -49,8 +49,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.indices.IndicesWarmer; -import org.elasticsearch.indices.IndicesWarmer.TerminationHandle; -import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.indices.IndicesWarmer.WarmerContext; import java.io.Closeable; import java.io.IOException; @@ -58,9 +57,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; /** * This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time. @@ -241,9 +238,9 @@ public int hashCode() { final class BitDocIdSetFilterWarmer extends IndicesWarmer.Listener { @Override - public IndicesWarmer.TerminationHandle warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, IndicesWarmer.WarmerContext context, ThreadPool threadPool) { + public void warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, IndicesWarmer.WarmerContext context) { if (!loadRandomAccessFiltersEagerly) { - return TerminationHandle.NO_WAIT; + return; } boolean hasNested = false; @@ -267,43 +264,25 @@ public IndicesWarmer.TerminationHandle warmNewReaders(final IndexShard indexShar warmUp.add(Queries.newNonNestedFilter()); } - final Executor executor = threadPool.executor(executor()); - final CountDownLatch latch = new CountDownLatch(context.searcher().reader().leaves().size() * warmUp.size()); for (final LeafReaderContext ctx : context.searcher().reader().leaves()) { for (final Filter filterToWarm : warmUp) { - executor.execute(new Runnable() { - - @Override - public void run() { - try { - final long start = System.nanoTime(); - getAndLoadIfNotPresent(filterToWarm, ctx); - if (indexShard.warmerService().logger().isTraceEnabled()) { - indexShard.warmerService().logger().trace("warmed bitset for [{}], took [{}]", filterToWarm, TimeValue.timeValueNanos(System.nanoTime() - start)); - } - } catch (Throwable t) { - indexShard.warmerService().logger().warn("failed to load bitset for [{}]", t, filterToWarm); - } finally { - latch.countDown(); - } + try { + final long start = System.nanoTime(); + getAndLoadIfNotPresent(filterToWarm, ctx); + if (indexShard.warmerService().logger().isTraceEnabled()) { + indexShard.warmerService().logger().trace("warmed bitset for [{}], took [{}]", filterToWarm, TimeValue.timeValueNanos(System.nanoTime() - start)); } - - }); + } catch (Throwable t) { + indexShard.warmerService().logger().warn("failed to load bitset for [{}]", t, filterToWarm); + } } } - return new TerminationHandle() { - @Override - public void awaitTermination() throws InterruptedException { - latch.await(); - } - }; } @Override - public TerminationHandle warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, IndicesWarmer.WarmerContext context, ThreadPool threadPool) { - return TerminationHandle.NO_WAIT; + public void warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context) { + // no-op } - } Cache> getLoadedFilters() { diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java b/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java index 2b28842698d56..413786c335f1f 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java @@ -19,7 +19,6 @@ package org.elasticsearch.indices; -import com.google.common.collect.Lists; import org.apache.lucene.index.IndexReader; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -31,9 +30,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.threadpool.ThreadPool; -import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; @@ -43,8 +40,6 @@ public final class IndicesWarmer extends AbstractComponent { public static final String INDEX_WARMER_ENABLED = "index.warmer.enabled"; - private final ThreadPool threadPool; - private final ClusterService clusterService; private final IndicesService indicesService; @@ -52,9 +47,8 @@ public final class IndicesWarmer extends AbstractComponent { private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); @Inject - public IndicesWarmer(Settings settings, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService) { + public IndicesWarmer(Settings settings, ClusterService clusterService, IndicesService indicesService) { super(settings); - this.threadPool = threadPool; this.clusterService = clusterService; this.indicesService = indicesService; } @@ -100,27 +94,11 @@ private void warmInternal(final WarmerContext context, boolean topReader) { } indexShard.warmerService().onPreWarm(); long time = System.nanoTime(); - final List terminationHandles = Lists.newArrayList(); - // get a handle on pending tasks for (final Listener listener : listeners) { if (topReader) { - terminationHandles.add(listener.warmTopReader(indexShard, indexMetaData, context, threadPool)); + listener.warmTopReader(indexShard, indexMetaData, context); } else { - terminationHandles.add(listener.warmNewReaders(indexShard, indexMetaData, context, threadPool)); - } - } - // wait for termination - for (TerminationHandle terminationHandle : terminationHandles) { - try { - terminationHandle.awaitTermination(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - if (topReader) { - logger.warn("top warming has been interrupted", e); - } else { - logger.warn("warming has been interrupted", e); - } - break; + listener.warmNewReaders(indexShard, indexMetaData, context); } } long took = System.nanoTime() - time; @@ -134,27 +112,13 @@ private void warmInternal(final WarmerContext context, boolean topReader) { } } - /** A handle on the execution of warm-up action. */ - public interface TerminationHandle { - - public static TerminationHandle NO_WAIT = new TerminationHandle() { - @Override - public void awaitTermination() {} - }; - - /** Wait until execution of the warm-up action completes. */ - void awaitTermination() throws InterruptedException; - } public static abstract class Listener { - public String executor() { - return ThreadPool.Names.WARMER; - } - - /** Queue tasks to warm-up the given segments and return handles that allow to wait for termination of the execution of those tasks. */ - public abstract TerminationHandle warmNewReaders(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool); + /** Warm new leaf readers in the current thread. */ + public abstract void warmNewReaders(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context); - public abstract TerminationHandle warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool); + /** Warm the top reader in the current thread. */ + public abstract void warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context); } public static final class WarmerContext { diff --git a/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java b/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java index c4d241f027f54..9314ab049b830 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java @@ -50,34 +50,24 @@ public class RestThreadPoolAction extends AbstractCatAction { private final static String[] SUPPORTED_NAMES = new String[]{ - ThreadPool.Names.BULK, - ThreadPool.Names.FLUSH, + ThreadPool.Names.FETCH_SHARD, ThreadPool.Names.GENERIC, ThreadPool.Names.GET, - ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.OPTIMIZE, - ThreadPool.Names.PERCOLATE, - ThreadPool.Names.REFRESH, - ThreadPool.Names.SEARCH, + ThreadPool.Names.READ, ThreadPool.Names.SNAPSHOT, - ThreadPool.Names.SUGGEST, - ThreadPool.Names.WARMER + ThreadPool.Names.WRITE }; private final static String[] SUPPORTED_ALIASES = new String[]{ - "b", - "f", + "fs", "ge", "g", - "i", "ma", "o", - "p", "r", - "s", "sn", - "su", "w" }; @@ -86,9 +76,8 @@ public class RestThreadPoolAction extends AbstractCatAction { } private final static String[] DEFAULT_THREAD_POOLS = new String[]{ - ThreadPool.Names.BULK, - ThreadPool.Names.INDEX, - ThreadPool.Names.SEARCH, + ThreadPool.Names.WRITE, + ThreadPool.Names.READ }; private final static Map ALIAS_TO_THREAD_POOL; diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 4beacda97f132..0ce66c90ffa56 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -72,7 +72,6 @@ import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesWarmer; -import org.elasticsearch.indices.IndicesWarmer.TerminationHandle; import org.elasticsearch.indices.IndicesWarmer.WarmerContext; import org.elasticsearch.indices.cache.request.IndicesRequestCache; import org.elasticsearch.node.settings.NodeSettingsService; @@ -920,7 +919,7 @@ public int getActiveContexts() { static class NormsWarmer extends IndicesWarmer.Listener { @Override - public TerminationHandle warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, final WarmerContext context, ThreadPool threadPool) { + public void warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, final WarmerContext context) { final Loading defaultLoading = Loading.parse(indexMetaData.settings().get(NORMS_LOADING_KEY), Loading.LAZY); final MapperService mapperService = indexShard.mapperService(); final ObjectSet warmUp = new ObjectHashSet<>(); @@ -937,51 +936,35 @@ public TerminationHandle warmNewReaders(final IndexShard indexShard, IndexMetaDa } } - final CountDownLatch latch = new CountDownLatch(1); - // Norms loading may be I/O intensive but is not CPU intensive, so we execute it in a single task - threadPool.executor(executor()).execute(new Runnable() { - @Override - public void run() { - try { - for (ObjectCursor stringObjectCursor : warmUp) { - final String indexName = stringObjectCursor.value; - final long start = System.nanoTime(); - for (final LeafReaderContext ctx : context.searcher().reader().leaves()) { - final NumericDocValues values = ctx.reader().getNormValues(indexName); - if (values != null) { - values.get(0); - } - } - if (indexShard.warmerService().logger().isTraceEnabled()) { - indexShard.warmerService().logger().trace("warmed norms for [{}], took [{}]", indexName, TimeValue.timeValueNanos(System.nanoTime() - start)); - } + try { + for (ObjectCursor stringObjectCursor : warmUp) { + final String indexName = stringObjectCursor.value; + final long start = System.nanoTime(); + for (final LeafReaderContext ctx : context.searcher().reader().leaves()) { + final NumericDocValues values = ctx.reader().getNormValues(indexName); + if (values != null) { + values.get(0); } - } catch (Throwable t) { - indexShard.warmerService().logger().warn("failed to warm-up norms", t); - } finally { - latch.countDown(); + } + if (indexShard.warmerService().logger().isTraceEnabled()) { + indexShard.warmerService().logger().trace("warmed norms for [{}], took [{}]", indexName, TimeValue.timeValueNanos(System.nanoTime() - start)); } } - }); - - return new TerminationHandle() { - @Override - public void awaitTermination() throws InterruptedException { - latch.await(); - } - }; + } catch (Throwable t) { + indexShard.warmerService().logger().warn("failed to warm-up norms", t); + } } @Override - public TerminationHandle warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool) { - return TerminationHandle.NO_WAIT; + public void warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context) { + // no-op } } static class FieldDataWarmer extends IndicesWarmer.Listener { @Override - public TerminationHandle warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, final WarmerContext context, ThreadPool threadPool) { + public void warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, final WarmerContext context) { final MapperService mapperService = indexShard.mapperService(); final Map warmUp = new HashMap<>(); for (DocumentMapper docMapper : mapperService.docMappers(false)) { @@ -1002,40 +985,23 @@ public TerminationHandle warmNewReaders(final IndexShard indexShard, IndexMetaDa } } final IndexFieldDataService indexFieldDataService = indexShard.indexFieldDataService(); - final Executor executor = threadPool.executor(executor()); - final CountDownLatch latch = new CountDownLatch(context.searcher().reader().leaves().size() * warmUp.size()); for (final LeafReaderContext ctx : context.searcher().reader().leaves()) { for (final MappedFieldType fieldType : warmUp.values()) { - executor.execute(new Runnable() { - - @Override - public void run() { - try { - final long start = System.nanoTime(); - indexFieldDataService.getForField(fieldType).load(ctx); - if (indexShard.warmerService().logger().isTraceEnabled()) { - indexShard.warmerService().logger().trace("warmed fielddata for [{}], took [{}]", fieldType.names().fullName(), TimeValue.timeValueNanos(System.nanoTime() - start)); - } - } catch (Throwable t) { - indexShard.warmerService().logger().warn("failed to warm-up fielddata for [{}]", t, fieldType.names().fullName()); - } finally { - latch.countDown(); - } + try { + final long start = System.nanoTime(); + indexFieldDataService.getForField(fieldType).load(ctx); + if (indexShard.warmerService().logger().isTraceEnabled()) { + indexShard.warmerService().logger().trace("warmed fielddata for [{}], took [{}]", fieldType.names().fullName(), TimeValue.timeValueNanos(System.nanoTime() - start)); } - - }); + } catch (Throwable t) { + indexShard.warmerService().logger().warn("failed to warm-up fielddata for [{}]", t, fieldType.names().fullName()); + } } } - return new TerminationHandle() { - @Override - public void awaitTermination() throws InterruptedException { - latch.await(); - } - }; } @Override - public TerminationHandle warmTopReader(final IndexShard indexShard, IndexMetaData indexMetaData, final WarmerContext context, ThreadPool threadPool) { + public void warmTopReader(final IndexShard indexShard, IndexMetaData indexMetaData, final WarmerContext context) { final MapperService mapperService = indexShard.mapperService(); final Map warmUpGlobalOrdinals = new HashMap<>(); for (DocumentMapper docMapper : mapperService.docMappers(false)) { @@ -1055,104 +1021,70 @@ public TerminationHandle warmTopReader(final IndexShard indexShard, IndexMetaDat } } final IndexFieldDataService indexFieldDataService = indexShard.indexFieldDataService(); - final Executor executor = threadPool.executor(executor()); - final CountDownLatch latch = new CountDownLatch(warmUpGlobalOrdinals.size()); for (final MappedFieldType fieldType : warmUpGlobalOrdinals.values()) { - executor.execute(new Runnable() { - @Override - public void run() { - try { - final long start = System.nanoTime(); - IndexFieldData.Global ifd = indexFieldDataService.getForField(fieldType); - ifd.loadGlobal(context.reader()); - if (indexShard.warmerService().logger().isTraceEnabled()) { - indexShard.warmerService().logger().trace("warmed global ordinals for [{}], took [{}]", fieldType.names().fullName(), TimeValue.timeValueNanos(System.nanoTime() - start)); - } - } catch (Throwable t) { - indexShard.warmerService().logger().warn("failed to warm-up global ordinals for [{}]", t, fieldType.names().fullName()); - } finally { - latch.countDown(); - } + try { + final long start = System.nanoTime(); + IndexFieldData.Global ifd = indexFieldDataService.getForField(fieldType); + ifd.loadGlobal(context.reader()); + if (indexShard.warmerService().logger().isTraceEnabled()) { + indexShard.warmerService().logger().trace("warmed global ordinals for [{}], took [{}]", fieldType.names().fullName(), TimeValue.timeValueNanos(System.nanoTime() - start)); } - }); - } - return new TerminationHandle() { - @Override - public void awaitTermination() throws InterruptedException { - latch.await(); + } catch (Throwable t) { + indexShard.warmerService().logger().warn("failed to warm-up global ordinals for [{}]", t, fieldType.names().fullName()); } - }; + } } } class SearchWarmer extends IndicesWarmer.Listener { @Override - public TerminationHandle warmNewReaders(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool) { - return internalWarm(indexShard, indexMetaData, context, threadPool, false); + public void warmNewReaders(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context) { + internalWarm(indexShard, indexMetaData, context, threadPool, false); } @Override - public TerminationHandle warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool) { - return internalWarm(indexShard, indexMetaData, context, threadPool, true); + public void warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context) { + internalWarm(indexShard, indexMetaData, context, threadPool, true); } - public TerminationHandle internalWarm(final IndexShard indexShard, final IndexMetaData indexMetaData, final IndicesWarmer.WarmerContext warmerContext, ThreadPool threadPool, final boolean top) { + public void internalWarm(final IndexShard indexShard, final IndexMetaData indexMetaData, final IndicesWarmer.WarmerContext warmerContext, ThreadPool threadPool, final boolean top) { IndexWarmersMetaData custom = indexMetaData.custom(IndexWarmersMetaData.TYPE); if (custom == null) { - return TerminationHandle.NO_WAIT; + return ; } - final Executor executor = threadPool.executor(executor()); - final CountDownLatch latch = new CountDownLatch(custom.entries().size()); for (final IndexWarmersMetaData.Entry entry : custom.entries()) { - executor.execute(new Runnable() { - - @Override - public void run() { - SearchContext context = null; - try { - long now = System.nanoTime(); - ShardSearchRequest request = new ShardSearchLocalRequest(indexShard.shardId(), indexMetaData.numberOfShards(), - SearchType.QUERY_THEN_FETCH, entry.source(), entry.types(), entry.requestCache()); - context = createContext(request, warmerContext.searcher()); - // if we use sort, we need to do query to sort on it and load relevant field data - // if not, we might as well set size=0 (and cache if needed) - if (context.sort() == null) { - context.size(0); - } - boolean canCache = indicesQueryCache.canCache(request, context); - // early terminate when we can cache, since we can only do proper caching on top level searcher - // also, if we can't cache, and its top, we don't need to execute it, since we already did when its not top - if (canCache != top) { - return; - } - loadOrExecuteQueryPhase(request, context, queryPhase); - long took = System.nanoTime() - now; - if (indexShard.warmerService().logger().isTraceEnabled()) { - indexShard.warmerService().logger().trace("warmed [{}], took [{}]", entry.name(), TimeValue.timeValueNanos(took)); - } - } catch (Throwable t) { - indexShard.warmerService().logger().warn("warmer [{}] failed", t, entry.name()); - } finally { - try { - if (context != null) { - freeContext(context.id()); - cleanContext(context); - } - } finally { - latch.countDown(); - } - } + SearchContext context = null; + try { + long now = System.nanoTime(); + ShardSearchRequest request = new ShardSearchLocalRequest(indexShard.shardId(), indexMetaData.numberOfShards(), + SearchType.QUERY_THEN_FETCH, entry.source(), entry.types(), entry.requestCache()); + context = createContext(request, warmerContext.searcher()); + // if we use sort, we need to do query to sort on it and load relevant field data + // if not, we might as well set size=0 (and cache if needed) + if (context.sort() == null) { + context.size(0); + } + boolean canCache = indicesQueryCache.canCache(request, context); + // early terminate when we can cache, since we can only do proper caching on top level searcher + // also, if we can't cache, and its top, we don't need to execute it, since we already did when its not top + if (canCache != top) { + return; + } + loadOrExecuteQueryPhase(request, context, queryPhase); + long took = System.nanoTime() - now; + if (indexShard.warmerService().logger().isTraceEnabled()) { + indexShard.warmerService().logger().trace("warmed [{}], took [{}]", entry.name(), TimeValue.timeValueNanos(took)); + } + } catch (Throwable t) { + indexShard.warmerService().logger().warn("warmer [{}] failed", t, entry.name()); + } finally { + if (context != null) { + freeContext(context.id()); + cleanContext(context); } - - }); - } - return new TerminationHandle() { - @Override - public void awaitTermination() throws InterruptedException { - latch.await(); } - }; + } } } diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 7c01367c0169f..5c3fdfda7f525 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -21,8 +21,10 @@ import com.google.common.base.Objects; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Maps; import com.google.common.util.concurrent.MoreExecutors; + import org.apache.lucene.util.Counter; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; @@ -60,23 +62,48 @@ public class ThreadPool extends AbstractComponent { public static class Names { + // Actual threadpools public static final String SAME = "same"; public static final String GENERIC = "generic"; public static final String LISTENER = "listener"; - public static final String GET = "get"; - public static final String INDEX = "index"; - public static final String BULK = "bulk"; - public static final String SEARCH = "search"; - public static final String SUGGEST = "suggest"; - public static final String PERCOLATE = "percolate"; public static final String MANAGEMENT = "management"; - public static final String FLUSH = "flush"; - public static final String REFRESH = "refresh"; - public static final String WARMER = "warmer"; + public static final String READ = "read"; + public static final String WRITE = "write"; public static final String SNAPSHOT = "snapshot"; public static final String OPTIMIZE = "optimize"; - public static final String FETCH_SHARD_STARTED = "fetch_shard_started"; - public static final String FETCH_SHARD_STORE = "fetch_shard_store"; + public static final String FETCH_SHARD = "fetch_shard"; + // this would ideally be an alias to "read" but some search + // requests block on get requests to retrieve eg. indexed scripts + // or geo shapes, so we need to keep get in its own thread pool + // to avoid dead locks + public static final String GET = "get"; + + // Aliases + public static final String SEARCH = READ; + public static final String SUGGEST = READ; + public static final String PERCOLATE = READ; + public static final String INDEX = WRITE; + public static final String BULK = WRITE; + public static final String FLUSH = WRITE; + public static final String REFRESH = WRITE; + public static final String FETCH_SHARD_STARTED = FETCH_SHARD; + public static final String FETCH_SHARD_STORE = FETCH_SHARD; + + private static Map DEPRECATED_THREADPOOLS; + static { + Builder builder = ImmutableMap.builder(); + builder.put("search", SEARCH); + builder.put("suggest", SUGGEST); + builder.put("percolate", PERCOLATE); + builder.put("warmer", REFRESH); + builder.put("index", INDEX); + builder.put("bulk", BULK); + builder.put("flush", FLUSH); + builder.put("refresh", REFRESH); + builder.put("fetch_shard_started", FETCH_SHARD_STARTED); + builder.put("fetch_shard_store", FETCH_SHARD_STORE); + DEPRECATED_THREADPOOLS = builder.build(); + } } public static final String THREADPOOL_GROUP = "threadpool."; @@ -110,23 +137,16 @@ public ThreadPool(Settings settings) { int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10); defaultExecutorTypeSettings = ImmutableMap.builder() .put(Names.GENERIC, settingsBuilder().put("type", "cached").put("keep_alive", "30s").build()) - .put(Names.INDEX, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 200).build()) - .put(Names.BULK, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 50).build()) + .put(Names.WRITE, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 200).build()) + .put(Names.READ, settingsBuilder().put("type", "fixed").put("size", ((availableProcessors * 3) / 2) + 1).put("queue_size", 1000).build()) .put(Names.GET, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build()) - .put(Names.SEARCH, settingsBuilder().put("type", "fixed").put("size", ((availableProcessors * 3) / 2) + 1).put("queue_size", 1000).build()) - .put(Names.SUGGEST, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build()) - .put(Names.PERCOLATE, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build()) .put(Names.MANAGEMENT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 5).build()) // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side .put(Names.LISTENER, settingsBuilder().put("type", "fixed").put("size", halfProcMaxAt10).build()) - .put(Names.FLUSH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) - .put(Names.REFRESH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build()) - .put(Names.WARMER, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) .put(Names.SNAPSHOT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) - .put(Names.OPTIMIZE, settingsBuilder().put("type", "fixed").put("size", 1).build()) - .put(Names.FETCH_SHARD_STARTED, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", availableProcessors * 2).build()) - .put(Names.FETCH_SHARD_STORE, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", availableProcessors * 2).build()) + .put(Names.OPTIMIZE, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 1).build()) + .put(Names.FETCH_SHARD, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", availableProcessors * 2).build()) .build(); Map executors = Maps.newHashMap(); @@ -136,7 +156,13 @@ public ThreadPool(Settings settings) { // Building custom thread pools for (Map.Entry entry : groupSettings.entrySet()) { - if (executors.containsKey(entry.getKey())) { + final String threadPoolName = entry.getKey(); + if (executors.containsKey(threadPoolName)) { + continue; + } + if (Names.DEPRECATED_THREADPOOLS.containsKey(threadPoolName)) { + final String replacement = Names.DEPRECATED_THREADPOOLS.get(threadPoolName); + deprecationLogger.deprecated("threadpool [{}] has been merged together with other threadpools into the [{}] threadpool", threadPoolName, replacement); continue; } executors.put(entry.getKey(), build(entry.getKey(), entry.getValue(), Settings.EMPTY)); @@ -446,11 +472,17 @@ public void updateSettings(Settings settings) { // Building custom thread pools for (Map.Entry entry : groupSettings.entrySet()) { - if (defaultExecutorTypeSettings.containsKey(entry.getKey())) { + final String threadPoolName = entry.getKey(); + if (defaultExecutorTypeSettings.containsKey(threadPoolName)) { + continue; + } + if (Names.DEPRECATED_THREADPOOLS.containsKey(threadPoolName)) { + final String replacement = Names.DEPRECATED_THREADPOOLS.get(threadPoolName); + deprecationLogger.deprecated("threadpool [{}] has been merged together with other threadpools into the [{}] threadpool", threadPoolName, replacement); continue; } - ExecutorHolder oldExecutorHolder = executors.get(entry.getKey()); + ExecutorHolder oldExecutorHolder = executors.get(threadPoolName); ExecutorHolder newExecutorHolder = rebuild(entry.getKey(), oldExecutorHolder, entry.getValue(), Settings.EMPTY); // Can't introduce new thread pools at runtime, because The oldExecutorHolder variable will be null in the // case the settings contains a thread pool not defined in the initial settings in the constructor. The if diff --git a/core/src/test/java/org/elasticsearch/action/RejectionActionIT.java b/core/src/test/java/org/elasticsearch/action/RejectionActionIT.java index ff094e09a4c95..63188782c0fd2 100644 --- a/core/src/test/java/org/elasticsearch/action/RejectionActionIT.java +++ b/core/src/test/java/org/elasticsearch/action/RejectionActionIT.java @@ -47,12 +47,10 @@ public class RejectionActionIT extends ESIntegTestCase { protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - .put("threadpool.search.size", 1) - .put("threadpool.search.queue_size", 1) - .put("threadpool.index.size", 1) - .put("threadpool.index.queue_size", 1) - .put("threadpool.get.size", 1) - .put("threadpool.get.queue_size", 1) + .put("threadpool.read.size", 1) + .put("threadpool.read.queue_size", 1) + .put("threadpool.write.size", 1) + .put("threadpool.write.queue_size", 1) .build(); } diff --git a/core/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java b/core/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java index b5b4b94a8a158..877554dfd7e7e 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java +++ b/core/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java @@ -39,9 +39,9 @@ public class SearchWithRejectionsIT extends ESIntegTestCase { @Override public Settings nodeSettings(int nodeOrdinal) { return settingsBuilder().put(super.nodeSettings(nodeOrdinal)) - .put("threadpool.search.type", "fixed") - .put("threadpool.search.size", 1) - .put("threadpool.search.queue_size", 1) + .put("threadpool.read.type", "fixed") + .put("threadpool.read.size", 1) + .put("threadpool.read.queue_size", 1) .build(); } diff --git a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 8ec6f89e44bc4..14f0da0449570 100644 --- a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -407,10 +407,8 @@ private static Settings getRandomNodeSettings(long seed) { } if (random.nextBoolean()) { // change threadpool types to make sure we don't have components that rely on the type of thread pools - for (String name : Arrays.asList(ThreadPool.Names.BULK, ThreadPool.Names.FLUSH, ThreadPool.Names.GET, - ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.OPTIMIZE, - ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT, - ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) { + for (String name : Arrays.asList(ThreadPool.Names.WRITE, ThreadPool.Names.READ, ThreadPool.Names.MANAGEMENT, + ThreadPool.Names.OPTIMIZE, ThreadPool.Names.SNAPSHOT, ThreadPool.Names.LISTENER, ThreadPool.Names.FETCH_SHARD)) { if (random.nextBoolean()) { final String type = RandomPicks.randomFrom(random, Arrays.asList("fixed", "cached", "scaling")); builder.put(ThreadPool.THREADPOOL_GROUP + name + ".type", type); diff --git a/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java b/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java index d0f1fa2027a94..da6bfb7af919e 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java +++ b/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java @@ -63,7 +63,7 @@ public class SimpleThreadPoolIT extends ESIntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal) { - return Settings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("threadpool.search.type", "cached").build(); + return Settings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("threadpool.read.type", "cached").build(); } @Test @@ -131,7 +131,7 @@ public void testUpdatingThreadPoolSettings() throws Exception { ThreadPool threadPool = internalCluster().getDataNodeInstance(ThreadPool.class); // Check that settings are changed assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(5L)); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.keep_alive", "10m").build()).execute().actionGet(); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.read.keep_alive", "10m").build()).execute().actionGet(); assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); // Make sure that threads continue executing when executor is replaced @@ -149,7 +149,7 @@ public void run() { } } }); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet(); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.read.type", "fixed").build()).execute().actionGet(); assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor))); assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true)); assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true)); @@ -169,7 +169,7 @@ public void run() { } } }); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet(); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.read.type", "fixed").build()).execute().actionGet(); barrier.await(); Thread.sleep(200); diff --git a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java index 3226662f686b3..b38554b3140b9 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java @@ -94,9 +94,9 @@ public void testThatToXContentWritesOutUnboundedCorrectly() throws Exception { @Test public void testThatNegativeSettingAllowsToStart() throws InterruptedException { - Settings settings = settingsBuilder().put("name", "index").put("threadpool.index.queue_size", "-1").build(); + Settings settings = settingsBuilder().put("name", "index").put("threadpool.write.queue_size", "-1").build(); ThreadPool threadPool = new ThreadPool(settings); - assertThat(threadPool.info("index").getQueueSize(), is(nullValue())); + assertThat(threadPool.info(ThreadPool.Names.INDEX).getQueueSize(), is(nullValue())); terminate(threadPool); } } diff --git a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index 9330f71996d24..12bed597500dc 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -52,7 +52,7 @@ private ThreadPool.Info info(ThreadPool threadPool, String name) { public void testCachedExecutorType() throws InterruptedException { ThreadPool threadPool = new ThreadPool( Settings.settingsBuilder() - .put("threadpool.search.type", "cached") + .put("threadpool.read.type", "cached") .put("name","testCachedExecutorType").build()); assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); @@ -60,14 +60,14 @@ public void testCachedExecutorType() throws InterruptedException { assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); // Replace with different type - threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "same").build()); + threadPool.updateSettings(settingsBuilder().put("threadpool.read.type", "same").build()); assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("same")); assertThat(threadPool.executor(Names.SEARCH), instanceOf(MoreExecutors.directExecutor().getClass())); // Replace with different type again threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.keep_alive", "10m") + .put("threadpool.read.type", "scaling") + .put("threadpool.read.keep_alive", "10m") .build()); assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); @@ -77,7 +77,7 @@ public void testCachedExecutorType() throws InterruptedException { assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); // Put old type back - threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "cached").build()); + threadPool.updateSettings(settingsBuilder().put("threadpool.read.type", "cached").build()); assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); // Make sure keep alive value reused assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); @@ -85,7 +85,7 @@ public void testCachedExecutorType() throws InterruptedException { // Change keep alive Executor oldExecutor = threadPool.executor(Names.SEARCH); - threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build()); + threadPool.updateSettings(settingsBuilder().put("threadpool.read.keep_alive", "1m").build()); // Make sure keep alive value changed assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); @@ -94,7 +94,7 @@ public void testCachedExecutorType() throws InterruptedException { assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); // Set the same keep alive - threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build()); + threadPool.updateSettings(settingsBuilder().put("threadpool.read.keep_alive", "1m").build()); // Make sure keep alive value didn't change assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); @@ -107,17 +107,17 @@ public void testCachedExecutorType() throws InterruptedException { @Test public void testFixedExecutorType() throws InterruptedException { ThreadPool threadPool = new ThreadPool(settingsBuilder() - .put("threadpool.search.type", "fixed") + .put("threadpool.read.type", "fixed") .put("name","testCachedExecutorType").build()); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); // Replace with different type threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.keep_alive", "10m") - .put("threadpool.search.min", "2") - .put("threadpool.search.size", "15") + .put("threadpool.read.type", "scaling") + .put("threadpool.read.keep_alive", "10m") + .put("threadpool.read.min", "2") + .put("threadpool.read.size", "15") .build()); assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); @@ -131,7 +131,7 @@ public void testFixedExecutorType() throws InterruptedException { // Put old type back threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "fixed") + .put("threadpool.read.type", "fixed") .build()); assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed")); // Make sure keep alive value is not used @@ -145,7 +145,7 @@ public void testFixedExecutorType() throws InterruptedException { // Change size Executor oldExecutor = threadPool.executor(Names.SEARCH); - threadPool.updateSettings(settingsBuilder().put("threadpool.search.size", "10").build()); + threadPool.updateSettings(settingsBuilder().put("threadpool.read.size", "10").build()); // Make sure size values changed assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10)); assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(10)); @@ -157,7 +157,7 @@ public void testFixedExecutorType() throws InterruptedException { // Change queue capacity threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.queue", "500") + .put("threadpool.read.queue", "500") .build()); terminate(threadPool); @@ -167,8 +167,8 @@ public void testFixedExecutorType() throws InterruptedException { @Test public void testScalingExecutorType() throws InterruptedException { ThreadPool threadPool = new ThreadPool(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.size", 10) + .put("threadpool.read.type", "scaling") + .put("threadpool.read.size", 10) .put("name","testCachedExecutorType").build()); assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(1)); @@ -180,10 +180,10 @@ public void testScalingExecutorType() throws InterruptedException { // Change settings that doesn't require pool replacement Executor oldExecutor = threadPool.executor(Names.SEARCH); threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.keep_alive", "10m") - .put("threadpool.search.min", "2") - .put("threadpool.search.size", "15") + .put("threadpool.read.type", "scaling") + .put("threadpool.read.keep_alive", "10m") + .put("threadpool.read.min", "2") + .put("threadpool.read.size", "15") .build()); assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); @@ -202,7 +202,7 @@ public void testScalingExecutorType() throws InterruptedException { @Test(timeout = 10000) public void testShutdownDownNowDoesntBlock() throws Exception { ThreadPool threadPool = new ThreadPool(Settings.settingsBuilder() - .put("threadpool.search.type", "cached") + .put("threadpool.read.type", "cached") .put("name","testCachedExecutorType").build()); final CountDownLatch latch = new CountDownLatch(1); @@ -218,7 +218,7 @@ public void run() { } } }); - threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()); + threadPool.updateSettings(settingsBuilder().put("threadpool.read.type", "fixed").build()); assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor))); assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true)); assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true)); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yaml index edb87ce27b9b1..8ffa133b0a952 100755 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yaml @@ -7,8 +7,8 @@ - match: $body: | - / #host ip bulk.active bulk.queue bulk.rejected index.active index.queue index.rejected search.active search.queue search.rejected - ^ (\S+ \s+ (\d{1,3}\.){3}\d{1,3} \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ + / #host ip read.active read.queue read.rejected write.active write.queue write.rejected + ^ (\S+ \s+ (\d{1,3}\.){3}\d{1,3} \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ - do: cat.thread_pool: @@ -16,8 +16,8 @@ - match: $body: | - /^ host \s+ ip \s+ bulk.active \s+ bulk.queue \s+ bulk.rejected \s+ index.active \s+ index.queue \s+ index.rejected \s+ search.active \s+ search.queue \s+ search.rejected \s+ \n - (\S+ \s+ (\d{1,3}\.){3}\d{1,3} \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ + /^ host \s+ ip \s+ read.active \s+ read.queue \s+ read.rejected \s+ write.active \s+ write.queue \s+ write.rejected \s+ \n + (\S+ \s+ (\d{1,3}\.){3}\d{1,3} \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ - do: cat.thread_pool: @@ -31,141 +31,22 @@ - do: cat.thread_pool: - h: id,ba,fa,gea,ga,ia,maa,ma,oa,pa + h: id,fsa,gea,ga,maa,oa,ra,sna,wa v: true full_id: true - match: $body: | - /^ id \s+ ba \s+ fa \s+ gea \s+ ga \s+ ia \s+ maa \s+ oa \s+ pa \s+ \n - (\S+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ + /^ id \s+ fsa \s+ gea \s+ ga \s+ maa \s+ oa \s+ ra \s+ sna \s+ wa \s+ \n + (\S+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ - do: cat.thread_pool: - h: id,bulk.type,bulk.active,bulk.size,bulk.queue,bulk.queueSize,bulk.rejected,bulk.largest,bulk.completed,bulk.min,bulk.max,bulk.keepAlive + h: id,write.type,write.active,write.size,write.queue,write.queueSize,write.rejected,write.largest,write.completed,write.min,write.max,write.keepAlive v: true - match: $body: | - /^ id \s+ bulk.type \s+ bulk.active \s+ bulk.size \s+ bulk.queue \s+ bulk.queueSize \s+ bulk.rejected \s+ bulk.largest \s+ bulk.completed \s+ bulk.min \s+ bulk.max \s+ bulk.keepAlive \s+ \n + /^ id \s+ write.type \s+ write.active \s+ write.size \s+ write.queue \s+ write.queueSize \s+ write.rejected \s+ write.largest \s+ write.completed \s+ write.min \s+ write.max \s+ write.keepAlive \s+ \n (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - do: - cat.thread_pool: - h: id,flush.type,flush.active,flush.size,flush.queue,flush.queueSize,flush.rejected,flush.largest,flush.completed,flush.min,flush.max,flush.keepAlive - v: true - - - match: - $body: | - /^ id \s+ flush.type \s+ flush.active \s+ flush.size \s+ flush.queue \s+ flush.queueSize \s+ flush.rejected \s+ flush.largest \s+ flush.completed \s+ flush.min \s+ flush.max \s+ flush.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,generic.type,generic.active,generic.size,generic.queue,generic.queueSize,generic.rejected,generic.largest,generic.completed,generic.min,generic.max,generic.keepAlive - v: true - - - match: - $body: | - /^ id \s+ generic.type \s+ generic.active \s+ generic.size \s+ generic.queue \s+ generic.queueSize \s+ generic.rejected \s+ generic.largest \s+ generic.completed \s+ generic.min \s+ generic.max \s+ generic.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,get.type,get.active,get.size,get.queue,get.queueSize,get.rejected,get.largest,get.completed,get.min,get.max,get.keepAlive - v: true - - - match: - $body: | - /^ id \s+ get.type \s+ get.active \s+ get.size \s+ get.queue \s+ get.queueSize \s+ get.rejected \s+ get.largest \s+ get.completed \s+ get.min \s+ get.max \s+ get.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,index.type,index.active,index.size,index.queue,index.queueSize,index.rejected,index.largest,index.completed,index.min,index.max,index.keepAlive - v: true - - - match: - $body: | - /^ id \s+ index.type \s+ index.active \s+ index.size \s+ index.queue \s+ index.queueSize \s+ index.rejected \s+ index.largest \s+ index.completed \s+ index.min \s+ index.max \s+ index.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,management.type,management.active,management.size,management.queue,management.queueSize,management.rejected,management.largest,management.completed,management.min,management.max,management.keepAlive - v: true - - - match: - $body: | - /^ id \s+ management.type \s+ management.active \s+ management.size \s+ management.queue \s+ management.queueSize \s+ management.rejected \s+ management.largest \s+ management.completed \s+ management.min \s+ management.max \s+ management.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,optimize.type,optimize.active,optimize.size,optimize.queue,optimize.queueSize,optimize.rejected,optimize.largest,optimize.completed,optimize.min,optimize.max,optimize.keepAlive - v: true - - - match: - $body: | - /^ id \s+ optimize.type \s+ optimize.active \s+ optimize.size \s+ optimize.queue \s+ optimize.queueSize \s+ optimize.rejected \s+ optimize.largest \s+ optimize.completed \s+ optimize.min \s+ optimize.max \s+ optimize.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,percolate.type,percolate.active,percolate.size,percolate.queue,percolate.queueSize,percolate.rejected,percolate.largest,percolate.completed,percolate.min,percolate.max,percolate.keepAlive - v: true - - - match: - $body: | - /^ id \s+ percolate.type \s+ percolate.active \s+ percolate.size \s+ percolate.queue \s+ percolate.queueSize \s+ percolate.rejected \s+ percolate.largest \s+ percolate.completed \s+ percolate.min \s+ percolate.max \s+ percolate.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,refresh.type,refresh.active,refresh.size,refresh.queue,refresh.queueSize,refresh.rejected,refresh.largest,refresh.completed,refresh.min,refresh.max,refresh.keepAlive - v: true - - - match: - $body: | - /^ id \s+ refresh.type \s+ refresh.active \s+ refresh.size \s+ refresh.queue \s+ refresh.queueSize \s+ refresh.rejected \s+ refresh.largest \s+ refresh.completed \s+ refresh.min \s+ refresh.max \s+ refresh.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,search.type,search.active,search.size,search.queue,search.queueSize,search.rejected,search.largest,search.completed,search.min,search.max,search.keepAlive - v: true - - - match: - $body: | - /^ id \s+ search.type \s+ search.active \s+ search.size \s+ search.queue \s+ search.queueSize \s+ search.rejected \s+ search.largest \s+ search.completed \s+ search.min \s+ search.max \s+ search.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,snapshot.type,snapshot.active,snapshot.size,snapshot.queue,snapshot.queueSize,snapshot.rejected,snapshot.largest,snapshot.completed,snapshot.min,snapshot.max,snapshot.keepAlive - v: true - - - match: - $body: | - /^ id \s+ snapshot.type \s+ snapshot.active \s+ snapshot.size \s+ snapshot.queue \s+ snapshot.queueSize \s+ snapshot.rejected \s+ snapshot.largest \s+ snapshot.completed \s+ snapshot.min \s+ snapshot.max \s+ snapshot.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,suggest.type,suggest.active,suggest.size,suggest.queue,suggest.queueSize,suggest.rejected,suggest.largest,suggest.completed,suggest.min,suggest.max,suggest.keepAlive - v: true - - - match: - $body: | - /^ id \s+ suggest.type \s+ suggest.active \s+ suggest.size \s+ suggest.queue \s+ suggest.queueSize \s+ suggest.rejected \s+ suggest.largest \s+ suggest.completed \s+ suggest.min \s+ suggest.max \s+ suggest.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,warmer.type,warmer.active,warmer.size,warmer.queue,warmer.queueSize,warmer.rejected,warmer.largest,warmer.completed,warmer.min,warmer.max,warmer.keepAlive - v: true - - - match: - $body: | - /^ id \s+ warmer.type \s+ warmer.active \s+ warmer.size \s+ warmer.queue \s+ warmer.queueSize \s+ warmer.rejected \s+ warmer.largest \s+ warmer.completed \s+ warmer.min \s+ warmer.max \s+ warmer.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ From b8bf16faf886f62dc1cf89fb59f12d90aee49615 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Tue, 18 Aug 2015 13:11:05 +0200 Subject: [PATCH 2/3] Update documentation. --- docs/reference/modules/threadpool.asciidoc | 41 +++++----------------- 1 file changed, 9 insertions(+), 32 deletions(-) diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index cde176f342598..e216e533060ce 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -9,33 +9,18 @@ of discarded. There are several thread pools, but the important ones include: -`index`:: - For index/delete operations. Defaults to `fixed` +`write`:: + For index/delete/bulk/refresh/flush operations. Defaults to `fixed` with a size of `# of available processors`, queue_size of `200`. -`search`:: - For count/search operations. Defaults to `fixed` +`read`:: + For count/search/suggest/percolate operations. Defaults to `fixed` with a size of `int((# of available_processors * 3) / 2) + 1`, queue_size of `1000`. -`suggest`:: - For suggest operations. Defaults to `fixed` - with a size of `# of available processors`, - queue_size of `1000`. - `get`:: - For get operations. Defaults to `fixed` - with a size of `# of available processors`, - queue_size of `1000`. - -`bulk`:: - For bulk operations. Defaults to `fixed` - with a size of `# of available processors`, - queue_size of `50`. - -`percolate`:: - For percolate operations. Defaults to `fixed` + For get operations only. Defaults to `fixed` with a size of `# of available processors`, queue_size of `1000`. @@ -43,17 +28,9 @@ There are several thread pools, but the important ones include: For snapshot/restore operations. Defaults to `scaling` with a keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`. -`warmer`:: - For segment warm-up operations. Defaults to `scaling` with a - keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`. - -`refresh`:: - For refresh operations. Defaults to `scaling` with a - keep-alive of `5m` and a size of `min(10, (# of available processors)/2)`. - `listener`:: Mainly for java client executing of action when listener threaded is set to true. - Default size of `(# of available processors)/2`, max at 10. + Default size of `min(10, (# of available processors)/2)`. Changing a specific thread pool can be done by setting its type and specific type parameters, for example, changing the `index` thread pool @@ -62,7 +39,7 @@ to have more threads: [source,js] -------------------------------------------------- threadpool: - index: + write: type: fixed size: 30 -------------------------------------------------- @@ -110,7 +87,7 @@ full, it will abort the request. [source,js] -------------------------------------------------- threadpool: - index: + write: type: fixed size: 30 queue_size: 1000 @@ -129,7 +106,7 @@ around in the thread pool without it doing any work. [source,js] -------------------------------------------------- threadpool: - warmer: + snapshot: type: scaling size: 8 keep_alive: 2m From 0e6cd3053f58b3fc8b20cffbefe10999ee72820f Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Wed, 19 Aug 2015 14:11:12 +0200 Subject: [PATCH 3/3] Add INDEX thread pool back. --- .../rest/action/cat/RestThreadPoolAction.java | 2 ++ .../java/org/elasticsearch/threadpool/ThreadPool.java | 8 +++++--- .../threadpool/ThreadPoolSerializationTests.java | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java b/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java index 9314ab049b830..662a05a4f6e00 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java @@ -53,6 +53,7 @@ public class RestThreadPoolAction extends AbstractCatAction { ThreadPool.Names.FETCH_SHARD, ThreadPool.Names.GENERIC, ThreadPool.Names.GET, + ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.OPTIMIZE, ThreadPool.Names.READ, @@ -64,6 +65,7 @@ public class RestThreadPoolAction extends AbstractCatAction { "fs", "ge", "g", + "i", "ma", "o", "r", diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 5c3fdfda7f525..c64cd54dd7cb4 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -77,12 +77,14 @@ public static class Names { // or geo shapes, so we need to keep get in its own thread pool // to avoid dead locks public static final String GET = "get"; + // not an alias to WRITE in order to prevent heavy bulk requests from + // delaying lightweight create/index/delete operations + public static final String INDEX = "index"; // Aliases public static final String SEARCH = READ; public static final String SUGGEST = READ; public static final String PERCOLATE = READ; - public static final String INDEX = WRITE; public static final String BULK = WRITE; public static final String FLUSH = WRITE; public static final String REFRESH = WRITE; @@ -96,7 +98,6 @@ public static class Names { builder.put("suggest", SUGGEST); builder.put("percolate", PERCOLATE); builder.put("warmer", REFRESH); - builder.put("index", INDEX); builder.put("bulk", BULK); builder.put("flush", FLUSH); builder.put("refresh", REFRESH); @@ -137,7 +138,8 @@ public ThreadPool(Settings settings) { int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10); defaultExecutorTypeSettings = ImmutableMap.builder() .put(Names.GENERIC, settingsBuilder().put("type", "cached").put("keep_alive", "30s").build()) - .put(Names.WRITE, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 200).build()) + .put(Names.WRITE, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 50).build()) + .put(Names.INDEX, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 200).build()) .put(Names.READ, settingsBuilder().put("type", "fixed").put("size", ((availableProcessors * 3) / 2) + 1).put("queue_size", 1000).build()) .put(Names.GET, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build()) .put(Names.MANAGEMENT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 5).build()) diff --git a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java index b38554b3140b9..73782382c162e 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java @@ -94,7 +94,7 @@ public void testThatToXContentWritesOutUnboundedCorrectly() throws Exception { @Test public void testThatNegativeSettingAllowsToStart() throws InterruptedException { - Settings settings = settingsBuilder().put("name", "index").put("threadpool.write.queue_size", "-1").build(); + Settings settings = settingsBuilder().put("name", "index").put("threadpool.index.queue_size", "-1").build(); ThreadPool threadPool = new ThreadPool(settings); assertThat(threadPool.info(ThreadPool.Names.INDEX).getQueueSize(), is(nullValue())); terminate(threadPool);