Skip to content

Commit eb8b317

Browse files
bleskesdnhatn
authored andcommitted
Move trimming unsafe commits from engine ctor to store (#29260)
As follow up to #28245 , this PR removes the logic for selecting the right start commit from the Engine constructor in favor of explicitly trimming them in the Store, before the engine is opened. This makes the constructor in engine follow standard Lucene semantics and use the last commit. Relates #28245 Relates #29156
1 parent 04d0edc commit eb8b317

File tree

9 files changed

+207
-150
lines changed

9 files changed

+207
-150
lines changed

docs/reference/indices/flush.asciidoc

+2-2
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,12 @@ which returns something similar to:
9393
{
9494
"commit" : {
9595
"id" : "3M3zkw2GHMo2Y4h4/KFKCg==",
96-
"generation" : 4,
96+
"generation" : 3,
9797
"user_data" : {
9898
"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA",
9999
"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ",
100100
"local_checkpoint" : "-1",
101-
"translog_generation" : "3",
101+
"translog_generation" : "2",
102102
"max_seq_no" : "-1",
103103
"sync_id" : "AVvFY-071siAOuFGEO9P", <1>
104104
"max_unsafe_auto_id_timestamp" : "-1"

server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

+7-40
Original file line numberDiff line numberDiff line change
@@ -47,60 +47,27 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
4747
private final Logger logger;
4848
private final TranslogDeletionPolicy translogDeletionPolicy;
4949
private final LongSupplier globalCheckpointSupplier;
50-
private final IndexCommit startingCommit;
5150
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
5251
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
5352
private volatile IndexCommit lastCommit; // the most recent commit point
5453

55-
CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy,
56-
LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
54+
CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) {
5755
this.logger = logger;
5856
this.translogDeletionPolicy = translogDeletionPolicy;
5957
this.globalCheckpointSupplier = globalCheckpointSupplier;
60-
this.startingCommit = startingCommit;
6158
this.snapshottedCommits = new ObjectIntHashMap<>();
6259
}
6360

6461
@Override
6562
public synchronized void onInit(List<? extends IndexCommit> commits) throws IOException {
6663
assert commits.isEmpty() == false : "index is opened, but we have no commits";
67-
assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; "
68-
+ "startingCommit [" + startingCommit + "], commit list [" + commits + "]";
69-
keepOnlyStartingCommitOnInit(commits);
70-
updateTranslogDeletionPolicy();
71-
}
72-
73-
/**
74-
* Keeping existing unsafe commits when opening an engine can be problematic because these commits are not safe
75-
* at the recovering time but they can suddenly become safe in the future.
76-
* The following issues can happen if unsafe commits are kept oninit.
77-
* <p>
78-
* 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1)
79-
* and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2)
80-
* is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use
81-
* the unsafe commit c2(max_seqno=2 at most gcp=2) as the starting commit for sequenced-based recovery even the
82-
* commit c2 contains a stale operation and the document(with seqno=2) will not be replicated to the replica.
83-
* <p>
84-
* 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit
85-
* c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2).
86-
* The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new
87-
* commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery
88-
* translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1
89-
* while the local checkpoint of c2 is 2.
90-
* <p>
91-
* 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced
92-
* (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit,
93-
* the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog.
94-
*/
95-
private void keepOnlyStartingCommitOnInit(List<? extends IndexCommit> commits) throws IOException {
96-
for (IndexCommit commit : commits) {
97-
if (startingCommit.equals(commit) == false) {
98-
this.deleteCommit(commit);
99-
}
64+
onCommit(commits);
65+
if (safeCommit != commits.get(commits.size() - 1)) {
66+
throw new IllegalStateException("Engine is opened, but the last commit isn't safe. Global checkpoint ["
67+
+ globalCheckpointSupplier.getAsLong() + "], seqNo is last commit ["
68+
+ SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommit.getUserData().entrySet()) + "], "
69+
+ "seqNos in safe commit [" + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getUserData().entrySet()) + "]");
10070
}
101-
assert startingCommit.isDeleted() == false : "Starting commit must not be deleted";
102-
lastCommit = startingCommit;
103-
safeCommit = startingCommit;
10471
}
10572

10673
@Override

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+11-42
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,8 @@
4141
import org.apache.lucene.store.Directory;
4242
import org.apache.lucene.store.LockObtainFailedException;
4343
import org.apache.lucene.util.BytesRef;
44-
import org.elasticsearch.core.internal.io.IOUtils;
4544
import org.apache.lucene.util.InfoStream;
4645
import org.elasticsearch.ExceptionsHelper;
47-
import org.elasticsearch.Version;
4846
import org.elasticsearch.action.index.IndexRequest;
4947
import org.elasticsearch.common.Nullable;
5048
import org.elasticsearch.common.SuppressForbidden;
@@ -59,6 +57,7 @@
5957
import org.elasticsearch.common.metrics.CounterMetric;
6058
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
6159
import org.elasticsearch.common.util.concurrent.ReleasableLock;
60+
import org.elasticsearch.core.internal.io.IOUtils;
6261
import org.elasticsearch.index.IndexSettings;
6362
import org.elasticsearch.index.VersionType;
6463
import org.elasticsearch.index.mapper.IdFieldMapper;
@@ -70,16 +69,13 @@
7069
import org.elasticsearch.index.seqno.SequenceNumbers;
7170
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
7271
import org.elasticsearch.index.shard.ShardId;
73-
import org.elasticsearch.index.store.Store;
7472
import org.elasticsearch.index.translog.Translog;
7573
import org.elasticsearch.index.translog.TranslogConfig;
7674
import org.elasticsearch.index.translog.TranslogCorruptedException;
7775
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
7876
import org.elasticsearch.threadpool.ThreadPool;
7977

8078
import java.io.IOException;
81-
import java.io.UncheckedIOException;
82-
import java.util.ArrayList;
8379
import java.util.Arrays;
8480
import java.util.Collection;
8581
import java.util.HashMap;
@@ -183,12 +179,10 @@ public InternalEngine(EngineConfig engineConfig) {
183179
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
184180
assert translog.getGeneration() != null;
185181
this.translog = translog;
186-
final IndexCommit startingCommit = getStartingCommitPoint();
187-
assert startingCommit != null : "Starting commit should be non-null";
188-
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit);
189-
this.combinedDeletionPolicy = new CombinedDeletionPolicy(logger, translogDeletionPolicy,
190-
translog::getLastSyncedGlobalCheckpoint, startingCommit);
191-
writer = createWriter(startingCommit);
182+
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
183+
this.combinedDeletionPolicy =
184+
new CombinedDeletionPolicy(logger, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint);
185+
writer = createWriter();
192186
bootstrapAppendOnlyInfoFromWriter(writer);
193187
historyUUID = loadOrGenerateHistoryUUID(writer);
194188
Objects.requireNonNull(historyUUID, "history uuid should not be null");
@@ -232,10 +226,11 @@ public InternalEngine(EngineConfig engineConfig) {
232226
}
233227

234228
private LocalCheckpointTracker createLocalCheckpointTracker(
235-
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier, IndexCommit startingCommit) throws IOException {
229+
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
236230
final long maxSeqNo;
237231
final long localCheckpoint;
238-
final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(startingCommit);
232+
final SequenceNumbers.CommitInfo seqNoStats =
233+
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet());
239234
maxSeqNo = seqNoStats.maxSeqNo;
240235
localCheckpoint = seqNoStats.localCheckpoint;
241236
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
@@ -395,31 +390,6 @@ public void skipTranslogRecovery() {
395390
pendingTranslogRecovery.set(false); // we are good - now we can commit
396391
}
397392

398-
private IndexCommit getStartingCommitPoint() throws IOException {
399-
final IndexCommit startingIndexCommit;
400-
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
401-
final long minRetainedTranslogGen = translog.getMinFileGeneration();
402-
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
403-
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
404-
// are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
405-
// To avoid this issue, we only select index commits whose translog are fully retained.
406-
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
407-
final List<IndexCommit> recoverableCommits = new ArrayList<>();
408-
for (IndexCommit commit : existingCommits) {
409-
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
410-
recoverableCommits.add(commit);
411-
}
412-
}
413-
assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " +
414-
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
415-
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
416-
} else {
417-
// TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
418-
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
419-
}
420-
return startingIndexCommit;
421-
}
422-
423393
private void recoverFromTranslogInternal() throws IOException {
424394
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
425395
final int opsRecovered;
@@ -1907,9 +1877,9 @@ private long loadCurrentVersionFromIndex(Term uid) throws IOException {
19071877
}
19081878
}
19091879

1910-
private IndexWriter createWriter(IndexCommit startingCommit) throws IOException {
1880+
private IndexWriter createWriter() throws IOException {
19111881
try {
1912-
final IndexWriterConfig iwc = getIndexWriterConfig(startingCommit);
1882+
final IndexWriterConfig iwc = getIndexWriterConfig();
19131883
return createWriter(store.directory(), iwc);
19141884
} catch (LockObtainFailedException ex) {
19151885
logger.warn("could not lock IndexWriter", ex);
@@ -1922,11 +1892,10 @@ IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOEx
19221892
return new IndexWriter(directory, iwc);
19231893
}
19241894

1925-
private IndexWriterConfig getIndexWriterConfig(IndexCommit startingCommit) {
1895+
private IndexWriterConfig getIndexWriterConfig() {
19261896
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
19271897
iwc.setCommitOnClose(false); // we by default don't commit on close
19281898
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
1929-
iwc.setIndexCommit(startingCommit);
19301899
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
19311900
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
19321901
boolean verbose = false;

server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java

+8
Original file line numberDiff line numberDiff line change
@@ -122,5 +122,13 @@ public CommitInfo(long maxSeqNo, long localCheckpoint) {
122122
this.maxSeqNo = maxSeqNo;
123123
this.localCheckpoint = localCheckpoint;
124124
}
125+
126+
@Override
127+
public String toString() {
128+
return "CommitInfo{" +
129+
"maxSeqNo=" + maxSeqNo +
130+
", localCheckpoint=" + localCheckpoint +
131+
'}';
132+
}
125133
}
126134
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+3
Original file line numberDiff line numberDiff line change
@@ -1317,6 +1317,9 @@ private void innerOpenEngineAndTranslog() throws IOException {
13171317

13181318
assertMaxUnsafeAutoIdInCommit();
13191319

1320+
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);
1321+
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated());
1322+
13201323
createNewEngine(config);
13211324
verifyNotClosed();
13221325
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,

0 commit comments

Comments
 (0)