19
19
20
20
package org .elasticsearch .common .breaker ;
21
21
22
- import org .elasticsearch .common .settings .ClusterSettings ;
23
- import org .elasticsearch .common .settings .Settings ;
24
- import org .elasticsearch .common .unit .ByteSizeUnit ;
25
22
import org .elasticsearch .common .unit .ByteSizeValue ;
26
- import org .elasticsearch .indices .breaker .BreakerSettings ;
27
- import org .elasticsearch .indices .breaker .CircuitBreakerService ;
28
- import org .elasticsearch .indices .breaker .HierarchyCircuitBreakerService ;
29
23
import org .elasticsearch .test .ESTestCase ;
30
24
31
25
import java .util .concurrent .atomic .AtomicBoolean ;
32
- import java .util .concurrent .atomic .AtomicInteger ;
33
26
import java .util .concurrent .atomic .AtomicReference ;
34
27
35
- import static org .hamcrest .Matchers .containsString ;
36
28
import static org .hamcrest .Matchers .equalTo ;
37
29
import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
38
30
@@ -50,21 +42,18 @@ public void testThreadedUpdatesToBreaker() throws Exception {
50
42
final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker (new ByteSizeValue ((BYTES_PER_THREAD * NUM_THREADS ) - 1 ), 1.0 , logger );
51
43
52
44
for (int i = 0 ; i < NUM_THREADS ; i ++) {
53
- threads [i ] = new Thread (new Runnable () {
54
- @ Override
55
- public void run () {
56
- for (int j = 0 ; j < BYTES_PER_THREAD ; j ++) {
57
- try {
58
- breaker .addEstimateBytesAndMaybeBreak (1L , "test" );
59
- } catch (CircuitBreakingException e ) {
60
- if (tripped .get ()) {
61
- assertThat ("tripped too many times" , true , equalTo (false ));
62
- } else {
63
- assertThat (tripped .compareAndSet (false , true ), equalTo (true ));
64
- }
65
- } catch (Exception e ) {
66
- lastException .set (e );
45
+ threads [i ] = new Thread (() -> {
46
+ for (int j = 0 ; j < BYTES_PER_THREAD ; j ++) {
47
+ try {
48
+ breaker .addEstimateBytesAndMaybeBreak (1L , "test" );
49
+ } catch (CircuitBreakingException e ) {
50
+ if (tripped .get ()) {
51
+ assertThat ("tripped too many times" , true , equalTo (false ));
52
+ } else {
53
+ assertThat (tripped .compareAndSet (false , true ), equalTo (true ));
67
54
}
55
+ } catch (Exception e ) {
56
+ lastException .set (e );
68
57
}
69
58
}
70
59
});
@@ -81,134 +70,6 @@ public void run() {
81
70
assertThat ("breaker was tripped at least once" , breaker .getTrippedCount (), greaterThanOrEqualTo (1L ));
82
71
}
83
72
84
- public void testThreadedUpdatesToChildBreaker () throws Exception {
85
- final int NUM_THREADS = scaledRandomIntBetween (3 , 15 );
86
- final int BYTES_PER_THREAD = scaledRandomIntBetween (500 , 4500 );
87
- final Thread [] threads = new Thread [NUM_THREADS ];
88
- final AtomicBoolean tripped = new AtomicBoolean (false );
89
- final AtomicReference <Throwable > lastException = new AtomicReference <>(null );
90
-
91
- final AtomicReference <ChildMemoryCircuitBreaker > breakerRef = new AtomicReference <>(null );
92
- final CircuitBreakerService service = new HierarchyCircuitBreakerService (Settings .EMPTY , new ClusterSettings (Settings .EMPTY , ClusterSettings .BUILT_IN_CLUSTER_SETTINGS )) {
93
-
94
- @ Override
95
- public CircuitBreaker getBreaker (String name ) {
96
- return breakerRef .get ();
97
- }
98
-
99
- @ Override
100
- public void checkParentLimit (String label ) throws CircuitBreakingException {
101
- // never trip
102
- }
103
- };
104
- final BreakerSettings settings = new BreakerSettings (CircuitBreaker .REQUEST , (BYTES_PER_THREAD * NUM_THREADS ) - 1 , 1.0 );
105
- final ChildMemoryCircuitBreaker breaker = new ChildMemoryCircuitBreaker (settings , logger ,
106
- (HierarchyCircuitBreakerService )service , CircuitBreaker .REQUEST );
107
- breakerRef .set (breaker );
108
-
109
- for (int i = 0 ; i < NUM_THREADS ; i ++) {
110
- threads [i ] = new Thread (new Runnable () {
111
- @ Override
112
- public void run () {
113
- for (int j = 0 ; j < BYTES_PER_THREAD ; j ++) {
114
- try {
115
- breaker .addEstimateBytesAndMaybeBreak (1L , "test" );
116
- } catch (CircuitBreakingException e ) {
117
- if (tripped .get ()) {
118
- assertThat ("tripped too many times" , true , equalTo (false ));
119
- } else {
120
- assertThat (tripped .compareAndSet (false , true ), equalTo (true ));
121
- }
122
- } catch (Exception e ) {
123
- lastException .set (e );
124
- }
125
- }
126
- }
127
- });
128
-
129
- threads [i ].start ();
130
- }
131
-
132
- for (Thread t : threads ) {
133
- t .join ();
134
- }
135
-
136
- assertThat ("no other exceptions were thrown" , lastException .get (), equalTo (null ));
137
- assertThat ("breaker was tripped" , tripped .get (), equalTo (true ));
138
- assertThat ("breaker was tripped at least once" , breaker .getTrippedCount (), greaterThanOrEqualTo (1L ));
139
- }
140
-
141
- public void testThreadedUpdatesToChildBreakerWithParentLimit () throws Exception {
142
- final int NUM_THREADS = scaledRandomIntBetween (3 , 15 );
143
- final int BYTES_PER_THREAD = scaledRandomIntBetween (500 , 4500 );
144
- final int parentLimit = (BYTES_PER_THREAD * NUM_THREADS ) - 2 ;
145
- final int childLimit = parentLimit + 10 ;
146
- final Thread [] threads = new Thread [NUM_THREADS ];
147
- final AtomicInteger tripped = new AtomicInteger (0 );
148
- final AtomicReference <Throwable > lastException = new AtomicReference <>(null );
149
-
150
- final AtomicInteger parentTripped = new AtomicInteger (0 );
151
- final AtomicReference <ChildMemoryCircuitBreaker > breakerRef = new AtomicReference <>(null );
152
- final CircuitBreakerService service = new HierarchyCircuitBreakerService (Settings .EMPTY , new ClusterSettings (Settings .EMPTY , ClusterSettings .BUILT_IN_CLUSTER_SETTINGS )) {
153
-
154
- @ Override
155
- public CircuitBreaker getBreaker (String name ) {
156
- return breakerRef .get ();
157
- }
158
-
159
- @ Override
160
- public void checkParentLimit (String label ) throws CircuitBreakingException {
161
- // Parent will trip right before regular breaker would trip
162
- if (getBreaker (CircuitBreaker .REQUEST ).getUsed () > parentLimit ) {
163
- parentTripped .incrementAndGet ();
164
- logger .info ("--> parent tripped" );
165
- throw new CircuitBreakingException ("parent tripped" );
166
- }
167
- }
168
- };
169
- final BreakerSettings settings = new BreakerSettings (CircuitBreaker .REQUEST , childLimit , 1.0 );
170
- final ChildMemoryCircuitBreaker breaker = new ChildMemoryCircuitBreaker (settings , logger ,
171
- (HierarchyCircuitBreakerService )service , CircuitBreaker .REQUEST );
172
- breakerRef .set (breaker );
173
-
174
- for (int i = 0 ; i < NUM_THREADS ; i ++) {
175
- threads [i ] = new Thread (new Runnable () {
176
- @ Override
177
- public void run () {
178
- for (int j = 0 ; j < BYTES_PER_THREAD ; j ++) {
179
- try {
180
- breaker .addEstimateBytesAndMaybeBreak (1L , "test" );
181
- } catch (CircuitBreakingException e ) {
182
- tripped .incrementAndGet ();
183
- } catch (Exception e ) {
184
- lastException .set (e );
185
- }
186
- }
187
- }
188
- });
189
- }
190
-
191
- logger .info ("--> NUM_THREADS: [{}], BYTES_PER_THREAD: [{}], TOTAL_BYTES: [{}], PARENT_LIMIT: [{}], CHILD_LIMIT: [{}]" ,
192
- NUM_THREADS , BYTES_PER_THREAD , (BYTES_PER_THREAD * NUM_THREADS ), parentLimit , childLimit );
193
-
194
- logger .info ("--> starting threads..." );
195
- for (Thread t : threads ) {
196
- t .start ();
197
- }
198
-
199
- for (Thread t : threads ) {
200
- t .join ();
201
- }
202
-
203
- logger .info ("--> child breaker: used: {}, limit: {}" , breaker .getUsed (), breaker .getLimit ());
204
- logger .info ("--> parent tripped: {}, total trip count: {} (expecting 1-2 for each)" , parentTripped .get (), tripped .get ());
205
- assertThat ("no other exceptions were thrown" , lastException .get (), equalTo (null ));
206
- assertThat ("breaker should be reset back to the parent limit after parent breaker trips" ,
207
- breaker .getUsed (), greaterThanOrEqualTo ((long )parentLimit - NUM_THREADS ));
208
- assertThat ("parent breaker was tripped at least once" , parentTripped .get (), greaterThanOrEqualTo (1 ));
209
- assertThat ("total breaker was tripped at least once" , tripped .get (), greaterThanOrEqualTo (1 ));
210
- }
211
-
212
73
public void testConstantFactor () throws Exception {
213
74
final MemoryCircuitBreaker breaker = new MemoryCircuitBreaker (new ByteSizeValue (15 ), 1.6 , logger );
214
75
String field = "myfield" ;
@@ -243,40 +104,4 @@ public void testConstantFactor() throws Exception {
243
104
assertThat (cbe .getMessage ().contains ("field [" + field + "]" ), equalTo (true ));
244
105
}
245
106
}
246
-
247
- /**
248
- * Test that a breaker correctly redistributes to a different breaker, in
249
- * this case, the request breaker borrows space from the fielddata breaker
250
- */
251
- public void testBorrowingSiblingBreakerMemory () throws Exception {
252
- Settings clusterSettings = Settings .builder ()
253
- .put (HierarchyCircuitBreakerService .TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING .getKey (), "200mb" )
254
- .put (HierarchyCircuitBreakerService .REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING .getKey (), "150mb" )
255
- .put (HierarchyCircuitBreakerService .FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING .getKey (), "150mb" )
256
- .build ();
257
- try (CircuitBreakerService service = new HierarchyCircuitBreakerService (clusterSettings ,
258
- new ClusterSettings (clusterSettings , ClusterSettings .BUILT_IN_CLUSTER_SETTINGS ))) {
259
- CircuitBreaker requestCircuitBreaker = service .getBreaker (MemoryCircuitBreaker .REQUEST );
260
- CircuitBreaker fieldDataCircuitBreaker = service .getBreaker (MemoryCircuitBreaker .FIELDDATA );
261
-
262
- assertEquals (new ByteSizeValue (200 , ByteSizeUnit .MB ).getBytes (),
263
- service .stats ().getStats (MemoryCircuitBreaker .PARENT ).getLimit ());
264
- assertEquals (new ByteSizeValue (150 , ByteSizeUnit .MB ).getBytes (), requestCircuitBreaker .getLimit ());
265
- assertEquals (new ByteSizeValue (150 , ByteSizeUnit .MB ).getBytes (), fieldDataCircuitBreaker .getLimit ());
266
-
267
- double fieldDataUsedBytes = fieldDataCircuitBreaker
268
- .addEstimateBytesAndMaybeBreak (new ByteSizeValue (50 , ByteSizeUnit .MB ).getBytes (), "should not break" );
269
- assertEquals (new ByteSizeValue (50 , ByteSizeUnit .MB ).getBytes (), fieldDataUsedBytes , 0.0 );
270
- double requestUsedBytes = requestCircuitBreaker .addEstimateBytesAndMaybeBreak (new ByteSizeValue (50 , ByteSizeUnit .MB ).getBytes (),
271
- "should not break" );
272
- assertEquals (new ByteSizeValue (50 , ByteSizeUnit .MB ).getBytes (), requestUsedBytes , 0.0 );
273
- requestUsedBytes = requestCircuitBreaker .addEstimateBytesAndMaybeBreak (new ByteSizeValue (50 , ByteSizeUnit .MB ).getBytes (),
274
- "should not break" );
275
- assertEquals (new ByteSizeValue (100 , ByteSizeUnit .MB ).getBytes (), requestUsedBytes , 0.0 );
276
- CircuitBreakingException exception = expectThrows (CircuitBreakingException .class , () -> requestCircuitBreaker
277
- .addEstimateBytesAndMaybeBreak (new ByteSizeValue (50 , ByteSizeUnit .MB ).getBytes (), "should break" ));
278
- assertThat (exception .getMessage (), containsString ("[parent] Data too large, data for [should break] would be" ));
279
- assertThat (exception .getMessage (), containsString ("which is larger than the limit of [209715200/200mb]" ));
280
- }
281
- }
282
107
}
0 commit comments