Skip to content

Commit bf65cb4

Browse files
authored
Untangle Engine Constructor logic (#28245)
Currently we have a fairly complicated logic in the engine constructor logic to deal with all the various ways we want to mutate the lucene index and translog we're opening. We can: 1) Create an empty index 2) Use the lucene but create a new translog 3) Use both 4) Force a new history uuid in all cases. This leads complicated code flows which makes it harder and harder to make sure we cover all the corner cases. This PR tries to take another approach. Constructing an InternalEngine always opens things as they are and all needed modifications are done by static methods directly on the directory, one at a time.
1 parent 8e8fdc4 commit bf65cb4

20 files changed

+620
-686
lines changed

docs/reference/indices/flush.asciidoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,12 @@ which returns something similar to:
9393
{
9494
"commit" : {
9595
"id" : "3M3zkw2GHMo2Y4h4/KFKCg==",
96-
"generation" : 2,
96+
"generation" : 3,
9797
"user_data" : {
9898
"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA",
9999
"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ",
100100
"local_checkpoint" : "-1",
101-
"translog_generation" : "2",
101+
"translog_generation" : "3",
102102
"max_seq_no" : "-1",
103103
"sync_id" : "AVvFY-071siAOuFGEO9P", <1>
104104
"max_unsafe_auto_id_timestamp" : "-1"

server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,14 @@
4646
public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
4747
private final Logger logger;
4848
private final TranslogDeletionPolicy translogDeletionPolicy;
49-
private final EngineConfig.OpenMode openMode;
5049
private final LongSupplier globalCheckpointSupplier;
5150
private final IndexCommit startingCommit;
5251
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
5352
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
5453
private volatile IndexCommit lastCommit; // the most recent commit point
5554

56-
CombinedDeletionPolicy(EngineConfig.OpenMode openMode, Logger logger, TranslogDeletionPolicy translogDeletionPolicy,
55+
CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy,
5756
LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
58-
this.openMode = openMode;
5957
this.logger = logger;
6058
this.translogDeletionPolicy = translogDeletionPolicy;
6159
this.globalCheckpointSupplier = globalCheckpointSupplier;
@@ -65,25 +63,11 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
6563

6664
@Override
6765
public synchronized void onInit(List<? extends IndexCommit> commits) throws IOException {
68-
switch (openMode) {
69-
case CREATE_INDEX_AND_TRANSLOG:
70-
assert startingCommit == null : "CREATE_INDEX_AND_TRANSLOG must not have starting commit; commit [" + startingCommit + "]";
71-
break;
72-
case OPEN_INDEX_CREATE_TRANSLOG:
73-
case OPEN_INDEX_AND_TRANSLOG:
74-
assert commits.isEmpty() == false : "index is opened, but we have no commits";
75-
assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; "
76-
+ "startingCommit [" + startingCommit + "], commit list [" + commits + "]";
77-
keepOnlyStartingCommitOnInit(commits);
78-
// OPEN_INDEX_CREATE_TRANSLOG can open an index commit from other shard with a different translog history,
79-
// We therefore should not use that index commit to update the translog deletion policy.
80-
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
81-
updateTranslogDeletionPolicy();
82-
}
83-
break;
84-
default:
85-
throw new IllegalArgumentException("unknown openMode [" + openMode + "]");
86-
}
66+
assert commits.isEmpty() == false : "index is opened, but we have no commits";
67+
assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; "
68+
+ "startingCommit [" + startingCommit + "], commit list [" + commits + "]";
69+
keepOnlyStartingCommitOnInit(commits);
70+
updateTranslogDeletionPolicy();
8771
}
8872

8973
/**

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ public final class EngineConfig {
7575
private final List<ReferenceManager.RefreshListener> internalRefreshListener;
7676
@Nullable
7777
private final Sort indexSort;
78-
private final boolean forceNewHistoryUUID;
7978
private final TranslogRecoveryRunner translogRecoveryRunner;
8079
@Nullable
8180
private final CircuitBreakerService circuitBreakerService;
@@ -113,24 +112,20 @@ public final class EngineConfig {
113112
Property.IndexScope, Property.Dynamic);
114113

115114
private final TranslogConfig translogConfig;
116-
private final OpenMode openMode;
117115

118116
/**
119117
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
120118
*/
121-
public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, ThreadPool threadPool,
119+
public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
122120
IndexSettings indexSettings, Engine.Warmer warmer, Store store,
123121
MergePolicy mergePolicy, Analyzer analyzer,
124122
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
125123
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
126-
boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter,
124+
TranslogConfig translogConfig, TimeValue flushMergesAfter,
127125
List<ReferenceManager.RefreshListener> externalRefreshListener,
128126
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
129127
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService,
130128
LongSupplier globalCheckpointSupplier) {
131-
if (openMode == null) {
132-
throw new IllegalArgumentException("openMode must not be null");
133-
}
134129
this.shardId = shardId;
135130
this.allocationId = allocationId;
136131
this.indexSettings = indexSettings;
@@ -151,8 +146,6 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr
151146
this.queryCachingPolicy = queryCachingPolicy;
152147
this.translogConfig = translogConfig;
153148
this.flushMergesAfter = flushMergesAfter;
154-
this.openMode = openMode;
155-
this.forceNewHistoryUUID = forceNewHistoryUUID;
156149
this.externalRefreshListener = externalRefreshListener;
157150
this.internalRefreshListener = internalRefreshListener;
158151
this.indexSort = indexSort;
@@ -315,22 +308,6 @@ public TranslogConfig getTranslogConfig() {
315308
*/
316309
public TimeValue getFlushMergesAfter() { return flushMergesAfter; }
317310

318-
/**
319-
* Returns the {@link OpenMode} for this engine config.
320-
*/
321-
public OpenMode getOpenMode() {
322-
return openMode;
323-
}
324-
325-
326-
/**
327-
* Returns true if a new history uuid must be generated. If false, a new uuid will only be generated if no existing
328-
* one is found.
329-
*/
330-
public boolean getForceNewHistoryUUID() {
331-
return forceNewHistoryUUID;
332-
}
333-
334311
@FunctionalInterface
335312
public interface TranslogRecoveryRunner {
336313
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
@@ -343,20 +320,6 @@ public TranslogRecoveryRunner getTranslogRecoveryRunner() {
343320
return translogRecoveryRunner;
344321
}
345322

346-
/**
347-
* Engine open mode defines how the engine should be opened or in other words what the engine should expect
348-
* to recover from. We either create a brand new engine with a new index and translog or we recover from an existing index.
349-
* If the index exists we also have the ability open only the index and create a new transaction log which happens
350-
* during remote recovery since we have already transferred the index files but the translog is replayed from remote. The last
351-
* and safest option opens the lucene index as well as it's referenced transaction log for a translog recovery.
352-
* See also {@link Engine#recoverFromTranslog()}
353-
*/
354-
public enum OpenMode {
355-
CREATE_INDEX_AND_TRANSLOG,
356-
OPEN_INDEX_CREATE_TRANSLOG,
357-
OPEN_INDEX_AND_TRANSLOG;
358-
}
359-
360323
/**
361324
* The refresh listeners to add to Lucene for externally visible refreshes
362325
*/
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.engine;
21+
22+
import org.apache.lucene.index.DirectoryReader;
23+
import org.apache.lucene.index.IndexCommit;
24+
import org.apache.lucene.index.IndexWriter;
25+
import org.apache.lucene.index.IndexWriterConfig;
26+
import org.apache.lucene.index.NoMergePolicy;
27+
import org.apache.lucene.store.Directory;
28+
import org.elasticsearch.Assertions;
29+
import org.elasticsearch.common.UUIDs;
30+
import org.elasticsearch.index.seqno.SequenceNumbers;
31+
import org.elasticsearch.index.shard.ShardId;
32+
import org.elasticsearch.index.store.Store;
33+
import org.elasticsearch.index.translog.Translog;
34+
35+
import java.io.IOException;
36+
import java.nio.file.Path;
37+
import java.util.Collections;
38+
import java.util.HashMap;
39+
import java.util.List;
40+
import java.util.Map;
41+
42+
43+
/**
44+
* This class contains utility methods for mutating the shard lucene index and translog as a preparation to be opened.
45+
*/
46+
public abstract class EngineDiskUtils {
47+
48+
/**
49+
* creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted.
50+
*/
51+
public static void createEmpty(final Directory dir, final Path translogPath, final ShardId shardId) throws IOException {
52+
try (IndexWriter writer = newIndexWriter(true, dir)) {
53+
final String translogUuid = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId);
54+
final Map<String, String> map = new HashMap<>();
55+
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
56+
map.put(Translog.TRANSLOG_UUID_KEY, translogUuid);
57+
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
58+
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
59+
map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
60+
map.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, "-1");
61+
updateCommitData(writer, map);
62+
}
63+
}
64+
65+
66+
/**
67+
* Converts an existing lucene index and marks it with a new history uuid. Also creates a new empty translog file.
68+
* This is used to make sure no existing shard will recovery from this index using ops based recovery.
69+
*/
70+
public static void bootstrapNewHistoryFromLuceneIndex(final Directory dir, final Path translogPath, final ShardId shardId)
71+
throws IOException {
72+
try (IndexWriter writer = newIndexWriter(false, dir)) {
73+
final Map<String, String> userData = getUserData(writer);
74+
final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO));
75+
final String translogUuid = Translog.createEmptyTranslog(translogPath, maxSeqNo, shardId);
76+
final Map<String, String> map = new HashMap<>();
77+
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
78+
map.put(Translog.TRANSLOG_UUID_KEY, translogUuid);
79+
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
80+
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
81+
updateCommitData(writer, map);
82+
}
83+
}
84+
85+
/**
86+
* Creates a new empty translog and associates it with an existing lucene index.
87+
*/
88+
public static void createNewTranslog(final Directory dir, final Path translogPath, long initialGlobalCheckpoint, final ShardId shardId)
89+
throws IOException {
90+
if (Assertions.ENABLED) {
91+
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(dir);
92+
assert existingCommits.size() == 1 : "creating a translog translog should have one commit, commits[" + existingCommits + "]";
93+
SequenceNumbers.CommitInfo commitInfo = Store.loadSeqNoInfo(existingCommits.get(0));
94+
assert commitInfo.localCheckpoint >= initialGlobalCheckpoint :
95+
"trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint ["
96+
+ initialGlobalCheckpoint + "]";
97+
}
98+
99+
try (IndexWriter writer = newIndexWriter(false, dir)) {
100+
final String translogUuid = Translog.createEmptyTranslog(translogPath, initialGlobalCheckpoint, shardId);
101+
final Map<String, String> map = new HashMap<>();
102+
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
103+
map.put(Translog.TRANSLOG_UUID_KEY, translogUuid);
104+
updateCommitData(writer, map);
105+
}
106+
}
107+
108+
109+
/**
110+
* Checks that the Lucene index contains a history uuid marker. If not, a new one is generated and committed.
111+
*/
112+
public static void ensureIndexHasHistoryUUID(final Directory dir) throws IOException {
113+
try (IndexWriter writer = newIndexWriter(false, dir)) {
114+
final Map<String, String> userData = getUserData(writer);
115+
if (userData.containsKey(Engine.HISTORY_UUID_KEY) == false) {
116+
updateCommitData(writer, Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()));
117+
}
118+
}
119+
}
120+
121+
private static void updateCommitData(IndexWriter writer, Map<String, String> keysToUpdate) throws IOException {
122+
final Map<String, String> userData = getUserData(writer);
123+
userData.putAll(keysToUpdate);
124+
writer.setLiveCommitData(userData.entrySet());
125+
writer.commit();
126+
}
127+
128+
private static Map<String, String> getUserData(IndexWriter writer) {
129+
final Map<String, String> userData = new HashMap<>();
130+
writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));
131+
return userData;
132+
}
133+
134+
private static IndexWriter newIndexWriter(final boolean create, final Directory dir) throws IOException {
135+
IndexWriterConfig iwc = new IndexWriterConfig(null)
136+
.setCommitOnClose(false)
137+
// we don't want merges to happen here - we call maybe merge on the engine
138+
// later once we stared it up otherwise we would need to wait for it here
139+
// we also don't specify a codec here and merges should use the engines for this index
140+
.setMergePolicy(NoMergePolicy.INSTANCE)
141+
.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
142+
return new IndexWriter(dir, iwc);
143+
}
144+
}

0 commit comments

Comments
 (0)