Skip to content

Commit afa7fec

Browse files
committed
Harden periodically check to avoid endless flush loop (#29125)
In #28350, we fixed an endless flushing loop which may happen on replicas by tightening the relation between the flush action and the periodically flush condition. 1. The periodically flush condition is enabled only if it is disabled after a flush. 2. If the periodically flush condition is enabled then a flush will actually happen regardless of Lucene state. (1) and (2) guarantee that a flushing loop will be terminated. Sadly, the condition 1 can be violated in edge cases as we used two different algorithms to evaluate the current and future uncommitted translog size. - We use method `uncommittedSizeInBytes` to calculate current uncommitted size. It is the sum of translogs whose generation at least the minGen (determined by a given seqno). We pick a continuous range of translogs since the minGen to evaluate the current uncommitted size. - We use method `sizeOfGensAboveSeqNoInBytes` to calculate the future uncommitted size. It is the sum of translogs whose maxSeqNo at least the given seqNo. Here we don't pick a range but select translog one by one. Suppose we have 3 translogs `gen1={#1,#2}, gen2={}, gen3={#3} and seqno=#1`, `uncommittedSizeInBytes` is the sum of gen1, gen2, and gen3 while `sizeOfGensAboveSeqNoInBytes` is the sum of gen1 and gen3. Gen2 is excluded because its maxSeqno is still -1. This commit removes both `sizeOfGensAboveSeqNoInBytes` and `uncommittedSizeInBytes` methods, then enforces an engine to use only `sizeInBytesByMinGen` method to evaluate the periodically flush condition. Closes #29097 Relates ##28350
1 parent d3b9583 commit afa7fec

File tree

7 files changed

+95
-75
lines changed

7 files changed

+95
-75
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1479,7 +1479,8 @@ final boolean tryRenewSyncCommit() {
14791479
ensureOpen();
14801480
ensureCanFlush();
14811481
String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
1482-
if (syncId != null && translog.uncommittedOperations() == 0 && indexWriter.hasUncommittedChanges()) {
1482+
long translogGenOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
1483+
if (syncId != null && indexWriter.hasUncommittedChanges() && translog.totalOperationsByMinGen(translogGenOfLastCommit) == 0) {
14831484
logger.trace("start renewing sync commit [{}]", syncId);
14841485
commitIndexWriter(indexWriter, translog, syncId);
14851486
logger.debug("successfully sync committed. sync id [{}].", syncId);
@@ -1501,26 +1502,30 @@ final boolean tryRenewSyncCommit() {
15011502
@Override
15021503
public boolean shouldPeriodicallyFlush() {
15031504
ensureOpen();
1505+
final long translogGenerationOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
15041506
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
1505-
final long uncommittedSizeOfCurrentCommit = translog.uncommittedSizeInBytes();
1506-
if (uncommittedSizeOfCurrentCommit < flushThreshold) {
1507+
if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
15071508
return false;
15081509
}
15091510
/*
1510-
* We should only flush ony if the shouldFlush condition can become false after flushing.
1511-
* This condition will change if the `uncommittedSize` of the new commit is smaller than
1512-
* the `uncommittedSize` of the current commit. This method is to maintain translog only,
1513-
* thus the IndexWriter#hasUncommittedChanges condition is not considered.
1514-
*/
1515-
final long uncommittedSizeOfNewCommit = translog.sizeOfGensAboveSeqNoInBytes(localCheckpointTracker.getCheckpoint() + 1);
1516-
/*
1517-
* If flushThreshold is too small, we may repeatedly flush even there is no uncommitted operation
1518-
* as #sizeOfGensAboveSeqNoInByte and #uncommittedSizeInBytes can return different values.
1519-
* An empty translog file has non-zero `uncommittedSize` (the translog header), and method #sizeOfGensAboveSeqNoInBytes can
1520-
* return 0 now(no translog gen contains ops above local checkpoint) but method #uncommittedSizeInBytes will return an actual
1521-
* non-zero value after rolling a new translog generation. This can be avoided by checking the actual uncommitted operations.
1511+
* We flush to reduce the size of uncommitted translog but strictly speaking the uncommitted size won't always be
1512+
* below the flush-threshold after a flush. To avoid getting into an endless loop of flushing, we only enable the
1513+
* periodically flush condition if this condition is disabled after a flush. The condition will change if the new
1514+
* commit points to the later generation the last commit's(eg. gen-of-last-commit < gen-of-new-commit)[1].
1515+
*
1516+
* When the local checkpoint equals to max_seqno, and translog-gen of the last commit equals to translog-gen of
1517+
* the new commit, we know that the last generation must contain operations because its size is above the flush
1518+
* threshold and the flush-threshold is guaranteed to be higher than an empty translog by the setting validation.
1519+
* This guarantees that the new commit will point to the newly rolled generation. In fact, this scenario only
1520+
* happens when the generation-threshold is close to or above the flush-threshold; otherwise we have rolled
1521+
* generations as the generation-threshold was reached, then the first condition (eg. [1]) is already satisfied.
1522+
*
1523+
* This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered.
15221524
*/
1523-
return uncommittedSizeOfNewCommit < uncommittedSizeOfCurrentCommit && translog.uncommittedOperations() > 0;
1525+
final long translogGenerationOfNewCommit =
1526+
translog.getMinGenerationForSeqNo(localCheckpointTracker.getCheckpoint() + 1).translogFileGeneration;
1527+
return translogGenerationOfLastCommit < translogGenerationOfNewCommit
1528+
|| localCheckpointTracker.getCheckpoint() == localCheckpointTracker.getMaxSeqNo();
15241529
}
15251530

15261531
@Override

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
106106
public static final String TRANSLOG_FILE_SUFFIX = ".tlog";
107107
public static final String CHECKPOINT_SUFFIX = ".ckp";
108108
public static final String CHECKPOINT_FILE_NAME = "translog" + CHECKPOINT_SUFFIX;
109+
public static final int DEFAULT_HEADER_SIZE_IN_BYTES = TranslogWriter.getHeaderLength(UUIDs.randomBase64UUID());
109110

110111
static final Pattern PARSE_STRICT_ID_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.tlog)$");
111112

@@ -372,26 +373,11 @@ public long getMinFileGeneration() {
372373
}
373374
}
374375

375-
376-
/**
377-
* Returns the number of operations in the translog files that aren't committed to lucene.
378-
*/
379-
public int uncommittedOperations() {
380-
return totalOperations(deletionPolicy.getTranslogGenerationOfLastCommit());
381-
}
382-
383-
/**
384-
* Returns the size in bytes of the translog files that aren't committed to lucene.
385-
*/
386-
public long uncommittedSizeInBytes() {
387-
return sizeInBytesByMinGen(deletionPolicy.getTranslogGenerationOfLastCommit());
388-
}
389-
390376
/**
391377
* Returns the number of operations in the translog files
392378
*/
393379
public int totalOperations() {
394-
return totalOperations(-1);
380+
return totalOperationsByMinGen(-1);
395381
}
396382

397383
/**
@@ -402,9 +388,9 @@ public long sizeInBytes() {
402388
}
403389

404390
/**
405-
* Returns the number of operations in the transaction files that aren't committed to lucene..
391+
* Returns the number of operations in the translog files at least the given generation
406392
*/
407-
private int totalOperations(long minGeneration) {
393+
public int totalOperationsByMinGen(long minGeneration) {
408394
try (ReleasableLock ignored = readLock.acquire()) {
409395
ensureOpen();
410396
return Stream.concat(readers.stream(), Stream.of(current))
@@ -425,9 +411,9 @@ public int estimateTotalOperationsFromMinSeq(long minSeqNo) {
425411
}
426412

427413
/**
428-
* Returns the size in bytes of the translog files above the given generation
414+
* Returns the size in bytes of the translog files at least the given generation
429415
*/
430-
private long sizeInBytesByMinGen(long minGeneration) {
416+
public long sizeInBytesByMinGen(long minGeneration) {
431417
try (ReleasableLock ignored = readLock.acquire()) {
432418
ensureOpen();
433419
return Stream.concat(readers.stream(), Stream.of(current))
@@ -437,16 +423,6 @@ private long sizeInBytesByMinGen(long minGeneration) {
437423
}
438424
}
439425

440-
/**
441-
* Returns the size in bytes of the translog files with ops above the given seqNo
442-
*/
443-
public long sizeOfGensAboveSeqNoInBytes(long minSeqNo) {
444-
try (ReleasableLock ignored = readLock.acquire()) {
445-
ensureOpen();
446-
return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum();
447-
}
448-
}
449-
450426
/**
451427
* Creates a new translog for the specified generation.
452428
*
@@ -751,7 +727,8 @@ private void closeOnTragicEvent(Exception ex) {
751727
public TranslogStats stats() {
752728
// acquire lock to make the two numbers roughly consistent (no file change half way)
753729
try (ReleasableLock lock = readLock.acquire()) {
754-
return new TranslogStats(totalOperations(), sizeInBytes(), uncommittedOperations(), uncommittedSizeInBytes());
730+
final long uncommittedGen = deletionPolicy.getTranslogGenerationOfLastCommit();
731+
return new TranslogStats(totalOperations(), sizeInBytes(), totalOperationsByMinGen(uncommittedGen), sizeInBytesByMinGen(uncommittedGen));
755732
}
756733
}
757734

@@ -1508,7 +1485,7 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
15081485
* @return the minimum generation for the sequence number
15091486
*/
15101487
public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
1511-
try (ReleasableLock ignored = writeLock.acquire()) {
1488+
try (ReleasableLock ignored = readLock.acquire()) {
15121489
/*
15131490
* When flushing, the engine will ask the translog for the minimum generation that could contain any sequence number after the
15141491
* local checkpoint. Immediately after flushing, there will be no such generation, so this minimum generation in this case will

server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,6 @@ public synchronized long getMinTranslogGenerationForRecovery() {
211211

212212
/**
213213
* Returns a translog generation that will be used to calculate the number of uncommitted operations since the last index commit.
214-
* See {@link Translog#uncommittedOperations()} and {@link Translog#uncommittedSizeInBytes()}
215214
*/
216215
public synchronized long getTranslogGenerationOfLastCommit() {
217216
return translogGenerationOfLastCommit;

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -723,14 +723,13 @@ public void testTranslogRecoveryDoesNotReplayIntoTranslog() throws IOException {
723723
recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) {
724724
@Override
725725
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
726-
assertThat(getTranslog().uncommittedOperations(), equalTo(docs));
726+
assertThat(getTranslog().stats().getUncommittedOperations(), equalTo(docs));
727727
final CommitId commitId = super.flush(force, waitIfOngoing);
728728
flushed.set(true);
729729
return commitId;
730730
}
731731
};
732-
733-
assertThat(recoveringEngine.getTranslog().uncommittedOperations(), equalTo(docs));
732+
assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(docs));
734733
recoveringEngine.recoverFromTranslog();
735734
assertTrue(flushed.get());
736735
} finally {
@@ -2884,7 +2883,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException {
28842883
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
28852884
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
28862885
assertEquals(1, engine.getTranslog().currentFileGeneration());
2887-
assertEquals(0L, engine.getTranslog().uncommittedOperations());
2886+
assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations());
28882887
}
28892888
}
28902889

@@ -3840,7 +3839,7 @@ protected long doGenerateSeqNoForOperation(Operation operation) {
38403839
System.nanoTime(),
38413840
reason));
38423841
assertThat(noOpEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) (maxSeqNo + 1)));
3843-
assertThat(noOpEngine.getTranslog().uncommittedOperations(), equalTo(1 + gapsFilled));
3842+
assertThat(noOpEngine.getTranslog().stats().getUncommittedOperations(), equalTo(1 + gapsFilled));
38443843
// skip to the op that we added to the translog
38453844
Translog.Operation op;
38463845
Translog.Operation last = null;
@@ -4041,7 +4040,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
40414040
assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint());
40424041
recoveringEngine = new InternalEngine(copy(
40434042
replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get));
4044-
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().uncommittedOperations());
4043+
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().stats().getUncommittedOperations());
40454044
recoveringEngine.recoverFromTranslog();
40464045
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
40474046
assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint());
@@ -4076,7 +4075,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
40764075
recoveringEngine = new InternalEngine(
40774076
copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get));
40784077
if (flushed) {
4079-
assertEquals(0, recoveringEngine.getTranslog().uncommittedOperations());
4078+
assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
40804079
}
40814080
recoveringEngine.recoverFromTranslog();
40824081
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
@@ -4451,31 +4450,71 @@ public void testShouldPeriodicallyFlush() throws Exception {
44514450
engine.index(indexForDoc(doc));
44524451
}
44534452
assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false));
4454-
long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, engine.getTranslog().uncommittedSizeInBytes());
4453+
long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, engine.getTranslog().stats().getUncommittedSizeInBytes());
44554454
final IndexSettings indexSettings = engine.config().getIndexSettings();
44564455
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
44574456
.settings(Settings.builder().put(indexSettings.getSettings())
44584457
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build();
44594458
indexSettings.updateIndexMetaData(indexMetaData);
44604459
engine.onSettingsChanged();
4461-
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs));
4460+
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs));
44624461
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
44634462
engine.flush();
4464-
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0));
4463+
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
44654464
// Stale operations skipped by Lucene but added to translog - still able to flush
44664465
for (int id = 0; id < numDocs; id++) {
44674466
final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null);
44684467
final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 1L, id, false));
44694468
assertThat(result.isCreated(), equalTo(false));
44704469
}
44714470
SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos();
4472-
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs));
4471+
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs));
44734472
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
44744473
engine.flush(false, false);
44754474
assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo)));
4476-
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0));
4475+
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
4476+
// If the new index commit still points to the same translog generation as the current index commit,
4477+
// we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes.
4478+
engine.getLocalCheckpointTracker().generateSeqNo(); // create a gap here
4479+
for (int id = 0; id < numDocs; id++) {
4480+
if (randomBoolean()) {
4481+
engine.getTranslog().rollGeneration();
4482+
}
4483+
final ParsedDocument doc = testParsedDocument("new" + id, null, testDocumentWithTextField(), SOURCE, null);
4484+
engine.index(replicaIndexForDoc(doc, 2L, engine.getLocalCheckpointTracker().generateSeqNo(), false));
4485+
if (engine.shouldPeriodicallyFlush()) {
4486+
engine.flush();
4487+
assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo)));
4488+
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
4489+
}
4490+
}
44774491
}
44784492

4493+
public void testStressShouldPeriodicallyFlush() throws Exception {
4494+
final long flushThreshold = randomLongBetween(100, 5000);
4495+
final long generationThreshold = randomLongBetween(1000, 5000);
4496+
final IndexSettings indexSettings = engine.config().getIndexSettings();
4497+
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
4498+
.settings(Settings.builder().put(indexSettings.getSettings())
4499+
.put(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(), generationThreshold + "b")
4500+
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build();
4501+
indexSettings.updateIndexMetaData(indexMetaData);
4502+
engine.onSettingsChanged();
4503+
final int numOps = scaledRandomIntBetween(100, 10_000);
4504+
for (int i = 0; i < numOps; i++) {
4505+
final long localCheckPoint = engine.getLocalCheckpointTracker().getCheckpoint();
4506+
final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5);
4507+
final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null);
4508+
engine.index(replicaIndexForDoc(doc, 1L, seqno, false));
4509+
if (rarely() && engine.getTranslog().shouldRollGeneration()) {
4510+
engine.rollTranslogGeneration();
4511+
}
4512+
if (rarely() || engine.shouldPeriodicallyFlush()) {
4513+
engine.flush();
4514+
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
4515+
}
4516+
}
4517+
}
44794518

44804519
public void testStressUpdateSameDocWhileGettingIt() throws IOException, InterruptedException {
44814520
final int iters = randomIntBetween(1, 15);

0 commit comments

Comments
 (0)