Skip to content

Commit cccc9c6

Browse files
committed
Encapsulate Translog in Engine (#31220)
This removes the abstract `getTranslog` method in `Engine`, instead leaving it to the abstract implementations of the other methods that use the translog. This allows future Engines not to have a Translog, as instead they must implement the methods that use the translog pieces to return necessary values.
1 parent 8c76a1f commit cccc9c6

File tree

4 files changed

+62
-40
lines changed

4 files changed

+62
-40
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -565,18 +565,10 @@ public enum SearcherScope {
565565
EXTERNAL, INTERNAL
566566
}
567567

568-
/**
569-
* Returns the translog associated with this engine.
570-
* Prefer to keep the translog package-private, so that an engine can control all accesses to the translog.
571-
*/
572-
abstract Translog getTranslog();
573-
574568
/**
575569
* Checks if the underlying storage sync is required.
576570
*/
577-
public boolean isTranslogSyncNeeded() {
578-
return getTranslog().syncNeeded();
579-
}
571+
public abstract boolean isTranslogSyncNeeded();
580572

581573
/**
582574
* Ensures that all locations in the given stream have been written to the underlying storage.
@@ -585,35 +577,25 @@ public boolean isTranslogSyncNeeded() {
585577

586578
public abstract void syncTranslog() throws IOException;
587579

588-
public Closeable acquireTranslogRetentionLock() {
589-
return getTranslog().acquireRetentionLock();
590-
}
580+
public abstract Closeable acquireTranslogRetentionLock();
591581

592582
/**
593583
* Creates a new translog snapshot from this engine for reading translog operations whose seq# at least the provided seq#.
594584
* The caller has to close the returned snapshot after finishing the reading.
595585
*/
596-
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
597-
return getTranslog().newSnapshotFromMinSeqNo(minSeqNo);
598-
}
586+
public abstract Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException;
599587

600588
/**
601589
* Returns the estimated number of translog operations in this engine whose seq# at least the provided seq#.
602590
*/
603-
public int estimateTranslogOperationsFromMinSeq(long minSeqNo) {
604-
return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo);
605-
}
591+
public abstract int estimateTranslogOperationsFromMinSeq(long minSeqNo);
606592

607-
public TranslogStats getTranslogStats() {
608-
return getTranslog().stats();
609-
}
593+
public abstract TranslogStats getTranslogStats();
610594

611595
/**
612596
* Returns the last location that the translog of this engine has written into.
613597
*/
614-
public Translog.Location getTranslogLastWriteLocation() {
615-
return getTranslog().getLastWriteLocation();
616-
}
598+
public abstract Translog.Location getTranslogLastWriteLocation();
617599

618600
protected final void ensureOpen(Exception suppressed) {
619601
if (isClosed.get()) {
@@ -661,9 +643,7 @@ public CommitStats commitStats() {
661643
/**
662644
* Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint)
663645
*/
664-
public long getLastSyncedGlobalCheckpoint() {
665-
return getTranslog().getLastSyncedGlobalCheckpoint();
666-
}
646+
public abstract long getLastSyncedGlobalCheckpoint();
667647

668648
/**
669649
* Global stats on segments.
@@ -935,9 +915,7 @@ public final boolean refreshNeeded() {
935915
*
936916
* @return {@code true} if the current generation should be rolled to a new generation
937917
*/
938-
public boolean shouldRollTranslogGeneration() {
939-
return getTranslog().shouldRollGeneration();
940-
}
918+
public abstract boolean shouldRollTranslogGeneration();
941919

942920
/**
943921
* Rolls the translog generation and cleans unneeded.

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,10 @@
7575
import org.elasticsearch.index.translog.TranslogConfig;
7676
import org.elasticsearch.index.translog.TranslogCorruptedException;
7777
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
78+
import org.elasticsearch.index.translog.TranslogStats;
7879
import org.elasticsearch.threadpool.ThreadPool;
7980

81+
import java.io.Closeable;
8082
import java.io.IOException;
8183
import java.util.Arrays;
8284
import java.util.Collection;
@@ -435,12 +437,17 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy
435437
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, engineConfig.getPrimaryTermSupplier());
436438
}
437439

438-
@Override
440+
// Package private for testing purposes only
439441
Translog getTranslog() {
440442
ensureOpen();
441443
return translog;
442444
}
443445

446+
@Override
447+
public boolean isTranslogSyncNeeded() {
448+
return getTranslog().syncNeeded();
449+
}
450+
444451
@Override
445452
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
446453
final boolean synced = translog.ensureSynced(locations);
@@ -456,6 +463,31 @@ public void syncTranslog() throws IOException {
456463
revisitIndexDeletionPolicyOnTranslogSynced();
457464
}
458465

466+
@Override
467+
public Closeable acquireTranslogRetentionLock() {
468+
return getTranslog().acquireRetentionLock();
469+
}
470+
471+
@Override
472+
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
473+
return getTranslog().newSnapshotFromMinSeqNo(minSeqNo);
474+
}
475+
476+
@Override
477+
public int estimateTranslogOperationsFromMinSeq(long minSeqNo) {
478+
return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo);
479+
}
480+
481+
@Override
482+
public TranslogStats getTranslogStats() {
483+
return getTranslog().stats();
484+
}
485+
486+
@Override
487+
public Translog.Location getTranslogLastWriteLocation() {
488+
return getTranslog().getLastWriteLocation();
489+
}
490+
459491
private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
460492
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
461493
indexWriter.deleteUnusedFiles();
@@ -1619,6 +1651,11 @@ public void trimUnreferencedTranslogFiles() throws EngineException {
16191651
}
16201652
}
16211653

1654+
@Override
1655+
public boolean shouldRollTranslogGeneration() {
1656+
return getTranslog().shouldRollGeneration();
1657+
}
1658+
16221659
@Override
16231660
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
16241661
try (ReleasableLock lock = readLock.acquire()) {
@@ -2240,6 +2277,11 @@ LocalCheckpointTracker getLocalCheckpointTracker() {
22402277
return localCheckpointTracker;
22412278
}
22422279

2280+
@Override
2281+
public long getLastSyncedGlobalCheckpoint() {
2282+
return getTranslog().getLastSyncedGlobalCheckpoint();
2283+
}
2284+
22432285
@Override
22442286
public long getLocalCheckpoint() {
22452287
return localCheckpointTracker.getCheckpoint();

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
731731
super.commitIndexWriter(writer, translog, syncId);
732732
}
733733
};
734-
assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(docs));
734+
assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs));
735735
recoveringEngine.recoverFromTranslog();
736736
assertTrue(committed.get());
737737
} finally {
@@ -758,7 +758,7 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException {
758758
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
759759
initialEngine.index(indexForDoc(doc));
760760
if (rarely()) {
761-
initialEngine.getTranslog().rollGeneration();
761+
getTranslog(initialEngine).rollGeneration();
762762
} else if (rarely()) {
763763
initialEngine.flush();
764764
}
@@ -4007,14 +4007,14 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
40074007
assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpoint());
40084008
trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get));
40094009
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
4010-
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().stats().getUncommittedOperations());
4010+
assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations());
40114011
recoveringEngine.recoverFromTranslog();
40124012
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
40134013
assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint());
40144014
assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2));
40154015

40164016
// now snapshot the tlog and ensure the primary term is updated
4017-
try (Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot()) {
4017+
try (Translog.Snapshot snapshot = getTranslog(recoveringEngine).newSnapshot()) {
40184018
assertTrue((maxSeqIDOnReplica + 1) - numDocsOnReplica <= snapshot.totalOperations());
40194019
Translog.Operation operation;
40204020
while ((operation = snapshot.next()) != null) {
@@ -4029,7 +4029,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
40294029
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint());
40304030
if ((flushed = randomBoolean())) {
40314031
globalCheckpoint.set(recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
4032-
recoveringEngine.getTranslog().sync();
4032+
getTranslog(recoveringEngine).sync();
40334033
recoveringEngine.flush(true, true);
40344034
}
40354035
}
@@ -4042,7 +4042,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
40424042
trimUnsafeCommits(replicaEngine.config());
40434043
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
40444044
if (flushed) {
4045-
assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
4045+
assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
40464046
}
40474047
recoveringEngine.recoverFromTranslog();
40484048
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
@@ -4245,7 +4245,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
42454245
engine.index(indexForDoc(testParsedDocument(Integer.toString(docId), null, document, B_1, null)));
42464246
if (frequently()) {
42474247
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
4248-
engine.getTranslog().sync();
4248+
engine.syncTranslog();
42494249
}
42504250
if (frequently()) {
42514251
final long lastSyncedGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
@@ -4267,7 +4267,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
42674267
}
42684268
// Make sure we keep all translog operations after the local checkpoint of the safe commit.
42694269
long localCheckpointFromSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
4270-
try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) {
4270+
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
42714271
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(localCheckpointFromSafeCommit + 1, docId));
42724272
}
42734273
}

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,8 @@ protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo,
507507
* Exposes a translog associated with the given engine for testing purpose.
508508
*/
509509
public static Translog getTranslog(Engine engine) {
510-
return engine.getTranslog();
510+
assert engine instanceof InternalEngine : "only InternalEngines have translogs, got: " + engine.getClass();
511+
InternalEngine internalEngine = (InternalEngine) engine;
512+
return internalEngine.getTranslog();
511513
}
512514
}

0 commit comments

Comments
 (0)