Skip to content

Commit 8ab2bcf

Browse files
committed
Use soft-deleted docs to resolve strategy for engine operation (#35230)
A CCR test failure shows that the approach in #34474 is flawed. Restoring the LocalCheckpointTracker from an index commit can cause both FollowingEngine and InternalEngine to incorrectly ignore some deletes. Here is a small scenario illustrating the problem: 1. Delete doc with seq=1 => engine will add a delete tombstone to Lucene 2. Flush a commit consisting of only the delete tombstone 3. Index doc with seq=0 => engine will add that doc to Lucene but soft-deleted 4. Restart an engine with the commit (step 2); the engine will fill its LocalCheckpointTracker with the delete tombstone in the commit 5. Replay the local translog in reverse order: index#0 then delete#1 6. When process index#0, an engine will add it into Lucene as a live doc and advance the local checkpoint to 1 (seq#1 was restored from the commit - step 4). 7. When process delete#1, an engine will skip it because seq_no=1 is less than or equal to the local checkpoint. We should have zero document after recovering from translog, but here we have one. Since all operations after the local checkpoint of the safe commit are retained, we should find them if the look-up considers also soft-deleted documents. This PR fills the disparity between the version map and the local checkpoint tracker by taking soft-deleted documents into account while resolving strategy for engine operations. Relates #34474 Relates #33656
1 parent 6189998 commit 8ab2bcf

File tree

4 files changed

+151
-51
lines changed

4 files changed

+151
-51
lines changed

server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -140,16 +140,36 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
140140
DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException {
141141
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
142142
"context's reader is not the same as the reader class was initialized on.";
143-
int docID = getDocID(id, context.reader().getLiveDocs());
144-
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
145-
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
146-
long seqNo;
147-
if (seqNos != null && seqNos.advanceExact(docID)) {
148-
seqNo = seqNos.longValue();
149-
} else {
150-
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
143+
// termsEnum can possibly be null here if this leaf contains only no-ops.
144+
if (termsEnum != null && termsEnum.seekExact(id)) {
145+
docsEnum = termsEnum.postings(docsEnum, 0);
146+
final Bits liveDocs = context.reader().getLiveDocs();
147+
DocIdAndSeqNo result = null;
148+
int docID = docsEnum.nextDoc();
149+
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
150+
final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
151+
for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) {
152+
final long seqNo;
153+
if (seqNoDV != null && seqNoDV.advanceExact(docID)) {
154+
seqNo = seqNoDV.longValue();
155+
} else {
156+
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
157+
}
158+
final boolean isLive = (liveDocs == null || liveDocs.get(docID));
159+
if (isLive) {
160+
// The live document must always be the latest copy, thus we can early terminate here.
161+
// If a nested docs is live, we return the first doc which doesn't have term (only the last doc has term).
162+
// This should not be an issue since we no longer use primary term as tier breaker when comparing operations.
163+
assert result == null || result.seqNo <= seqNo :
164+
"the live doc does not have the highest seq_no; live_seq_no=" + seqNo + " < deleted_seq_no=" + result.seqNo;
165+
return new DocIdAndSeqNo(docID, seqNo, context, isLive);
166+
}
167+
if (result == null || result.seqNo < seqNo) {
168+
result = new DocIdAndSeqNo(docID, seqNo, context, isLive);
169+
}
170+
}
151171
}
152-
return new DocIdAndSeqNo(docID, seqNo, context);
172+
return result;
153173
} else {
154174
return null;
155175
}

server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,9 @@
2222
import org.apache.lucene.index.IndexReader;
2323
import org.apache.lucene.index.LeafReader;
2424
import org.apache.lucene.index.LeafReaderContext;
25-
import org.apache.lucene.index.NumericDocValues;
2625
import org.apache.lucene.index.Term;
2726
import org.apache.lucene.util.CloseableThreadLocal;
2827
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
29-
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
3028

3129
import java.io.IOException;
3230
import java.util.List;
@@ -114,11 +112,13 @@ public static class DocIdAndSeqNo {
114112
public final int docId;
115113
public final long seqNo;
116114
public final LeafReaderContext context;
115+
public final boolean isLive;
117116

118-
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) {
117+
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context, boolean isLive) {
119118
this.docId = docId;
120119
this.seqNo = seqNo;
121120
this.context = context;
121+
this.isLive = isLive;
122122
}
123123
}
124124

@@ -146,41 +146,34 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term)
146146
}
147147

148148
/**
149-
* Load the internal doc ID and sequence number for the uid from the reader, returning<ul>
150-
* <li>null if the uid wasn't found,
151-
* <li>a doc ID and the associated seqNo otherwise
152-
* </ul>
149+
* Loads the internal docId and sequence number of the latest copy for a given uid from the provided reader.
150+
* The flag {@link DocIdAndSeqNo#isLive} indicates whether the returned document is live or (soft)deleted.
151+
* This returns {@code null} if no such document matching the given term uid.
153152
*/
154153
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
155-
PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
156-
List<LeafReaderContext> leaves = reader.leaves();
154+
final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
155+
final List<LeafReaderContext> leaves = reader.leaves();
156+
DocIdAndSeqNo latest = null;
157157
// iterate backwards to optimize for the frequently updated documents
158158
// which are likely to be in the last segments
159159
for (int i = leaves.size() - 1; i >= 0; i--) {
160160
final LeafReaderContext leaf = leaves.get(i);
161-
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
162-
DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf);
163-
if (result != null) {
161+
final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
162+
final DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf);
163+
if (result == null) {
164+
continue;
165+
}
166+
if (result.isLive) {
167+
// The live document must always be the latest copy, thus we can early terminate here.
168+
assert latest == null || latest.seqNo <= result.seqNo :
169+
"the live doc does not have the highest seq_no; live_seq_no=" + result.seqNo + " < deleted_seq_no=" + latest.seqNo;
164170
return result;
165171
}
172+
if (latest == null || latest.seqNo < result.seqNo) {
173+
latest = result;
174+
}
166175
}
167-
return null;
168-
}
169-
170-
/**
171-
* Load the primaryTerm associated with the given {@link DocIdAndSeqNo}
172-
*/
173-
public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo, String uidField) throws IOException {
174-
NumericDocValues primaryTerms = docIdAndSeqNo.context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
175-
long result;
176-
if (primaryTerms != null && primaryTerms.advanceExact(docIdAndSeqNo.docId)) {
177-
result = primaryTerms.longValue();
178-
} else {
179-
result = 0;
180-
}
181-
assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]"
182-
+ " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]";
183-
return result;
176+
return latest;
184177
}
185178

186179
/**

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -679,31 +679,32 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
679679
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
680680
assert incrementVersionLookup();
681681
if (versionValue != null) {
682-
if (op.seqNo() > versionValue.seqNo ||
683-
(op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term))
682+
if (op.seqNo() > versionValue.seqNo) {
684683
status = OpVsLuceneDocStatus.OP_NEWER;
685-
else {
684+
} else if (op.seqNo() == versionValue.seqNo) {
685+
assert versionValue.term == op.primaryTerm() : "primary term not matched; id=" + op.id() + " seq_no=" + op.seqNo()
686+
+ " op_term=" + op.primaryTerm() + " existing_term=" + versionValue.term;
687+
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
688+
} else {
686689
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
687690
}
688691
} else {
689692
// load from index
690693
assert incrementIndexVersionLookup();
691694
try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) {
692-
DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid());
695+
final DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid());
693696
if (docAndSeqNo == null) {
694697
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
695698
} else if (op.seqNo() > docAndSeqNo.seqNo) {
696-
status = OpVsLuceneDocStatus.OP_NEWER;
697-
} else if (op.seqNo() == docAndSeqNo.seqNo) {
698-
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
699-
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
700-
// load term to tie break
701-
final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field());
702-
if (op.primaryTerm() > existingTerm) {
699+
if (docAndSeqNo.isLive) {
703700
status = OpVsLuceneDocStatus.OP_NEWER;
704701
} else {
705-
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
702+
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
706703
}
704+
} else if (op.seqNo() == docAndSeqNo.seqNo) {
705+
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
706+
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
707+
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
707708
} else {
708709
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
709710
}

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.elasticsearch.cluster.routing.ShardRouting;
7878
import org.elasticsearch.cluster.routing.ShardRoutingState;
7979
import org.elasticsearch.cluster.routing.TestShardRouting;
80+
import org.elasticsearch.common.CheckedRunnable;
8081
import org.elasticsearch.common.Randomness;
8182
import org.elasticsearch.common.Strings;
8283
import org.elasticsearch.common.UUIDs;
@@ -140,6 +141,7 @@
140141
import java.util.Base64;
141142
import java.util.Collections;
142143
import java.util.Comparator;
144+
import java.util.HashMap;
143145
import java.util.HashSet;
144146
import java.util.Iterator;
145147
import java.util.LinkedHashMap;
@@ -3712,6 +3714,81 @@ public void testSequenceIDs() throws Exception {
37123714
searchResult.close();
37133715
}
37143716

3717+
public void testLookupSeqNoByIdInLucene() throws Exception {
3718+
int numOps = between(10, 100);
3719+
long seqNo = 0;
3720+
List<Engine.Operation> operations = new ArrayList<>(numOps);
3721+
for (int i = 0; i < numOps; i++) {
3722+
String id = Integer.toString(between(1, 50));
3723+
boolean isIndexing = randomBoolean();
3724+
int copies = frequently() ? 1 : between(2, 4);
3725+
for (int c = 0; c < copies; c++) {
3726+
final ParsedDocument doc = EngineTestCase.createParsedDoc(id, null);
3727+
if (isIndexing) {
3728+
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
3729+
i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true));
3730+
} else {
3731+
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
3732+
i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis()));
3733+
}
3734+
}
3735+
seqNo++;
3736+
if (rarely()) {
3737+
seqNo++;
3738+
}
3739+
}
3740+
Randomness.shuffle(operations);
3741+
Settings.Builder settings = Settings.builder()
3742+
.put(defaultSettings.getSettings())
3743+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
3744+
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
3745+
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
3746+
Map<String, Engine.Operation> latestOps = new HashMap<>(); // id -> latest seq_no
3747+
try (Store store = createStore();
3748+
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))) {
3749+
CheckedRunnable<IOException> lookupAndCheck = () -> {
3750+
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
3751+
for (String id : latestOps.keySet()) {
3752+
String msg = "latestOps=" + latestOps + " op=" + id;
3753+
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id));
3754+
assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo()));
3755+
assertThat(msg, docIdAndSeqNo.isLive, equalTo(latestOps.get(id).operationType() == Engine.Operation.TYPE.INDEX));
3756+
}
3757+
assertThat(VersionsAndSeqNoResolver.loadDocIdAndVersion(
3758+
searcher.reader(), newUid("any-" + between(1, 10))), nullValue());
3759+
Map<String, Long> liveOps = latestOps.entrySet().stream()
3760+
.filter(e -> e.getValue().operationType() == Engine.Operation.TYPE.INDEX)
3761+
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().seqNo()));
3762+
assertThat(getDocIds(engine, true).stream().collect(Collectors.toMap(e -> e.getId(), e -> e.getSeqNo())),
3763+
equalTo(liveOps));
3764+
}
3765+
};
3766+
for (Engine.Operation op : operations) {
3767+
if (op instanceof Engine.Index) {
3768+
engine.index((Engine.Index) op);
3769+
if (latestOps.containsKey(op.id()) == false || latestOps.get(op.id()).seqNo() < op.seqNo()) {
3770+
latestOps.put(op.id(), op);
3771+
}
3772+
} else if (op instanceof Engine.Delete) {
3773+
engine.delete((Engine.Delete) op);
3774+
if (latestOps.containsKey(op.id()) == false || latestOps.get(op.id()).seqNo() < op.seqNo()) {
3775+
latestOps.put(op.id(), op);
3776+
}
3777+
}
3778+
if (randomInt(100) < 10) {
3779+
engine.refresh("test");
3780+
lookupAndCheck.run();
3781+
}
3782+
if (rarely()) {
3783+
engine.flush();
3784+
lookupAndCheck.run();
3785+
}
3786+
}
3787+
engine.refresh("test");
3788+
lookupAndCheck.run();
3789+
}
3790+
}
3791+
37153792
/**
37163793
* A sequence number generator that will generate a sequence number and if {@code stall} is set to true will wait on the barrier and the
37173794
* referenced latch before returning. If the local checkpoint should advance (because {@code stall} is false, then the value of
@@ -4089,7 +4166,11 @@ private Tuple<Long, Long> getSequenceID(Engine engine, Engine.Get get) throws En
40894166
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
40904167
} else {
40914168
seqNo = docIdAndSeqNo.seqNo;
4092-
primaryTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docIdAndSeqNo, get.uid().field());
4169+
NumericDocValues primaryTerms = docIdAndSeqNo.context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
4170+
if (primaryTerms == null || primaryTerms.advanceExact(docIdAndSeqNo.docId) == false) {
4171+
throw new AssertionError("document does not have primary term [" + docIdAndSeqNo.docId + "]");
4172+
}
4173+
primaryTerm = primaryTerms.longValue();
40934174
}
40944175
return new Tuple<>(seqNo, primaryTerm);
40954176
} catch (Exception e) {
@@ -5194,6 +5275,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
51945275
commits.add(new ArrayList<>());
51955276
try (Store store = createStore()) {
51965277
EngineConfig config = config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get);
5278+
final List<DocIdSeqNoAndTerm> docs;
51975279
try (InternalEngine engine = createEngine(config)) {
51985280
List<Engine.Operation> flushedOperations = new ArrayList<>();
51995281
for (Engine.Operation op : operations) {
@@ -5216,6 +5298,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
52165298
}
52175299
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
52185300
engine.syncTranslog();
5301+
docs = getDocIds(engine, true);
52195302
}
52205303
trimUnsafeCommits(config);
52215304
List<Engine.Operation> safeCommit = null;
@@ -5232,6 +5315,9 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
52325315
assertThat("seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(),
52335316
tracker.contains(op.seqNo()), equalTo(safeCommit.contains(op)));
52345317
}
5318+
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
5319+
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
5320+
assertThat(getDocIds(engine, true), equalTo(docs));
52355321
}
52365322
}
52375323
}

0 commit comments

Comments
 (0)