20
20
21
21
import java .io .IOException ;
22
22
import java .util .Collections ;
23
+ import java .util .concurrent .CountDownLatch ;
23
24
import java .util .concurrent .Executor ;
24
25
import java .util .concurrent .ExecutorService ;
25
26
import java .util .concurrent .Executors ;
27
+ import java .util .concurrent .TimeUnit ;
26
28
import java .util .concurrent .atomic .AtomicBoolean ;
27
29
import java .util .concurrent .atomic .AtomicReference ;
28
30
@@ -34,11 +36,14 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
34
36
35
37
private class MockIndexer extends AsyncTwoPhaseIndexer <Integer , MockJobStats > {
36
38
39
+ private final CountDownLatch latch ;
37
40
// test the execution order
38
41
private int step ;
39
42
40
- protected MockIndexer (Executor executor , AtomicReference <IndexerState > initialState , Integer initialPosition ) {
43
+ protected MockIndexer (Executor executor , AtomicReference <IndexerState > initialState , Integer initialPosition ,
44
+ CountDownLatch latch ) {
41
45
super (executor , initialState , initialPosition , new MockJobStats ());
46
+ this .latch = latch ;
42
47
}
43
48
44
49
@ Override
@@ -48,11 +53,20 @@ protected String getJobId() {
48
53
49
54
@ Override
50
55
protected IterationResult <Integer > doProcess (SearchResponse searchResponse ) {
56
+ awaitForLatch ();
51
57
assertThat (step , equalTo (3 ));
52
58
++step ;
53
59
return new IterationResult <Integer >(Collections .emptyList (), 3 , true );
54
60
}
55
61
62
+ private void awaitForLatch () {
63
+ try {
64
+ latch .await (10 , TimeUnit .SECONDS );
65
+ } catch (InterruptedException e ) {
66
+ throw new RuntimeException (e );
67
+ }
68
+ }
69
+
56
70
@ Override
57
71
protected SearchRequest buildSearchRequest () {
58
72
assertThat (step , equalTo (1 ));
@@ -195,12 +209,14 @@ public void testStateMachine() throws InterruptedException {
195
209
final ExecutorService executor = Executors .newFixedThreadPool (1 );
196
210
isFinished .set (false );
197
211
try {
198
-
199
- MockIndexer indexer = new MockIndexer (executor , state , 2 );
212
+ CountDownLatch countDownLatch = new CountDownLatch ( 1 );
213
+ MockIndexer indexer = new MockIndexer (executor , state , 2 , countDownLatch );
200
214
indexer .start ();
201
215
assertThat (indexer .getState (), equalTo (IndexerState .STARTED ));
202
216
assertTrue (indexer .maybeTriggerAsyncJob (System .currentTimeMillis ()));
203
217
assertThat (indexer .getState (), equalTo (IndexerState .INDEXING ));
218
+ countDownLatch .countDown ();
219
+
204
220
assertThat (indexer .getPosition (), equalTo (2 ));
205
221
ESTestCase .awaitBusy (() -> isFinished .get ());
206
222
assertThat (indexer .getStep (), equalTo (6 ));
0 commit comments