Skip to content

Commit f174f72

Browse files
Circuit-break based on real memory usage
With this commit we introduce a new circuit-breaking strategy to the parent circuit breaker. Contrary to the current implementation which only accounts for memory reserved via child circuit breakers, the new strategy measures real heap memory usage at the time of reservation. This allows us to be much more aggressive with the circuit breaker limit so we bump it to 95% by default. The new strategy is turned on by default and can be controlled with the new cluster setting `indices.breaker.total.userealmemory`. Note that we turn it off for all integration tests with an internal test cluster because it leads to spurious test failures which are of no value (we cannot fully control heap memory usage in tests). All REST tests, however, will make use of the real memory circuit breaker. Relates #31767
1 parent d246164 commit f174f72

File tree

9 files changed

+225
-17
lines changed

9 files changed

+225
-17
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.benchmark.indices.breaker;
20+
21+
import org.openjdk.jmh.annotations.Benchmark;
22+
import org.openjdk.jmh.annotations.BenchmarkMode;
23+
import org.openjdk.jmh.annotations.Fork;
24+
import org.openjdk.jmh.annotations.Measurement;
25+
import org.openjdk.jmh.annotations.Mode;
26+
import org.openjdk.jmh.annotations.OutputTimeUnit;
27+
import org.openjdk.jmh.annotations.Param;
28+
import org.openjdk.jmh.annotations.Scope;
29+
import org.openjdk.jmh.annotations.State;
30+
import org.openjdk.jmh.annotations.Threads;
31+
import org.openjdk.jmh.annotations.Warmup;
32+
import org.openjdk.jmh.infra.Blackhole;
33+
34+
import java.lang.management.ManagementFactory;
35+
import java.lang.management.MemoryMXBean;
36+
import java.util.concurrent.TimeUnit;
37+
38+
@Fork(3)
39+
@Warmup(iterations = 10)
40+
@Measurement(iterations = 10)
41+
@BenchmarkMode(Mode.AverageTime)
42+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
43+
@State(Scope.Benchmark)
44+
@SuppressWarnings("unused") //invoked by benchmarking framework
45+
public class MemoryStatsBenchmark {
46+
private static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean();
47+
48+
@Param({"0", "16", "256", "4096"})
49+
private int tokens;
50+
51+
@Benchmark
52+
public void baseline() {
53+
Blackhole.consumeCPU(tokens);
54+
}
55+
56+
@Benchmark
57+
@Threads(1)
58+
public long getMemoryStats_01() {
59+
Blackhole.consumeCPU(tokens);
60+
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
61+
}
62+
63+
@Benchmark
64+
@Threads(2)
65+
public long getMemoryStats_02() {
66+
Blackhole.consumeCPU(tokens);
67+
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
68+
}
69+
70+
@Benchmark
71+
@Threads(4)
72+
public long getMemoryStats_04() {
73+
Blackhole.consumeCPU(tokens);
74+
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
75+
}
76+
77+
@Benchmark
78+
@Threads(8)
79+
public long getMemoryStats_08() {
80+
Blackhole.consumeCPU(tokens);
81+
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
82+
}
83+
84+
@Benchmark
85+
@Threads(16)
86+
public long getMemoryStats_16() {
87+
Blackhole.consumeCPU(tokens);
88+
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
89+
}
90+
91+
@Benchmark
92+
@Threads(32)
93+
public long getMemoryStats_32() {
94+
Blackhole.consumeCPU(tokens);
95+
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
96+
}
97+
98+
@Benchmark
99+
@Threads(64)
100+
public long getMemoryStats_64() {
101+
Blackhole.consumeCPU(tokens);
102+
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
103+
}
104+
}
105+

docs/reference/migration/migrate_7_0/indices.asciidoc

+9-1
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,12 @@ The following previously deprecated url parameter have been removed:
6969

7070
Previously the in flight requests circuit breaker considered only the raw byte representation.
7171
By bumping the value of `network.breaker.inflight_requests.overhead` from 1 to 2, this circuit
72-
breaker considers now also the memory overhead of representing the request as a structured object.
72+
breaker considers now also the memory overhead of representing the request as a structured object.
73+
74+
==== Parent circuit breaker changes
75+
76+
The parent circuit breaker defines a new setting `indices.breaker.total.use_real_memory` which is
77+
`true` by default. This means that the parent circuit breaker will trip based on currently used
78+
heap memory instead of only considering the reserved memory by child circuit breakers. When this
79+
setting is `true`, the default parent breaker limit also changes from 70% to 95% of the JVM heap size.
80+
The previous behavior can be restored by setting `indices.breaker.total.use_real_memory` to `false`.

docs/reference/modules/indices/circuit_breaker.asciidoc

+9-2
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,18 @@ These settings can be dynamically updated on a live cluster with the
1313
[float]
1414
==== Parent circuit breaker
1515

16-
The parent-level breaker can be configured with the following setting:
16+
The parent-level breaker can be configured with the following settings:
17+
18+
`indices.breaker.total.use_real_memory`::
19+
20+
Whether the parent breaker should take real memory usage into account (`true`) or only
21+
consider the amount that is reserved by child circuit breakers (`false`). Defaults to `true`.
1722

1823
`indices.breaker.total.limit`::
1924

20-
Starting limit for overall parent breaker, defaults to 70% of JVM heap.
25+
Starting limit for overall parent breaker, defaults to 70% of JVM heap if
26+
`indices.breaker.total.use_real_memory` is `false`. If `indices.breaker.total.use_real_memory`
27+
is `true`, defaults to 95% of the JVM heap.
2128

2229
[[fielddata-circuit-breaker]]
2330
[float]

server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws Cir
125125

126126
// Additionally, we need to check that we haven't exceeded the parent's limit
127127
try {
128-
parent.checkParentLimit(label);
128+
parent.checkParentLimit((long) (bytes * overheadConstant), label);
129129
} catch (CircuitBreakingException e) {
130130
// If the parent breaker is tripped, this breaker has to be
131131
// adjusted back down because the allocation is "blocked" but the

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ public void apply(Settings value, Settings current, Settings previous) {
254254
HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS,
255255
HttpTransportSettings.SETTING_HTTP_TCP_SEND_BUFFER_SIZE,
256256
HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE,
257+
HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING,
257258
HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING,
258259
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING,
259260
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING,

server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java

+37-10
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.elasticsearch.common.settings.Settings;
3131
import org.elasticsearch.common.unit.ByteSizeValue;
3232

33+
import java.lang.management.ManagementFactory;
34+
import java.lang.management.MemoryMXBean;
3335
import java.util.ArrayList;
3436
import java.util.List;
3537
import java.util.concurrent.ConcurrentHashMap;
@@ -44,10 +46,21 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
4446

4547
private static final String CHILD_LOGGER_PREFIX = "org.elasticsearch.indices.breaker.";
4648

49+
private static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean();
50+
4751
private final ConcurrentMap<String, CircuitBreaker> breakers = new ConcurrentHashMap<>();
4852

53+
public static final Setting<Boolean> USE_REAL_MEMORY_USAGE_SETTING =
54+
Setting.boolSetting("indices.breaker.total.use_real_memory", true, Property.NodeScope);
55+
4956
public static final Setting<ByteSizeValue> TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING =
50-
Setting.memorySizeSetting("indices.breaker.total.limit", "70%", Property.Dynamic, Property.NodeScope);
57+
Setting.memorySizeSetting("indices.breaker.total.limit", settings -> {
58+
if (USE_REAL_MEMORY_USAGE_SETTING.get(settings)) {
59+
return "95%";
60+
} else {
61+
return "70%";
62+
}
63+
}, Property.Dynamic, Property.NodeScope);
5164

5265
public static final Setting<ByteSizeValue> FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING =
5366
Setting.memorySizeSetting("indices.breaker.fielddata.limit", "60%", Property.Dynamic, Property.NodeScope);
@@ -77,6 +90,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
7790
public static final Setting<CircuitBreaker.Type> IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_TYPE_SETTING =
7891
new Setting<>("network.breaker.inflight_requests.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope);
7992

93+
private final boolean trackRealMemoryUsage;
8094
private volatile BreakerSettings parentSettings;
8195
private volatile BreakerSettings fielddataSettings;
8296
private volatile BreakerSettings inFlightRequestsSettings;
@@ -120,6 +134,8 @@ public HierarchyCircuitBreakerService(Settings settings, ClusterSettings cluster
120134
logger.trace("parent circuit breaker with settings {}", this.parentSettings);
121135
}
122136

137+
this.trackRealMemoryUsage = USE_REAL_MEMORY_USAGE_SETTING.get(settings);
138+
123139
registerBreaker(this.requestSettings);
124140
registerBreaker(this.fielddataSettings);
125141
registerBreaker(this.inFlightRequestsSettings);
@@ -191,17 +207,15 @@ public CircuitBreaker getBreaker(String name) {
191207

192208
@Override
193209
public AllCircuitBreakerStats stats() {
194-
long parentEstimated = 0;
195210
List<CircuitBreakerStats> allStats = new ArrayList<>(this.breakers.size());
196211
// Gather the "estimated" count for the parent breaker by adding the
197212
// estimations for each individual breaker
198213
for (CircuitBreaker breaker : this.breakers.values()) {
199214
allStats.add(stats(breaker.getName()));
200-
parentEstimated += breaker.getUsed();
201215
}
202216
// Manually add the parent breaker settings since they aren't part of the breaker map
203217
allStats.add(new CircuitBreakerStats(CircuitBreaker.PARENT, parentSettings.getLimit(),
204-
parentEstimated, 1.0, parentTripCount.get()));
218+
parentUsed(0L), 1.0, parentTripCount.get()));
205219
return new AllCircuitBreakerStats(allStats.toArray(new CircuitBreakerStats[allStats.size()]));
206220
}
207221

@@ -211,15 +225,28 @@ public CircuitBreakerStats stats(String name) {
211225
return new CircuitBreakerStats(breaker.getName(), breaker.getLimit(), breaker.getUsed(), breaker.getOverhead(), breaker.getTrippedCount());
212226
}
213227

228+
private long parentUsed(long newBytesReserved) {
229+
if (this.trackRealMemoryUsage) {
230+
return currentMemoryUsage() + newBytesReserved;
231+
} else {
232+
long parentEstimated = 0;
233+
for (CircuitBreaker breaker : this.breakers.values()) {
234+
parentEstimated += breaker.getUsed() * breaker.getOverhead();
235+
}
236+
return parentEstimated;
237+
}
238+
}
239+
240+
//package private to allow overriding it in tests
241+
long currentMemoryUsage() {
242+
return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
243+
}
244+
214245
/**
215246
* Checks whether the parent breaker has been tripped
216247
*/
217-
public void checkParentLimit(String label) throws CircuitBreakingException {
218-
long totalUsed = 0;
219-
for (CircuitBreaker breaker : this.breakers.values()) {
220-
totalUsed += (breaker.getUsed() * breaker.getOverhead());
221-
}
222-
248+
public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException {
249+
long totalUsed = parentUsed(newBytesReserved);
223250
long parentLimit = this.parentSettings.getLimit();
224251
if (totalUsed > parentLimit) {
225252
this.parentTripCount.incrementAndGet();

server/src/test/java/org/elasticsearch/common/settings/MemorySizeSettingsTests.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.common.settings;
2121

2222
import org.elasticsearch.common.settings.Setting.Property;
23+
import org.elasticsearch.common.unit.ByteSizeUnit;
2324
import org.elasticsearch.common.unit.ByteSizeValue;
2425
import org.elasticsearch.common.util.PageCacheRecycler;
2526
import org.elasticsearch.indices.IndexingMemoryController;
@@ -57,8 +58,15 @@ public void testIndicesRequestCacheSetting() {
5758
}
5859

5960
public void testCircuitBreakerSettings() {
61+
// default is chosen based on actual heap size
62+
double defaultTotalPercentage;
63+
if (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() < new ByteSizeValue(1, ByteSizeUnit.GB).getBytes()) {
64+
defaultTotalPercentage = 0.95d;
65+
} else {
66+
defaultTotalPercentage = 0.7d;
67+
}
6068
assertMemorySizeSetting(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, "indices.breaker.total.limit",
61-
new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.7)));
69+
new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * defaultTotalPercentage)));
6270
assertMemorySizeSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, "indices.breaker.fielddata.limit",
6371
new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.6)));
6472
assertMemorySizeSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, "indices.breaker.request.limit",

server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java

+50-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import java.util.concurrent.atomic.AtomicBoolean;
3434
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.concurrent.atomic.AtomicLong;
3536
import java.util.concurrent.atomic.AtomicReference;
3637

3738
import static org.hamcrest.Matchers.containsString;
@@ -56,7 +57,7 @@ public CircuitBreaker getBreaker(String name) {
5657
}
5758

5859
@Override
59-
public void checkParentLimit(String label) throws CircuitBreakingException {
60+
public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException {
6061
// never trip
6162
}
6263
};
@@ -114,7 +115,7 @@ public CircuitBreaker getBreaker(String name) {
114115
}
115116

116117
@Override
117-
public void checkParentLimit(String label) throws CircuitBreakingException {
118+
public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException {
118119
// Parent will trip right before regular breaker would trip
119120
if (getBreaker(CircuitBreaker.REQUEST).getUsed() > parentLimit) {
120121
parentTripped.incrementAndGet();
@@ -170,6 +171,7 @@ public void checkParentLimit(String label) throws CircuitBreakingException {
170171
*/
171172
public void testBorrowingSiblingBreakerMemory() throws Exception {
172173
Settings clusterSettings = Settings.builder()
174+
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false)
173175
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "200mb")
174176
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "150mb")
175177
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "150mb")
@@ -199,4 +201,50 @@ public void testBorrowingSiblingBreakerMemory() throws Exception {
199201
assertThat(exception.getMessage(), containsString("which is larger than the limit of [209715200/200mb]"));
200202
}
201203
}
204+
205+
public void testParentBreaksOnRealMemoryUsage() throws Exception {
206+
Settings clusterSettings = Settings.builder()
207+
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), Boolean.TRUE)
208+
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "200b")
209+
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "300b")
210+
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey(), 2)
211+
.build();
212+
213+
AtomicLong memoryUsage = new AtomicLong();
214+
final CircuitBreakerService service = new HierarchyCircuitBreakerService(clusterSettings,
215+
new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) {
216+
@Override
217+
long currentMemoryUsage() {
218+
return memoryUsage.get();
219+
}
220+
};
221+
final CircuitBreaker requestBreaker = service.getBreaker(CircuitBreaker.REQUEST);
222+
223+
// anything below 100 bytes should work (overhead) - current memory usage is zero
224+
requestBreaker.addEstimateBytesAndMaybeBreak(randomLongBetween(0, 99), "request");
225+
assertEquals(0, requestBreaker.getTrippedCount());
226+
// assume memory usage has increased to 150 bytes
227+
memoryUsage.set(150);
228+
229+
// a reservation that bumps memory usage to less than 200 (150 bytes used + reservation < 200)
230+
requestBreaker.addEstimateBytesAndMaybeBreak(randomLongBetween(0, 24), "request");
231+
assertEquals(0, requestBreaker.getTrippedCount());
232+
memoryUsage.set(181);
233+
234+
long reservationInBytes = randomLongBetween(10, 50);
235+
// anything >= 20 bytes (10 bytes * 2 overhead) reservation breaks the parent but it must be low enough to avoid
236+
// breaking the child breaker.
237+
CircuitBreakingException exception = expectThrows(CircuitBreakingException.class, () -> requestBreaker
238+
.addEstimateBytesAndMaybeBreak(reservationInBytes, "request"));
239+
// it was the parent that rejected the reservation
240+
assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [request] would be"));
241+
assertThat(exception.getMessage(), containsString("which is larger than the limit of [200/200b]"));
242+
assertEquals(0, requestBreaker.getTrippedCount());
243+
assertEquals(1, service.stats().getStats(CircuitBreaker.PARENT).getTrippedCount());
244+
245+
// lower memory usage again - the same reservation should succeed
246+
memoryUsage.set(100);
247+
requestBreaker.addEstimateBytesAndMaybeBreak(reservationInBytes, "request");
248+
assertEquals(0, requestBreaker.getTrippedCount());
249+
}
202250
}

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

+4
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,10 @@ private Settings getRandomNodeSettings(long seed) {
394394
builder.put(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING.getKey(), new TimeValue(RandomNumbers.randomIntBetween(random, 10, 30), TimeUnit.SECONDS));
395395
}
396396

397+
// turning on the real memory circuit breaker leads to spurious test failures. As have no full control over heap usage, we
398+
// turn it off for these tests.
399+
builder.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false);
400+
397401
if (random.nextInt(10) == 0) {
398402
builder.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), "noop");
399403
builder.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), "noop");

0 commit comments

Comments
 (0)