172
172
import java .util .concurrent .atomic .AtomicReference ;
173
173
import java .util .function .BiFunction ;
174
174
import java .util .function .Function ;
175
+ import java .util .function .IntSupplier ;
175
176
import java .util .function .LongSupplier ;
176
177
import java .util .function .Supplier ;
177
178
import java .util .function .ToLongBiFunction ;
@@ -4695,6 +4696,10 @@ public void testShouldPeriodicallyFlush() throws Exception {
4695
4696
assertThat ("Empty engine does not need flushing" , engine .shouldPeriodicallyFlush (), equalTo (false ));
4696
4697
// A new engine may have more than one empty translog files - the test should account this extra.
4697
4698
final Translog translog = engine .getTranslog ();
4699
+ final IntSupplier uncommittedTranslogOperationsSinceLastCommit = () -> {
4700
+ long localCheckpoint = Long .parseLong (engine .getLastCommittedSegmentInfos ().userData .get (SequenceNumbers .LOCAL_CHECKPOINT_KEY ));
4701
+ return translog .totalOperationsByMinGen (translog .getMinGenerationForSeqNo (localCheckpoint + 1 ).translogFileGeneration );
4702
+ };
4698
4703
final long extraTranslogSizeInNewEngine =
4699
4704
engine .getTranslog ().stats ().getUncommittedSizeInBytes () - Translog .DEFAULT_HEADER_SIZE_IN_BYTES ;
4700
4705
int numDocs = between (10 , 100 );
@@ -4712,10 +4717,10 @@ public void testShouldPeriodicallyFlush() throws Exception {
4712
4717
.put (IndexSettings .INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING .getKey (), flushThreshold + "b" )).build ();
4713
4718
indexSettings .updateIndexMetaData (indexMetaData );
4714
4719
engine .onSettingsChanged ();
4715
- assertThat (engine . getTranslog (). stats (). getUncommittedOperations (), equalTo (numDocs ));
4720
+ assertThat (uncommittedTranslogOperationsSinceLastCommit . getAsInt (), equalTo (numDocs ));
4716
4721
assertThat (engine .shouldPeriodicallyFlush (), equalTo (true ));
4717
4722
engine .flush ();
4718
- assertThat (engine . getTranslog (). stats (). getUncommittedOperations (), equalTo (0 ));
4723
+ assertThat (uncommittedTranslogOperationsSinceLastCommit . getAsInt (), equalTo (0 ));
4719
4724
// Stale operations skipped by Lucene but added to translog - still able to flush
4720
4725
for (int id = 0 ; id < numDocs ; id ++) {
4721
4726
final ParsedDocument doc =
@@ -4724,11 +4729,11 @@ public void testShouldPeriodicallyFlush() throws Exception {
4724
4729
assertThat (result .isCreated (), equalTo (false ));
4725
4730
}
4726
4731
SegmentInfos lastCommitInfo = engine .getLastCommittedSegmentInfos ();
4727
- assertThat (engine . getTranslog (). stats (). getUncommittedOperations (), equalTo (numDocs ));
4732
+ assertThat (uncommittedTranslogOperationsSinceLastCommit . getAsInt (), equalTo (numDocs ));
4728
4733
assertThat (engine .shouldPeriodicallyFlush (), equalTo (true ));
4729
4734
engine .flush (false , false );
4730
4735
assertThat (engine .getLastCommittedSegmentInfos (), not (sameInstance (lastCommitInfo )));
4731
- assertThat (engine . getTranslog (). stats (). getUncommittedOperations (), equalTo (0 ));
4736
+ assertThat (uncommittedTranslogOperationsSinceLastCommit . getAsInt (), equalTo (0 ));
4732
4737
// If the new index commit still points to the same translog generation as the current index commit,
4733
4738
// we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes.
4734
4739
generateNewSeqNo (engine ); // create a gap here
0 commit comments