Skip to content

Commit 41a902a

Browse files
committed
[RCI] Add NoOpEngine for closed indices (#33903)
This commit adds a new NoOpEngine implementation based on the current ReadOnlyEngine. This new implementation uses an empty DirectoryReader with no segments readers and will always returns 0 docs. The NoOpEngine is the default Engine created for IndexShards of closed indices. It expects an empty translog when it is instantiated. Relates to #33888
1 parent 0290547 commit 41a902a

File tree

6 files changed

+423
-2
lines changed

6 files changed

+423
-2
lines changed
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.engine;
21+
22+
import org.apache.lucene.index.DirectoryReader;
23+
import org.apache.lucene.index.IndexCommit;
24+
import org.apache.lucene.index.IndexWriter;
25+
import org.apache.lucene.index.LeafReader;
26+
import org.apache.lucene.store.Directory;
27+
import org.elasticsearch.common.Nullable;
28+
import org.elasticsearch.core.internal.io.IOUtils;
29+
import org.elasticsearch.index.translog.Translog;
30+
import org.elasticsearch.index.translog.TranslogConfig;
31+
import org.elasticsearch.index.translog.TranslogCorruptedException;
32+
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
33+
34+
import java.io.IOException;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.function.LongSupplier;
38+
import java.util.stream.Stream;
39+
40+
/**
41+
* NoOpEngine is an engine implementation that does nothing but the bare minimum
42+
* required in order to have an engine. All attempts to do something (search,
43+
* index, get), throw {@link UnsupportedOperationException}. This does maintain
44+
* a translog with a deletion policy so that when flushing, no translog is
45+
* retained on disk (setting a retention size and age of 0).
46+
*
47+
* It's also important to notice that this does list the commits of the Store's
48+
* Directory so that the last commit's user data can be read for the historyUUID
49+
* and last committed segment info.
50+
*/
51+
public final class NoOpEngine extends ReadOnlyEngine {
52+
53+
public NoOpEngine(EngineConfig engineConfig) {
54+
super(engineConfig, null, null, true, directoryReader -> directoryReader);
55+
boolean success = false;
56+
try {
57+
// The deletion policy for the translog should not keep any translogs around, so the min age/size is set to -1
58+
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1);
59+
60+
// The translog is opened and closed to validate that the translog UUID from lucene is the same as the one in the translog
61+
try (Translog translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier())) {
62+
final int nbOperations = translog.totalOperations();
63+
if (nbOperations != 0) {
64+
throw new IllegalArgumentException("Expected 0 translog operations but there were " + nbOperations);
65+
}
66+
}
67+
success = true;
68+
} catch (IOException | TranslogCorruptedException e) {
69+
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
70+
} finally {
71+
if (success == false) {
72+
IOUtils.closeWhileHandlingException(this);
73+
}
74+
}
75+
}
76+
77+
@Override
78+
protected DirectoryReader open(final Directory directory) throws IOException {
79+
final List<IndexCommit> indexCommits = DirectoryReader.listCommits(directory);
80+
assert indexCommits.size() == 1 : "expected only one commit point";
81+
IndexCommit indexCommit = indexCommits.get(indexCommits.size() - 1);
82+
return new DirectoryReader(directory, new LeafReader[0]) {
83+
@Override
84+
protected DirectoryReader doOpenIfChanged() throws IOException {
85+
return null;
86+
}
87+
88+
@Override
89+
protected DirectoryReader doOpenIfChanged(IndexCommit commit) throws IOException {
90+
return null;
91+
}
92+
93+
@Override
94+
protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) throws IOException {
95+
return null;
96+
}
97+
98+
@Override
99+
public long getVersion() {
100+
return 0;
101+
}
102+
103+
@Override
104+
public boolean isCurrent() throws IOException {
105+
return true;
106+
}
107+
108+
@Override
109+
public IndexCommit getIndexCommit() throws IOException {
110+
return indexCommit;
111+
}
112+
113+
@Override
114+
protected void doClose() throws IOException {
115+
}
116+
117+
@Override
118+
public CacheHelper getReaderCacheHelper() {
119+
return null;
120+
}
121+
};
122+
}
123+
124+
private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy,
125+
LongSupplier globalCheckpointSupplier) throws IOException {
126+
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
127+
final String translogUUID = loadTranslogUUIDFromLastCommit();
128+
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
129+
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier,
130+
engineConfig.getPrimaryTermSupplier());
131+
}
132+
133+
/**
134+
* Reads the current stored translog ID from the last commit data.
135+
*/
136+
@Nullable
137+
private String loadTranslogUUIDFromLastCommit() {
138+
final Map<String, String> commitUserData = getLastCommittedSegmentInfos().getUserData();
139+
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
140+
throw new IllegalStateException("Commit doesn't contain translog generation id");
141+
}
142+
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
143+
}
144+
145+
@Override
146+
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) {
147+
throw new UnsupportedOperationException("Translog synchronization should never be needed");
148+
}
149+
}

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.elasticsearch.index.engine.CommitStats;
8585
import org.elasticsearch.index.engine.EngineFactory;
8686
import org.elasticsearch.index.engine.InternalEngineFactory;
87+
import org.elasticsearch.index.engine.NoOpEngine;
8788
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
8889
import org.elasticsearch.index.flush.FlushStats;
8990
import org.elasticsearch.index.get.GetStats;
@@ -517,6 +518,12 @@ private synchronized IndexService createIndexService(final String reason,
517518
}
518519

519520
private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
521+
final IndexMetaData indexMetaData = idxSettings.getIndexMetaData();
522+
if (indexMetaData != null && indexMetaData.getState() == IndexMetaData.State.CLOSE) {
523+
// NoOpEngine takes precedence as long as the index is closed
524+
return NoOpEngine::new;
525+
}
526+
520527
final List<Optional<EngineFactory>> engineFactories =
521528
engineFactoryProviders
522529
.stream()
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.engine;
21+
22+
import org.apache.lucene.index.DirectoryReader;
23+
import org.apache.lucene.index.IndexReader;
24+
import org.apache.lucene.index.MergePolicy;
25+
import org.apache.lucene.index.NoMergePolicy;
26+
import org.apache.lucene.store.LockObtainFailedException;
27+
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
28+
import org.elasticsearch.cluster.routing.ShardRouting;
29+
import org.elasticsearch.cluster.routing.ShardRoutingState;
30+
import org.elasticsearch.cluster.routing.TestShardRouting;
31+
import org.elasticsearch.common.bytes.BytesArray;
32+
import org.elasticsearch.common.settings.Settings;
33+
import org.elasticsearch.core.internal.io.IOUtils;
34+
import org.elasticsearch.index.IndexSettings;
35+
import org.elasticsearch.index.mapper.ParsedDocument;
36+
import org.elasticsearch.index.seqno.ReplicationTracker;
37+
import org.elasticsearch.index.seqno.SequenceNumbers;
38+
import org.elasticsearch.index.shard.DocsStats;
39+
import org.elasticsearch.index.store.Store;
40+
import org.elasticsearch.index.translog.Translog;
41+
import org.elasticsearch.index.translog.TranslogCorruptedException;
42+
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
43+
import org.elasticsearch.test.IndexSettingsModule;
44+
45+
import java.io.IOException;
46+
import java.io.UncheckedIOException;
47+
import java.nio.file.Path;
48+
import java.util.Collections;
49+
import java.util.concurrent.atomic.AtomicLong;
50+
51+
import static org.hamcrest.Matchers.equalTo;
52+
import static org.hamcrest.Matchers.instanceOf;
53+
import static org.hamcrest.Matchers.is;
54+
55+
public class NoOpEngineTests extends EngineTestCase {
56+
private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY);
57+
58+
public void testNoopEngine() throws IOException {
59+
engine.close();
60+
final NoOpEngine engine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir));
61+
expectThrows(UnsupportedOperationException.class, () -> engine.syncFlush(null, null));
62+
expectThrows(UnsupportedOperationException.class, () -> engine.ensureTranslogSynced(null));
63+
assertThat(engine.refreshNeeded(), equalTo(false));
64+
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
65+
engine.close();
66+
}
67+
68+
public void testTwoNoopEngines() throws IOException {
69+
engine.close();
70+
// Ensure that we can't open two noop engines for the same store
71+
final EngineConfig engineConfig = noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir);
72+
try (NoOpEngine ignored = new NoOpEngine(engineConfig)) {
73+
UncheckedIOException e = expectThrows(UncheckedIOException.class, () -> new NoOpEngine(engineConfig));
74+
assertThat(e.getCause(), instanceOf(LockObtainFailedException.class));
75+
}
76+
}
77+
78+
public void testNoopAfterRegularEngine() throws IOException {
79+
int docs = randomIntBetween(1, 10);
80+
ReplicationTracker tracker = (ReplicationTracker) engine.config().getGlobalCheckpointSupplier();
81+
ShardRouting routing = TestShardRouting.newShardRouting("test", shardId.id(), "node",
82+
null, true, ShardRoutingState.STARTED, allocationId);
83+
IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build();
84+
tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table, Collections.emptySet());
85+
tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
86+
for (int i = 0; i < docs; i++) {
87+
ParsedDocument doc = testParsedDocument("" + i, null, testDocumentWithTextField(), B_1, null);
88+
engine.index(indexForDoc(doc));
89+
tracker.updateLocalCheckpoint(allocationId.getId(), i);
90+
}
91+
92+
flushAndTrimTranslog(engine);
93+
94+
long localCheckpoint = engine.getLocalCheckpoint();
95+
long maxSeqNo = engine.getSeqNoStats(100L).getMaxSeqNo();
96+
engine.close();
97+
98+
final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker));
99+
assertThat(noOpEngine.getLocalCheckpoint(), equalTo(localCheckpoint));
100+
assertThat(noOpEngine.getSeqNoStats(100L).getMaxSeqNo(), equalTo(maxSeqNo));
101+
try (Engine.IndexCommitRef ref = noOpEngine.acquireLastIndexCommit(false)) {
102+
try (IndexReader reader = DirectoryReader.open(ref.getIndexCommit())) {
103+
assertThat(reader.numDocs(), equalTo(docs));
104+
}
105+
}
106+
noOpEngine.close();
107+
}
108+
109+
public void testNoopEngineWithInvalidTranslogUUID() throws IOException {
110+
IOUtils.close(engine, store);
111+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
112+
try (Store store = createStore()) {
113+
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
114+
int numDocs = scaledRandomIntBetween(10, 100);
115+
try (InternalEngine engine = createEngine(config)) {
116+
for (int i = 0; i < numDocs; i++) {
117+
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
118+
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
119+
System.nanoTime(), -1, false));
120+
if (rarely()) {
121+
engine.flush();
122+
}
123+
globalCheckpoint.set(engine.getLocalCheckpoint());
124+
}
125+
flushAndTrimTranslog(engine);
126+
}
127+
128+
final Path newTranslogDir = createTempDir();
129+
// A new translog will have a different UUID than the existing store/noOp engine does
130+
Translog newTranslog = createTranslog(newTranslogDir, () -> 1L);
131+
newTranslog.close();
132+
133+
EngineCreationFailureException e = expectThrows(EngineCreationFailureException.class,
134+
() -> new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, newTranslogDir)));
135+
assertThat(e.getCause(), instanceOf(TranslogCorruptedException.class));
136+
}
137+
}
138+
139+
public void testNoopEngineWithNonZeroTranslogOperations() throws IOException {
140+
IOUtils.close(engine, store);
141+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
142+
try (Store store = createStore()) {
143+
final MergePolicy mergePolicy = NoMergePolicy.INSTANCE;
144+
EngineConfig config = config(defaultSettings, store, createTempDir(), mergePolicy, null, null, globalCheckpoint::get);
145+
int numDocs = scaledRandomIntBetween(10, 100);
146+
try (InternalEngine engine = createEngine(config)) {
147+
for (int i = 0; i < numDocs; i++) {
148+
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
149+
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
150+
System.nanoTime(), -1, false));
151+
if (rarely()) {
152+
engine.flush();
153+
}
154+
globalCheckpoint.set(engine.getLocalCheckpoint());
155+
}
156+
engine.syncTranslog();
157+
engine.flushAndClose();
158+
engine.close();
159+
160+
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new NoOpEngine(engine.engineConfig));
161+
assertThat(e.getMessage(), is("Expected 0 translog operations but there were " + numDocs));
162+
}
163+
}
164+
}
165+
166+
public void testNoOpEngineDocStats() throws Exception {
167+
IOUtils.close(engine, store);
168+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
169+
try (Store store = createStore()) {
170+
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
171+
final int numDocs = scaledRandomIntBetween(10, 3000);
172+
int deletions = 0;
173+
try (InternalEngine engine = createEngine(config)) {
174+
for (int i = 0; i < numDocs; i++) {
175+
engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null)));
176+
if (rarely()) {
177+
engine.flush();
178+
}
179+
globalCheckpoint.set(engine.getLocalCheckpoint());
180+
}
181+
182+
for (int i = 0; i < numDocs; i++) {
183+
if (randomBoolean()) {
184+
String delId = Integer.toString(i);
185+
Engine.DeleteResult result = engine.delete(new Engine.Delete("test", delId, newUid(delId), primaryTerm.get()));
186+
assertTrue(result.isFound());
187+
globalCheckpoint.set(engine.getLocalCheckpoint());
188+
deletions += 1;
189+
}
190+
}
191+
engine.waitForOpsToComplete(numDocs + deletions - 1);
192+
flushAndTrimTranslog(engine);
193+
engine.close();
194+
}
195+
196+
final DocsStats expectedDocStats;
197+
try (InternalEngine engine = createEngine(config)) {
198+
expectedDocStats = engine.docStats();
199+
}
200+
201+
try (NoOpEngine noOpEngine = new NoOpEngine(config)) {
202+
assertEquals(expectedDocStats.getCount(), noOpEngine.docStats().getCount());
203+
assertEquals(expectedDocStats.getDeleted(), noOpEngine.docStats().getDeleted());
204+
assertEquals(expectedDocStats.getTotalSizeInBytes(), noOpEngine.docStats().getTotalSizeInBytes());
205+
assertEquals(expectedDocStats.getAverageSizeInBytes(), noOpEngine.docStats().getAverageSizeInBytes());
206+
} catch (AssertionError e) {
207+
logger.error(config.getMergePolicy());
208+
throw e;
209+
}
210+
}
211+
}
212+
213+
private void flushAndTrimTranslog(final InternalEngine engine) {
214+
engine.flush(true, true);
215+
final TranslogDeletionPolicy deletionPolicy = engine.getTranslog().getDeletionPolicy();
216+
deletionPolicy.setRetentionSizeInBytes(-1);
217+
deletionPolicy.setRetentionAgeInMillis(-1);
218+
deletionPolicy.setMinTranslogGenerationForRecovery(engine.getTranslog().getGeneration().translogFileGeneration);
219+
engine.flush(true, true);
220+
}
221+
}

0 commit comments

Comments
 (0)