Skip to content

Commit 0290547

Browse files
authored
Ensure that max seq # is equal to the global checkpoint when creating ReadOnlyEngines (#37426)
Since version 6.7.0 the Close Index API guarantees that all translog operations have been correctly flushed before the index is closed. If the index is reopened as a Frozen index (which uses a ReadOnlyEngine) we can verify that the maximum sequence number from the last Lucene commit is indeed equal to the last known global checkpoint and refuses to open the read only engine if it's not the case. In this PR the check is only done for indices created on or after 6.7.0 as they are guaranteed to be closed using the new Close Index API. Related #33888
1 parent a713183 commit 0290547

File tree

2 files changed

+63
-3
lines changed

2 files changed

+63
-3
lines changed

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.lucene.search.SearcherManager;
3131
import org.apache.lucene.store.Directory;
3232
import org.apache.lucene.store.Lock;
33+
import org.elasticsearch.Assertions;
34+
import org.elasticsearch.Version;
3335
import org.elasticsearch.common.lucene.Lucene;
3436
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
3537
import org.elasticsearch.core.internal.io.IOUtils;
@@ -98,7 +100,25 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
98100
indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null;
99101
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
100102
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
101-
this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats;
103+
if (seqNoStats == null) {
104+
seqNoStats = buildSeqNoStats(lastCommittedSegmentInfos);
105+
// During a peer-recovery the global checkpoint is not known and up to date when the engine
106+
// is created, so we only check the max seq no / global checkpoint coherency when the global
107+
// checkpoint is different from the unassigned sequence number value.
108+
// In addition to that we only execute the check if the index the engine belongs to has been
109+
// created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction
110+
// that guarantee that all operations have been flushed to Lucene.
111+
final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong();
112+
if (globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO
113+
&& engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_7_0)) {
114+
if (seqNoStats.getMaxSeqNo() != globalCheckpoint) {
115+
assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), globalCheckpoint);
116+
throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo()
117+
+ "] from last commit does not match global checkpoint [" + globalCheckpoint + "]");
118+
}
119+
}
120+
}
121+
this.seqNoStats = seqNoStats;
102122
this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory);
103123
reader = open(indexCommit);
104124
reader = wrapReader(reader, readerWrapperFunction);
@@ -116,6 +136,12 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
116136
}
117137
}
118138

139+
protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) {
140+
if (Assertions.ENABLED) {
141+
assert false : "max seq. no. [" + maxSeqNo + "] does not match [" + globalCheckpoint + "]";
142+
}
143+
}
144+
119145
protected final DirectoryReader wrapReader(DirectoryReader reader,
120146
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) throws IOException {
121147
reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId());

server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void testReadOnlyEngine() throws Exception {
7070
lastDocIds = getDocIds(engine, true);
7171
assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));
7272
assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo()));
73-
assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds));
73+
assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds));
7474
for (int i = 0; i < numDocs; i++) {
7575
if (randomBoolean()) {
7676
String delId = Integer.toString(i);
@@ -126,7 +126,7 @@ public void testFlushes() throws IOException {
126126
if (rarely()) {
127127
engine.flush();
128128
}
129-
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
129+
globalCheckpoint.set(i);
130130
}
131131
engine.syncTranslog();
132132
engine.flushAndClose();
@@ -139,6 +139,40 @@ public void testFlushes() throws IOException {
139139
}
140140
}
141141

142+
public void testEnsureMaxSeqNoIsEqualToGlobalCheckpoint() throws IOException {
143+
IOUtils.close(engine, store);
144+
Engine readOnlyEngine = null;
145+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
146+
try (Store store = createStore()) {
147+
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
148+
final int numDocs = scaledRandomIntBetween(10, 100);
149+
try (InternalEngine engine = createEngine(config)) {
150+
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
151+
for (int i = 0; i < numDocs; i++) {
152+
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
153+
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
154+
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
155+
maxSeqNo = engine.getLocalCheckpoint();
156+
}
157+
globalCheckpoint.set(engine.getLocalCheckpoint() - 1);
158+
engine.syncTranslog();
159+
engine.flushAndClose();
160+
161+
IllegalStateException exception = expectThrows(IllegalStateException.class,
162+
() -> new ReadOnlyEngine(engine.engineConfig, null, null, true, Function.identity()) {
163+
@Override
164+
protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) {
165+
// we don't want the assertion to trip in this test
166+
}
167+
});
168+
assertThat(exception.getMessage(), equalTo("Maximum sequence number [" + maxSeqNo
169+
+ "] from last commit does not match global checkpoint [" + globalCheckpoint.get() + "]"));
170+
} finally {
171+
IOUtils.close(readOnlyEngine);
172+
}
173+
}
174+
}
175+
142176
public void testReadOnly() throws IOException {
143177
IOUtils.close(engine, store);
144178
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

0 commit comments

Comments
 (0)