38
38
import org .elasticsearch .index .engine .VersionConflictEngineException ;
39
39
import org .elasticsearch .test .ESIntegTestCase ;
40
40
import org .elasticsearch .test .disruption .ServiceDisruptionScheme ;
41
+ import org .elasticsearch .threadpool .Scheduler ;
42
+ import org .elasticsearch .threadpool .ThreadPool ;
41
43
42
44
import java .io .FileInputStream ;
43
45
import java .io .IOException ;
50
52
import java .util .Random ;
51
53
import java .util .concurrent .BrokenBarrierException ;
52
54
import java .util .concurrent .CyclicBarrier ;
55
+ import java .util .concurrent .ScheduledThreadPoolExecutor ;
53
56
import java .util .concurrent .TimeUnit ;
54
57
import java .util .concurrent .TimeoutException ;
58
+ import java .util .concurrent .atomic .AtomicBoolean ;
55
59
import java .util .concurrent .atomic .AtomicReference ;
56
60
import java .util .function .Consumer ;
57
61
import java .util .function .Function ;
@@ -437,13 +441,24 @@ private void consumeOutput(HistoryOutput output, int eventId) {
437
441
}
438
442
}
439
443
440
- public boolean isLinearizable () {
444
+ public void assertLinearizable () {
441
445
logger .info ("--> Linearizability checking history of size: {} for key: {} and initialVersion: {}: {}" , history .size (),
442
446
id , initialVersion , history );
443
447
LinearizabilityChecker .SequentialSpec spec = new CASSequentialSpec (initialVersion );
444
448
boolean linearizable = false ;
445
449
try {
446
- linearizable = new LinearizabilityChecker ().isLinearizable (spec , history , missingResponseGenerator ());
450
+ final ScheduledThreadPoolExecutor scheduler = Scheduler .initScheduler (Settings .EMPTY );
451
+ final AtomicBoolean abort = new AtomicBoolean ();
452
+ // Large histories can be problematic and have the linearizability checker run OOM
453
+ // Bound the time how long the checker can run on such histories (Values empirically determined)
454
+ if (history .size () > 300 ) {
455
+ scheduler .schedule (() -> abort .set (true ), 10 , TimeUnit .SECONDS );
456
+ }
457
+ linearizable = new LinearizabilityChecker ().isLinearizable (spec , history , missingResponseGenerator (), abort ::get );
458
+ ThreadPool .terminate (scheduler , 1 , TimeUnit .SECONDS );
459
+ if (abort .get () && linearizable == false ) {
460
+ linearizable = true ; // let the test pass
461
+ }
447
462
} finally {
448
463
// implicitly test that we can serialize all histories.
449
464
String serializedHistory = base64Serialize (history );
@@ -453,11 +468,7 @@ public boolean isLinearizable() {
453
468
spec , initialVersion , serializedHistory );
454
469
}
455
470
}
456
- return linearizable ;
457
- }
458
-
459
- public void assertLinearizable () {
460
- assertTrue ("Must be linearizable" , isLinearizable ());
471
+ assertTrue ("Must be linearizable" , linearizable );
461
472
}
462
473
}
463
474
0 commit comments