@@ -644,6 +644,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
644
644
InternalEngine engine = createEngine (store , translog );
645
645
engine .close ();
646
646
647
+ trimUnsafeCommits (engine .config ());
647
648
engine = new InternalEngine (engine .config ());
648
649
assertTrue (engine .isRecovering ());
649
650
engine .recoverFromTranslog ();
@@ -659,6 +660,7 @@ public void testFlushIsDisabledDuringTranslogRecovery() throws IOException {
659
660
engine .index (indexForDoc (doc ));
660
661
engine .close ();
661
662
663
+ trimUnsafeCommits (engine .config ());
662
664
engine = new InternalEngine (engine .config ());
663
665
expectThrows (IllegalStateException .class , () -> engine .flush (true , true ));
664
666
assertTrue (engine .isRecovering ());
@@ -690,18 +692,14 @@ public void testTranslogMultipleOperationsSameDocument() throws IOException {
690
692
} finally {
691
693
IOUtils .close (engine );
692
694
}
693
-
694
- Engine recoveringEngine = null ;
695
- try {
696
- recoveringEngine = new InternalEngine (engine .config ());
695
+ trimUnsafeCommits (engine .config ());
696
+ try (Engine recoveringEngine = new InternalEngine (engine .config ())){
697
697
recoveringEngine .recoverFromTranslog ();
698
698
try (Engine .Searcher searcher = recoveringEngine .acquireSearcher ("test" )) {
699
699
final TotalHitCountCollector collector = new TotalHitCountCollector ();
700
700
searcher .searcher ().search (new MatchAllDocsQuery (), collector );
701
701
assertThat (collector .getTotalHits (), equalTo (operations .get (operations .size () - 1 ) instanceof Engine .Delete ? 0 : 1 ));
702
702
}
703
- } finally {
704
- IOUtils .close (recoveringEngine );
705
703
}
706
704
}
707
705
@@ -722,6 +720,7 @@ public void testTranslogRecoveryDoesNotReplayIntoTranslog() throws IOException {
722
720
Engine recoveringEngine = null ;
723
721
try {
724
722
final AtomicBoolean committed = new AtomicBoolean ();
723
+ trimUnsafeCommits (initialEngine .config ());
725
724
recoveringEngine = new InternalEngine (initialEngine .config ()) {
726
725
727
726
@ Override
@@ -1151,6 +1150,7 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException {
1151
1150
SequenceNumbers .UNASSIGNED_SEQ_NO , shardId );
1152
1151
store .associateIndexWithNewTranslog (translogUUID );
1153
1152
}
1153
+ trimUnsafeCommits (config );
1154
1154
engine = new InternalEngine (config );
1155
1155
engine .recoverFromTranslog ();
1156
1156
assertEquals (engine .getLastCommittedSegmentInfos ().getUserData ().get (Engine .SYNC_COMMIT_ID ), syncId );
@@ -2054,9 +2054,8 @@ public void testSeqNoAndCheckpoints() throws IOException {
2054
2054
IOUtils .close (initialEngine );
2055
2055
}
2056
2056
2057
- InternalEngine recoveringEngine = null ;
2058
- try {
2059
- recoveringEngine = new InternalEngine (initialEngine .config ());
2057
+ trimUnsafeCommits (initialEngine .engineConfig );
2058
+ try (InternalEngine recoveringEngine = new InternalEngine (initialEngine .config ())){
2060
2059
recoveringEngine .recoverFromTranslog ();
2061
2060
2062
2061
assertEquals (primarySeqNo , recoveringEngine .getLocalCheckpointTracker ().getMaxSeqNo ());
@@ -2075,8 +2074,6 @@ public void testSeqNoAndCheckpoints() throws IOException {
2075
2074
assertThat (recoveringEngine .getLocalCheckpointTracker ().getCheckpoint (), equalTo (primarySeqNo ));
2076
2075
assertThat (recoveringEngine .getLocalCheckpointTracker ().getMaxSeqNo (), equalTo (primarySeqNo ));
2077
2076
assertThat (recoveringEngine .getLocalCheckpointTracker ().generateSeqNo (), equalTo (primarySeqNo + 1 ));
2078
- } finally {
2079
- IOUtils .close (recoveringEngine );
2080
2077
}
2081
2078
}
2082
2079
@@ -2389,6 +2386,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException {
2389
2386
// open and recover tlog
2390
2387
{
2391
2388
for (int i = 0 ; i < 2 ; i ++) {
2389
+ trimUnsafeCommits (config );
2392
2390
try (InternalEngine engine = new InternalEngine (config )) {
2393
2391
assertTrue (engine .isRecovering ());
2394
2392
Map <String , String > userData = engine .getLastCommittedSegmentInfos ().getUserData ();
@@ -2413,6 +2411,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException {
2413
2411
final String translogUUID =
2414
2412
Translog .createEmptyTranslog (config .getTranslogConfig ().getTranslogPath (), SequenceNumbers .NO_OPS_PERFORMED , shardId );
2415
2413
store .associateIndexWithNewTranslog (translogUUID );
2414
+ trimUnsafeCommits (config );
2416
2415
try (InternalEngine engine = new InternalEngine (config )) {
2417
2416
Map <String , String > userData = engine .getLastCommittedSegmentInfos ().getUserData ();
2418
2417
assertEquals ("1" , userData .get (Translog .TRANSLOG_GENERATION_KEY ));
@@ -2426,6 +2425,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException {
2426
2425
// open and recover tlog with empty tlog
2427
2426
{
2428
2427
for (int i = 0 ; i < 2 ; i ++) {
2428
+ trimUnsafeCommits (config );
2429
2429
try (InternalEngine engine = new InternalEngine (config )) {
2430
2430
Map <String , String > userData = engine .getLastCommittedSegmentInfos ().getUserData ();
2431
2431
assertEquals ("1" , userData .get (Translog .TRANSLOG_GENERATION_KEY ));
@@ -2487,6 +2487,7 @@ public void testTranslogReplayWithFailure() throws IOException {
2487
2487
boolean started = false ;
2488
2488
InternalEngine engine = null ;
2489
2489
try {
2490
+ trimUnsafeCommits (config (defaultSettings , store , translogPath , NoMergePolicy .INSTANCE , null ));
2490
2491
engine = createEngine (store , translogPath );
2491
2492
started = true ;
2492
2493
} catch (EngineException | IOException e ) {
@@ -2567,6 +2568,7 @@ public void testSkipTranslogReplay() throws IOException {
2567
2568
}
2568
2569
assertVisibleCount (engine , numDocs );
2569
2570
engine .close ();
2571
+ trimUnsafeCommits (engine .config ());
2570
2572
engine = new InternalEngine (engine .config ());
2571
2573
engine .skipTranslogRecovery ();
2572
2574
try (Engine .Searcher searcher = engine .acquireSearcher ("test" )) {
@@ -2608,6 +2610,7 @@ public void testTranslogReplay() throws IOException {
2608
2610
parser .mappingUpdate = dynamicUpdate ();
2609
2611
2610
2612
engine .close ();
2613
+ trimUnsafeCommits (copy (engine .config (), inSyncGlobalCheckpointSupplier ));
2611
2614
engine = new InternalEngine (copy (engine .config (), inSyncGlobalCheckpointSupplier )); // we need to reuse the engine config unless the parser.mappingModified won't work
2612
2615
engine .recoverFromTranslog ();
2613
2616
@@ -3685,6 +3688,7 @@ public void testNoOps() throws IOException {
3685
3688
final BiFunction <Long , Long , LocalCheckpointTracker > supplier = (ms , lcp ) -> new LocalCheckpointTracker (
3686
3689
maxSeqNo ,
3687
3690
localCheckpoint );
3691
+ trimUnsafeCommits (engine .config ());
3688
3692
noOpEngine = new InternalEngine (engine .config (), supplier ) {
3689
3693
@ Override
3690
3694
protected long doGenerateSeqNoForOperation (Operation operation ) {
@@ -3832,6 +3836,7 @@ public void markSeqNoAsCompleted(long seqNo) {
3832
3836
completedSeqNos .add (seqNo );
3833
3837
}
3834
3838
};
3839
+ trimUnsafeCommits (engine .config ());
3835
3840
actualEngine = new InternalEngine (engine .config (), supplier );
3836
3841
final int operations = randomIntBetween (0 , 1024 );
3837
3842
final Set <Long > expectedCompletedSeqNos = new HashSet <>();
@@ -3902,6 +3907,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
3902
3907
assertEquals (docs - 1 , engine .getLocalCheckpointTracker ().getCheckpoint ());
3903
3908
assertEquals (maxSeqIDOnReplica , replicaEngine .getLocalCheckpointTracker ().getMaxSeqNo ());
3904
3909
assertEquals (checkpointOnReplica , replicaEngine .getLocalCheckpointTracker ().getCheckpoint ());
3910
+ trimUnsafeCommits (copy (replicaEngine .config (), globalCheckpoint ::get ));
3905
3911
recoveringEngine = new InternalEngine (copy (replicaEngine .config (), globalCheckpoint ::get ));
3906
3912
assertEquals (numDocsOnReplica , recoveringEngine .getTranslog ().stats ().getUncommittedOperations ());
3907
3913
recoveringEngine .recoverFromTranslog ();
0 commit comments