22
22
import org .apache .logging .log4j .LogManager ;
23
23
import org .apache .logging .log4j .Logger ;
24
24
import org .apache .logging .log4j .message .ParameterizedMessage ;
25
+ import org .elasticsearch .common .Booleans ;
25
26
import org .elasticsearch .common .breaker .ChildMemoryCircuitBreaker ;
26
27
import org .elasticsearch .common .breaker .CircuitBreaker ;
27
28
import org .elasticsearch .common .breaker .CircuitBreakingException ;
30
31
import org .elasticsearch .common .settings .Setting ;
31
32
import org .elasticsearch .common .settings .Setting .Property ;
32
33
import org .elasticsearch .common .settings .Settings ;
34
+ import org .elasticsearch .common .unit .ByteSizeUnit ;
33
35
import org .elasticsearch .common .unit .ByteSizeValue ;
36
+ import org .elasticsearch .common .unit .TimeValue ;
37
+ import org .elasticsearch .common .util .concurrent .ReleasableLock ;
38
+ import org .elasticsearch .monitor .jvm .GcNames ;
39
+ import org .elasticsearch .monitor .jvm .JvmInfo ;
34
40
41
+ import java .lang .management .GarbageCollectorMXBean ;
35
42
import java .lang .management .ManagementFactory ;
36
43
import java .lang .management .MemoryMXBean ;
37
44
import java .util .ArrayList ;
40
47
import java .util .List ;
41
48
import java .util .Map ;
42
49
import java .util .concurrent .atomic .AtomicLong ;
50
+ import java .util .concurrent .locks .ReentrantLock ;
51
+ import java .util .function .Function ;
52
+ import java .util .function .LongSupplier ;
43
53
import java .util .stream .Collectors ;
44
54
45
55
import static org .elasticsearch .indices .breaker .BreakerSettings .CIRCUIT_BREAKER_LIMIT_SETTING ;
@@ -104,7 +114,14 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
104
114
// Tripped count for when redistribution was attempted but wasn't successful
105
115
private final AtomicLong parentTripCount = new AtomicLong (0 );
106
116
117
+ private final OverLimitStrategy overLimitStrategy ;
118
+
107
119
public HierarchyCircuitBreakerService (Settings settings , List <BreakerSettings > customBreakers , ClusterSettings clusterSettings ) {
120
+ this (settings , customBreakers , clusterSettings , HierarchyCircuitBreakerService ::createOverLimitStrategy );
121
+ }
122
+
123
+ HierarchyCircuitBreakerService (Settings settings , List <BreakerSettings > customBreakers , ClusterSettings clusterSettings ,
124
+ Function <Boolean , OverLimitStrategy > overLimitStrategyFactory ) {
108
125
super ();
109
126
HashMap <String , CircuitBreaker > childCircuitBreakers = new HashMap <>();
110
127
childCircuitBreakers .put (CircuitBreaker .FIELDDATA , validateAndCreateBreaker (
@@ -168,6 +185,8 @@ public HierarchyCircuitBreakerService(Settings settings, List<BreakerSettings> c
168
185
CIRCUIT_BREAKER_OVERHEAD_SETTING ,
169
186
(name , updatedValues ) -> updateCircuitBreakerSettings (name , updatedValues .v1 (), updatedValues .v2 ()),
170
187
(s , t ) -> {});
188
+
189
+ this .overLimitStrategy = overLimitStrategyFactory .apply (this .trackRealMemoryUsage );
171
190
}
172
191
173
192
private void updateCircuitBreakerSettings (String name , ByteSizeValue newLimit , Double newOverhead ) {
@@ -231,7 +250,7 @@ public CircuitBreakerStats stats(String name) {
231
250
breaker .getTrippedCount ());
232
251
}
233
252
234
- private static class MemoryUsage {
253
+ static class MemoryUsage {
235
254
final long baseUsage ;
236
255
final long totalUsage ;
237
256
final long transientChildUsage ;
@@ -268,6 +287,10 @@ private MemoryUsage memoryUsed(long newBytesReserved) {
268
287
269
288
//package private to allow overriding it in tests
270
289
long currentMemoryUsage () {
290
+ return realMemoryUsage ();
291
+ }
292
+
293
+ static long realMemoryUsage () {
271
294
try {
272
295
return MEMORY_MX_BEAN .getHeapMemoryUsage ().getUsed ();
273
296
} catch (IllegalArgumentException ex ) {
@@ -290,7 +313,7 @@ public long getParentLimit() {
290
313
public void checkParentLimit (long newBytesReserved , String label ) throws CircuitBreakingException {
291
314
final MemoryUsage memoryUsed = memoryUsed (newBytesReserved );
292
315
long parentLimit = this .parentSettings .getLimit ();
293
- if (memoryUsed .totalUsage > parentLimit ) {
316
+ if (memoryUsed .totalUsage > parentLimit && overLimitStrategy . overLimit ( memoryUsed ). totalUsage > parentLimit ) {
294
317
this .parentTripCount .incrementAndGet ();
295
318
final StringBuilder message = new StringBuilder ("[parent] Data too large, data for [" + label + "]" +
296
319
" would be [" + memoryUsed .totalUsage + "/" + new ByteSizeValue (memoryUsed .totalUsage ) + "]" +
@@ -334,4 +357,163 @@ private CircuitBreaker validateAndCreateBreaker(BreakerSettings breakerSettings)
334
357
this ,
335
358
breakerSettings .getName ());
336
359
}
360
+
361
+ static OverLimitStrategy createOverLimitStrategy (boolean trackRealMemoryUsage ) {
362
+ JvmInfo jvmInfo = JvmInfo .jvmInfo ();
363
+ if (trackRealMemoryUsage && jvmInfo .useG1GC ().equals ("true" )
364
+ // messing with GC is "dangerous" so we apply an escape hatch. Not intended to be used.
365
+ && Booleans .parseBoolean (System .getProperty ("es.real_memory_circuit_breaker.g1_over_limit_strategy.enabled" ), true )) {
366
+ TimeValue lockTimeout = TimeValue .timeValueMillis (
367
+ Integer .parseInt (System .getProperty ("es.real_memory_circuit_breaker.g1_over_limit_strategy.lock_timeout_ms" , "500" ))
368
+ );
369
+ // hardcode interval, do not want any tuning of it outside code changes.
370
+ return new G1OverLimitStrategy (jvmInfo , HierarchyCircuitBreakerService ::realMemoryUsage , createYoungGcCountSupplier (),
371
+ System ::currentTimeMillis , 5000 , lockTimeout );
372
+ } else {
373
+ return memoryUsed -> memoryUsed ;
374
+ }
375
+ }
376
+
377
+ static LongSupplier createYoungGcCountSupplier () {
378
+ List <GarbageCollectorMXBean > youngBeans =
379
+ ManagementFactory .getGarbageCollectorMXBeans ().stream ()
380
+ .filter (mxBean -> GcNames .getByGcName (mxBean .getName (), mxBean .getName ()).equals (GcNames .YOUNG ))
381
+ .collect (Collectors .toList ());
382
+ assert youngBeans .size () == 1 ;
383
+ assert youngBeans .get (0 ).getCollectionCount () != -1 : "G1 must support getting collection count" ;
384
+
385
+ if (youngBeans .size () == 1 ) {
386
+ return youngBeans .get (0 )::getCollectionCount ;
387
+ } else {
388
+ logger .warn ("Unable to find young generation collector, G1 over limit strategy might be impacted [{}]" , youngBeans );
389
+ return () -> -1 ;
390
+ }
391
+ }
392
+
393
+ interface OverLimitStrategy {
394
+ MemoryUsage overLimit (MemoryUsage memoryUsed );
395
+ }
396
+
397
+ static class G1OverLimitStrategy implements OverLimitStrategy {
398
+ private final long g1RegionSize ;
399
+ private final LongSupplier currentMemoryUsageSupplier ;
400
+ private final LongSupplier gcCountSupplier ;
401
+ private final LongSupplier timeSupplier ;
402
+ private final TimeValue lockTimeout ;
403
+ private final long maxHeap ;
404
+
405
+ private long lastCheckTime = Long .MIN_VALUE ;
406
+ private final long minimumInterval ;
407
+
408
+ private long blackHole ;
409
+ private final ReleasableLock lock = new ReleasableLock (new ReentrantLock ());
410
+
411
+ G1OverLimitStrategy (JvmInfo jvmInfo , LongSupplier currentMemoryUsageSupplier ,
412
+ LongSupplier gcCountSupplier ,
413
+ LongSupplier timeSupplier , long minimumInterval , TimeValue lockTimeout ) {
414
+ this .lockTimeout = lockTimeout ;
415
+ assert minimumInterval > 0 ;
416
+ this .currentMemoryUsageSupplier = currentMemoryUsageSupplier ;
417
+ this .gcCountSupplier = gcCountSupplier ;
418
+ this .timeSupplier = timeSupplier ;
419
+ this .minimumInterval = minimumInterval ;
420
+ this .maxHeap = jvmInfo .getMem ().getHeapMax ().getBytes ();
421
+ long g1RegionSize = jvmInfo .getG1RegionSize ();
422
+ if (g1RegionSize <= 0 ) {
423
+ this .g1RegionSize = fallbackRegionSize (jvmInfo );
424
+ } else {
425
+ this .g1RegionSize = g1RegionSize ;
426
+ }
427
+ }
428
+
429
+ static long fallbackRegionSize (JvmInfo jvmInfo ) {
430
+ // mimick JDK calculation based on JDK 14 source:
431
+ // https://hg.openjdk.java.net/jdk/jdk14/file/6c954123ee8d/src/hotspot/share/gc/g1/heapRegion.cpp#l65
432
+ // notice that newer JDKs will have a slight variant only considering max-heap:
433
+ // https://hg.openjdk.java.net/jdk/jdk/file/e7d0ec2d06e8/src/hotspot/share/gc/g1/heapRegion.cpp#l67
434
+ // based on this JDK "bug":
435
+ // https://bugs.openjdk.java.net/browse/JDK-8241670
436
+ long averageHeapSize =
437
+ (jvmInfo .getMem ().getHeapMax ().getBytes () + JvmInfo .jvmInfo ().getMem ().getHeapMax ().getBytes ()) / 2 ;
438
+ long regionSize = Long .highestOneBit (averageHeapSize / 2048 );
439
+ if (regionSize < ByteSizeUnit .MB .toBytes (1 )) {
440
+ regionSize = ByteSizeUnit .MB .toBytes (1 );
441
+ } else if (regionSize > ByteSizeUnit .MB .toBytes (32 )) {
442
+ regionSize = ByteSizeUnit .MB .toBytes (32 );
443
+ }
444
+ return regionSize ;
445
+ }
446
+
447
+ @ Override
448
+ public MemoryUsage overLimit (MemoryUsage memoryUsed ) {
449
+ boolean leader = false ;
450
+ int allocationIndex = 0 ;
451
+ long allocationDuration = 0 ;
452
+ try (ReleasableLock locked = lock .tryAcquire (lockTimeout )) {
453
+ if (locked != null ) {
454
+ long begin = timeSupplier .getAsLong ();
455
+ leader = begin >= lastCheckTime + minimumInterval ;
456
+ overLimitTriggered (leader );
457
+ if (leader ) {
458
+ long initialCollectionCount = gcCountSupplier .getAsLong ();
459
+ logger .info ("attempting to trigger G1GC due to high heap usage [{}]" , memoryUsed .baseUsage );
460
+ long localBlackHole = 0 ;
461
+ // number of allocations, corresponding to (approximately) number of free regions + 1
462
+ int allocationCount = Math .toIntExact ((maxHeap - memoryUsed .baseUsage ) / g1RegionSize + 1 );
463
+ // allocations of half-region size becomes single humongous alloc, thus taking up a full region.
464
+ int allocationSize = (int ) (g1RegionSize >> 1 );
465
+ long maxUsageObserved = memoryUsed .baseUsage ;
466
+ for (; allocationIndex < allocationCount ; ++allocationIndex ) {
467
+ long current = currentMemoryUsageSupplier .getAsLong ();
468
+ if (current >= maxUsageObserved ) {
469
+ maxUsageObserved = current ;
470
+ } else {
471
+ // we observed a memory drop, so some GC must have occurred
472
+ break ;
473
+ }
474
+ if (initialCollectionCount != gcCountSupplier .getAsLong ()) {
475
+ break ;
476
+ }
477
+ localBlackHole += new byte [allocationSize ].hashCode ();
478
+ }
479
+
480
+ blackHole += localBlackHole ;
481
+ logger .trace ("black hole [{}]" , blackHole );
482
+
483
+ long now = timeSupplier .getAsLong ();
484
+ this .lastCheckTime = now ;
485
+ allocationDuration = now - begin ;
486
+ }
487
+ }
488
+ } catch (InterruptedException e ) {
489
+ Thread .currentThread ().interrupt ();
490
+ // fallthrough
491
+ }
492
+
493
+ final long current = currentMemoryUsageSupplier .getAsLong ();
494
+ if (current < memoryUsed .baseUsage ) {
495
+ if (leader ) {
496
+ logger .info ("GC did bring memory usage down, before [{}], after [{}], allocations [{}], duration [{}]" ,
497
+ memoryUsed .baseUsage , current , allocationIndex , allocationDuration );
498
+ }
499
+ return new MemoryUsage (current , memoryUsed .totalUsage - memoryUsed .baseUsage + current ,
500
+ memoryUsed .transientChildUsage , memoryUsed .permanentChildUsage );
501
+ } else {
502
+ if (leader ) {
503
+ logger .info ("GC did not bring memory usage down, before [{}], after [{}], allocations [{}], duration [{}]" ,
504
+ memoryUsed .baseUsage , current , allocationIndex , allocationDuration );
505
+ }
506
+ // prefer original measurement when reporting if heap usage was not brought down.
507
+ return memoryUsed ;
508
+ }
509
+ }
510
+
511
+ void overLimitTriggered (boolean leader ) {
512
+ // for tests to override.
513
+ }
514
+
515
+ TimeValue getLockTimeout () {
516
+ return lockTimeout ;
517
+ }
518
+ }
337
519
}
0 commit comments