17
17
import org .elasticsearch .test .ESIntegTestCase ;
18
18
import org .elasticsearch .test .ESIntegTestCase .ClusterScope ;
19
19
import org .elasticsearch .test .ESIntegTestCase .Scope ;
20
+ import org .elasticsearch .threadpool .ThreadPool ;
20
21
21
22
import java .util .Arrays ;
22
23
import java .util .HashSet ;
25
26
import java .util .concurrent .CountDownLatch ;
26
27
import java .util .concurrent .TimeUnit ;
27
28
import java .util .concurrent .atomic .AtomicBoolean ;
29
+ import java .util .stream .StreamSupport ;
28
30
29
31
import static org .hamcrest .Matchers .equalTo ;
30
32
import static org .hamcrest .Matchers .greaterThan ;
@@ -414,11 +416,7 @@ public void onFailure(Exception e) {
414
416
});
415
417
}
416
418
417
- final var startNanoTime = System .nanoTime ();
418
- while (TimeUnit .MILLISECONDS .convert (System .nanoTime () - startNanoTime , TimeUnit .NANOSECONDS ) <= 0 ) {
419
- // noinspection BusyWait
420
- Thread .sleep (100 );
421
- }
419
+ waitForTimeToElapse ();
422
420
423
421
pendingClusterTasks = clusterService .getMasterService ().pendingTasks ();
424
422
assertThat (pendingClusterTasks .size (), greaterThanOrEqualTo (5 ));
@@ -441,4 +439,28 @@ public void onFailure(Exception e) {
441
439
block2 .countDown ();
442
440
}
443
441
}
442
+
443
+ private static void waitForTimeToElapse () throws InterruptedException {
444
+ final ThreadPool [] threadPools = StreamSupport .stream (internalCluster ().getInstances (ClusterService .class ).spliterator (), false )
445
+ .map (ClusterService ::threadPool )
446
+ .toArray (ThreadPool []::new );
447
+ final long [] startTimes = Arrays .stream (threadPools ).mapToLong (ThreadPool ::relativeTimeInMillis ).toArray ();
448
+
449
+ final var startNanoTime = System .nanoTime ();
450
+ while (TimeUnit .MILLISECONDS .convert (System .nanoTime () - startNanoTime , TimeUnit .NANOSECONDS ) <= 100 ) {
451
+ // noinspection BusyWait
452
+ Thread .sleep (100 );
453
+ }
454
+
455
+ outer : do {
456
+ for (int i = 0 ; i < threadPools .length ; i ++) {
457
+ if (threadPools [i ].relativeTimeInMillis () <= startTimes [i ]) {
458
+ // noinspection BusyWait
459
+ Thread .sleep (100 );
460
+ continue outer ;
461
+ }
462
+ }
463
+ return ;
464
+ } while (true );
465
+ }
444
466
}
0 commit comments