diff --git a/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java b/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java index 2203c94bd5a74..336be368d38b7 100644 --- a/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java +++ b/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java @@ -49,8 +49,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.indices.IndicesWarmer; -import org.elasticsearch.indices.IndicesWarmer.TerminationHandle; -import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.indices.IndicesWarmer.WarmerContext; import java.io.Closeable; import java.io.IOException; @@ -58,9 +57,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; /** * This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time. @@ -241,9 +238,9 @@ public int hashCode() { final class BitDocIdSetFilterWarmer extends IndicesWarmer.Listener { @Override - public IndicesWarmer.TerminationHandle warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, IndicesWarmer.WarmerContext context, ThreadPool threadPool) { + public void warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, IndicesWarmer.WarmerContext context) { if (!loadRandomAccessFiltersEagerly) { - return TerminationHandle.NO_WAIT; + return; } boolean hasNested = false; @@ -267,43 +264,25 @@ public IndicesWarmer.TerminationHandle warmNewReaders(final IndexShard indexShar warmUp.add(Queries.newNonNestedFilter()); } - final Executor executor = threadPool.executor(executor()); - final CountDownLatch latch = new CountDownLatch(context.searcher().reader().leaves().size() * warmUp.size()); for (final LeafReaderContext ctx : context.searcher().reader().leaves()) { for (final Filter filterToWarm : warmUp) { - executor.execute(new Runnable() { - - @Override - public void run() { - try { - final long start = System.nanoTime(); - getAndLoadIfNotPresent(filterToWarm, ctx); - if (indexShard.warmerService().logger().isTraceEnabled()) { - indexShard.warmerService().logger().trace("warmed bitset for [{}], took [{}]", filterToWarm, TimeValue.timeValueNanos(System.nanoTime() - start)); - } - } catch (Throwable t) { - indexShard.warmerService().logger().warn("failed to load bitset for [{}]", t, filterToWarm); - } finally { - latch.countDown(); - } + try { + final long start = System.nanoTime(); + getAndLoadIfNotPresent(filterToWarm, ctx); + if (indexShard.warmerService().logger().isTraceEnabled()) { + indexShard.warmerService().logger().trace("warmed bitset for [{}], took [{}]", filterToWarm, TimeValue.timeValueNanos(System.nanoTime() - start)); } - - }); + } catch (Throwable t) { + indexShard.warmerService().logger().warn("failed to load bitset for [{}]", t, filterToWarm); + } } } - return new TerminationHandle() { - @Override - public void awaitTermination() throws InterruptedException { - latch.await(); - } - }; } @Override - public TerminationHandle warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, IndicesWarmer.WarmerContext context, ThreadPool threadPool) { - return TerminationHandle.NO_WAIT; + public void warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context) { + // no-op } - } Cache> getLoadedFilters() { diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java b/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java index 2b28842698d56..413786c335f1f 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java @@ -19,7 +19,6 @@ package org.elasticsearch.indices; -import com.google.common.collect.Lists; import org.apache.lucene.index.IndexReader; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -31,9 +30,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.threadpool.ThreadPool; -import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; @@ -43,8 +40,6 @@ public final class IndicesWarmer extends AbstractComponent { public static final String INDEX_WARMER_ENABLED = "index.warmer.enabled"; - private final ThreadPool threadPool; - private final ClusterService clusterService; private final IndicesService indicesService; @@ -52,9 +47,8 @@ public final class IndicesWarmer extends AbstractComponent { private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); @Inject - public IndicesWarmer(Settings settings, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService) { + public IndicesWarmer(Settings settings, ClusterService clusterService, IndicesService indicesService) { super(settings); - this.threadPool = threadPool; this.clusterService = clusterService; this.indicesService = indicesService; } @@ -100,27 +94,11 @@ private void warmInternal(final WarmerContext context, boolean topReader) { } indexShard.warmerService().onPreWarm(); long time = System.nanoTime(); - final List terminationHandles = Lists.newArrayList(); - // get a handle on pending tasks for (final Listener listener : listeners) { if (topReader) { - terminationHandles.add(listener.warmTopReader(indexShard, indexMetaData, context, threadPool)); + listener.warmTopReader(indexShard, indexMetaData, context); } else { - terminationHandles.add(listener.warmNewReaders(indexShard, indexMetaData, context, threadPool)); - } - } - // wait for termination - for (TerminationHandle terminationHandle : terminationHandles) { - try { - terminationHandle.awaitTermination(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - if (topReader) { - logger.warn("top warming has been interrupted", e); - } else { - logger.warn("warming has been interrupted", e); - } - break; + listener.warmNewReaders(indexShard, indexMetaData, context); } } long took = System.nanoTime() - time; @@ -134,27 +112,13 @@ private void warmInternal(final WarmerContext context, boolean topReader) { } } - /** A handle on the execution of warm-up action. */ - public interface TerminationHandle { - - public static TerminationHandle NO_WAIT = new TerminationHandle() { - @Override - public void awaitTermination() {} - }; - - /** Wait until execution of the warm-up action completes. */ - void awaitTermination() throws InterruptedException; - } public static abstract class Listener { - public String executor() { - return ThreadPool.Names.WARMER; - } - - /** Queue tasks to warm-up the given segments and return handles that allow to wait for termination of the execution of those tasks. */ - public abstract TerminationHandle warmNewReaders(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool); + /** Warm new leaf readers in the current thread. */ + public abstract void warmNewReaders(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context); - public abstract TerminationHandle warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool); + /** Warm the top reader in the current thread. */ + public abstract void warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context); } public static final class WarmerContext { diff --git a/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java b/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java index c4d241f027f54..662a05a4f6e00 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java @@ -50,34 +50,26 @@ public class RestThreadPoolAction extends AbstractCatAction { private final static String[] SUPPORTED_NAMES = new String[]{ - ThreadPool.Names.BULK, - ThreadPool.Names.FLUSH, + ThreadPool.Names.FETCH_SHARD, ThreadPool.Names.GENERIC, ThreadPool.Names.GET, ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.OPTIMIZE, - ThreadPool.Names.PERCOLATE, - ThreadPool.Names.REFRESH, - ThreadPool.Names.SEARCH, + ThreadPool.Names.READ, ThreadPool.Names.SNAPSHOT, - ThreadPool.Names.SUGGEST, - ThreadPool.Names.WARMER + ThreadPool.Names.WRITE }; private final static String[] SUPPORTED_ALIASES = new String[]{ - "b", - "f", + "fs", "ge", "g", "i", "ma", "o", - "p", "r", - "s", "sn", - "su", "w" }; @@ -86,9 +78,8 @@ public class RestThreadPoolAction extends AbstractCatAction { } private final static String[] DEFAULT_THREAD_POOLS = new String[]{ - ThreadPool.Names.BULK, - ThreadPool.Names.INDEX, - ThreadPool.Names.SEARCH, + ThreadPool.Names.WRITE, + ThreadPool.Names.READ }; private final static Map ALIAS_TO_THREAD_POOL; diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 4beacda97f132..0ce66c90ffa56 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -72,7 +72,6 @@ import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesWarmer; -import org.elasticsearch.indices.IndicesWarmer.TerminationHandle; import org.elasticsearch.indices.IndicesWarmer.WarmerContext; import org.elasticsearch.indices.cache.request.IndicesRequestCache; import org.elasticsearch.node.settings.NodeSettingsService; @@ -920,7 +919,7 @@ public int getActiveContexts() { static class NormsWarmer extends IndicesWarmer.Listener { @Override - public TerminationHandle warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, final WarmerContext context, ThreadPool threadPool) { + public void warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, final WarmerContext context) { final Loading defaultLoading = Loading.parse(indexMetaData.settings().get(NORMS_LOADING_KEY), Loading.LAZY); final MapperService mapperService = indexShard.mapperService(); final ObjectSet warmUp = new ObjectHashSet<>(); @@ -937,51 +936,35 @@ public TerminationHandle warmNewReaders(final IndexShard indexShard, IndexMetaDa } } - final CountDownLatch latch = new CountDownLatch(1); - // Norms loading may be I/O intensive but is not CPU intensive, so we execute it in a single task - threadPool.executor(executor()).execute(new Runnable() { - @Override - public void run() { - try { - for (ObjectCursor stringObjectCursor : warmUp) { - final String indexName = stringObjectCursor.value; - final long start = System.nanoTime(); - for (final LeafReaderContext ctx : context.searcher().reader().leaves()) { - final NumericDocValues values = ctx.reader().getNormValues(indexName); - if (values != null) { - values.get(0); - } - } - if (indexShard.warmerService().logger().isTraceEnabled()) { - indexShard.warmerService().logger().trace("warmed norms for [{}], took [{}]", indexName, TimeValue.timeValueNanos(System.nanoTime() - start)); - } + try { + for (ObjectCursor stringObjectCursor : warmUp) { + final String indexName = stringObjectCursor.value; + final long start = System.nanoTime(); + for (final LeafReaderContext ctx : context.searcher().reader().leaves()) { + final NumericDocValues values = ctx.reader().getNormValues(indexName); + if (values != null) { + values.get(0); } - } catch (Throwable t) { - indexShard.warmerService().logger().warn("failed to warm-up norms", t); - } finally { - latch.countDown(); + } + if (indexShard.warmerService().logger().isTraceEnabled()) { + indexShard.warmerService().logger().trace("warmed norms for [{}], took [{}]", indexName, TimeValue.timeValueNanos(System.nanoTime() - start)); } } - }); - - return new TerminationHandle() { - @Override - public void awaitTermination() throws InterruptedException { - latch.await(); - } - }; + } catch (Throwable t) { + indexShard.warmerService().logger().warn("failed to warm-up norms", t); + } } @Override - public TerminationHandle warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool) { - return TerminationHandle.NO_WAIT; + public void warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context) { + // no-op } } static class FieldDataWarmer extends IndicesWarmer.Listener { @Override - public TerminationHandle warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, final WarmerContext context, ThreadPool threadPool) { + public void warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, final WarmerContext context) { final MapperService mapperService = indexShard.mapperService(); final Map warmUp = new HashMap<>(); for (DocumentMapper docMapper : mapperService.docMappers(false)) { @@ -1002,40 +985,23 @@ public TerminationHandle warmNewReaders(final IndexShard indexShard, IndexMetaDa } } final IndexFieldDataService indexFieldDataService = indexShard.indexFieldDataService(); - final Executor executor = threadPool.executor(executor()); - final CountDownLatch latch = new CountDownLatch(context.searcher().reader().leaves().size() * warmUp.size()); for (final LeafReaderContext ctx : context.searcher().reader().leaves()) { for (final MappedFieldType fieldType : warmUp.values()) { - executor.execute(new Runnable() { - - @Override - public void run() { - try { - final long start = System.nanoTime(); - indexFieldDataService.getForField(fieldType).load(ctx); - if (indexShard.warmerService().logger().isTraceEnabled()) { - indexShard.warmerService().logger().trace("warmed fielddata for [{}], took [{}]", fieldType.names().fullName(), TimeValue.timeValueNanos(System.nanoTime() - start)); - } - } catch (Throwable t) { - indexShard.warmerService().logger().warn("failed to warm-up fielddata for [{}]", t, fieldType.names().fullName()); - } finally { - latch.countDown(); - } + try { + final long start = System.nanoTime(); + indexFieldDataService.getForField(fieldType).load(ctx); + if (indexShard.warmerService().logger().isTraceEnabled()) { + indexShard.warmerService().logger().trace("warmed fielddata for [{}], took [{}]", fieldType.names().fullName(), TimeValue.timeValueNanos(System.nanoTime() - start)); } - - }); + } catch (Throwable t) { + indexShard.warmerService().logger().warn("failed to warm-up fielddata for [{}]", t, fieldType.names().fullName()); + } } } - return new TerminationHandle() { - @Override - public void awaitTermination() throws InterruptedException { - latch.await(); - } - }; } @Override - public TerminationHandle warmTopReader(final IndexShard indexShard, IndexMetaData indexMetaData, final WarmerContext context, ThreadPool threadPool) { + public void warmTopReader(final IndexShard indexShard, IndexMetaData indexMetaData, final WarmerContext context) { final MapperService mapperService = indexShard.mapperService(); final Map warmUpGlobalOrdinals = new HashMap<>(); for (DocumentMapper docMapper : mapperService.docMappers(false)) { @@ -1055,104 +1021,70 @@ public TerminationHandle warmTopReader(final IndexShard indexShard, IndexMetaDat } } final IndexFieldDataService indexFieldDataService = indexShard.indexFieldDataService(); - final Executor executor = threadPool.executor(executor()); - final CountDownLatch latch = new CountDownLatch(warmUpGlobalOrdinals.size()); for (final MappedFieldType fieldType : warmUpGlobalOrdinals.values()) { - executor.execute(new Runnable() { - @Override - public void run() { - try { - final long start = System.nanoTime(); - IndexFieldData.Global ifd = indexFieldDataService.getForField(fieldType); - ifd.loadGlobal(context.reader()); - if (indexShard.warmerService().logger().isTraceEnabled()) { - indexShard.warmerService().logger().trace("warmed global ordinals for [{}], took [{}]", fieldType.names().fullName(), TimeValue.timeValueNanos(System.nanoTime() - start)); - } - } catch (Throwable t) { - indexShard.warmerService().logger().warn("failed to warm-up global ordinals for [{}]", t, fieldType.names().fullName()); - } finally { - latch.countDown(); - } + try { + final long start = System.nanoTime(); + IndexFieldData.Global ifd = indexFieldDataService.getForField(fieldType); + ifd.loadGlobal(context.reader()); + if (indexShard.warmerService().logger().isTraceEnabled()) { + indexShard.warmerService().logger().trace("warmed global ordinals for [{}], took [{}]", fieldType.names().fullName(), TimeValue.timeValueNanos(System.nanoTime() - start)); } - }); - } - return new TerminationHandle() { - @Override - public void awaitTermination() throws InterruptedException { - latch.await(); + } catch (Throwable t) { + indexShard.warmerService().logger().warn("failed to warm-up global ordinals for [{}]", t, fieldType.names().fullName()); } - }; + } } } class SearchWarmer extends IndicesWarmer.Listener { @Override - public TerminationHandle warmNewReaders(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool) { - return internalWarm(indexShard, indexMetaData, context, threadPool, false); + public void warmNewReaders(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context) { + internalWarm(indexShard, indexMetaData, context, threadPool, false); } @Override - public TerminationHandle warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool) { - return internalWarm(indexShard, indexMetaData, context, threadPool, true); + public void warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context) { + internalWarm(indexShard, indexMetaData, context, threadPool, true); } - public TerminationHandle internalWarm(final IndexShard indexShard, final IndexMetaData indexMetaData, final IndicesWarmer.WarmerContext warmerContext, ThreadPool threadPool, final boolean top) { + public void internalWarm(final IndexShard indexShard, final IndexMetaData indexMetaData, final IndicesWarmer.WarmerContext warmerContext, ThreadPool threadPool, final boolean top) { IndexWarmersMetaData custom = indexMetaData.custom(IndexWarmersMetaData.TYPE); if (custom == null) { - return TerminationHandle.NO_WAIT; + return ; } - final Executor executor = threadPool.executor(executor()); - final CountDownLatch latch = new CountDownLatch(custom.entries().size()); for (final IndexWarmersMetaData.Entry entry : custom.entries()) { - executor.execute(new Runnable() { - - @Override - public void run() { - SearchContext context = null; - try { - long now = System.nanoTime(); - ShardSearchRequest request = new ShardSearchLocalRequest(indexShard.shardId(), indexMetaData.numberOfShards(), - SearchType.QUERY_THEN_FETCH, entry.source(), entry.types(), entry.requestCache()); - context = createContext(request, warmerContext.searcher()); - // if we use sort, we need to do query to sort on it and load relevant field data - // if not, we might as well set size=0 (and cache if needed) - if (context.sort() == null) { - context.size(0); - } - boolean canCache = indicesQueryCache.canCache(request, context); - // early terminate when we can cache, since we can only do proper caching on top level searcher - // also, if we can't cache, and its top, we don't need to execute it, since we already did when its not top - if (canCache != top) { - return; - } - loadOrExecuteQueryPhase(request, context, queryPhase); - long took = System.nanoTime() - now; - if (indexShard.warmerService().logger().isTraceEnabled()) { - indexShard.warmerService().logger().trace("warmed [{}], took [{}]", entry.name(), TimeValue.timeValueNanos(took)); - } - } catch (Throwable t) { - indexShard.warmerService().logger().warn("warmer [{}] failed", t, entry.name()); - } finally { - try { - if (context != null) { - freeContext(context.id()); - cleanContext(context); - } - } finally { - latch.countDown(); - } - } + SearchContext context = null; + try { + long now = System.nanoTime(); + ShardSearchRequest request = new ShardSearchLocalRequest(indexShard.shardId(), indexMetaData.numberOfShards(), + SearchType.QUERY_THEN_FETCH, entry.source(), entry.types(), entry.requestCache()); + context = createContext(request, warmerContext.searcher()); + // if we use sort, we need to do query to sort on it and load relevant field data + // if not, we might as well set size=0 (and cache if needed) + if (context.sort() == null) { + context.size(0); + } + boolean canCache = indicesQueryCache.canCache(request, context); + // early terminate when we can cache, since we can only do proper caching on top level searcher + // also, if we can't cache, and its top, we don't need to execute it, since we already did when its not top + if (canCache != top) { + return; + } + loadOrExecuteQueryPhase(request, context, queryPhase); + long took = System.nanoTime() - now; + if (indexShard.warmerService().logger().isTraceEnabled()) { + indexShard.warmerService().logger().trace("warmed [{}], took [{}]", entry.name(), TimeValue.timeValueNanos(took)); + } + } catch (Throwable t) { + indexShard.warmerService().logger().warn("warmer [{}] failed", t, entry.name()); + } finally { + if (context != null) { + freeContext(context.id()); + cleanContext(context); } - - }); - } - return new TerminationHandle() { - @Override - public void awaitTermination() throws InterruptedException { - latch.await(); } - }; + } } } diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 7c01367c0169f..c64cd54dd7cb4 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -21,8 +21,10 @@ import com.google.common.base.Objects; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Maps; import com.google.common.util.concurrent.MoreExecutors; + import org.apache.lucene.util.Counter; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; @@ -60,23 +62,49 @@ public class ThreadPool extends AbstractComponent { public static class Names { + // Actual threadpools public static final String SAME = "same"; public static final String GENERIC = "generic"; public static final String LISTENER = "listener"; - public static final String GET = "get"; - public static final String INDEX = "index"; - public static final String BULK = "bulk"; - public static final String SEARCH = "search"; - public static final String SUGGEST = "suggest"; - public static final String PERCOLATE = "percolate"; public static final String MANAGEMENT = "management"; - public static final String FLUSH = "flush"; - public static final String REFRESH = "refresh"; - public static final String WARMER = "warmer"; + public static final String READ = "read"; + public static final String WRITE = "write"; public static final String SNAPSHOT = "snapshot"; public static final String OPTIMIZE = "optimize"; - public static final String FETCH_SHARD_STARTED = "fetch_shard_started"; - public static final String FETCH_SHARD_STORE = "fetch_shard_store"; + public static final String FETCH_SHARD = "fetch_shard"; + // this would ideally be an alias to "read" but some search + // requests block on get requests to retrieve eg. indexed scripts + // or geo shapes, so we need to keep get in its own thread pool + // to avoid dead locks + public static final String GET = "get"; + // not an alias to WRITE in order to prevent heavy bulk requests from + // delaying lightweight create/index/delete operations + public static final String INDEX = "index"; + + // Aliases + public static final String SEARCH = READ; + public static final String SUGGEST = READ; + public static final String PERCOLATE = READ; + public static final String BULK = WRITE; + public static final String FLUSH = WRITE; + public static final String REFRESH = WRITE; + public static final String FETCH_SHARD_STARTED = FETCH_SHARD; + public static final String FETCH_SHARD_STORE = FETCH_SHARD; + + private static Map DEPRECATED_THREADPOOLS; + static { + Builder builder = ImmutableMap.builder(); + builder.put("search", SEARCH); + builder.put("suggest", SUGGEST); + builder.put("percolate", PERCOLATE); + builder.put("warmer", REFRESH); + builder.put("bulk", BULK); + builder.put("flush", FLUSH); + builder.put("refresh", REFRESH); + builder.put("fetch_shard_started", FETCH_SHARD_STARTED); + builder.put("fetch_shard_store", FETCH_SHARD_STORE); + DEPRECATED_THREADPOOLS = builder.build(); + } } public static final String THREADPOOL_GROUP = "threadpool."; @@ -110,23 +138,17 @@ public ThreadPool(Settings settings) { int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10); defaultExecutorTypeSettings = ImmutableMap.builder() .put(Names.GENERIC, settingsBuilder().put("type", "cached").put("keep_alive", "30s").build()) + .put(Names.WRITE, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 50).build()) .put(Names.INDEX, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 200).build()) - .put(Names.BULK, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 50).build()) + .put(Names.READ, settingsBuilder().put("type", "fixed").put("size", ((availableProcessors * 3) / 2) + 1).put("queue_size", 1000).build()) .put(Names.GET, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build()) - .put(Names.SEARCH, settingsBuilder().put("type", "fixed").put("size", ((availableProcessors * 3) / 2) + 1).put("queue_size", 1000).build()) - .put(Names.SUGGEST, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build()) - .put(Names.PERCOLATE, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build()) .put(Names.MANAGEMENT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 5).build()) // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side .put(Names.LISTENER, settingsBuilder().put("type", "fixed").put("size", halfProcMaxAt10).build()) - .put(Names.FLUSH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) - .put(Names.REFRESH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build()) - .put(Names.WARMER, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) .put(Names.SNAPSHOT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) - .put(Names.OPTIMIZE, settingsBuilder().put("type", "fixed").put("size", 1).build()) - .put(Names.FETCH_SHARD_STARTED, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", availableProcessors * 2).build()) - .put(Names.FETCH_SHARD_STORE, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", availableProcessors * 2).build()) + .put(Names.OPTIMIZE, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 1).build()) + .put(Names.FETCH_SHARD, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", availableProcessors * 2).build()) .build(); Map executors = Maps.newHashMap(); @@ -136,7 +158,13 @@ public ThreadPool(Settings settings) { // Building custom thread pools for (Map.Entry entry : groupSettings.entrySet()) { - if (executors.containsKey(entry.getKey())) { + final String threadPoolName = entry.getKey(); + if (executors.containsKey(threadPoolName)) { + continue; + } + if (Names.DEPRECATED_THREADPOOLS.containsKey(threadPoolName)) { + final String replacement = Names.DEPRECATED_THREADPOOLS.get(threadPoolName); + deprecationLogger.deprecated("threadpool [{}] has been merged together with other threadpools into the [{}] threadpool", threadPoolName, replacement); continue; } executors.put(entry.getKey(), build(entry.getKey(), entry.getValue(), Settings.EMPTY)); @@ -446,11 +474,17 @@ public void updateSettings(Settings settings) { // Building custom thread pools for (Map.Entry entry : groupSettings.entrySet()) { - if (defaultExecutorTypeSettings.containsKey(entry.getKey())) { + final String threadPoolName = entry.getKey(); + if (defaultExecutorTypeSettings.containsKey(threadPoolName)) { + continue; + } + if (Names.DEPRECATED_THREADPOOLS.containsKey(threadPoolName)) { + final String replacement = Names.DEPRECATED_THREADPOOLS.get(threadPoolName); + deprecationLogger.deprecated("threadpool [{}] has been merged together with other threadpools into the [{}] threadpool", threadPoolName, replacement); continue; } - ExecutorHolder oldExecutorHolder = executors.get(entry.getKey()); + ExecutorHolder oldExecutorHolder = executors.get(threadPoolName); ExecutorHolder newExecutorHolder = rebuild(entry.getKey(), oldExecutorHolder, entry.getValue(), Settings.EMPTY); // Can't introduce new thread pools at runtime, because The oldExecutorHolder variable will be null in the // case the settings contains a thread pool not defined in the initial settings in the constructor. The if diff --git a/core/src/test/java/org/elasticsearch/action/RejectionActionIT.java b/core/src/test/java/org/elasticsearch/action/RejectionActionIT.java index ff094e09a4c95..63188782c0fd2 100644 --- a/core/src/test/java/org/elasticsearch/action/RejectionActionIT.java +++ b/core/src/test/java/org/elasticsearch/action/RejectionActionIT.java @@ -47,12 +47,10 @@ public class RejectionActionIT extends ESIntegTestCase { protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - .put("threadpool.search.size", 1) - .put("threadpool.search.queue_size", 1) - .put("threadpool.index.size", 1) - .put("threadpool.index.queue_size", 1) - .put("threadpool.get.size", 1) - .put("threadpool.get.queue_size", 1) + .put("threadpool.read.size", 1) + .put("threadpool.read.queue_size", 1) + .put("threadpool.write.size", 1) + .put("threadpool.write.queue_size", 1) .build(); } diff --git a/core/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java b/core/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java index b5b4b94a8a158..877554dfd7e7e 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java +++ b/core/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java @@ -39,9 +39,9 @@ public class SearchWithRejectionsIT extends ESIntegTestCase { @Override public Settings nodeSettings(int nodeOrdinal) { return settingsBuilder().put(super.nodeSettings(nodeOrdinal)) - .put("threadpool.search.type", "fixed") - .put("threadpool.search.size", 1) - .put("threadpool.search.queue_size", 1) + .put("threadpool.read.type", "fixed") + .put("threadpool.read.size", 1) + .put("threadpool.read.queue_size", 1) .build(); } diff --git a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 8ec6f89e44bc4..14f0da0449570 100644 --- a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -407,10 +407,8 @@ private static Settings getRandomNodeSettings(long seed) { } if (random.nextBoolean()) { // change threadpool types to make sure we don't have components that rely on the type of thread pools - for (String name : Arrays.asList(ThreadPool.Names.BULK, ThreadPool.Names.FLUSH, ThreadPool.Names.GET, - ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.OPTIMIZE, - ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT, - ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) { + for (String name : Arrays.asList(ThreadPool.Names.WRITE, ThreadPool.Names.READ, ThreadPool.Names.MANAGEMENT, + ThreadPool.Names.OPTIMIZE, ThreadPool.Names.SNAPSHOT, ThreadPool.Names.LISTENER, ThreadPool.Names.FETCH_SHARD)) { if (random.nextBoolean()) { final String type = RandomPicks.randomFrom(random, Arrays.asList("fixed", "cached", "scaling")); builder.put(ThreadPool.THREADPOOL_GROUP + name + ".type", type); diff --git a/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java b/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java index d0f1fa2027a94..da6bfb7af919e 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java +++ b/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java @@ -63,7 +63,7 @@ public class SimpleThreadPoolIT extends ESIntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal) { - return Settings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("threadpool.search.type", "cached").build(); + return Settings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("threadpool.read.type", "cached").build(); } @Test @@ -131,7 +131,7 @@ public void testUpdatingThreadPoolSettings() throws Exception { ThreadPool threadPool = internalCluster().getDataNodeInstance(ThreadPool.class); // Check that settings are changed assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(5L)); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.keep_alive", "10m").build()).execute().actionGet(); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.read.keep_alive", "10m").build()).execute().actionGet(); assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); // Make sure that threads continue executing when executor is replaced @@ -149,7 +149,7 @@ public void run() { } } }); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet(); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.read.type", "fixed").build()).execute().actionGet(); assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor))); assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true)); assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true)); @@ -169,7 +169,7 @@ public void run() { } } }); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet(); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.read.type", "fixed").build()).execute().actionGet(); barrier.await(); Thread.sleep(200); diff --git a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java index 3226662f686b3..73782382c162e 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java @@ -96,7 +96,7 @@ public void testThatToXContentWritesOutUnboundedCorrectly() throws Exception { public void testThatNegativeSettingAllowsToStart() throws InterruptedException { Settings settings = settingsBuilder().put("name", "index").put("threadpool.index.queue_size", "-1").build(); ThreadPool threadPool = new ThreadPool(settings); - assertThat(threadPool.info("index").getQueueSize(), is(nullValue())); + assertThat(threadPool.info(ThreadPool.Names.INDEX).getQueueSize(), is(nullValue())); terminate(threadPool); } } diff --git a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index 9330f71996d24..12bed597500dc 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -52,7 +52,7 @@ private ThreadPool.Info info(ThreadPool threadPool, String name) { public void testCachedExecutorType() throws InterruptedException { ThreadPool threadPool = new ThreadPool( Settings.settingsBuilder() - .put("threadpool.search.type", "cached") + .put("threadpool.read.type", "cached") .put("name","testCachedExecutorType").build()); assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); @@ -60,14 +60,14 @@ public void testCachedExecutorType() throws InterruptedException { assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); // Replace with different type - threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "same").build()); + threadPool.updateSettings(settingsBuilder().put("threadpool.read.type", "same").build()); assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("same")); assertThat(threadPool.executor(Names.SEARCH), instanceOf(MoreExecutors.directExecutor().getClass())); // Replace with different type again threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.keep_alive", "10m") + .put("threadpool.read.type", "scaling") + .put("threadpool.read.keep_alive", "10m") .build()); assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); @@ -77,7 +77,7 @@ public void testCachedExecutorType() throws InterruptedException { assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); // Put old type back - threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "cached").build()); + threadPool.updateSettings(settingsBuilder().put("threadpool.read.type", "cached").build()); assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); // Make sure keep alive value reused assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); @@ -85,7 +85,7 @@ public void testCachedExecutorType() throws InterruptedException { // Change keep alive Executor oldExecutor = threadPool.executor(Names.SEARCH); - threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build()); + threadPool.updateSettings(settingsBuilder().put("threadpool.read.keep_alive", "1m").build()); // Make sure keep alive value changed assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); @@ -94,7 +94,7 @@ public void testCachedExecutorType() throws InterruptedException { assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); // Set the same keep alive - threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build()); + threadPool.updateSettings(settingsBuilder().put("threadpool.read.keep_alive", "1m").build()); // Make sure keep alive value didn't change assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); @@ -107,17 +107,17 @@ public void testCachedExecutorType() throws InterruptedException { @Test public void testFixedExecutorType() throws InterruptedException { ThreadPool threadPool = new ThreadPool(settingsBuilder() - .put("threadpool.search.type", "fixed") + .put("threadpool.read.type", "fixed") .put("name","testCachedExecutorType").build()); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); // Replace with different type threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.keep_alive", "10m") - .put("threadpool.search.min", "2") - .put("threadpool.search.size", "15") + .put("threadpool.read.type", "scaling") + .put("threadpool.read.keep_alive", "10m") + .put("threadpool.read.min", "2") + .put("threadpool.read.size", "15") .build()); assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); @@ -131,7 +131,7 @@ public void testFixedExecutorType() throws InterruptedException { // Put old type back threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "fixed") + .put("threadpool.read.type", "fixed") .build()); assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed")); // Make sure keep alive value is not used @@ -145,7 +145,7 @@ public void testFixedExecutorType() throws InterruptedException { // Change size Executor oldExecutor = threadPool.executor(Names.SEARCH); - threadPool.updateSettings(settingsBuilder().put("threadpool.search.size", "10").build()); + threadPool.updateSettings(settingsBuilder().put("threadpool.read.size", "10").build()); // Make sure size values changed assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10)); assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(10)); @@ -157,7 +157,7 @@ public void testFixedExecutorType() throws InterruptedException { // Change queue capacity threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.queue", "500") + .put("threadpool.read.queue", "500") .build()); terminate(threadPool); @@ -167,8 +167,8 @@ public void testFixedExecutorType() throws InterruptedException { @Test public void testScalingExecutorType() throws InterruptedException { ThreadPool threadPool = new ThreadPool(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.size", 10) + .put("threadpool.read.type", "scaling") + .put("threadpool.read.size", 10) .put("name","testCachedExecutorType").build()); assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(1)); @@ -180,10 +180,10 @@ public void testScalingExecutorType() throws InterruptedException { // Change settings that doesn't require pool replacement Executor oldExecutor = threadPool.executor(Names.SEARCH); threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.keep_alive", "10m") - .put("threadpool.search.min", "2") - .put("threadpool.search.size", "15") + .put("threadpool.read.type", "scaling") + .put("threadpool.read.keep_alive", "10m") + .put("threadpool.read.min", "2") + .put("threadpool.read.size", "15") .build()); assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); @@ -202,7 +202,7 @@ public void testScalingExecutorType() throws InterruptedException { @Test(timeout = 10000) public void testShutdownDownNowDoesntBlock() throws Exception { ThreadPool threadPool = new ThreadPool(Settings.settingsBuilder() - .put("threadpool.search.type", "cached") + .put("threadpool.read.type", "cached") .put("name","testCachedExecutorType").build()); final CountDownLatch latch = new CountDownLatch(1); @@ -218,7 +218,7 @@ public void run() { } } }); - threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()); + threadPool.updateSettings(settingsBuilder().put("threadpool.read.type", "fixed").build()); assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor))); assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true)); assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true)); diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index cde176f342598..e216e533060ce 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -9,33 +9,18 @@ of discarded. There are several thread pools, but the important ones include: -`index`:: - For index/delete operations. Defaults to `fixed` +`write`:: + For index/delete/bulk/refresh/flush operations. Defaults to `fixed` with a size of `# of available processors`, queue_size of `200`. -`search`:: - For count/search operations. Defaults to `fixed` +`read`:: + For count/search/suggest/percolate operations. Defaults to `fixed` with a size of `int((# of available_processors * 3) / 2) + 1`, queue_size of `1000`. -`suggest`:: - For suggest operations. Defaults to `fixed` - with a size of `# of available processors`, - queue_size of `1000`. - `get`:: - For get operations. Defaults to `fixed` - with a size of `# of available processors`, - queue_size of `1000`. - -`bulk`:: - For bulk operations. Defaults to `fixed` - with a size of `# of available processors`, - queue_size of `50`. - -`percolate`:: - For percolate operations. Defaults to `fixed` + For get operations only. Defaults to `fixed` with a size of `# of available processors`, queue_size of `1000`. @@ -43,17 +28,9 @@ There are several thread pools, but the important ones include: For snapshot/restore operations. Defaults to `scaling` with a keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`. -`warmer`:: - For segment warm-up operations. Defaults to `scaling` with a - keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`. - -`refresh`:: - For refresh operations. Defaults to `scaling` with a - keep-alive of `5m` and a size of `min(10, (# of available processors)/2)`. - `listener`:: Mainly for java client executing of action when listener threaded is set to true. - Default size of `(# of available processors)/2`, max at 10. + Default size of `min(10, (# of available processors)/2)`. Changing a specific thread pool can be done by setting its type and specific type parameters, for example, changing the `index` thread pool @@ -62,7 +39,7 @@ to have more threads: [source,js] -------------------------------------------------- threadpool: - index: + write: type: fixed size: 30 -------------------------------------------------- @@ -110,7 +87,7 @@ full, it will abort the request. [source,js] -------------------------------------------------- threadpool: - index: + write: type: fixed size: 30 queue_size: 1000 @@ -129,7 +106,7 @@ around in the thread pool without it doing any work. [source,js] -------------------------------------------------- threadpool: - warmer: + snapshot: type: scaling size: 8 keep_alive: 2m diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yaml index edb87ce27b9b1..8ffa133b0a952 100755 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yaml @@ -7,8 +7,8 @@ - match: $body: | - / #host ip bulk.active bulk.queue bulk.rejected index.active index.queue index.rejected search.active search.queue search.rejected - ^ (\S+ \s+ (\d{1,3}\.){3}\d{1,3} \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ + / #host ip read.active read.queue read.rejected write.active write.queue write.rejected + ^ (\S+ \s+ (\d{1,3}\.){3}\d{1,3} \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ - do: cat.thread_pool: @@ -16,8 +16,8 @@ - match: $body: | - /^ host \s+ ip \s+ bulk.active \s+ bulk.queue \s+ bulk.rejected \s+ index.active \s+ index.queue \s+ index.rejected \s+ search.active \s+ search.queue \s+ search.rejected \s+ \n - (\S+ \s+ (\d{1,3}\.){3}\d{1,3} \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ + /^ host \s+ ip \s+ read.active \s+ read.queue \s+ read.rejected \s+ write.active \s+ write.queue \s+ write.rejected \s+ \n + (\S+ \s+ (\d{1,3}\.){3}\d{1,3} \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ - do: cat.thread_pool: @@ -31,141 +31,22 @@ - do: cat.thread_pool: - h: id,ba,fa,gea,ga,ia,maa,ma,oa,pa + h: id,fsa,gea,ga,maa,oa,ra,sna,wa v: true full_id: true - match: $body: | - /^ id \s+ ba \s+ fa \s+ gea \s+ ga \s+ ia \s+ maa \s+ oa \s+ pa \s+ \n - (\S+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ + /^ id \s+ fsa \s+ gea \s+ ga \s+ maa \s+ oa \s+ ra \s+ sna \s+ wa \s+ \n + (\S+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ - do: cat.thread_pool: - h: id,bulk.type,bulk.active,bulk.size,bulk.queue,bulk.queueSize,bulk.rejected,bulk.largest,bulk.completed,bulk.min,bulk.max,bulk.keepAlive + h: id,write.type,write.active,write.size,write.queue,write.queueSize,write.rejected,write.largest,write.completed,write.min,write.max,write.keepAlive v: true - match: $body: | - /^ id \s+ bulk.type \s+ bulk.active \s+ bulk.size \s+ bulk.queue \s+ bulk.queueSize \s+ bulk.rejected \s+ bulk.largest \s+ bulk.completed \s+ bulk.min \s+ bulk.max \s+ bulk.keepAlive \s+ \n + /^ id \s+ write.type \s+ write.active \s+ write.size \s+ write.queue \s+ write.queueSize \s+ write.rejected \s+ write.largest \s+ write.completed \s+ write.min \s+ write.max \s+ write.keepAlive \s+ \n (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - do: - cat.thread_pool: - h: id,flush.type,flush.active,flush.size,flush.queue,flush.queueSize,flush.rejected,flush.largest,flush.completed,flush.min,flush.max,flush.keepAlive - v: true - - - match: - $body: | - /^ id \s+ flush.type \s+ flush.active \s+ flush.size \s+ flush.queue \s+ flush.queueSize \s+ flush.rejected \s+ flush.largest \s+ flush.completed \s+ flush.min \s+ flush.max \s+ flush.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,generic.type,generic.active,generic.size,generic.queue,generic.queueSize,generic.rejected,generic.largest,generic.completed,generic.min,generic.max,generic.keepAlive - v: true - - - match: - $body: | - /^ id \s+ generic.type \s+ generic.active \s+ generic.size \s+ generic.queue \s+ generic.queueSize \s+ generic.rejected \s+ generic.largest \s+ generic.completed \s+ generic.min \s+ generic.max \s+ generic.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,get.type,get.active,get.size,get.queue,get.queueSize,get.rejected,get.largest,get.completed,get.min,get.max,get.keepAlive - v: true - - - match: - $body: | - /^ id \s+ get.type \s+ get.active \s+ get.size \s+ get.queue \s+ get.queueSize \s+ get.rejected \s+ get.largest \s+ get.completed \s+ get.min \s+ get.max \s+ get.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,index.type,index.active,index.size,index.queue,index.queueSize,index.rejected,index.largest,index.completed,index.min,index.max,index.keepAlive - v: true - - - match: - $body: | - /^ id \s+ index.type \s+ index.active \s+ index.size \s+ index.queue \s+ index.queueSize \s+ index.rejected \s+ index.largest \s+ index.completed \s+ index.min \s+ index.max \s+ index.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,management.type,management.active,management.size,management.queue,management.queueSize,management.rejected,management.largest,management.completed,management.min,management.max,management.keepAlive - v: true - - - match: - $body: | - /^ id \s+ management.type \s+ management.active \s+ management.size \s+ management.queue \s+ management.queueSize \s+ management.rejected \s+ management.largest \s+ management.completed \s+ management.min \s+ management.max \s+ management.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,optimize.type,optimize.active,optimize.size,optimize.queue,optimize.queueSize,optimize.rejected,optimize.largest,optimize.completed,optimize.min,optimize.max,optimize.keepAlive - v: true - - - match: - $body: | - /^ id \s+ optimize.type \s+ optimize.active \s+ optimize.size \s+ optimize.queue \s+ optimize.queueSize \s+ optimize.rejected \s+ optimize.largest \s+ optimize.completed \s+ optimize.min \s+ optimize.max \s+ optimize.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,percolate.type,percolate.active,percolate.size,percolate.queue,percolate.queueSize,percolate.rejected,percolate.largest,percolate.completed,percolate.min,percolate.max,percolate.keepAlive - v: true - - - match: - $body: | - /^ id \s+ percolate.type \s+ percolate.active \s+ percolate.size \s+ percolate.queue \s+ percolate.queueSize \s+ percolate.rejected \s+ percolate.largest \s+ percolate.completed \s+ percolate.min \s+ percolate.max \s+ percolate.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,refresh.type,refresh.active,refresh.size,refresh.queue,refresh.queueSize,refresh.rejected,refresh.largest,refresh.completed,refresh.min,refresh.max,refresh.keepAlive - v: true - - - match: - $body: | - /^ id \s+ refresh.type \s+ refresh.active \s+ refresh.size \s+ refresh.queue \s+ refresh.queueSize \s+ refresh.rejected \s+ refresh.largest \s+ refresh.completed \s+ refresh.min \s+ refresh.max \s+ refresh.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,search.type,search.active,search.size,search.queue,search.queueSize,search.rejected,search.largest,search.completed,search.min,search.max,search.keepAlive - v: true - - - match: - $body: | - /^ id \s+ search.type \s+ search.active \s+ search.size \s+ search.queue \s+ search.queueSize \s+ search.rejected \s+ search.largest \s+ search.completed \s+ search.min \s+ search.max \s+ search.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,snapshot.type,snapshot.active,snapshot.size,snapshot.queue,snapshot.queueSize,snapshot.rejected,snapshot.largest,snapshot.completed,snapshot.min,snapshot.max,snapshot.keepAlive - v: true - - - match: - $body: | - /^ id \s+ snapshot.type \s+ snapshot.active \s+ snapshot.size \s+ snapshot.queue \s+ snapshot.queueSize \s+ snapshot.rejected \s+ snapshot.largest \s+ snapshot.completed \s+ snapshot.min \s+ snapshot.max \s+ snapshot.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,suggest.type,suggest.active,suggest.size,suggest.queue,suggest.queueSize,suggest.rejected,suggest.largest,suggest.completed,suggest.min,suggest.max,suggest.keepAlive - v: true - - - match: - $body: | - /^ id \s+ suggest.type \s+ suggest.active \s+ suggest.size \s+ suggest.queue \s+ suggest.queueSize \s+ suggest.rejected \s+ suggest.largest \s+ suggest.completed \s+ suggest.min \s+ suggest.max \s+ suggest.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - - do: - cat.thread_pool: - h: id,warmer.type,warmer.active,warmer.size,warmer.queue,warmer.queueSize,warmer.rejected,warmer.largest,warmer.completed,warmer.min,warmer.max,warmer.keepAlive - v: true - - - match: - $body: | - /^ id \s+ warmer.type \s+ warmer.active \s+ warmer.size \s+ warmer.queue \s+ warmer.queueSize \s+ warmer.rejected \s+ warmer.largest \s+ warmer.completed \s+ warmer.min \s+ warmer.max \s+ warmer.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/