Skip to content

Commit eeaaf50

Browse files
committed
Relax NoOpEngine constraints
1 parent b00b323 commit eeaaf50

File tree

3 files changed

+58
-127
lines changed

3 files changed

+58
-127
lines changed

server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java

Lines changed: 4 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -24,54 +24,20 @@
2424
import org.apache.lucene.index.IndexWriter;
2525
import org.apache.lucene.index.LeafReader;
2626
import org.apache.lucene.store.Directory;
27-
import org.elasticsearch.common.Nullable;
28-
import org.elasticsearch.core.internal.io.IOUtils;
29-
import org.elasticsearch.index.translog.Translog;
30-
import org.elasticsearch.index.translog.TranslogConfig;
31-
import org.elasticsearch.index.translog.TranslogCorruptedException;
32-
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
3327

3428
import java.io.IOException;
3529
import java.util.List;
36-
import java.util.Map;
37-
import java.util.function.LongSupplier;
38-
import java.util.stream.Stream;
30+
import java.util.function.Function;
3931

4032
/**
4133
* NoOpEngine is an engine implementation that does nothing but the bare minimum
4234
* required in order to have an engine. All attempts to do something (search,
43-
* index, get), throw {@link UnsupportedOperationException}. This does maintain
44-
* a translog with a deletion policy so that when flushing, no translog is
45-
* retained on disk (setting a retention size and age of 0).
46-
*
47-
* It's also important to notice that this does list the commits of the Store's
48-
* Directory so that the last commit's user data can be read for the historyUUID
49-
* and last committed segment info.
35+
* index, get), throw {@link UnsupportedOperationException}.
5036
*/
5137
public final class NoOpEngine extends ReadOnlyEngine {
5238

53-
public NoOpEngine(EngineConfig engineConfig) {
54-
super(engineConfig, null, null, true, directoryReader -> directoryReader);
55-
boolean success = false;
56-
try {
57-
// The deletion policy for the translog should not keep any translogs around, so the min age/size is set to -1
58-
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1);
59-
60-
// The translog is opened and closed to validate that the translog UUID from lucene is the same as the one in the translog
61-
try (Translog translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier())) {
62-
final int nbOperations = translog.totalOperations();
63-
if (nbOperations != 0) {
64-
throw new IllegalArgumentException("Expected 0 translog operations but there were " + nbOperations);
65-
}
66-
}
67-
success = true;
68-
} catch (IOException | TranslogCorruptedException e) {
69-
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
70-
} finally {
71-
if (success == false) {
72-
IOUtils.closeWhileHandlingException(this);
73-
}
74-
}
39+
public NoOpEngine(EngineConfig config) {
40+
super(config, null, null, true, Function.identity());
7541
}
7642

7743
@Override
@@ -121,30 +87,4 @@ public CacheHelper getReaderCacheHelper() {
12187
}
12288
};
12389
}
124-
125-
private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy,
126-
LongSupplier globalCheckpointSupplier) throws IOException {
127-
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
128-
final String translogUUID = loadTranslogUUIDFromLastCommit();
129-
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
130-
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier,
131-
engineConfig.getPrimaryTermSupplier());
132-
}
133-
134-
/**
135-
* Reads the current stored translog ID from the last commit data.
136-
*/
137-
@Nullable
138-
private String loadTranslogUUIDFromLastCommit() {
139-
final Map<String, String> commitUserData = getLastCommittedSegmentInfos().getUserData();
140-
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
141-
throw new IllegalStateException("Commit doesn't contain translog generation id");
142-
}
143-
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
144-
}
145-
146-
@Override
147-
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) {
148-
throw new UnsupportedOperationException("Translog synchronization should never be needed");
149-
}
15090
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.index.engine;
20+
21+
import org.elasticsearch.cluster.routing.RecoverySource.ExistingStoreRecoverySource;
22+
import org.elasticsearch.cluster.routing.ShardRouting;
23+
import org.elasticsearch.common.settings.Settings;
24+
import org.elasticsearch.index.shard.IndexShard;
25+
import org.elasticsearch.index.shard.IndexShardTestCase;
26+
27+
import java.io.IOException;
28+
29+
import static org.elasticsearch.cluster.routing.ShardRoutingHelper.initWithSameId;
30+
31+
public class NoOpEngineRecoveryTests extends IndexShardTestCase {
32+
33+
public void testRecoverFromNoOp() throws IOException {
34+
final int nbDocs = scaledRandomIntBetween(1, 100);
35+
36+
final IndexShard indexShard = newStartedShard(true);
37+
for (int i = 0; i < nbDocs; i++) {
38+
indexDoc(indexShard, "_doc", String.valueOf(i));
39+
}
40+
indexShard.close("test", true);
41+
42+
final ShardRouting shardRouting = indexShard.routingEntry();
43+
IndexShard primary = reinitShard(indexShard, initWithSameId(shardRouting, ExistingStoreRecoverySource.INSTANCE), NoOpEngine::new);
44+
recoverShardFromStore(primary);
45+
assertEquals(primary.seqNoStats().getMaxSeqNo(), primary.getMaxSeqNoOfUpdatesOrDeletes());
46+
assertEquals(nbDocs, primary.docStats().getCount());
47+
48+
IndexShard replica = newShard(false, Settings.EMPTY, NoOpEngine::new);
49+
recoverReplica(replica, primary, true);
50+
assertEquals(replica.seqNoStats().getMaxSeqNo(), replica.getMaxSeqNoOfUpdatesOrDeletes());
51+
assertEquals(nbDocs, replica.docStats().getCount());
52+
closeShards(primary, replica);
53+
}
54+
}

server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,12 @@
2121

2222
import org.apache.lucene.index.DirectoryReader;
2323
import org.apache.lucene.index.IndexReader;
24-
import org.apache.lucene.index.MergePolicy;
2524
import org.apache.lucene.index.NoMergePolicy;
2625
import org.apache.lucene.store.LockObtainFailedException;
2726
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2827
import org.elasticsearch.cluster.routing.ShardRouting;
2928
import org.elasticsearch.cluster.routing.ShardRoutingState;
3029
import org.elasticsearch.cluster.routing.TestShardRouting;
31-
import org.elasticsearch.common.bytes.BytesArray;
3230
import org.elasticsearch.common.settings.Settings;
3331
import org.elasticsearch.core.internal.io.IOUtils;
3432
import org.elasticsearch.index.IndexSettings;
@@ -37,8 +35,6 @@
3735
import org.elasticsearch.index.seqno.SequenceNumbers;
3836
import org.elasticsearch.index.shard.DocsStats;
3937
import org.elasticsearch.index.store.Store;
40-
import org.elasticsearch.index.translog.Translog;
41-
import org.elasticsearch.index.translog.TranslogCorruptedException;
4238
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
4339
import org.elasticsearch.test.IndexSettingsModule;
4440

@@ -50,7 +46,6 @@
5046

5147
import static org.hamcrest.Matchers.equalTo;
5248
import static org.hamcrest.Matchers.instanceOf;
53-
import static org.hamcrest.Matchers.is;
5449

5550
public class NoOpEngineTests extends EngineTestCase {
5651
private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY);
@@ -59,7 +54,6 @@ public void testNoopEngine() throws IOException {
5954
engine.close();
6055
final NoOpEngine engine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir));
6156
expectThrows(UnsupportedOperationException.class, () -> engine.syncFlush(null, null));
62-
expectThrows(UnsupportedOperationException.class, () -> engine.ensureTranslogSynced(null));
6357
assertThat(engine.refreshNeeded(), equalTo(false));
6458
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
6559
engine.close();
@@ -106,63 +100,6 @@ public void testNoopAfterRegularEngine() throws IOException {
106100
noOpEngine.close();
107101
}
108102

109-
public void testNoopEngineWithInvalidTranslogUUID() throws IOException {
110-
IOUtils.close(engine, store);
111-
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
112-
try (Store store = createStore()) {
113-
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
114-
int numDocs = scaledRandomIntBetween(10, 100);
115-
try (InternalEngine engine = createEngine(config)) {
116-
for (int i = 0; i < numDocs; i++) {
117-
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
118-
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
119-
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
120-
if (rarely()) {
121-
engine.flush();
122-
}
123-
globalCheckpoint.set(engine.getLocalCheckpoint());
124-
}
125-
flushAndTrimTranslog(engine);
126-
}
127-
128-
final Path newTranslogDir = createTempDir();
129-
// A new translog will have a different UUID than the existing store/noOp engine does
130-
Translog newTranslog = createTranslog(newTranslogDir, () -> 1L);
131-
newTranslog.close();
132-
133-
EngineCreationFailureException e = expectThrows(EngineCreationFailureException.class,
134-
() -> new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, newTranslogDir)));
135-
assertThat(e.getCause(), instanceOf(TranslogCorruptedException.class));
136-
}
137-
}
138-
139-
public void testNoopEngineWithNonZeroTranslogOperations() throws IOException {
140-
IOUtils.close(engine, store);
141-
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
142-
try (Store store = createStore()) {
143-
final MergePolicy mergePolicy = NoMergePolicy.INSTANCE;
144-
EngineConfig config = config(defaultSettings, store, createTempDir(), mergePolicy, null, null, globalCheckpoint::get);
145-
int numDocs = scaledRandomIntBetween(10, 100);
146-
try (InternalEngine engine = createEngine(config)) {
147-
for (int i = 0; i < numDocs; i++) {
148-
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
149-
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
150-
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
151-
if (rarely()) {
152-
engine.flush();
153-
}
154-
globalCheckpoint.set(engine.getLocalCheckpoint());
155-
}
156-
engine.syncTranslog();
157-
engine.flushAndClose();
158-
engine.close();
159-
160-
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new NoOpEngine(engine.engineConfig));
161-
assertThat(e.getMessage(), is("Expected 0 translog operations but there were " + numDocs));
162-
}
163-
}
164-
}
165-
166103
public void testNoOpEngineDocStats() throws Exception {
167104
IOUtils.close(engine, store);
168105
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

0 commit comments

Comments
 (0)