diff --git a/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java new file mode 100644 index 0000000000000..b096107e0a327 --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java @@ -0,0 +1,129 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.IndexingPressure; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.XContentTestUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; + +import static org.elasticsearch.rest.RestStatus.CREATED; +import static org.elasticsearch.rest.RestStatus.OK; +import static org.elasticsearch.rest.RestStatus.TOO_MANY_REQUESTS; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; + +/** + * Test Indexing Pressure Metrics and Statistics + */ +@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0) +public class IndexingPressureRestIT extends HttpSmokeTestCase { + + private static final Settings unboundedWriteQueue = Settings.builder().put("thread_pool.write.queue_size", -1).build(); + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1KB") + .put(unboundedWriteQueue) + .build(); + } + + @SuppressWarnings("unchecked") + public void testIndexingPressureStats() throws IOException { + Request createRequest = new Request("PUT", "/index_name"); + createRequest.setJsonEntity("{\"settings\": {\"index\": {\"number_of_shards\": 1, \"number_of_replicas\": 1, " + + "\"write.wait_for_active_shards\": 2}}}"); + final Response indexCreatedResponse = getRestClient().performRequest(createRequest); + assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + Request successfulIndexingRequest = new Request("POST", "/index_name/_doc/"); + successfulIndexingRequest.setJsonEntity("{\"x\": \"small text\"}"); + final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest); + assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(CREATED.getStatus())); + + Request getNodeStats = new Request("GET", "/_nodes/stats/indexing_pressure"); + final Response nodeStats = getRestClient().performRequest(getNodeStats); + Map nodeStatsMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, nodeStats.getEntity().getContent(), true); + ArrayList values = new ArrayList<>(((Map) nodeStatsMap.get("nodes")).values()); + assertThat(values.size(), equalTo(2)); + XContentTestUtils.JsonMapView node1 = new XContentTestUtils.JsonMapView((Map) values.get(0)); + Integer node1IndexingBytes = node1.get("indexing_pressure.total.coordinating_and_primary_bytes"); + Integer node1ReplicaBytes = node1.get("indexing_pressure.total.replica_bytes"); + Integer node1Rejections = node1.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections"); + XContentTestUtils.JsonMapView node2 = new XContentTestUtils.JsonMapView((Map) values.get(1)); + Integer node2IndexingBytes = node2.get("indexing_pressure.total.coordinating_and_primary_bytes"); + Integer node2ReplicaBytes = node2.get("indexing_pressure.total.replica_bytes"); + Integer node2Rejections = node2.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections"); + + if (node1IndexingBytes == 0) { + assertThat(node2IndexingBytes, greaterThan(0)); + assertThat(node2IndexingBytes, lessThan(1024)); + } else { + assertThat(node1IndexingBytes, greaterThan(0)); + assertThat(node1IndexingBytes, lessThan(1024)); + } + + if (node1ReplicaBytes == 0) { + assertThat(node2ReplicaBytes, greaterThan(0)); + assertThat(node2ReplicaBytes, lessThan(1024)); + } else { + assertThat(node2ReplicaBytes, equalTo(0)); + assertThat(node1ReplicaBytes, lessThan(1024)); + } + + assertThat(node1Rejections, equalTo(0)); + assertThat(node2Rejections, equalTo(0)); + + Request failedIndexingRequest = new Request("POST", "/index_name/_doc/"); + String largeString = randomAlphaOfLength(10000); + failedIndexingRequest.setJsonEntity("{\"x\": " + largeString + "}"); + ResponseException exception = expectThrows(ResponseException.class, () -> getRestClient().performRequest(failedIndexingRequest)); + assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(TOO_MANY_REQUESTS.getStatus())); + + Request getNodeStats2 = new Request("GET", "/_nodes/stats/indexing_pressure"); + final Response nodeStats2 = getRestClient().performRequest(getNodeStats2); + Map nodeStatsMap2 = XContentHelper.convertToMap(JsonXContent.jsonXContent, nodeStats2.getEntity().getContent(), + true); + ArrayList values2 = new ArrayList<>(((Map) nodeStatsMap2.get("nodes")).values()); + assertThat(values2.size(), equalTo(2)); + XContentTestUtils.JsonMapView node1AfterRejection = new XContentTestUtils.JsonMapView((Map) values2.get(0)); + node1Rejections = node1AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections"); + XContentTestUtils.JsonMapView node2AfterRejection = new XContentTestUtils.JsonMapView((Map) values2.get(1)); + node2Rejections = node2AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections"); + + if (node1Rejections == 0) { + assertThat(node2Rejections, equalTo(1)); + } else { + assertThat(node1Rejections, equalTo(1)); + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json index 1aa57ee849c66..cc1a9e8185093 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json @@ -44,7 +44,8 @@ "process", "thread_pool", "transport", - "discovery" + "discovery", + "indexing_pressure" ], "description":"Limit the information returned to the specified metrics" } @@ -69,7 +70,8 @@ "process", "thread_pool", "transport", - "discovery" + "discovery", + "indexing_pressure" ], "description":"Limit the information returned to the specified metrics" }, @@ -98,7 +100,8 @@ "process", "thread_pool", "transport", - "discovery" + "discovery", + "indexing_pressure" ], "description":"Limit the information returned to the specified metrics" }, @@ -145,7 +148,8 @@ "process", "thread_pool", "transport", - "discovery" + "discovery", + "indexing_pressure" ], "description":"Limit the information returned to the specified metrics" }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml new file mode 100644 index 0000000000000..bf85c9e2ac1af --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml @@ -0,0 +1,28 @@ +--- +"Indexing pressure stats": + - skip: + version: " - 7.8.99" + reason: "indexing_pressure was added in 7.9" + features: [arbitrary_key] + + - do: + nodes.info: {} + - set: + nodes._arbitrary_key_: node_id + + - do: + nodes.stats: + metric: [ indexing_pressure ] + + - gte: { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.replica_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.all_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_memory_limit_rejections: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.replica_memory_limit_rejections: 0 } + - gte: { nodes.$node_id.indexing_pressure.current.coordinating_and_primary_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.current.replica_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.current.all_bytes: 0 } + +# TODO: +# +# Change skipped version after backport diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java similarity index 74% rename from server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java rename to server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java index b569f3c772bd2..dca9c59fedcba 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java @@ -16,11 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.action.bulk; +package org.elasticsearch.index; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -51,7 +54,7 @@ import static org.hamcrest.Matchers.instanceOf; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1, transportClientRatio = 0.0D) -public class WriteMemoryLimitsIT extends ESIntegTestCase { +public class IndexingPressureIT extends ESIntegTestCase { // TODO: Add additional REST tests when metrics are exposed @@ -63,7 +66,6 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase { protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - // Need at least two threads because we are going to block one .put(unboundedWriteQueue) .build(); } @@ -134,16 +136,16 @@ public void testWriteBytesAreIncremented() throws Exception { final ActionFuture successFuture = client(coordinatingOnlyNode).bulk(bulkRequest); replicationSendPointReached.await(); - WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); - WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); - WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode); + IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName); + IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName); + IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode); - assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); - assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); + assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); latchBlockingReplicationSend.countDown(); @@ -165,14 +167,15 @@ public void testWriteBytesAreIncremented() throws Exception { final long secondBulkShardRequestSize = request.ramBytesUsed(); if (usePrimaryAsCoordinatingNode) { - assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkRequestSize)); - assertEquals(0, replicaWriteLimits.getWriteBytes()); + assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), + greaterThan(bulkShardRequestSize + secondBulkRequestSize)); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); } else { - assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(secondBulkRequestSize, replicaWriteLimits.getWriteBytes()); + assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(secondBulkRequestSize, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); } - assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); - assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(), + assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertBusy(() -> assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize + secondBulkShardRequestSize))); replicaRelease.close(); @@ -180,12 +183,12 @@ public void testWriteBytesAreIncremented() throws Exception { successFuture.actionGet(); secondFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getWriteBytes()); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); } finally { if (replicationSendPointReached.getCount() > 0) { replicationSendPointReached.countDown(); @@ -212,8 +215,8 @@ public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception { final long bulkRequestSize = bulkRequest.ramBytesUsed(); final long bulkShardRequestSize = totalRequestSize; - restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(), - (long)(bulkShardRequestSize * 1.5) + "B").build()); + restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), + (long) (bulkShardRequestSize * 1.5) + "B").build()); assertAcked(prepareCreate(INDEX_NAME, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) @@ -229,17 +232,17 @@ public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception { try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) { final ActionFuture successFuture = client(coordinatingOnlyNode).bulk(bulkRequest); - WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); - WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); - WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode); + IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName); + IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName); + IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode); assertBusy(() -> { - assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); }); expectThrows(EsRejectedExecutionException.class, () -> { @@ -256,12 +259,12 @@ public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception { successFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getWriteBytes()); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); } } @@ -276,7 +279,7 @@ public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception { bulkRequest.add(request); } final long bulkShardRequestSize = totalRequestSize; - restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(), + restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), (long)(bulkShardRequestSize * 1.5) + "B").build()); assertAcked(prepareCreate(INDEX_NAME, Settings.builder() @@ -293,17 +296,17 @@ public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception { try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) { final ActionFuture successFuture = client(primaryName).bulk(bulkRequest); - WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); - WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); - WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode); + IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName); + IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName); + IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode); assertBusy(() -> { - assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); }); BulkResponse responses = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); @@ -314,17 +317,17 @@ public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception { successFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getWriteBytes()); - assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(0, replicaWriteLimits.getWriteBytes()); - assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getWriteBytes()); - assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); + assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); } } public void testWritesWillSucceedIfBelowThreshold() throws Exception { - restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(), "1MB").build()); + restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1MB").build()); assertAcked(prepareCreate(INDEX_NAME, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java index 56a177e496658..f5db990f5e3d6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.http.HttpStats; +import org.elasticsearch.index.stats.IndexingPressureStats; import org.elasticsearch.indices.NodeIndicesStats; import org.elasticsearch.indices.breaker.AllCircuitBreakerStats; import org.elasticsearch.ingest.IngestStats; @@ -95,6 +96,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private AdaptiveSelectionStats adaptiveSelectionStats; + @Nullable + private IndexingPressureStats indexingPressureStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -125,6 +129,11 @@ public NodeStats(StreamInput in) throws IOException { scriptCacheStats = scriptStats.toScriptCacheStats(); } } + if (in.getVersion().onOrAfter(Version.V_7_9_0)) { + indexingPressureStats = in.readOptionalWriteable(IndexingPressureStats::new); + } else { + indexingPressureStats = null; + } } public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats indices, @@ -135,7 +144,8 @@ public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats @Nullable DiscoveryStats discoveryStats, @Nullable IngestStats ingestStats, @Nullable AdaptiveSelectionStats adaptiveSelectionStats, - @Nullable ScriptCacheStats scriptCacheStats) { + @Nullable ScriptCacheStats scriptCacheStats, + @Nullable IndexingPressureStats indexingPressureStats) { super(node); this.timestamp = timestamp; this.indices = indices; @@ -152,6 +162,7 @@ public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats this.ingestStats = ingestStats; this.adaptiveSelectionStats = adaptiveSelectionStats; this.scriptCacheStats = scriptCacheStats; + this.indexingPressureStats = indexingPressureStats; } public long getTimestamp() { @@ -251,6 +262,11 @@ public ScriptCacheStats getScriptCacheStats() { return scriptCacheStats; } + @Nullable + public IndexingPressureStats getIndexingPressureStats() { + return indexingPressureStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -277,6 +293,9 @@ public void writeTo(StreamOutput out) throws IOException { } if (out.getVersion().onOrAfter(Version.V_7_8_0) && out.getVersion().before(Version.V_7_9_0)) { out.writeOptionalWriteable(scriptCacheStats); } + if (out.getVersion().onOrAfter(Version.V_7_9_0)) { + out.writeOptionalWriteable(indexingPressureStats); + } } @Override @@ -343,6 +362,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getScriptCacheStats() != null) { getScriptCacheStats().toXContent(builder, params); } + if (getIndexingPressureStats() != null) { + getIndexingPressureStats().toXContent(builder, params); + } return builder; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java index b92de67ce6c5c..44f07dd8243d6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -224,7 +224,8 @@ public enum Metric { DISCOVERY("discovery"), INGEST("ingest"), ADAPTIVE_SELECTION("adaptive_selection"), - SCRIPT_CACHE("script_cache"); + SCRIPT_CACHE("script_cache"), + INDEXING_PRESSURE("indexing_pressure"),; private String metricName; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 6b75dac8aa0d2..2e868686b77a8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -83,7 +83,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.DISCOVERY.containedIn(metrics), NodesStatsRequest.Metric.INGEST.containedIn(metrics), NodesStatsRequest.Metric.ADAPTIVE_SELECTION.containedIn(metrics), - NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics)); + NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics), + NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics)); } public static class NodeStatsRequest extends BaseNodeRequest { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index b3c3fd2e5bd7d..bd93d8b766148 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -97,7 +97,7 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) { NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false); NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, - true, true, true, false, true, false, false, false, false, false, true, false, false); + true, true, true, false, true, false, false, false, false, false, true, false, false, false); List shardsStats = new ArrayList<>(); for (IndexService indexService : indicesService) { for (IndexShard indexShard : indexService) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index fe780c00301db..fa95eeba9645c 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -68,6 +68,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexClosedException; @@ -113,23 +114,23 @@ public class TransportBulkAction extends HandledTransportAction docWriteReque @Override protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { long indexingBytes = bulkRequest.ramBytesUsed(); - final Releasable releasable = writeMemoryLimits.markWriteOperationStarted(indexingBytes); + final Releasable releasable = indexingPressure.markIndexingOperationStarted(indexingBytes); final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); try { doInternalExecute(task, bulkRequest, releasingListener); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 53bd949dbe29f..0c88af3ca7cba 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -55,6 +55,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; @@ -91,9 +92,9 @@ public class TransportShardBulkAction extends TransportWriteAction MAX_INDEXING_BYTES = - Setting.memorySizeSetting("indexing_limits.memory.limit", "10%", Setting.Property.NodeScope); - - private final AtomicLong writeBytes = new AtomicLong(0); - private final AtomicLong replicaWriteBytes = new AtomicLong(0); - private final long writeLimits; - - public WriteMemoryLimits(Settings settings) { - this.writeLimits = MAX_INDEXING_BYTES.get(settings).getBytes(); - } - - public WriteMemoryLimits(Settings settings, ClusterSettings clusterSettings) { - this.writeLimits = MAX_INDEXING_BYTES.get(settings).getBytes(); - } - - public Releasable markWriteOperationStarted(long bytes) { - return markWriteOperationStarted(bytes, false); - } - - public Releasable markWriteOperationStarted(long bytes, boolean forceExecution) { - long currentWriteLimits = this.writeLimits; - long writeBytes = this.writeBytes.addAndGet(bytes); - long replicaWriteBytes = this.replicaWriteBytes.get(); - long totalBytes = writeBytes + replicaWriteBytes; - if (forceExecution == false && totalBytes > currentWriteLimits) { - long bytesWithoutOperation = writeBytes - bytes; - long totalBytesWithoutOperation = totalBytes - bytes; - this.writeBytes.getAndAdd(-bytes); - throw new EsRejectedExecutionException("rejected execution of write operation [" + - "write_bytes=" + bytesWithoutOperation + ", " + - "replica_write_bytes=" + replicaWriteBytes + ", " + - "total_write_bytes=" + totalBytesWithoutOperation + ", " + - "current_operation_bytes=" + bytes + ", " + - "max_write_bytes=" + currentWriteLimits + "]", false); - } - return () -> this.writeBytes.getAndAdd(-bytes); - } - - public long getWriteBytes() { - return writeBytes.get(); - } - - public Releasable markReplicaWriteStarted(long bytes, boolean forceExecution) { - long currentReplicaWriteLimits = (long) (this.writeLimits * 1.5); - long replicaWriteBytes = this.replicaWriteBytes.getAndAdd(bytes); - if (forceExecution == false && replicaWriteBytes > currentReplicaWriteLimits) { - long replicaBytesWithoutOperation = replicaWriteBytes - bytes; - this.replicaWriteBytes.getAndAdd(-bytes); - throw new EsRejectedExecutionException("rejected execution of replica write operation [" + - "replica_write_bytes=" + replicaBytesWithoutOperation + ", " + - "current_replica_operation_bytes=" + bytes + ", " + - "max_replica_write_bytes=" + currentReplicaWriteLimits + "]", false); - } - return () -> this.replicaWriteBytes.getAndAdd(-bytes); - } - - public long getReplicaWriteBytes() { - return replicaWriteBytes.get(); - } -} diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 74ddcf54b3212..47f287ba0133f 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -20,7 +20,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -57,11 +57,11 @@ public class TransportResyncReplicationAction extends TransportWriteAction extends TransportReplicationAction { private final boolean forceExecution; - private final WriteMemoryLimits writeMemoryLimits; + private final IndexingPressure indexingPressure; private final String executor; protected TransportWriteAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader request, Writeable.Reader replicaRequest, String executor, boolean forceExecutionOnPrimary, - WriteMemoryLimits writeMemoryLimits) { + IndexingPressure indexingPressure) { // We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the // ThreadPool.Names.WRITE thread pool in this class. super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary); this.executor = executor; this.forceExecution = forceExecutionOnPrimary; - this.writeMemoryLimits = writeMemoryLimits; + this.indexingPressure = indexingPressure; } @Override protected Releasable checkOperationLimits(Request request) { - return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request), forceExecution); + return indexingPressure.markIndexingOperationStarted(primaryOperationSize(request), forceExecution); } @Override @@ -90,7 +90,7 @@ protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal if (rerouteWasLocal) { return () -> {}; } else { - return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request), forceExecution); + return indexingPressure.markIndexingOperationStarted(primaryOperationSize(request), forceExecution); } } @@ -100,7 +100,7 @@ protected long primaryOperationSize(Request request) { @Override protected Releasable checkReplicaLimits(ReplicaRequest request) { - return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request), forceExecution); + return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecution); } protected long replicaOperationSize(ReplicaRequest request) { diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 844fba08ad17e..1d091f56cc1cc 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -21,7 +21,7 @@ import org.apache.logging.log4j.LogManager; import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.DestructiveOperations; @@ -556,7 +556,7 @@ public void apply(Settings value, Settings current, Settings previous) { FsHealthService.ENABLED_SETTING, FsHealthService.REFRESH_INTERVAL_SETTING, FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING, - WriteMemoryLimits.MAX_INDEXING_BYTES))); + IndexingPressure.MAX_INDEXING_BYTES))); public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList( SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER, diff --git a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java new file mode 100644 index 0000000000000..9c8fb83fe4ffc --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index; + +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.stats.IndexingPressureStats; + +import java.util.concurrent.atomic.AtomicLong; + +public class IndexingPressure { + + public static final Setting MAX_INDEXING_BYTES = + Setting.memorySizeSetting("indexing_pressure.memory.limit", "10%", Setting.Property.NodeScope); + + private final AtomicLong currentCoordinatingAndPrimaryBytes = new AtomicLong(0); + private final AtomicLong currentReplicaBytes = new AtomicLong(0); + private final AtomicLong totalCoordinatingAndPrimaryBytes = new AtomicLong(0); + private final AtomicLong totalReplicaBytes = new AtomicLong(0); + private final AtomicLong coordinatingAndPrimaryRejections = new AtomicLong(0); + private final AtomicLong replicaRejections = new AtomicLong(0); + + private final long primaryAndCoordinatingLimits; + private final long replicaLimits; + + public IndexingPressure(Settings settings) { + this.primaryAndCoordinatingLimits = MAX_INDEXING_BYTES.get(settings).getBytes(); + this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5); + } + + public Releasable markIndexingOperationStarted(long bytes) { + return markIndexingOperationStarted(bytes, false); + } + + public Releasable markIndexingOperationStarted(long bytes, boolean forceExecution) { + long writeBytes = this.currentCoordinatingAndPrimaryBytes.addAndGet(bytes); + long replicaWriteBytes = this.currentReplicaBytes.get(); + long totalBytes = writeBytes + replicaWriteBytes; + if (forceExecution == false && totalBytes > primaryAndCoordinatingLimits) { + long bytesWithoutOperation = writeBytes - bytes; + long totalBytesWithoutOperation = totalBytes - bytes; + this.currentCoordinatingAndPrimaryBytes.getAndAdd(-bytes); + this.coordinatingAndPrimaryRejections.getAndIncrement(); + throw new EsRejectedExecutionException("rejected execution of operation [" + + "coordinating_and_primary_bytes=" + bytesWithoutOperation + ", " + + "replica_bytes=" + replicaWriteBytes + ", " + + "all_bytes=" + totalBytesWithoutOperation + ", " + + "operation_bytes=" + bytes + ", " + + "max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false); + } + totalCoordinatingAndPrimaryBytes.getAndAdd(bytes); + return () -> this.currentCoordinatingAndPrimaryBytes.getAndAdd(-bytes); + } + + public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution) { + long replicaWriteBytes = this.currentReplicaBytes.getAndAdd(bytes); + if (forceExecution == false && replicaWriteBytes > replicaLimits) { + long replicaBytesWithoutOperation = replicaWriteBytes - bytes; + this.currentReplicaBytes.getAndAdd(-bytes); + this.replicaRejections.getAndIncrement(); + throw new EsRejectedExecutionException("rejected execution of replica operation [" + + "replica_bytes=" + replicaBytesWithoutOperation + ", " + + "replica_operation_bytes=" + bytes + ", " + + "max_replica_bytes=" + replicaLimits + "]", false); + } + totalReplicaBytes.getAndAdd(bytes); + return () -> this.currentReplicaBytes.getAndAdd(-bytes); + } + + public long getCurrentCoordinatingAndPrimaryBytes() { + return currentCoordinatingAndPrimaryBytes.get(); + } + + public long getCurrentReplicaBytes() { + return currentReplicaBytes.get(); + } + + public long getTotalCoordinatingAndPrimaryBytes() { + return totalCoordinatingAndPrimaryBytes.get(); + } + + public long getTotalReplicaBytes() { + return totalReplicaBytes.get(); + } + + public IndexingPressureStats stats() { + return new IndexingPressureStats(totalCoordinatingAndPrimaryBytes.get(), totalReplicaBytes.get(), + currentCoordinatingAndPrimaryBytes.get(), currentReplicaBytes.get(), coordinatingAndPrimaryRejections.get(), + replicaRejections.get()); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 54a418fe673c7..dd08f8ff763ad 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -25,7 +25,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteResponse; @@ -80,7 +80,7 @@ public RetentionLeaseSyncAction( final ThreadPool threadPool, final ShardStateAction shardStateAction, final ActionFilters actionFilters, - final WriteMemoryLimits writeMemoryLimits) { + final IndexingPressure indexingPressure) { super( settings, ACTION_NAME, @@ -92,7 +92,7 @@ public RetentionLeaseSyncAction( actionFilters, RetentionLeaseSyncAction.Request::new, RetentionLeaseSyncAction.Request::new, - ThreadPool.Names.MANAGEMENT, false, writeMemoryLimits); + ThreadPool.Names.MANAGEMENT, false, indexingPressure); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java b/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java new file mode 100644 index 0000000000000..309cf863b6324 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java @@ -0,0 +1,85 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.stats; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +public class IndexingPressureStats implements Writeable, ToXContentFragment { + + private final long totalCoordinatingAndPrimaryBytes; + private final long totalReplicaBytes; + private final long currentCoordinatingAndPrimaryBytes; + private final long currentReplicaBytes; + private final long coordinatingAndPrimaryRejections; + private final long replicaRejections; + + public IndexingPressureStats(StreamInput in) throws IOException { + totalCoordinatingAndPrimaryBytes = in.readVLong(); + totalReplicaBytes = in.readVLong(); + currentCoordinatingAndPrimaryBytes = in.readVLong(); + currentReplicaBytes = in.readVLong(); + coordinatingAndPrimaryRejections = in.readVLong(); + replicaRejections = in.readVLong(); + } + + public IndexingPressureStats(long totalCoordinatingAndPrimaryBytes, long totalReplicaBytes, long currentCoordinatingAndPrimaryBytes, + long currentReplicaBytes, long coordinatingAndPrimaryRejections, long replicaRejections) { + this.totalCoordinatingAndPrimaryBytes = totalCoordinatingAndPrimaryBytes; + this.totalReplicaBytes = totalReplicaBytes; + this.currentCoordinatingAndPrimaryBytes = currentCoordinatingAndPrimaryBytes; + this.currentReplicaBytes = currentReplicaBytes; + this.coordinatingAndPrimaryRejections = coordinatingAndPrimaryRejections; + this.replicaRejections = replicaRejections; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(totalCoordinatingAndPrimaryBytes); + out.writeVLong(totalReplicaBytes); + out.writeVLong(currentCoordinatingAndPrimaryBytes); + out.writeVLong(currentReplicaBytes); + out.writeVLong(coordinatingAndPrimaryRejections); + out.writeVLong(replicaRejections); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("indexing_pressure"); + builder.startObject("total"); + builder.field("coordinating_and_primary_bytes", totalCoordinatingAndPrimaryBytes); + builder.field("replica_bytes", totalReplicaBytes); + builder.field("all_bytes", totalReplicaBytes + totalCoordinatingAndPrimaryBytes); + builder.field("coordinating_and_primary_memory_limit_rejections", coordinatingAndPrimaryRejections); + builder.field("replica_memory_limit_rejections", replicaRejections); + builder.endObject(); + builder.startObject("current"); + builder.field("coordinating_and_primary_bytes", currentCoordinatingAndPrimaryBytes); + builder.field("replica_bytes", currentReplicaBytes); + builder.field("all_bytes", currentCoordinatingAndPrimaryBytes + currentReplicaBytes); + builder.endObject(); + return builder.endObject(); + } +} diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 53f517ddfec61..2c4c025a91e68 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -31,7 +31,7 @@ import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.search.SearchExecutionStatsCollector; import org.elasticsearch.action.search.SearchPhaseController; import org.elasticsearch.action.search.SearchTransportService; @@ -547,6 +547,7 @@ protected Node(final Environment initialEnvironment, final SearchTransportService searchTransportService = new SearchTransportService(transportService, SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); final HttpServerTransport httpServerTransport = newHttpTransport(networkModule); + final IndexingPressure indexingLimits = new IndexingPressure(settings); final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); RepositoriesModule repositoriesModule = new RepositoriesModule(this.environment, @@ -577,7 +578,7 @@ protected Node(final Environment initialEnvironment, this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(), transportService, indicesService, pluginsService, circuitBreakerService, scriptService, httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService, - searchTransportService); + searchTransportService, indexingLimits); final SearchService searchService = newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, searchModule.getFetchPhase(), @@ -595,7 +596,6 @@ protected Node(final Environment initialEnvironment, new PersistentTasksClusterService(settings, registry, clusterService, threadPool); resourcesToClose.add(persistentTasksClusterService); final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); - final WriteMemoryLimits bulkIndexingLimits = new WriteMemoryLimits(settings); modules.add(b -> { b.bind(Node.class).toInstance(this); @@ -614,7 +614,7 @@ protected Node(final Environment initialEnvironment, b.bind(ScriptService.class).toInstance(scriptService); b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry()); b.bind(IngestService.class).toInstance(ingestService); - b.bind(WriteMemoryLimits.class).toInstance(bulkIndexingLimits); + b.bind(IndexingPressure.class).toInstance(indexingLimits); b.bind(UsageService.class).toInstance(usageService); b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService()); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); diff --git a/server/src/main/java/org/elasticsearch/node/NodeService.java b/server/src/main/java/org/elasticsearch/node/NodeService.java index 98e1f18dff289..f72923dee9801 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeService.java +++ b/server/src/main/java/org/elasticsearch/node/NodeService.java @@ -19,6 +19,7 @@ package org.elasticsearch.node; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Build; import org.elasticsearch.Version; @@ -59,6 +60,7 @@ public class NodeService implements Closeable { private final HttpServerTransport httpServerTransport; private final ResponseCollectorService responseCollectorService; private final SearchTransportService searchTransportService; + private final IndexingPressure indexingPressure; private final Discovery discovery; @@ -67,7 +69,7 @@ public class NodeService implements Closeable { CircuitBreakerService circuitBreakerService, ScriptService scriptService, @Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService, SettingsFilter settingsFilter, ResponseCollectorService responseCollectorService, - SearchTransportService searchTransportService) { + SearchTransportService searchTransportService, IndexingPressure indexingPressure) { this.settings = settings; this.threadPool = threadPool; this.monitorService = monitorService; @@ -82,6 +84,7 @@ public class NodeService implements Closeable { this.scriptService = scriptService; this.responseCollectorService = responseCollectorService; this.searchTransportService = searchTransportService; + this.indexingPressure = indexingPressure; clusterService.addStateApplier(ingestService); } @@ -103,7 +106,8 @@ public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool, boolean fs, boolean transport, boolean http, boolean circuitBreaker, - boolean script, boolean discoveryStats, boolean ingest, boolean adaptiveSelection, boolean scriptCache) { + boolean script, boolean discoveryStats, boolean ingest, boolean adaptiveSelection, boolean scriptCache, + boolean indexingPressure) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) return new NodeStats(transportService.getLocalNode(), System.currentTimeMillis(), @@ -120,7 +124,8 @@ public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, bo discoveryStats ? discovery.stats() : null, ingest ? ingestService.stats() : null, adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null, - scriptCache ? scriptService.cacheStats() : null + scriptCache ? scriptService.cacheStats() : null, + indexingPressure ? this.indexingPressure.stats() : null ); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 4c0dc158c8ec4..f53115f4f1085 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -518,7 +518,7 @@ public static NodeStats createNodeStats() { //TODO NodeIndicesStats are not tested here, way too complicated to create, also they need to be migrated to Writeable yet return new NodeStats(node, randomNonNegativeLong(), null, osStats, processStats, jvmStats, threadPoolStats, fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats, - ingestStats, adaptiveSelectionStats, scriptCacheStats); + ingestStats, adaptiveSelectionStats, scriptCacheStats, null); } private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index d4cb93eac6e1e..b29fb73bf2426 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -121,7 +122,7 @@ private void indicesThatCannotBeCreatedTestCase(Set expected, final ExecutorService direct = EsExecutors.newDirectExecutorService(); when(threadPool.executor(anyString())).thenReturn(direct); TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService, - null, null, null, mock(ActionFilters.class), null, null, new WriteMemoryLimits(Settings.EMPTY)) { + null, null, null, mock(ActionFilters.class), null, null, new IndexingPressure(Settings.EMPTY)) { @Override void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, AtomicArray responses, Map indicesThatCannotBeCreated) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 540e66603c5c3..d0b184afdf71b 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -51,6 +51,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; @@ -142,7 +143,7 @@ null, null, new ActionFilters(Collections.emptySet()), null, new AutoCreateIndex( SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new IndexNameExpressionResolver() - ), new WriteMemoryLimits(SETTINGS) + ), new IndexingPressure(SETTINGS) ); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 06e5786a6eb37..2b192e10a0a1c 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -45,6 +45,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -81,7 +82,7 @@ class TestTransportBulkAction extends TransportBulkAction { super(TransportBulkActionTests.this.threadPool, transportService, clusterService, null, null, null, new ActionFilters(Collections.emptySet()), new Resolver(), new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver()), - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index b249867060b50..26c46feb87152 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.rest.action.document.RestBulkAction; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -240,7 +241,7 @@ static class TestTransportBulkAction extends TransportBulkAction { actionFilters, indexNameExpressionResolver, autoCreateIndex, - new WriteMemoryLimits(Settings.EMPTY), + new IndexingPressure(Settings.EMPTY), relativeTimeProvider); } diff --git a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java index 8859651feed7f..e3b16b43fe33d 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java @@ -20,7 +20,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterState; @@ -145,7 +145,7 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService, clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()), - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); assertThat(action.globalBlockLevel(), nullValue()); assertThat(action.indexBlockLevel(), nullValue()); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 6535ab6d68f62..68ac15cba23ba 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -21,7 +21,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; @@ -367,7 +367,7 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null, new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false, - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; } @@ -377,7 +377,7 @@ protected TestAction(Settings settings, String actionName, TransportService tran super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, shardStateAction, new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false, - new WriteMemoryLimits(settings)); + new IndexingPressure(settings)); this.withDocumentFailureOnPrimary = false; this.withDocumentFailureOnReplica = false; } diff --git a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index a5674994b61b0..de577364e4ded 100644 --- a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -153,13 +153,13 @@ public void testFillDiskUsage() { List nodeStats = Arrays.asList( new NodeStats(new DiscoveryNode("node_1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null, - null), + null, null), new NodeStats(new DiscoveryNode("node_2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null, - null), + null, null), new NodeStats(new DiscoveryNode("node_3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null, - null) + null, null) ); InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvaiableUsages, newMostAvaiableUsages); DiskUsage leastNode_1 = newLeastAvaiableUsages.get("node_1"); @@ -197,13 +197,13 @@ public void testFillDiskUsageSomeInvalidValues() { List nodeStats = Arrays.asList( new NodeStats(new DiscoveryNode("node_1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null, - null), + null, null), new NodeStats(new DiscoveryNode("node_2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null, - null), + null, null), new NodeStats(new DiscoveryNode("node_3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null, - null) + null, null) ); InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvailableUsages, newMostAvailableUsages); DiskUsage leastNode_1 = newLeastAvailableUsages.get("node_1"); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index a037f79957c69..1c7c24f429e6d 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; @@ -105,7 +105,7 @@ public void testRetentionLeaseSyncActionOnPrimary() { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); action.dispatchedShardOperationOnPrimary(request, indexShard, @@ -142,7 +142,7 @@ public void testRetentionLeaseSyncActionOnReplica() throws Exception { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); @@ -182,7 +182,7 @@ public void testBlocks() { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new WriteMemoryLimits(Settings.EMPTY)); + new IndexingPressure(Settings.EMPTY)); assertNull(action.indexBlockLevel()); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index e570c35f1cea7..e002fc197b2aa 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -65,7 +65,7 @@ import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction; import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction; import org.elasticsearch.action.bulk.BulkAction; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.TransportBulkAction; @@ -1483,7 +1483,7 @@ public void onFailure(final Exception e) { threadPool, shardStateAction, actionFilters, - new WriteMemoryLimits(settings))), + new IndexingPressure(settings))), new GlobalCheckpointSyncAction( settings, transportService, @@ -1509,7 +1509,7 @@ allocationService, new AliasValidator(), shardLimitValidator, environment, index mappingUpdatedAction.setClient(client); final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService), - actionFilters, new WriteMemoryLimits(settings)); + actionFilters, new IndexingPressure(settings)); actions.put(BulkAction.INSTANCE, new TransportBulkAction(threadPool, transportService, clusterService, new IngestService( @@ -1517,7 +1517,7 @@ clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedActi new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(), Collections.emptyList(), client), transportShardBulkAction, client, actionFilters, indexNameExpressionResolver, - new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), new WriteMemoryLimits(settings) + new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), new IndexingPressure(settings) )); final RestoreService restoreService = new RestoreService( clusterService, repositoriesService, allocationService, diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index 9b0181f355005..9ce0310edfff4 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -84,7 +84,8 @@ List adjustNodesStats(List nodesStats) { .map(fsInfoPath -> diskUsageFunction.apply(discoveryNode, fsInfoPath)) .toArray(FsInfo.Path[]::new)), nodeStats.getTransport(), nodeStats.getHttp(), nodeStats.getBreaker(), nodeStats.getScriptStats(), nodeStats.getDiscoveryStats(), - nodeStats.getIngestStats(), nodeStats.getAdaptiveSelectionStats(), nodeStats.getScriptCacheStats()); + nodeStats.getIngestStats(), nodeStats.getAdaptiveSelectionStats(), nodeStats.getScriptCacheStats(), + nodeStats.getIndexingPressureStats()); }).collect(Collectors.toList()); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index c3414ea624de9..006303004b2a8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -37,7 +37,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; @@ -1350,13 +1350,13 @@ private void assertSameSyncIdSameDocs() { private void assertAllPendingWriteLimitsReleased() throws Exception { assertBusy(() -> { for (NodeAndClient nodeAndClient : nodes.values()) { - WriteMemoryLimits writeMemoryLimits = getInstance(WriteMemoryLimits.class, nodeAndClient.name); - final long writeBytes = writeMemoryLimits.getWriteBytes(); + IndexingPressure indexingPressure = getInstance(IndexingPressure.class, nodeAndClient.name); + final long writeBytes = indexingPressure.getCurrentCoordinatingAndPrimaryBytes(); if (writeBytes > 0) { throw new AssertionError("pending write bytes [" + writeBytes + "] bytes on node [" + nodeAndClient.name + "]."); } - final long replicaWriteBytes = writeMemoryLimits.getReplicaWriteBytes(); + final long replicaWriteBytes = indexingPressure.getCurrentReplicaBytes(); if (replicaWriteBytes > 0) { throw new AssertionError("pending replica write bytes [" + writeBytes + "] bytes on node [" + nodeAndClient.name + "]."); @@ -2497,7 +2497,7 @@ public void ensureEstimatedStats() { NodeService nodeService = getInstanceFromNode(NodeService.class, nodeAndClient.node); CommonStatsFlags flags = new CommonStatsFlags(Flag.FieldData, Flag.QueryCache, Flag.Segments); NodeStats stats = nodeService.stats(flags, - false, false, false, false, false, false, false, false, false, false, false, false, false); + false, false, false, false, false, false, false, false, false, false, false, false, false, false); assertThat("Fielddata size must be 0 on node: " + stats.getNode(), stats.getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); assertThat("Query cache size must be 0 on node: " + stats.getNode(), diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index ca5a2b268b79b..60fc3a1814cf8 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -6,7 +6,7 @@ package org.elasticsearch.xpack.ccr; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -136,12 +136,12 @@ public void testWriteLimitsIncremented() throws Exception { final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower"); client().execute(PutFollowAction.INSTANCE, followRequest).get(); - WriteMemoryLimits memoryLimits = getInstanceFromNode(WriteMemoryLimits.class); + IndexingPressure memoryLimits = getInstanceFromNode(IndexingPressure.class); final long finalSourceSize = sourceSize; assertBusy(() -> { // The actual write bytes will be greater due to other request fields. However, this test is // just spot checking that the bytes are incremented at all. - assertTrue(memoryLimits.getWriteBytes() > finalSourceSize); + assertTrue(memoryLimits.getCurrentCoordinatingAndPrimaryBytes() > finalSourceSize); }); blocker.countDown(); assertBusy(() -> { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index c0a2db2d3b482..fa206bcca26dd 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -9,7 +9,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.WriteMemoryLimits; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -38,7 +38,7 @@ public class TransportBulkShardOperationsAction extends TransportWriteAction { - private final WriteMemoryLimits writeMemoryLimits; + private final IndexingPressure indexingPressure; @Inject public TransportBulkShardOperationsAction( @@ -49,7 +49,7 @@ public TransportBulkShardOperationsAction( final ThreadPool threadPool, final ShardStateAction shardStateAction, final ActionFilters actionFilters, - final WriteMemoryLimits writeMemoryLimits) { + final IndexingPressure indexingPressure) { super( settings, BulkShardOperationsAction.NAME, @@ -61,14 +61,14 @@ public TransportBulkShardOperationsAction( actionFilters, BulkShardOperationsRequest::new, BulkShardOperationsRequest::new, - ThreadPool.Names.WRITE, false, writeMemoryLimits); - this.writeMemoryLimits = writeMemoryLimits; + ThreadPool.Names.WRITE, false, indexingPressure); + this.indexingPressure = indexingPressure; } @Override protected void doExecute(Task task, BulkShardOperationsRequest request, ActionListener listener) { // This is executed on the follower coordinator node and we need to mark the bytes. - Releasable releasable = writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); + Releasable releasable = indexingPressure.markIndexingOperationStarted(primaryOperationSize(request)); ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); try { super.doExecute(task, request, releasingListener); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java index a0ea1222d12f8..2d1f99e32b798 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java @@ -641,7 +641,7 @@ private static NodeStats buildNodeStats(List pipelineNames, List