diff --git a/docs/reference/cat/thread_pool.asciidoc b/docs/reference/cat/thread_pool.asciidoc index 63c4a3939e763..a5c71ac271b8d 100644 --- a/docs/reference/cat/thread_pool.asciidoc +++ b/docs/reference/cat/thread_pool.asciidoc @@ -22,7 +22,6 @@ node-0 flush 0 0 0 node-0 force_merge 0 0 0 node-0 generic 0 0 0 node-0 get 0 0 0 -node-0 index 0 0 0 node-0 listener 0 0 0 node-0 management 1 0 0 node-0 refresh 0 0 0 @@ -52,7 +51,6 @@ flush force_merge generic get -index listener management refresh diff --git a/docs/reference/migration/migrate_7_0/settings.asciidoc b/docs/reference/migration/migrate_7_0/settings.asciidoc index 1035bc73393ac..1556056337b37 100644 --- a/docs/reference/migration/migrate_7_0/settings.asciidoc +++ b/docs/reference/migration/migrate_7_0/settings.asciidoc @@ -5,4 +5,12 @@ ==== Percolator * The deprecated `index.percolator.map_unmapped_fields_as_string` setting has been removed in favour of - the `index.percolator.map_unmapped_fields_as_text` setting. \ No newline at end of file + the `index.percolator.map_unmapped_fields_as_text` setting. + +==== Index thread pool + +* Internally, single-document index/delete/update requests are executed as bulk + requests with a single-document payload. This means that these requests are + executed on the bulk thread pool. As such, the indexing thread pool is no + longer needed and has been removed. As such, the settings + `thread_pool.index.size` and `thread_pool.index.queue_size` have been removed. \ No newline at end of file diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index fa8522ea1cbb0..b85cda1aa3685 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -13,12 +13,6 @@ There are several thread pools, but the important ones include: For generic operations (e.g., background node discovery). Thread pool type is `scaling`. -`index`:: - For index/delete operations. Thread pool type is `fixed` - with a size of `# of available processors`, - queue_size of `200`. The maximum size for this pool - is `1 + # of available processors`. - `search`:: For count/search/suggest operations. Thread pool type is `fixed_auto_queue_size` with a size of @@ -55,13 +49,13 @@ There are several thread pools, but the important ones include: Mainly for java client executing of action when listener threaded is set to true. Thread pool type is `scaling` with a default max of `min(10, (# of available processors)/2)`. -Changing a specific thread pool can be done by setting its type-specific parameters; for example, changing the `index` +Changing a specific thread pool can be done by setting its type-specific parameters; for example, changing the `bulk` thread pool to have more threads: [source,yaml] -------------------------------------------------- thread_pool: - index: + bulk: size: 30 -------------------------------------------------- @@ -89,7 +83,7 @@ full, it will abort the request. [source,yaml] -------------------------------------------------- thread_pool: - index: + bulk: size: 30 queue_size: 1000 -------------------------------------------------- diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml index bb16ae391c46d..d7d33c15ec18a 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml @@ -33,7 +33,7 @@ - do: cat.thread_pool: - thread_pool_patterns: bulk,management,flush,index,generic,force_merge + thread_pool_patterns: bulk,management,flush,generic,force_merge h: id,name,active v: true @@ -44,7 +44,6 @@ \S+\s+ flush \s+ \d+ \n \S+\s+ force_merge \s+ \d+ \n \S+\s+ generic \s+ \d+ \n - \S+\s+ index \s+ \d+ \n \S+\s+ management \s+ \d+ \n)+ $/ - do: @@ -72,12 +71,11 @@ - do: cat.thread_pool: - thread_pool_patterns: bulk,index,search + thread_pool_patterns: bulk,search size: "" - match: $body: | / #node_name name active queue rejected ^ (\S+ \s+ bulk \s+ \d+ \s+ \d+ \s+ \d+ \n - \S+ \s+ index \s+ \d+ \s+ \d+ \s+ \d+ \n \S+ \s+ search \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/ diff --git a/server/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/server/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 3aaf4a472facf..89ed24c573a25 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/server/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -46,7 +46,7 @@ public TransportDeleteAction(Settings settings, TransportService transportServic ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) { super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, - actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX, + actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.BULK, bulkAction, shardBulkAction); } diff --git a/server/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/server/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 88a210c718019..deeb179221994 100644 --- a/server/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -54,7 +54,7 @@ public TransportIndexAction(Settings settings, TransportService transportService ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) { super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, - actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX, + actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.BULK, bulkAction, shardBulkAction); } diff --git a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index 242dfe635ec91..8428c85cdb62f 100644 --- a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -86,7 +86,7 @@ public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterSe @Override protected String executor() { - return ThreadPool.Names.INDEX; + return ThreadPool.Names.BULK; } @Override diff --git a/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java index 314eb1df71a4b..5404e7ac3defb 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java @@ -48,7 +48,7 @@ protected static String settingsKey(final String prefix, final String key) { } protected int applyHardSizeLimit(final Settings settings, final String name) { - if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) { + if (name.equals(ThreadPool.Names.BULK)) { return 1 + EsExecutors.numberOfProcessors(settings); } else { return Integer.MAX_VALUE; diff --git a/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java index 94db6cb64e2c8..43da1044c6bd0 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java @@ -49,20 +49,7 @@ public final class FixedExecutorBuilder extends ExecutorBuilder( sizeKey, s -> Integer.toString(size), s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey), - properties); + Setting.Property.NodeScope); final String queueSizeKey = settingsKey(prefix, "queue_size"); - this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, properties); + this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope); } @Override diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 4a172e9926911..c238fb45ad54a 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -69,7 +69,6 @@ public static class Names { public static final String LISTENER = "listener"; public static final String GET = "get"; public static final String ANALYZE = "analyze"; - public static final String INDEX = "index"; public static final String BULK = "bulk"; public static final String SEARCH = "search"; public static final String MANAGEMENT = "management"; @@ -126,7 +125,6 @@ public static ThreadPoolType fromType(String type) { map.put(Names.LISTENER, ThreadPoolType.FIXED); map.put(Names.GET, ThreadPoolType.FIXED); map.put(Names.ANALYZE, ThreadPoolType.FIXED); - map.put(Names.INDEX, ThreadPoolType.FIXED); map.put(Names.BULK, ThreadPoolType.FIXED); map.put(Names.SEARCH, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE); map.put(Names.MANAGEMENT, ThreadPoolType.SCALING); @@ -172,7 +170,6 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors); final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); - builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200, true)); builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16)); diff --git a/server/src/test/java/org/elasticsearch/action/RejectionActionIT.java b/server/src/test/java/org/elasticsearch/action/RejectionActionIT.java index 0aa84ad806998..a4cdc3408fb27 100644 --- a/server/src/test/java/org/elasticsearch/action/RejectionActionIT.java +++ b/server/src/test/java/org/elasticsearch/action/RejectionActionIT.java @@ -45,8 +45,8 @@ protected Settings nodeSettings(int nodeOrdinal) { .put(super.nodeSettings(nodeOrdinal)) .put("thread_pool.search.size", 1) .put("thread_pool.search.queue_size", 1) - .put("thread_pool.index.size", 1) - .put("thread_pool.index.queue_size", 1) + .put("thread_pool.bulk.size", 1) + .put("thread_pool.bulk.queue_size", 1) .put("thread_pool.get.size", 1) .put("thread_pool.get.queue_size", 1) .build(); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 32dfbe85d426e..5cd411c71b8c8 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -124,7 +124,7 @@ class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm() + randomIntBetween(1, 100), - SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.INDEX, "")); + SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.BULK, "")); closeShards(indexShard); } @@ -343,7 +343,7 @@ public void onFailure(Exception e) { throw new RuntimeException(e); } }, - ThreadPool.Names.INDEX, id); + ThreadPool.Names.BULK, id); }); thread.start(); threads.add(thread); @@ -394,7 +394,7 @@ public void onFailure(Exception e) { throw new RuntimeException(e); } }, - ThreadPool.Names.INDEX, id); + ThreadPool.Names.BULK, id); }); thread.start(); delayedThreads.add(thread); @@ -590,7 +590,7 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E assertEquals(0, indexShard.getActiveOperationsCount()); if (indexShard.routingEntry().isRelocationTarget() == false) { try { - indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.INDEX, ""); + indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.BULK, ""); fail("shard shouldn't accept operations as replica"); } catch (IllegalStateException ignored) { @@ -609,14 +609,14 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX, ""); + indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.BULK, ""); return fut.get(); } private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.INDEX, ""); + indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.BULK, ""); return fut.get(); } @@ -664,7 +664,7 @@ public void testOperationPermitOnReplicaShards() throws Exception { if (shardRouting.primary() == false) { final IllegalStateException e = expectThrows(IllegalStateException.class, - () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX, "")); + () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.BULK, "")); assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary"))); } @@ -701,7 +701,7 @@ public void onFailure(Exception e) { }; indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbers.UNASSIGNED_SEQ_NO, onLockAcquired, - ThreadPool.Names.INDEX, ""); + ThreadPool.Names.BULK, ""); assertFalse(onResponse.get()); assertTrue(onFailure.get()); @@ -1021,7 +1021,7 @@ public void onFailure(Exception e) { latch.countDown(); } }, - ThreadPool.Names.INDEX, ""); + ThreadPool.Names.BULK, ""); }; final long firstIncrement = 1 + (randomBoolean() ? 0 : 1); @@ -1382,7 +1382,7 @@ public void onResponse(Releasable releasable) { super.onResponse(releasable); } }; - shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.INDEX, "i_" + i); + shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.BULK, "i_" + i); onLockAcquiredActions.add(onLockAcquired); } diff --git a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java index 6561001ad7d86..c71ccdfba8c89 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java @@ -113,7 +113,7 @@ public void testSyncFailsIfOperationIsInFlight() throws InterruptedException, Ex SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); final ShardId shardId = shard.shardId(); PlainActionFuture fut = new PlainActionFuture<>(); - shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX, ""); + shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.BULK, ""); try (Releasable operationLock = fut.get()) { SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener<>(); flushService.attemptSyncedFlush(shardId, listener); diff --git a/server/src/test/java/org/elasticsearch/threadpool/FixedThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/FixedThreadPoolTests.java index 09019f37655d1..5ec0f30f520b9 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/FixedThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/FixedThreadPoolTests.java @@ -85,10 +85,6 @@ public void testRejectedExecutionCounter() throws InterruptedException { assertThat(counter, equalTo(rejections)); assertThat(stats(threadPool, threadPoolName).getRejected(), equalTo(rejections)); - - if (threadPoolName.equals(ThreadPool.Names.INDEX)) { - assertSettingDeprecationsAndWarnings(new String[]{"thread_pool.index.queue_size", "thread_pool.index.size"}); - } } finally { terminateThreadPoolIfNeeded(threadPool); } diff --git a/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index f0f8c70a3f9d4..31142fe9e45bc 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -60,8 +60,7 @@ public void testCorrectThreadPoolTypePermittedInSettings() throws InterruptedExc } } - public void testIndexingThreadPoolsMaxSize() throws InterruptedException { - final String name = randomFrom(Names.BULK, Names.INDEX); + public void testBulkThreadPoolsMaxSize() { final int maxSize = 1 + EsExecutors.numberOfProcessors(Settings.EMPTY); final int tooBig = randomIntBetween(1 + maxSize, Integer.MAX_VALUE); @@ -74,7 +73,7 @@ public void testIndexingThreadPoolsMaxSize() throws InterruptedException { try { tp = new ThreadPool(Settings.builder() .put("node.name", "testIndexingThreadPoolsMaxSize") - .put("thread_pool." + name + ".size", tooBig) + .put("thread_pool." + Names.BULK + ".size", tooBig) .build()); } finally { terminateThreadPoolIfNeeded(tp); @@ -84,15 +83,11 @@ public void testIndexingThreadPoolsMaxSize() throws InterruptedException { assertThat( initial, hasToString(containsString( - "Failed to parse value [" + tooBig + "] for setting [thread_pool." + name + ".size] must be "))); - - if (name.equals(Names.INDEX)) { - assertSettingDeprecationsAndWarnings(new String[] { "thread_pool.index.size" }); - } + "Failed to parse value [" + tooBig + "] for setting [thread_pool." + Names.BULK + ".size] must be "))); } private static int getExpectedThreadPoolSize(Settings settings, String name, int size) { - if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) { + if (name.equals(ThreadPool.Names.BULK)) { return Math.min(size, EsExecutors.numberOfProcessors(settings)); } else { return size; @@ -120,10 +115,6 @@ public void testFixedExecutorType() throws InterruptedException { assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize)); // keep alive does not apply to fixed thread pools assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(0L)); - - if (threadPoolName.equals(Names.INDEX)) { - assertSettingDeprecationsAndWarnings(new String[] { "thread_pool.index.size" }); - } } finally { terminateThreadPoolIfNeeded(threadPool); } @@ -179,10 +170,6 @@ public void testShutdownNowInterrupts() throws Exception { latch.await(3, TimeUnit.SECONDS); // if this throws then ThreadPool#shutdownNow did not interrupt assertThat(oldExecutor.isShutdown(), equalTo(true)); assertThat(oldExecutor.isTerminating() || oldExecutor.isTerminated(), equalTo(true)); - - if (threadPoolName.equals(Names.INDEX)) { - assertSettingDeprecationsAndWarnings(new String[] { "thread_pool.index.queue_size" }); - } } finally { terminateThreadPoolIfNeeded(threadPool); }