Skip to content

Commit b5e93b5

Browse files
committed
Allow engine to recover from translog upto a seqno (#33032)
This change allows an engine to recover from its local translog up to the given seqno. The extended API can be used in these use cases: When a replica starts following a new primary, it resets its index to the safe commit, then replays its local translog up to the current global checkpoint (see #32867). When a replica starts a peer-recovery, it can initialize the start_sequence_number to the persisted global checkpoint instead of the local checkpoint of the safe commit. A replica will then replay its local translog up to that global checkpoint before accepting remote translog from the primary. This change will increase the chance of operation-based recovery. I will make this in a follow-up. Relates #32867
1 parent a6b5fc1 commit b5e93b5

File tree

8 files changed

+217
-41
lines changed

8 files changed

+217
-41
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1629,10 +1629,12 @@ public interface Warmer {
16291629
public abstract int fillSeqNoGaps(long primaryTerm) throws IOException;
16301630

16311631
/**
1632-
* Performs recovery from the transaction log.
1632+
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
16331633
* This operation will close the engine if the recovery fails.
1634+
*
1635+
* @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
16341636
*/
1635-
public abstract Engine recoverFromTranslog() throws IOException;
1637+
public abstract Engine recoverFromTranslog(long recoverUpToSeqNo) throws IOException;
16361638

16371639
/**
16381640
* Do not replay translog operations, but make the engine be ready.

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -369,15 +369,15 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
369369
}
370370

371371
@Override
372-
public InternalEngine recoverFromTranslog() throws IOException {
372+
public InternalEngine recoverFromTranslog(long recoverUpToSeqNo) throws IOException {
373373
flushLock.lock();
374374
try (ReleasableLock lock = readLock.acquire()) {
375375
ensureOpen();
376376
if (pendingTranslogRecovery.get() == false) {
377377
throw new IllegalStateException("Engine has already been recovered");
378378
}
379379
try {
380-
recoverFromTranslogInternal();
380+
recoverFromTranslogInternal(recoverUpToSeqNo);
381381
} catch (Exception e) {
382382
try {
383383
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
@@ -399,11 +399,12 @@ public void skipTranslogRecovery() {
399399
pendingTranslogRecovery.set(false); // we are good - now we can commit
400400
}
401401

402-
private void recoverFromTranslogInternal() throws IOException {
402+
private void recoverFromTranslogInternal(long recoverUpToSeqNo) throws IOException {
403403
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
404404
final int opsRecovered;
405-
final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
406-
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen)) {
405+
final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
406+
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
407+
new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) {
407408
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
408409
} catch (Exception e) {
409410
throw new EngineException(shardId, "failed to recover from translog", e);

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1336,7 +1336,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
13361336
**/
13371337
public void openEngineAndRecoverFromTranslog() throws IOException {
13381338
innerOpenEngineAndTranslog();
1339-
getEngine().recoverFromTranslog();
1339+
getEngine().recoverFromTranslog(Long.MAX_VALUE);
13401340
}
13411341

13421342
/**

server/src/main/java/org/elasticsearch/index/translog/Translog.java

+64-6
Original file line numberDiff line numberDiff line change
@@ -579,21 +579,27 @@ public long getLastSyncedGlobalCheckpoint() {
579579
*/
580580
public Snapshot newSnapshot() throws IOException {
581581
try (ReleasableLock ignored = readLock.acquire()) {
582-
return newSnapshotFromGen(getMinFileGeneration());
582+
return newSnapshotFromGen(new TranslogGeneration(translogUUID, getMinFileGeneration()), Long.MAX_VALUE);
583583
}
584584
}
585585

586-
public Snapshot newSnapshotFromGen(long minGeneration) throws IOException {
586+
public Snapshot newSnapshotFromGen(TranslogGeneration fromGeneration, long upToSeqNo) throws IOException {
587587
try (ReleasableLock ignored = readLock.acquire()) {
588588
ensureOpen();
589-
if (minGeneration < getMinFileGeneration()) {
590-
throw new IllegalArgumentException("requested snapshot generation [" + minGeneration + "] is not available. " +
589+
final long fromFileGen = fromGeneration.translogFileGeneration;
590+
if (fromFileGen < getMinFileGeneration()) {
591+
throw new IllegalArgumentException("requested snapshot generation [" + fromFileGen + "] is not available. " +
591592
"Min referenced generation is [" + getMinFileGeneration() + "]");
592593
}
593594
TranslogSnapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current))
594-
.filter(reader -> reader.getGeneration() >= minGeneration)
595+
.filter(reader -> reader.getGeneration() >= fromFileGen && reader.getCheckpoint().minSeqNo <= upToSeqNo)
595596
.map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new);
596-
return newMultiSnapshot(snapshots);
597+
final Snapshot snapshot = newMultiSnapshot(snapshots);
598+
if (upToSeqNo == Long.MAX_VALUE) {
599+
return snapshot;
600+
} else {
601+
return new SeqNoFilterSnapshot(snapshot, Long.MIN_VALUE, upToSeqNo);
602+
}
597603
}
598604
}
599605

@@ -928,7 +934,59 @@ default int overriddenOperations() {
928934
* Returns the next operation in the snapshot or <code>null</code> if we reached the end.
929935
*/
930936
Translog.Operation next() throws IOException;
937+
}
938+
939+
/**
940+
* A filtered snapshot consisting of only operations whose sequence numbers are in the given range
941+
* between {@code fromSeqNo} (inclusive) and {@code toSeqNo} (inclusive). This filtered snapshot
942+
* shares the same underlying resources with the {@code delegate} snapshot, therefore we should not
943+
* use the {@code delegate} after passing it to this filtered snapshot.
944+
*/
945+
static final class SeqNoFilterSnapshot implements Snapshot {
946+
private final Snapshot delegate;
947+
private int filteredOpsCount;
948+
private final long fromSeqNo; // inclusive
949+
private final long toSeqNo; // inclusive
950+
951+
SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo) {
952+
assert fromSeqNo <= toSeqNo : "from_seq_no[" + fromSeqNo + "] > to_seq_no[" + toSeqNo + "]";
953+
this.delegate = delegate;
954+
this.fromSeqNo = fromSeqNo;
955+
this.toSeqNo = toSeqNo;
956+
}
957+
958+
@Override
959+
public int totalOperations() {
960+
return delegate.totalOperations();
961+
}
962+
963+
@Override
964+
public int skippedOperations() {
965+
return filteredOpsCount + delegate.skippedOperations();
966+
}
931967

968+
@Override
969+
public int overriddenOperations() {
970+
return delegate.overriddenOperations();
971+
}
972+
973+
@Override
974+
public Operation next() throws IOException {
975+
Translog.Operation op;
976+
while ((op = delegate.next()) != null) {
977+
if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo) {
978+
return op;
979+
} else {
980+
filteredOpsCount++;
981+
}
982+
}
983+
return null;
984+
}
985+
986+
@Override
987+
public void close() throws IOException {
988+
delegate.close();
989+
}
932990
}
933991

934992
/**

0 commit comments

Comments
 (0)