Skip to content

Commit cde2807

Browse files
committed
translog: Add a method to retrieve translog operations between two specific sequence numbers
and let users of the existing Translog.newSnapshotFromMinSeqNo(...) indirectly may use of that too.
1 parent 34662c9 commit cde2807

File tree

3 files changed

+57
-17
lines changed

3 files changed

+57
-17
lines changed

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2424,6 +2424,14 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
24242424
}
24252425
}
24262426

2427+
/**
2428+
* Returns a snapshot with translog operations having a sequence number equal or greater than <code>minSeqNo</code> and
2429+
* equal or lesser than <code>maxSeqNo</code>.
2430+
*/
2431+
public Translog.Snapshot getTranslogOperationsBetweenMinAndMaxSeqNo(long minSeqNo, long maxSeqNo) throws IOException {
2432+
return getEngine().getTranslog().newSnapshotBetweenMinAndMaxSeqNo(minSeqNo, maxSeqNo);
2433+
}
2434+
24272435
private static class RefreshMetricUpdater implements ReferenceManager.RefreshListener {
24282436

24292437
private final MeanMetric refreshMetric;

core/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,9 @@ private int totalOperations(long minGeneration) {
416416
public int estimateTotalOperationsFromMinSeq(long minSeqNo) {
417417
try (ReleasableLock ignored = readLock.acquire()) {
418418
ensureOpen();
419-
return readersAboveMinSeqNo(minSeqNo).mapToInt(BaseTranslogReader::totalOperations).sum();
419+
return readersBetweenMinAndMaxSeqNo(minSeqNo, Long.MAX_VALUE)
420+
.mapToInt(BaseTranslogReader::totalOperations)
421+
.sum();
420422
}
421423
}
422424

@@ -433,16 +435,6 @@ private long sizeInBytesByMinGen(long minGeneration) {
433435
}
434436
}
435437

436-
/**
437-
* Returns the size in bytes of the translog files with ops above the given seqNo
438-
*/
439-
private long sizeOfGensAboveSeqNoInBytes(long minSeqNo) {
440-
try (ReleasableLock ignored = readLock.acquire()) {
441-
ensureOpen();
442-
return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum();
443-
}
444-
}
445-
446438
/**
447439
* Creates a new translog for the specified generation.
448440
*
@@ -598,10 +590,19 @@ public Snapshot newSnapshotFromGen(long minGeneration) throws IOException {
598590
}
599591

600592
public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
593+
return newSnapshotBetweenMinAndMaxSeqNo(minSeqNo, Long.MAX_VALUE);
594+
}
595+
596+
/**
597+
* Returns a snapshot with operations having a sequence number equal or greater than <code>minSeqNo</code> and
598+
* equal or lesser than <code>maxSeqNo</code>.
599+
*/
600+
public Snapshot newSnapshotBetweenMinAndMaxSeqNo(long minSeqNo, long maxSeqNo) throws IOException {
601601
try (ReleasableLock ignored = readLock.acquire()) {
602602
ensureOpen();
603-
TranslogSnapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot)
604-
.toArray(TranslogSnapshot[]::new);
603+
TranslogSnapshot[] snapshots = readersBetweenMinAndMaxSeqNo(minSeqNo, maxSeqNo)
604+
.map(BaseTranslogReader::newSnapshot)
605+
.toArray(TranslogSnapshot[]::new);
605606
return newMultiSnapshot(snapshots);
606607
}
607608
}
@@ -627,14 +628,16 @@ private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOExcepti
627628
}
628629
}
629630

630-
private Stream<? extends BaseTranslogReader> readersAboveMinSeqNo(long minSeqNo) {
631+
private Stream<? extends BaseTranslogReader> readersBetweenMinAndMaxSeqNo(long minSeqNo, long maxSeqNo) {
631632
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() :
632-
"callers of readersAboveMinSeqNo must hold a lock: readLock ["
633+
"callers of readersBetweenMinAndMaxSeqNo must hold a lock: readLock ["
633634
+ readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]";
635+
634636
return Stream.concat(readers.stream(), Stream.of(current))
635637
.filter(reader -> {
636-
final long maxSeqNo = reader.getCheckpoint().maxSeqNo;
637-
return maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo;
638+
final Checkpoint checkpoint = reader.getCheckpoint();
639+
return checkpoint.maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO ||
640+
checkpoint.minSeqNo <= maxSeqNo && checkpoint.maxSeqNo >= minSeqNo;
638641
});
639642
}
640643

core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2497,6 +2497,35 @@ public void testMinSeqNoBasedAPI() throws IOException {
24972497
}
24982498
}
24992499

2500+
public void testGetOperationsBetweenMinAndMaxSeqNoAPI() throws IOException {
2501+
final int operations = randomIntBetween(2, 8096);
2502+
long seqNo = 0;
2503+
for (int i = 0; i < operations; i++) {
2504+
translog.add(new Translog.NoOp(seqNo++, 0, "test'"));
2505+
if (rarely()) {
2506+
translog.rollGeneration();
2507+
}
2508+
}
2509+
translog.rollGeneration();
2510+
2511+
int iters = randomIntBetween(8, 32);
2512+
for (int iter = 0; iter < iters; iter++) {
2513+
int min = randomIntBetween(0, operations - 1);
2514+
int max = randomIntBetween(min, operations);
2515+
try (Translog.Snapshot snapshot = translog.newSnapshotBetweenMinAndMaxSeqNo(min, max)) {
2516+
Translog.Operation operation;
2517+
do {
2518+
operation = snapshot.next();
2519+
} while (operation.seqNo() < (min - 1));
2520+
2521+
for (long expectedSeqNo = min; expectedSeqNo < max; expectedSeqNo++) {
2522+
operation = snapshot.next();
2523+
assertThat(operation.seqNo(), equalTo(expectedSeqNo));
2524+
}
2525+
}
2526+
}
2527+
}
2528+
25002529
public void testSimpleCommit() throws IOException {
25012530
final int operations = randomIntBetween(1, 4096);
25022531
long seqNo = 0;

0 commit comments

Comments
 (0)