Skip to content

Commit 262d3c0

Browse files
authored
Allow engine to recover from translog upto a seqno (#33032)
This change allows an engine to recover from its local translog up to the given seqno. The extended API can be used in these use cases: When a replica starts following a new primary, it resets its index to the safe commit, then replays its local translog up to the current global checkpoint (see #32867). When a replica starts a peer-recovery, it can initialize the start_sequence_number to the persisted global checkpoint instead of the local checkpoint of the safe commit. A replica will then replay its local translog up to that global checkpoint before accepting remote translog from the primary. This change will increase the chance of operation-based recovery. I will make this in a follow-up. Relates #32867
1 parent b02150a commit 262d3c0

File tree

8 files changed

+217
-41
lines changed

8 files changed

+217
-41
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1623,10 +1623,12 @@ public interface Warmer {
16231623
public abstract int fillSeqNoGaps(long primaryTerm) throws IOException;
16241624

16251625
/**
1626-
* Performs recovery from the transaction log.
1626+
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
16271627
* This operation will close the engine if the recovery fails.
1628+
*
1629+
* @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
16281630
*/
1629-
public abstract Engine recoverFromTranslog() throws IOException;
1631+
public abstract Engine recoverFromTranslog(long recoverUpToSeqNo) throws IOException;
16301632

16311633
/**
16321634
* Do not replay translog operations, but make the engine be ready.

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -364,15 +364,15 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
364364
}
365365

366366
@Override
367-
public InternalEngine recoverFromTranslog() throws IOException {
367+
public InternalEngine recoverFromTranslog(long recoverUpToSeqNo) throws IOException {
368368
flushLock.lock();
369369
try (ReleasableLock lock = readLock.acquire()) {
370370
ensureOpen();
371371
if (pendingTranslogRecovery.get() == false) {
372372
throw new IllegalStateException("Engine has already been recovered");
373373
}
374374
try {
375-
recoverFromTranslogInternal();
375+
recoverFromTranslogInternal(recoverUpToSeqNo);
376376
} catch (Exception e) {
377377
try {
378378
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
@@ -394,11 +394,12 @@ public void skipTranslogRecovery() {
394394
pendingTranslogRecovery.set(false); // we are good - now we can commit
395395
}
396396

397-
private void recoverFromTranslogInternal() throws IOException {
397+
private void recoverFromTranslogInternal(long recoverUpToSeqNo) throws IOException {
398398
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
399399
final int opsRecovered;
400-
final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
401-
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen)) {
400+
final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
401+
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
402+
new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) {
402403
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
403404
} catch (Exception e) {
404405
throw new EngineException(shardId, "failed to recover from translog", e);

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1305,7 +1305,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
13051305
**/
13061306
public void openEngineAndRecoverFromTranslog() throws IOException {
13071307
innerOpenEngineAndTranslog();
1308-
getEngine().recoverFromTranslog();
1308+
getEngine().recoverFromTranslog(Long.MAX_VALUE);
13091309
}
13101310

13111311
/**

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -577,21 +577,27 @@ public long getLastSyncedGlobalCheckpoint() {
577577
*/
578578
public Snapshot newSnapshot() throws IOException {
579579
try (ReleasableLock ignored = readLock.acquire()) {
580-
return newSnapshotFromGen(getMinFileGeneration());
580+
return newSnapshotFromGen(new TranslogGeneration(translogUUID, getMinFileGeneration()), Long.MAX_VALUE);
581581
}
582582
}
583583

584-
public Snapshot newSnapshotFromGen(long minGeneration) throws IOException {
584+
public Snapshot newSnapshotFromGen(TranslogGeneration fromGeneration, long upToSeqNo) throws IOException {
585585
try (ReleasableLock ignored = readLock.acquire()) {
586586
ensureOpen();
587-
if (minGeneration < getMinFileGeneration()) {
588-
throw new IllegalArgumentException("requested snapshot generation [" + minGeneration + "] is not available. " +
587+
final long fromFileGen = fromGeneration.translogFileGeneration;
588+
if (fromFileGen < getMinFileGeneration()) {
589+
throw new IllegalArgumentException("requested snapshot generation [" + fromFileGen + "] is not available. " +
589590
"Min referenced generation is [" + getMinFileGeneration() + "]");
590591
}
591592
TranslogSnapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current))
592-
.filter(reader -> reader.getGeneration() >= minGeneration)
593+
.filter(reader -> reader.getGeneration() >= fromFileGen && reader.getCheckpoint().minSeqNo <= upToSeqNo)
593594
.map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new);
594-
return newMultiSnapshot(snapshots);
595+
final Snapshot snapshot = newMultiSnapshot(snapshots);
596+
if (upToSeqNo == Long.MAX_VALUE) {
597+
return snapshot;
598+
} else {
599+
return new SeqNoFilterSnapshot(snapshot, Long.MIN_VALUE, upToSeqNo);
600+
}
595601
}
596602
}
597603

@@ -926,7 +932,59 @@ default int overriddenOperations() {
926932
* Returns the next operation in the snapshot or <code>null</code> if we reached the end.
927933
*/
928934
Translog.Operation next() throws IOException;
935+
}
936+
937+
/**
938+
* A filtered snapshot consisting of only operations whose sequence numbers are in the given range
939+
* between {@code fromSeqNo} (inclusive) and {@code toSeqNo} (inclusive). This filtered snapshot
940+
* shares the same underlying resources with the {@code delegate} snapshot, therefore we should not
941+
* use the {@code delegate} after passing it to this filtered snapshot.
942+
*/
943+
static final class SeqNoFilterSnapshot implements Snapshot {
944+
private final Snapshot delegate;
945+
private int filteredOpsCount;
946+
private final long fromSeqNo; // inclusive
947+
private final long toSeqNo; // inclusive
929948

949+
SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo) {
950+
assert fromSeqNo <= toSeqNo : "from_seq_no[" + fromSeqNo + "] > to_seq_no[" + toSeqNo + "]";
951+
this.delegate = delegate;
952+
this.fromSeqNo = fromSeqNo;
953+
this.toSeqNo = toSeqNo;
954+
}
955+
956+
@Override
957+
public int totalOperations() {
958+
return delegate.totalOperations();
959+
}
960+
961+
@Override
962+
public int skippedOperations() {
963+
return filteredOpsCount + delegate.skippedOperations();
964+
}
965+
966+
@Override
967+
public int overriddenOperations() {
968+
return delegate.overriddenOperations();
969+
}
970+
971+
@Override
972+
public Operation next() throws IOException {
973+
Translog.Operation op;
974+
while ((op = delegate.next()) != null) {
975+
if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo) {
976+
return op;
977+
} else {
978+
filteredOpsCount++;
979+
}
980+
}
981+
return null;
982+
}
983+
984+
@Override
985+
public void close() throws IOException {
986+
delegate.close();
987+
}
930988
}
931989

932990
/**

0 commit comments

Comments
 (0)