Skip to content

Commit edac71d

Browse files
committed
Provide a lower and upper bound when internally reading translog operations (#26708)
translog: Add a method to retrieve translog operations between two specific sequence numbers and let users of the existing Translog.getSnapshotBetween(...) indirectly may use of that too.
1 parent 1254fb9 commit edac71d

File tree

5 files changed

+67
-17
lines changed

5 files changed

+67
-17
lines changed

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ public void restoreLocalCheckpointFromTranslog() throws IOException {
246246
try (ReleasableLock ignored = writeLock.acquire()) {
247247
ensureOpen();
248248
final long localCheckpoint = seqNoService().getLocalCheckpoint();
249-
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
249+
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFrom(localCheckpoint + 1)) {
250250
Translog.Operation operation;
251251
while ((operation = snapshot.next()) != null) {
252252
if (operation.seqNo() > localCheckpoint) {

core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void resync(final IndexShard indexShard, final ActionListener<ResyncTask>
8282
ActionListener<ResyncTask> resyncListener = null;
8383
try {
8484
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
85-
Translog.Snapshot snapshot = indexShard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
85+
Translog.Snapshot snapshot = indexShard.getTranslog().newSnapshotFrom(startingSeqNo);
8686
resyncListener = new ActionListener<ResyncTask>() {
8787
@Override
8888
public void onResponse(final ResyncTask resyncTask) {

core/src/main/java/org/elasticsearch/index/translog/Translog.java

+25-11
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,9 @@ private int totalOperations(long minGeneration) {
416416
public int estimateTotalOperationsFromMinSeq(long minSeqNo) {
417417
try (ReleasableLock ignored = readLock.acquire()) {
418418
ensureOpen();
419-
return readersAboveMinSeqNo(minSeqNo).mapToInt(BaseTranslogReader::totalOperations).sum();
419+
return readersBetweenMinAndMaxSeqNo(minSeqNo, Long.MAX_VALUE)
420+
.mapToInt(BaseTranslogReader::totalOperations)
421+
.sum();
420422
}
421423
}
422424

@@ -439,7 +441,7 @@ private long sizeInBytesByMinGen(long minGeneration) {
439441
private long sizeOfGensAboveSeqNoInBytes(long minSeqNo) {
440442
try (ReleasableLock ignored = readLock.acquire()) {
441443
ensureOpen();
442-
return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum();
444+
return readersBetweenMinAndMaxSeqNo(minSeqNo, Long.MAX_VALUE).mapToLong(BaseTranslogReader::sizeInBytes).sum();
443445
}
444446
}
445447

@@ -597,11 +599,23 @@ public Snapshot newSnapshotFromGen(long minGeneration) throws IOException {
597599
}
598600
}
599601

600-
public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
602+
/**
603+
* Returns a snapshot with operations having a sequence number equal to or greater than <code>minSeqNo</code>.
604+
*/
605+
public Snapshot newSnapshotFrom(long minSeqNo) throws IOException {
606+
return getSnapshotBetween(minSeqNo, Long.MAX_VALUE);
607+
}
608+
609+
/**
610+
* Returns a snapshot with operations having a sequence number equal to or greater than <code>minSeqNo</code> and
611+
* equal to or lesser than <code>maxSeqNo</code>.
612+
*/
613+
public Snapshot getSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException {
601614
try (ReleasableLock ignored = readLock.acquire()) {
602615
ensureOpen();
603-
TranslogSnapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot)
604-
.toArray(TranslogSnapshot[]::new);
616+
TranslogSnapshot[] snapshots = readersBetweenMinAndMaxSeqNo(minSeqNo, maxSeqNo)
617+
.map(BaseTranslogReader::newSnapshot)
618+
.toArray(TranslogSnapshot[]::new);
605619
return newMultiSnapshot(snapshots);
606620
}
607621
}
@@ -627,14 +641,14 @@ private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOExcepti
627641
}
628642
}
629643

630-
private Stream<? extends BaseTranslogReader> readersAboveMinSeqNo(long minSeqNo) {
631-
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() :
632-
"callers of readersAboveMinSeqNo must hold a lock: readLock ["
633-
+ readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]";
644+
private Stream<? extends BaseTranslogReader> readersBetweenMinAndMaxSeqNo(long minSeqNo, long maxSeqNo) {
645+
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
646+
634647
return Stream.concat(readers.stream(), Stream.of(current))
635648
.filter(reader -> {
636-
final long maxSeqNo = reader.getCheckpoint().maxSeqNo;
637-
return maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo;
649+
final Checkpoint checkpoint = reader.getCheckpoint();
650+
return checkpoint.maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO ||
651+
checkpoint.minSeqNo <= maxSeqNo && checkpoint.maxSeqNo >= minSeqNo;
638652
});
639653
}
640654

core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
188188

189189
logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
190190
final long targetLocalCheckpoint;
191-
try(Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(startingSeqNo)) {
191+
try(Translog.Snapshot snapshot = translog.newSnapshotFrom(startingSeqNo)) {
192192
targetLocalCheckpoint = phase2(startingSeqNo, snapshot);
193193
} catch (Exception e) {
194194
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
@@ -244,7 +244,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException {
244244
logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo);
245245

246246
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1);
247-
try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
247+
try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFrom(startingSeqNo)) {
248248
Translog.Operation operation;
249249
while ((operation = snapshot.next()) != null) {
250250
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {

core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

+38-2
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import java.io.Closeable;
7777
import java.io.EOFException;
7878
import java.io.IOException;
79+
import java.io.UncheckedIOException;
7980
import java.nio.ByteBuffer;
8081
import java.nio.channels.FileChannel;
8182
import java.nio.charset.Charset;
@@ -88,6 +89,7 @@
8889
import java.util.Arrays;
8990
import java.util.Collection;
9091
import java.util.Collections;
92+
import java.util.Comparator;
9193
import java.util.HashMap;
9294
import java.util.HashSet;
9395
import java.util.Iterator;
@@ -104,7 +106,9 @@
104106
import java.util.concurrent.atomic.AtomicInteger;
105107
import java.util.concurrent.atomic.AtomicLong;
106108
import java.util.stream.Collectors;
109+
import java.util.stream.IntStream;
107110
import java.util.stream.LongStream;
111+
import java.util.stream.Stream;
108112

109113
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween;
110114
import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
@@ -891,7 +895,7 @@ protected void doRun() throws Exception {
891895
// these are what we expect the snapshot to return (and potentially some more).
892896
Set<Translog.Operation> expectedOps = new HashSet<>(writtenOps.keySet());
893897
expectedOps.removeIf(op -> op.seqNo() <= committedLocalCheckpointAtView);
894-
try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(committedLocalCheckpointAtView + 1L)) {
898+
try (Translog.Snapshot snapshot = translog.newSnapshotFrom(committedLocalCheckpointAtView + 1L)) {
895899
Translog.Operation op;
896900
while ((op = snapshot.next()) != null) {
897901
expectedOps.remove(op);
@@ -2484,7 +2488,7 @@ public void testMinSeqNoBasedAPI() throws IOException {
24842488
}
24852489
assertThat(translog.estimateTotalOperationsFromMinSeq(seqNo), equalTo(expectedSnapshotOps));
24862490
int readFromSnapshot = 0;
2487-
try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(seqNo)) {
2491+
try (Translog.Snapshot snapshot = translog.newSnapshotFrom(seqNo)) {
24882492
assertThat(snapshot.totalOperations(), equalTo(expectedSnapshotOps));
24892493
Translog.Operation op;
24902494
while ((op = snapshot.next()) != null) {
@@ -2500,6 +2504,38 @@ public void testMinSeqNoBasedAPI() throws IOException {
25002504
}
25012505
}
25022506

2507+
public void testGetSnapshotBetween() throws IOException {
2508+
final int numOperations = randomIntBetween(2, 8196);
2509+
final List<Integer> sequenceNumbers = IntStream.range(0, numOperations).boxed().collect(Collectors.toList());
2510+
Collections.shuffle(sequenceNumbers, random());
2511+
for (Integer sequenceNumber : sequenceNumbers) {
2512+
translog.add(new Translog.NoOp(sequenceNumber, 0, "test"));
2513+
if (rarely()) {
2514+
translog.rollGeneration();
2515+
}
2516+
}
2517+
translog.rollGeneration();
2518+
2519+
final int iters = randomIntBetween(8, 32);
2520+
for (int iter = 0; iter < iters; iter++) {
2521+
int min = randomIntBetween(0, numOperations - 1);
2522+
int max = randomIntBetween(min, numOperations);
2523+
try (Translog.Snapshot snapshot = translog.getSnapshotBetween(min, max)) {
2524+
final List<Translog.Operation> operations = new ArrayList<>();
2525+
for (Translog.Operation operation = snapshot.next(); operation != null; operation = snapshot.next()) {
2526+
if (operation.seqNo() >= min && operation.seqNo() <= max) {
2527+
operations.add(operation);
2528+
}
2529+
}
2530+
operations.sort(Comparator.comparingLong(Translog.Operation::seqNo));
2531+
Iterator<Translog.Operation> iterator = operations.iterator();
2532+
for (long expectedSeqNo = min; expectedSeqNo < max; expectedSeqNo++) {
2533+
assertThat(iterator.next().seqNo(), equalTo(expectedSeqNo));
2534+
}
2535+
}
2536+
}
2537+
}
2538+
25032539
public void testSimpleCommit() throws IOException {
25042540
final int operations = randomIntBetween(1, 4096);
25052541
long seqNo = 0;

0 commit comments

Comments
 (0)