@@ -725,8 +725,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
725
725
super .commitIndexWriter (writer , translog , syncId );
726
726
}
727
727
};
728
-
729
- assertThat (recoveringEngine .getTranslog ().uncommittedOperations (), equalTo (docs ));
728
+ assertThat (recoveringEngine .getTranslog ().stats ().getUncommittedOperations (), equalTo (docs ));
730
729
recoveringEngine .recoverFromTranslog ();
731
730
assertTrue (committed .get ());
732
731
} finally {
@@ -3614,7 +3613,7 @@ protected long doGenerateSeqNoForOperation(Operation operation) {
3614
3613
System .nanoTime (),
3615
3614
reason ));
3616
3615
assertThat (noOpEngine .getLocalCheckpointTracker ().getCheckpoint (), equalTo ((long ) (maxSeqNo + 1 )));
3617
- assertThat (noOpEngine .getTranslog ().uncommittedOperations (), equalTo (1 + gapsFilled ));
3616
+ assertThat (noOpEngine .getTranslog ().stats (). getUncommittedOperations (), equalTo (1 + gapsFilled ));
3618
3617
// skip to the op that we added to the translog
3619
3618
Translog .Operation op ;
3620
3619
Translog .Operation last = null ;
@@ -3814,7 +3813,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
3814
3813
assertEquals (maxSeqIDOnReplica , replicaEngine .getLocalCheckpointTracker ().getMaxSeqNo ());
3815
3814
assertEquals (checkpointOnReplica , replicaEngine .getLocalCheckpointTracker ().getCheckpoint ());
3816
3815
recoveringEngine = new InternalEngine (copy (replicaEngine .config (), globalCheckpoint ::get ));
3817
- assertEquals (numDocsOnReplica , recoveringEngine .getTranslog ().uncommittedOperations ());
3816
+ assertEquals (numDocsOnReplica , recoveringEngine .getTranslog ().stats (). getUncommittedOperations ());
3818
3817
recoveringEngine .recoverFromTranslog ();
3819
3818
assertEquals (maxSeqIDOnReplica , recoveringEngine .getLocalCheckpointTracker ().getMaxSeqNo ());
3820
3819
assertEquals (checkpointOnReplica , recoveringEngine .getLocalCheckpointTracker ().getCheckpoint ());
@@ -3848,7 +3847,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
3848
3847
try {
3849
3848
recoveringEngine = new InternalEngine (copy (replicaEngine .config (), globalCheckpoint ::get ));
3850
3849
if (flushed ) {
3851
- assertEquals ( 0 , recoveringEngine .getTranslog ().uncommittedOperations ( ));
3850
+ assertThat ( recoveringEngine .getTranslog ().stats (). getUncommittedOperations (), equalTo ( 0 ));
3852
3851
}
3853
3852
recoveringEngine .recoverFromTranslog ();
3854
3853
assertEquals (maxSeqIDOnReplica , recoveringEngine .getLocalCheckpointTracker ().getMaxSeqNo ());
@@ -4252,39 +4251,80 @@ public void testCleanupCommitsWhenReleaseSnapshot() throws Exception {
4252
4251
public void testShouldPeriodicallyFlush () throws Exception {
4253
4252
assertThat ("Empty engine does not need flushing" , engine .shouldPeriodicallyFlush (), equalTo (false ));
4254
4253
// A new engine may have more than one empty translog files - the test should account this extra.
4255
- final long extraTranslogSizeInNewEngine = engine .getTranslog ().uncommittedSizeInBytes () - Translog .DEFAULT_HEADER_SIZE_IN_BYTES ;
4254
+ final Translog translog = engine .getTranslog ();
4255
+ final long extraTranslogSizeInNewEngine = engine .getTranslog ().stats ().getUncommittedSizeInBytes () - Translog .DEFAULT_HEADER_SIZE_IN_BYTES ;
4256
4256
int numDocs = between (10 , 100 );
4257
4257
for (int id = 0 ; id < numDocs ; id ++) {
4258
4258
final ParsedDocument doc = testParsedDocument (Integer .toString (id ), null , testDocumentWithTextField (), SOURCE , null );
4259
4259
engine .index (indexForDoc (doc ));
4260
4260
}
4261
4261
assertThat ("Not exceeded translog flush threshold yet" , engine .shouldPeriodicallyFlush (), equalTo (false ));
4262
4262
long flushThreshold = RandomNumbers .randomLongBetween (random (), 100 ,
4263
- engine .getTranslog ().uncommittedSizeInBytes () - extraTranslogSizeInNewEngine );
4263
+ engine .getTranslog ().stats (). getUncommittedSizeInBytes () - extraTranslogSizeInNewEngine );
4264
4264
final IndexSettings indexSettings = engine .config ().getIndexSettings ();
4265
4265
final IndexMetaData indexMetaData = IndexMetaData .builder (indexSettings .getIndexMetaData ())
4266
4266
.settings (Settings .builder ().put (indexSettings .getSettings ())
4267
4267
.put (IndexSettings .INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING .getKey (), flushThreshold + "b" )).build ();
4268
4268
indexSettings .updateIndexMetaData (indexMetaData );
4269
4269
engine .onSettingsChanged ();
4270
- assertThat (engine .getTranslog ().uncommittedOperations (), equalTo (numDocs ));
4270
+ assertThat (engine .getTranslog ().stats (). getUncommittedOperations (), equalTo (numDocs ));
4271
4271
assertThat (engine .shouldPeriodicallyFlush (), equalTo (true ));
4272
4272
engine .flush ();
4273
- assertThat (engine .getTranslog ().uncommittedOperations (), equalTo (0 ));
4273
+ assertThat (engine .getTranslog ().stats (). getUncommittedOperations (), equalTo (0 ));
4274
4274
// Stale operations skipped by Lucene but added to translog - still able to flush
4275
4275
for (int id = 0 ; id < numDocs ; id ++) {
4276
4276
final ParsedDocument doc = testParsedDocument (Integer .toString (id ), null , testDocumentWithTextField (), SOURCE , null );
4277
4277
final Engine .IndexResult result = engine .index (replicaIndexForDoc (doc , 1L , id , false ));
4278
4278
assertThat (result .isCreated (), equalTo (false ));
4279
4279
}
4280
4280
SegmentInfos lastCommitInfo = engine .getLastCommittedSegmentInfos ();
4281
- assertThat (engine .getTranslog ().uncommittedOperations (), equalTo (numDocs ));
4281
+ assertThat (engine .getTranslog ().stats (). getUncommittedOperations (), equalTo (numDocs ));
4282
4282
assertThat (engine .shouldPeriodicallyFlush (), equalTo (true ));
4283
4283
engine .flush (false , false );
4284
4284
assertThat (engine .getLastCommittedSegmentInfos (), not (sameInstance (lastCommitInfo )));
4285
- assertThat (engine .getTranslog ().uncommittedOperations (), equalTo (0 ));
4285
+ assertThat (engine .getTranslog ().stats ().getUncommittedOperations (), equalTo (0 ));
4286
+ // If the new index commit still points to the same translog generation as the current index commit,
4287
+ // we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes.
4288
+ engine .getLocalCheckpointTracker ().generateSeqNo (); // create a gap here
4289
+ for (int id = 0 ; id < numDocs ; id ++) {
4290
+ if (randomBoolean ()) {
4291
+ translog .rollGeneration ();
4292
+ }
4293
+ final ParsedDocument doc = testParsedDocument ("new" + id , null , testDocumentWithTextField (), SOURCE , null );
4294
+ engine .index (replicaIndexForDoc (doc , 2L , engine .getLocalCheckpointTracker ().generateSeqNo (), false ));
4295
+ if (engine .shouldPeriodicallyFlush ()) {
4296
+ engine .flush ();
4297
+ assertThat (engine .getLastCommittedSegmentInfos (), not (sameInstance (lastCommitInfo )));
4298
+ assertThat (engine .shouldPeriodicallyFlush (), equalTo (false ));
4299
+ }
4300
+ }
4286
4301
}
4287
4302
4303
+ public void testStressShouldPeriodicallyFlush () throws Exception {
4304
+ final long flushThreshold = randomLongBetween (100 , 5000 );
4305
+ final long generationThreshold = randomLongBetween (1000 , 5000 );
4306
+ final IndexSettings indexSettings = engine .config ().getIndexSettings ();
4307
+ final IndexMetaData indexMetaData = IndexMetaData .builder (indexSettings .getIndexMetaData ())
4308
+ .settings (Settings .builder ().put (indexSettings .getSettings ())
4309
+ .put (IndexSettings .INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING .getKey (), generationThreshold + "b" )
4310
+ .put (IndexSettings .INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING .getKey (), flushThreshold + "b" )).build ();
4311
+ indexSettings .updateIndexMetaData (indexMetaData );
4312
+ engine .onSettingsChanged ();
4313
+ final int numOps = scaledRandomIntBetween (100 , 10_000 );
4314
+ for (int i = 0 ; i < numOps ; i ++) {
4315
+ final long localCheckPoint = engine .getLocalCheckpointTracker ().getCheckpoint ();
4316
+ final long seqno = randomLongBetween (Math .max (0 , localCheckPoint ), localCheckPoint + 5 );
4317
+ final ParsedDocument doc = testParsedDocument (Long .toString (seqno ), null , testDocumentWithTextField (), SOURCE , null );
4318
+ engine .index (replicaIndexForDoc (doc , 1L , seqno , false ));
4319
+ if (rarely () && engine .getTranslog ().shouldRollGeneration ()) {
4320
+ engine .rollTranslogGeneration ();
4321
+ }
4322
+ if (rarely () || engine .shouldPeriodicallyFlush ()) {
4323
+ engine .flush ();
4324
+ assertThat (engine .shouldPeriodicallyFlush (), equalTo (false ));
4325
+ }
4326
+ }
4327
+ }
4288
4328
4289
4329
public void testStressUpdateSameDocWhileGettingIt () throws IOException , InterruptedException {
4290
4330
final int iters = randomIntBetween (1 , 15 );
0 commit comments