Skip to content

Commit 04385a9

Browse files
authored
Restoring from snapshot should force generation of a new history uuid (elastic#26694)
Restoring a shard from snapshot throws the primary back in time violating assumptions and bringing the validity of global checkpoints in question. To avoid problems, we should make sure that a shard that was restored will never be the source of an ops based recovery to a shard that existed before the restore. To this end we have introduced the notion of `histroy_uuid` in elastic#26577 and required that both source and target will have the same history to allow ops based recoveries. This PR make sure that a shard gets a new uuid after restore. As suggested by @ywelsch , I derived the creation of a `history_uuid` from the `RecoverySource` of the shard. Store recovery will only generate a uuid if it doesn't already exist (we can make this stricter when we don't need to deal with 5.x indices). Peer recovery follows the same logic (note that this is different than the approach in elastic#26557, I went this way as it means that shards always have a history uuid after being recovered on a 6.x node and will also mean that a rolling restart is enough for old indices to step over to the new seq no model). Local shards and snapshot force the generation of a new translog uuid. Relates elastic#10708 Closes elastic#26544
1 parent 332b4d1 commit 04385a9

File tree

8 files changed

+273
-52
lines changed

8 files changed

+273
-52
lines changed

core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public final class EngineConfig {
7171
private final List<ReferenceManager.RefreshListener> refreshListeners;
7272
@Nullable
7373
private final Sort indexSort;
74+
private final boolean forceNewHistoryUUID;
7475
private final TranslogRecoveryRunner translogRecoveryRunner;
7576

7677
/**
@@ -115,8 +116,9 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr
115116
MergePolicy mergePolicy, Analyzer analyzer,
116117
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
117118
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
118-
TranslogConfig translogConfig, TimeValue flushMergesAfter, List<ReferenceManager.RefreshListener> refreshListeners,
119-
Sort indexSort, TranslogRecoveryRunner translogRecoveryRunner) {
119+
boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter,
120+
List<ReferenceManager.RefreshListener> refreshListeners, Sort indexSort,
121+
TranslogRecoveryRunner translogRecoveryRunner) {
120122
if (openMode == null) {
121123
throw new IllegalArgumentException("openMode must not be null");
122124
}
@@ -141,6 +143,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr
141143
this.translogConfig = translogConfig;
142144
this.flushMergesAfter = flushMergesAfter;
143145
this.openMode = openMode;
146+
this.forceNewHistoryUUID = forceNewHistoryUUID;
144147
this.refreshListeners = refreshListeners;
145148
this.indexSort = indexSort;
146149
this.translogRecoveryRunner = translogRecoveryRunner;
@@ -300,6 +303,15 @@ public OpenMode getOpenMode() {
300303
return openMode;
301304
}
302305

306+
307+
/**
308+
* Returns true if a new history uuid must be generated. If false, a new uuid will only be generated if no existing
309+
* one is found.
310+
*/
311+
public boolean getForceNewHistoryUUID() {
312+
return forceNewHistoryUUID;
313+
}
314+
303315
@FunctionalInterface
304316
public interface TranslogRecoveryRunner {
305317
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -177,23 +177,15 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
177177
switch (openMode) {
178178
case OPEN_INDEX_AND_TRANSLOG:
179179
writer = createWriter(false);
180-
String existingHistoryUUID = loadHistoryUUIDFromCommit(writer);
181-
if (existingHistoryUUID == null) {
182-
historyUUID = UUIDs.randomBase64UUID();
183-
} else {
184-
historyUUID = existingHistoryUUID;
185-
}
186180
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
187181
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
188182
break;
189183
case OPEN_INDEX_CREATE_TRANSLOG:
190184
writer = createWriter(false);
191-
historyUUID = loadHistoryUUIDFromCommit(writer);
192185
seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
193186
break;
194187
case CREATE_INDEX_AND_TRANSLOG:
195188
writer = createWriter(true);
196-
historyUUID = UUIDs.randomBase64UUID();
197189
seqNoStats = new SeqNoStats(
198190
SequenceNumbers.NO_OPS_PERFORMED,
199191
SequenceNumbers.NO_OPS_PERFORMED,
@@ -205,9 +197,13 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
205197
logger.trace("recovered [{}]", seqNoStats);
206198
seqNoService = sequenceNumberService(shardId, allocationId, engineConfig.getIndexSettings(), seqNoStats);
207199
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
200+
historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID());
201+
Objects.requireNonNull(historyUUID, "history uuid should not be null");
208202
indexWriter = writer;
209203
translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint());
210204
assert translog.getGeneration() != null;
205+
this.translog = translog;
206+
updateWriterOnOpen();
211207
} catch (IOException | TranslogCorruptedException e) {
212208
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
213209
} catch (AssertionError e) {
@@ -219,8 +215,6 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
219215
throw e;
220216
}
221217
}
222-
223-
this.translog = translog;
224218
manager = createSearcherManager();
225219
this.searcherManager = manager;
226220
this.versionMap.setManager(searcherManager);
@@ -375,24 +369,32 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, Tra
375369
throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
376370
}
377371
}
378-
final Translog translog = new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier);
379-
if (translogUUID == null) {
380-
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be "
381-
+ EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
382-
boolean success = false;
383-
try {
384-
commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
385-
? commitDataAsMap(writer).get(SYNC_COMMIT_ID) : null);
386-
success = true;
387-
} finally {
388-
if (success == false) {
389-
IOUtils.closeWhileHandlingException(translog);
390-
}
391-
}
372+
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier);
373+
}
374+
375+
/** If needed, updates the metadata in the index writer to match the potentially new translog and history uuid */
376+
private void updateWriterOnOpen() throws IOException {
377+
Objects.requireNonNull(historyUUID);
378+
final Map<String, String> commitUserData = commitDataAsMap(indexWriter);
379+
boolean needsCommit = false;
380+
if (historyUUID.equals(commitUserData.get(HISTORY_UUID_KEY)) == false) {
381+
needsCommit = true;
382+
} else {
383+
assert config().getForceNewHistoryUUID() == false : "config forced a new history uuid but it didn't change";
384+
assert openMode != EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG : "new index but it already has an existing history uuid";
385+
}
386+
if (translog.getTranslogUUID().equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) {
387+
needsCommit = true;
388+
} else {
389+
assert openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "translog uuid didn't change but open mode is " + openMode;
390+
}
391+
if (needsCommit) {
392+
commitIndexWriter(indexWriter, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
393+
? commitUserData.get(SYNC_COMMIT_ID) : null);
392394
}
393-
return translog;
394395
}
395396

397+
396398
@Override
397399
public Translog getTranslog() {
398400
ensureOpen();
@@ -424,14 +426,17 @@ private String loadTranslogUUIDFromCommit(IndexWriter writer) throws IOException
424426
}
425427

426428
/**
427-
* Reads the current stored history ID from the IW commit data. If the id is not found, returns null.
429+
* Reads the current stored history ID from the IW commit data. Generates a new UUID if not found or if generation is forced.
428430
*/
429-
@Nullable
430-
private String loadHistoryUUIDFromCommit(final IndexWriter writer) throws IOException {
431+
private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean forceNew) throws IOException {
431432
String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY);
432-
if (uuid == null) {
433-
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) :
434-
"index was created after 6_0_0_rc1 but has no history uuid";
433+
if (uuid == null || forceNew) {
434+
assert
435+
forceNew || // recovery from a local store creates an index that doesn't have yet a history_uuid
436+
openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG ||
437+
config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) :
438+
"existing index was created after 6_0_0_rc1 but has no history uuid";
439+
uuid = UUIDs.randomBase64UUID();
435440
}
436441
return uuid;
437442
}
@@ -1923,9 +1928,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
19231928
}
19241929
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
19251930
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
1926-
if (historyUUID != null) {
1927-
commitData.put(HISTORY_UUID_KEY, historyUUID);
1928-
}
1931+
commitData.put(HISTORY_UUID_KEY, historyUUID);
19291932
logger.trace("committing writer with commit data [{}]", commitData);
19301933
return commitData.entrySet().iterator();
19311934
});

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2099,10 +2099,24 @@ private DocumentMapperForType docMapper(String type) {
20992099

21002100
private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
21012101
Sort indexSort = indexSortSupplier.get();
2102+
final boolean forceNewHistoryUUID;
2103+
switch (shardRouting.recoverySource().getType()) {
2104+
case EXISTING_STORE:
2105+
case PEER:
2106+
forceNewHistoryUUID = false;
2107+
break;
2108+
case EMPTY_STORE:
2109+
case SNAPSHOT:
2110+
case LOCAL_SHARDS:
2111+
forceNewHistoryUUID = true;
2112+
break;
2113+
default:
2114+
throw new AssertionError("unknown recovery type: [" + shardRouting.recoverySource().getType() + "]");
2115+
}
21022116
return new EngineConfig(openMode, shardId, shardRouting.allocationId().getId(),
21032117
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
21042118
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,
2105-
indexCache.query(), cachingPolicy, translogConfig,
2119+
indexCache.query(), cachingPolicy, forceNewHistoryUUID, translogConfig,
21062120
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
21072121
Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort,
21082122
this::runTranslogRecovery);

core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,10 @@
3535
import org.elasticsearch.cluster.metadata.MappingMetaData;
3636
import org.elasticsearch.cluster.routing.RecoverySource;
3737
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
38-
import org.elasticsearch.common.UUIDs;
3938
import org.elasticsearch.common.lucene.Lucene;
4039
import org.elasticsearch.common.unit.ByteSizeValue;
4140
import org.elasticsearch.common.unit.TimeValue;
4241
import org.elasticsearch.index.Index;
43-
import org.elasticsearch.index.engine.Engine;
4442
import org.elasticsearch.index.engine.EngineException;
4543
import org.elasticsearch.index.engine.InternalEngine;
4644
import org.elasticsearch.index.mapper.MapperService;
@@ -164,11 +162,10 @@ void addIndices(
164162
* document-level semantics.
165163
*/
166164
writer.setLiveCommitData(() -> {
167-
final HashMap<String, String> liveCommitData = new HashMap<>(4);
165+
final HashMap<String, String> liveCommitData = new HashMap<>(3);
168166
liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
169167
liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
170168
liveCommitData.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp));
171-
liveCommitData.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
172169
return liveCommitData.entrySet().iterator();
173170
});
174171
writer.commit();

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.index.engine;
2121

2222
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
23-
2423
import org.apache.logging.log4j.Level;
2524
import org.apache.logging.log4j.LogManager;
2625
import org.apache.logging.log4j.Logger;
@@ -270,8 +269,8 @@ public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, An
270269
return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(),
271270
config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
272271
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
273-
config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort(),
274-
config.getTranslogRecoveryRunner());
272+
config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(),
273+
config.getIndexSort(), config.getTranslogRecoveryRunner());
275274
}
276275

277276
@Override
@@ -452,7 +451,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
452451
refreshListener == null ? emptyList() : Collections.singletonList(refreshListener);
453452
EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store,
454453
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
455-
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
454+
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig,
456455
TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler);
457456

458457
return config;
@@ -2796,8 +2795,8 @@ public void testRecoverFromForeignTranslog() throws IOException {
27962795
EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, allocationId.getId(),
27972796
threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
27982797
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
2799-
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), config.getRefreshListeners(),
2800-
null, config.getTranslogRecoveryRunner());
2798+
IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5),
2799+
config.getRefreshListeners(), null, config.getTranslogRecoveryRunner());
28012800

28022801
try {
28032802
InternalEngine internalEngine = new InternalEngine(brokenConfig);
@@ -2809,7 +2808,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
28092808
assertVisibleCount(engine, numDocs, false);
28102809
}
28112810

2812-
public void testRecoverFromStoreSetsHistoryUUIDIfNeeded() throws IOException {
2811+
public void testHistoryUUIDIsSetIfMissing() throws IOException {
28132812
final int numDocs = randomIntBetween(0, 3);
28142813
for (int i = 0; i < numDocs; i++) {
28152814
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
@@ -2842,11 +2841,56 @@ public void testRecoverFromStoreSetsHistoryUUIDIfNeeded() throws IOException {
28422841
.put(defaultSettings.getSettings())
28432842
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0_beta1)
28442843
.build());
2845-
engine = createEngine(indexSettings, store, primaryTranslogDir, newMergePolicy(), null);
2846-
assertVisibleCount(engine, numDocs, false);
2844+
2845+
EngineConfig config = engine.config();
2846+
2847+
EngineConfig newConfig = new EngineConfig(
2848+
randomBoolean() ? EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG,
2849+
shardId, allocationId.getId(),
2850+
threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
2851+
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
2852+
IndexSearcher.getDefaultQueryCachingPolicy(), false, config.getTranslogConfig(), TimeValue.timeValueMinutes(5),
2853+
config.getRefreshListeners(), null, config.getTranslogRecoveryRunner());
2854+
engine = new InternalEngine(newConfig);
2855+
if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
2856+
engine.recoverFromTranslog();
2857+
assertVisibleCount(engine, numDocs, false);
2858+
} else {
2859+
assertVisibleCount(engine, 0, false);
2860+
}
28472861
assertThat(engine.getHistoryUUID(), notNullValue());
28482862
}
28492863

2864+
public void testHistoryUUIDCanBeForced() throws IOException {
2865+
final int numDocs = randomIntBetween(0, 3);
2866+
for (int i = 0; i < numDocs; i++) {
2867+
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
2868+
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
2869+
Engine.IndexResult index = engine.index(firstIndexRequest);
2870+
assertThat(index.getVersion(), equalTo(1L));
2871+
}
2872+
assertVisibleCount(engine, numDocs);
2873+
final String oldHistoryUUID = engine.getHistoryUUID();
2874+
engine.close();
2875+
EngineConfig config = engine.config();
2876+
2877+
EngineConfig newConfig = new EngineConfig(
2878+
randomBoolean() ? EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG,
2879+
shardId, allocationId.getId(),
2880+
threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
2881+
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
2882+
IndexSearcher.getDefaultQueryCachingPolicy(), true, config.getTranslogConfig(), TimeValue.timeValueMinutes(5),
2883+
config.getRefreshListeners(), null, config.getTranslogRecoveryRunner());
2884+
engine = new InternalEngine(newConfig);
2885+
if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
2886+
engine.recoverFromTranslog();
2887+
assertVisibleCount(engine, numDocs, false);
2888+
} else {
2889+
assertVisibleCount(engine, 0, false);
2890+
}
2891+
assertThat(engine.getHistoryUUID(), not(equalTo(oldHistoryUUID)));
2892+
}
2893+
28502894
public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException {
28512895
AtomicReference<Exception> exception = new AtomicReference<>();
28522896
String operation = randomFrom("optimize", "refresh", "flush");

0 commit comments

Comments
 (0)