19
19
20
20
package org .elasticsearch .index .engine ;
21
21
22
- import java .nio .file .Path ;
23
22
import org .elasticsearch .common .settings .Settings ;
24
23
import org .elasticsearch .core .internal .io .IOUtils ;
25
24
import org .elasticsearch .index .IndexSettings ;
26
25
import org .elasticsearch .index .VersionType ;
27
26
import org .elasticsearch .index .mapper .MapperService ;
28
27
import org .elasticsearch .index .mapper .ParsedDocument ;
29
- import org .elasticsearch .index .store .Store ;
30
28
import org .elasticsearch .index .translog .SnapshotMatchers ;
31
29
import org .elasticsearch .index .translog .Translog ;
32
30
import org .elasticsearch .test .IndexSettingsModule ;
@@ -202,7 +200,7 @@ public void testUpdateAndReadChangesConcurrently() throws Exception {
202
200
CountDownLatch readyLatch = new CountDownLatch (followers .length + 1 );
203
201
AtomicBoolean isDone = new AtomicBoolean ();
204
202
for (int i = 0 ; i < followers .length ; i ++) {
205
- followers [i ] = new Follower (engine , isDone , readyLatch , createTempDir () );
203
+ followers [i ] = new Follower (engine , isDone , readyLatch );
206
204
followers [i ].start ();
207
205
}
208
206
boolean onPrimary = randomBoolean ();
@@ -228,28 +226,30 @@ public void testUpdateAndReadChangesConcurrently() throws Exception {
228
226
operations .add (op );
229
227
}
230
228
readyLatch .countDown ();
229
+ readyLatch .await ();
231
230
concurrentlyApplyOps (operations , engine );
232
231
assertThat (engine .getLocalCheckpointTracker ().getCheckpoint (), equalTo (operations .size () - 1L ));
233
232
isDone .set (true );
234
233
for (Follower follower : followers ) {
235
234
follower .join ();
235
+ IOUtils .close (follower .engine , follower .engine .store );
236
236
}
237
237
}
238
238
239
239
class Follower extends Thread {
240
240
private final Engine leader ;
241
+ private final InternalEngine engine ;
241
242
private final TranslogHandler translogHandler ;
242
243
private final AtomicBoolean isDone ;
243
244
private final CountDownLatch readLatch ;
244
- private final Path translogPath ;
245
245
246
- Follower (Engine leader , AtomicBoolean isDone , CountDownLatch readLatch , Path translogPath ) {
246
+ Follower (Engine leader , AtomicBoolean isDone , CountDownLatch readLatch ) throws IOException {
247
247
this .leader = leader ;
248
248
this .isDone = isDone ;
249
249
this .readLatch = readLatch ;
250
250
this .translogHandler = new TranslogHandler (xContentRegistry (), IndexSettingsModule .newIndexSettings (shardId .getIndexName (),
251
- engine .engineConfig .getIndexSettings ().getSettings ()));
252
- this .translogPath = translogPath ;
251
+ leader .engineConfig .getIndexSettings ().getSettings ()));
252
+ this .engine = createEngine ( createStore (), createTempDir ()) ;
253
253
}
254
254
255
255
void pullOperations (Engine follower ) throws IOException {
@@ -267,16 +267,15 @@ void pullOperations(Engine follower) throws IOException {
267
267
268
268
@ Override
269
269
public void run () {
270
- try (Store store = createStore ();
271
- InternalEngine follower = createEngine (store , translogPath )) {
270
+ try {
272
271
readLatch .countDown ();
273
272
readLatch .await ();
274
273
while (isDone .get () == false ||
275
- follower .getLocalCheckpointTracker ().getCheckpoint () < leader .getLocalCheckpoint ()) {
276
- pullOperations (follower );
274
+ engine .getLocalCheckpointTracker ().getCheckpoint () < leader .getLocalCheckpoint ()) {
275
+ pullOperations (engine );
277
276
}
278
- assertConsistentHistoryBetweenTranslogAndLuceneIndex (follower , mapperService );
279
- assertThat (getDocIds (follower , true ), equalTo (getDocIds (leader , true )));
277
+ assertConsistentHistoryBetweenTranslogAndLuceneIndex (engine , mapperService );
278
+ assertThat (getDocIds (engine , true ), equalTo (getDocIds (leader , true )));
280
279
} catch (Exception ex ) {
281
280
throw new AssertionError (ex );
282
281
}
0 commit comments