Skip to content

Commit 408a07f

Browse files
committed
Separate coordinating and primary bytes in stats (#59487)
Currently we combine coordinating and primary bytes into a single bucket for indexing pressure stats. This makes sense for rejection logic. However, for metrics it would be useful to separate them.
1 parent 70fe553 commit 408a07f

File tree

11 files changed

+275
-109
lines changed

11 files changed

+275
-109
lines changed

qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -76,32 +76,44 @@ public void testIndexingPressureStats() throws IOException {
7676
ArrayList<Object> values = new ArrayList<>(((Map<Object, Object>) nodeStatsMap.get("nodes")).values());
7777
assertThat(values.size(), equalTo(2));
7878
XContentTestUtils.JsonMapView node1 = new XContentTestUtils.JsonMapView((Map<String, Object>) values.get(0));
79-
Integer node1IndexingBytes = node1.get("indexing_pressure.total.coordinating_and_primary_bytes");
79+
Integer node1CombinedBytes = node1.get("indexing_pressure.total.combined_coordinating_and_primary_bytes");
80+
Integer node1PrimaryBytes = node1.get("indexing_pressure.total.primary_bytes");
8081
Integer node1ReplicaBytes = node1.get("indexing_pressure.total.replica_bytes");
81-
Integer node1Rejections = node1.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
82+
Integer node1CoordinatingRejections = node1.get("indexing_pressure.total.coordinating_rejections");
83+
Integer node1PrimaryRejections = node1.get("indexing_pressure.total.primary_rejections");
8284
XContentTestUtils.JsonMapView node2 = new XContentTestUtils.JsonMapView((Map<String, Object>) values.get(1));
83-
Integer node2IndexingBytes = node2.get("indexing_pressure.total.coordinating_and_primary_bytes");
85+
Integer node2IndexingBytes = node2.get("indexing_pressure.total.combined_coordinating_and_primary_bytes");
86+
Integer node2PrimaryBytes = node2.get("indexing_pressure.total.primary_bytes");
8487
Integer node2ReplicaBytes = node2.get("indexing_pressure.total.replica_bytes");
85-
Integer node2Rejections = node2.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
88+
Integer node2CoordinatingRejections = node2.get("indexing_pressure.total.coordinating_rejections");
89+
Integer node2PrimaryRejections = node2.get("indexing_pressure.total.primary_rejections");
8690

87-
if (node1IndexingBytes == 0) {
91+
if (node1CombinedBytes == 0) {
8892
assertThat(node2IndexingBytes, greaterThan(0));
8993
assertThat(node2IndexingBytes, lessThan(1024));
9094
} else {
91-
assertThat(node1IndexingBytes, greaterThan(0));
92-
assertThat(node1IndexingBytes, lessThan(1024));
95+
assertThat(node1CombinedBytes, greaterThan(0));
96+
assertThat(node1CombinedBytes, lessThan(1024));
9397
}
9498

9599
if (node1ReplicaBytes == 0) {
100+
assertThat(node1PrimaryBytes, greaterThan(0));
101+
assertThat(node1PrimaryBytes, lessThan(1024));
102+
96103
assertThat(node2ReplicaBytes, greaterThan(0));
97104
assertThat(node2ReplicaBytes, lessThan(1024));
98105
} else {
106+
assertThat(node2PrimaryBytes, greaterThan(0));
107+
assertThat(node2PrimaryBytes, lessThan(1024));
108+
99109
assertThat(node2ReplicaBytes, equalTo(0));
100110
assertThat(node1ReplicaBytes, lessThan(1024));
101111
}
102112

103-
assertThat(node1Rejections, equalTo(0));
104-
assertThat(node2Rejections, equalTo(0));
113+
assertThat(node1CoordinatingRejections, equalTo(0));
114+
assertThat(node1PrimaryRejections, equalTo(0));
115+
assertThat(node2CoordinatingRejections, equalTo(0));
116+
assertThat(node2PrimaryRejections, equalTo(0));
105117

106118
Request failedIndexingRequest = new Request("POST", "/index_name/_doc/");
107119
String largeString = randomAlphaOfLength(10000);
@@ -116,14 +128,19 @@ public void testIndexingPressureStats() throws IOException {
116128
ArrayList<Object> values2 = new ArrayList<>(((Map<Object, Object>) nodeStatsMap2.get("nodes")).values());
117129
assertThat(values2.size(), equalTo(2));
118130
XContentTestUtils.JsonMapView node1AfterRejection = new XContentTestUtils.JsonMapView((Map<String, Object>) values2.get(0));
119-
node1Rejections = node1AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
131+
node1CoordinatingRejections = node1AfterRejection.get("indexing_pressure.total.coordinating_rejections");
132+
node1PrimaryRejections = node1.get("indexing_pressure.total.primary_rejections");
120133
XContentTestUtils.JsonMapView node2AfterRejection = new XContentTestUtils.JsonMapView((Map<String, Object>) values2.get(1));
121-
node2Rejections = node2AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
134+
node2CoordinatingRejections = node2AfterRejection.get("indexing_pressure.total.coordinating_rejections");
135+
node2PrimaryRejections = node2AfterRejection.get("indexing_pressure.total.primary_rejections");
122136

123-
if (node1Rejections == 0) {
124-
assertThat(node2Rejections, equalTo(1));
137+
if (node1CoordinatingRejections == 0) {
138+
assertThat(node2CoordinatingRejections, equalTo(1));
125139
} else {
126-
assertThat(node1Rejections, equalTo(1));
140+
assertThat(node1CoordinatingRejections, equalTo(1));
127141
}
142+
143+
assertThat(node1PrimaryRejections, equalTo(0));
144+
assertThat(node2PrimaryRejections, equalTo(0));
128145
}
129146
}

rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,19 @@
1414
nodes.stats:
1515
metric: [ indexing_pressure ]
1616

17-
- gte: { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_bytes: 0 }
17+
- gte: { nodes.$node_id.indexing_pressure.total.combined_coordinating_and_primary_bytes: 0 }
18+
- gte: { nodes.$node_id.indexing_pressure.total.coordinating_bytes: 0 }
19+
- gte: { nodes.$node_id.indexing_pressure.total.primary_bytes: 0 }
1820
- gte: { nodes.$node_id.indexing_pressure.total.replica_bytes: 0 }
1921
- gte: { nodes.$node_id.indexing_pressure.total.all_bytes: 0 }
20-
- gte: { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_memory_limit_rejections: 0 }
21-
- gte: { nodes.$node_id.indexing_pressure.total.replica_memory_limit_rejections: 0 }
22-
- gte: { nodes.$node_id.indexing_pressure.current.coordinating_and_primary_bytes: 0 }
22+
23+
- gte: { nodes.$node_id.indexing_pressure.total.coordinating_rejections: 0 }
24+
- gte: { nodes.$node_id.indexing_pressure.total.primary_rejections: 0 }
25+
- gte: { nodes.$node_id.indexing_pressure.total.replica_rejections: 0 }
26+
27+
- gte: { nodes.$node_id.indexing_pressure.current.combined_coordinating_and_primary_bytes: 0 }
28+
- gte: { nodes.$node_id.indexing_pressure.current.coordinating_bytes: 0 }
29+
- gte: { nodes.$node_id.indexing_pressure.current.primary_bytes: 0 }
2330
- gte: { nodes.$node_id.indexing_pressure.current.replica_bytes: 0 }
2431
- gte: { nodes.$node_id.indexing_pressure.current.all_bytes: 0 }
2532

server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@
5656
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1, transportClientRatio = 0.0D)
5757
public class IndexingPressureIT extends ESIntegTestCase {
5858

59-
// TODO: Add additional REST tests when metrics are exposed
60-
6159
public static final String INDEX_NAME = "test";
6260

6361
private static final Settings unboundedWriteQueue = Settings.builder().put("thread_pool.write.queue_size", -1).build();
@@ -140,11 +138,19 @@ public void testWriteBytesAreIncremented() throws Exception {
140138
IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName);
141139
IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
142140

143-
assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
141+
assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
142+
assertThat(primaryWriteLimits.getCurrentPrimaryBytes(), greaterThan(bulkShardRequestSize));
143+
assertEquals(0, primaryWriteLimits.getCurrentCoordinatingBytes());
144144
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
145-
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
145+
146+
assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
147+
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingBytes());
148+
assertEquals(0, replicaWriteLimits.getCurrentPrimaryBytes());
146149
assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes());
147-
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
150+
151+
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
152+
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingBytes());
153+
assertEquals(0, coordinatingWriteLimits.getCurrentPrimaryBytes());
148154
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
149155

150156
latchBlockingReplicationSend.countDown();
@@ -167,14 +173,25 @@ public void testWriteBytesAreIncremented() throws Exception {
167173
final long secondBulkShardRequestSize = request.ramBytesUsed();
168174

169175
if (usePrimaryAsCoordinatingNode) {
170-
assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(),
171-
greaterThan(bulkShardRequestSize + secondBulkRequestSize));
172-
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
176+
assertBusy(() -> {
177+
assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(),
178+
greaterThan(bulkShardRequestSize + secondBulkRequestSize));
179+
assertEquals(secondBulkRequestSize, primaryWriteLimits.getCurrentCoordinatingBytes());
180+
assertThat(primaryWriteLimits.getCurrentPrimaryBytes(),
181+
greaterThan(bulkShardRequestSize + secondBulkRequestSize));
182+
183+
assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
184+
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingBytes());
185+
assertEquals(0, replicaWriteLimits.getCurrentPrimaryBytes());
186+
});
173187
} else {
174-
assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
175-
assertEquals(secondBulkRequestSize, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
188+
assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
189+
190+
assertEquals(secondBulkRequestSize, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
191+
assertEquals(secondBulkRequestSize, replicaWriteLimits.getCurrentCoordinatingBytes());
192+
assertEquals(0, replicaWriteLimits.getCurrentPrimaryBytes());
176193
}
177-
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
194+
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
178195
assertBusy(() -> assertThat(replicaWriteLimits.getCurrentReplicaBytes(),
179196
greaterThan(bulkShardRequestSize + secondBulkShardRequestSize)));
180197

@@ -183,11 +200,19 @@ public void testWriteBytesAreIncremented() throws Exception {
183200
successFuture.actionGet();
184201
secondFuture.actionGet();
185202

186-
assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
203+
assertEquals(0, primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
204+
assertEquals(0, primaryWriteLimits.getCurrentCoordinatingBytes());
205+
assertEquals(0, primaryWriteLimits.getCurrentPrimaryBytes());
187206
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
188-
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
207+
208+
assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
209+
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingBytes());
210+
assertEquals(0, replicaWriteLimits.getCurrentPrimaryBytes());
189211
assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes());
190-
assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
212+
213+
assertEquals(0, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
214+
assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingBytes());
215+
assertEquals(0, coordinatingWriteLimits.getCurrentPrimaryBytes());
191216
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
192217
} finally {
193218
if (replicationSendPointReached.getCount() > 0) {
@@ -237,11 +262,11 @@ public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception {
237262
IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
238263

239264
assertBusy(() -> {
240-
assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
265+
assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
241266
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
242-
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
267+
assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
243268
assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize));
244-
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
269+
assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
245270
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
246271
});
247272

@@ -259,11 +284,11 @@ public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception {
259284

260285
successFuture.actionGet();
261286

262-
assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
287+
assertEquals(0, primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
263288
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
264-
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
289+
assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
265290
assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes());
266-
assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
291+
assertEquals(0, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
267292
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
268293
}
269294
}
@@ -301,11 +326,11 @@ public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception {
301326
IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
302327

303328
assertBusy(() -> {
304-
assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
329+
assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
305330
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
306-
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
331+
assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
307332
assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize));
308-
assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
333+
assertEquals(0, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
309334
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
310335
});
311336

@@ -317,11 +342,11 @@ public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception {
317342

318343
successFuture.actionGet();
319344

320-
assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
345+
assertEquals(0, primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
321346
assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
322-
assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
347+
assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
323348
assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes());
324-
assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
349+
assertEquals(0, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes());
325350
assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
326351
}
327352
}

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteReque
167167
@Override
168168
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
169169
long indexingBytes = bulkRequest.ramBytesUsed();
170-
final Releasable releasable = indexingPressure.markIndexingOperationStarted(indexingBytes);
170+
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes);
171171
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
172172
try {
173173
doInternalExecute(task, bulkRequest, releasingListener);

0 commit comments

Comments
 (0)