Skip to content

Commit ed8732b

Browse files
authored
Use soft-deleted docs to resolve strategy for engine operation (#35230)
A CCR test failure shows that the approach in #34474 is flawed. Restoring the LocalCheckpointTracker from an index commit can cause both FollowingEngine and InternalEngine to incorrectly ignore some deletes. Here is a small scenario illustrating the problem: 1. Delete doc with seq=1 => engine will add a delete tombstone to Lucene 2. Flush a commit consisting of only the delete tombstone 3. Index doc with seq=0 => engine will add that doc to Lucene but soft-deleted 4. Restart an engine with the commit (step 2); the engine will fill its LocalCheckpointTracker with the delete tombstone in the commit 5. Replay the local translog in reverse order: index#0 then delete#1 6. When process index#0, an engine will add it into Lucene as a live doc and advance the local checkpoint to 1 (seq#1 was restored from the commit - step 4). 7. When process delete#1, an engine will skip it because seq_no=1 is less than or equal to the local checkpoint. We should have zero document after recovering from translog, but here we have one. Since all operations after the local checkpoint of the safe commit are retained, we should find them if the look-up considers also soft-deleted documents. This PR fills the disparity between the version map and the local checkpoint tracker by taking soft-deleted documents into account while resolving strategy for engine operations. Relates #34474 Relates #33656
1 parent c51dcb1 commit ed8732b

File tree

4 files changed

+151
-51
lines changed

4 files changed

+151
-51
lines changed

server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -140,16 +140,36 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
140140
DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException {
141141
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
142142
"context's reader is not the same as the reader class was initialized on.";
143-
int docID = getDocID(id, context.reader().getLiveDocs());
144-
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
145-
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
146-
long seqNo;
147-
if (seqNos != null && seqNos.advanceExact(docID)) {
148-
seqNo = seqNos.longValue();
149-
} else {
150-
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
143+
// termsEnum can possibly be null here if this leaf contains only no-ops.
144+
if (termsEnum != null && termsEnum.seekExact(id)) {
145+
docsEnum = termsEnum.postings(docsEnum, 0);
146+
final Bits liveDocs = context.reader().getLiveDocs();
147+
DocIdAndSeqNo result = null;
148+
int docID = docsEnum.nextDoc();
149+
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
150+
final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
151+
for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) {
152+
final long seqNo;
153+
if (seqNoDV != null && seqNoDV.advanceExact(docID)) {
154+
seqNo = seqNoDV.longValue();
155+
} else {
156+
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
157+
}
158+
final boolean isLive = (liveDocs == null || liveDocs.get(docID));
159+
if (isLive) {
160+
// The live document must always be the latest copy, thus we can early terminate here.
161+
// If a nested docs is live, we return the first doc which doesn't have term (only the last doc has term).
162+
// This should not be an issue since we no longer use primary term as tier breaker when comparing operations.
163+
assert result == null || result.seqNo <= seqNo :
164+
"the live doc does not have the highest seq_no; live_seq_no=" + seqNo + " < deleted_seq_no=" + result.seqNo;
165+
return new DocIdAndSeqNo(docID, seqNo, context, isLive);
166+
}
167+
if (result == null || result.seqNo < seqNo) {
168+
result = new DocIdAndSeqNo(docID, seqNo, context, isLive);
169+
}
170+
}
151171
}
152-
return new DocIdAndSeqNo(docID, seqNo, context);
172+
return result;
153173
} else {
154174
return null;
155175
}

server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,9 @@
2222
import org.apache.lucene.index.IndexReader;
2323
import org.apache.lucene.index.LeafReader;
2424
import org.apache.lucene.index.LeafReaderContext;
25-
import org.apache.lucene.index.NumericDocValues;
2625
import org.apache.lucene.index.Term;
2726
import org.apache.lucene.util.CloseableThreadLocal;
2827
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
29-
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
3028

3129
import java.io.IOException;
3230
import java.util.List;
@@ -114,11 +112,13 @@ public static class DocIdAndSeqNo {
114112
public final int docId;
115113
public final long seqNo;
116114
public final LeafReaderContext context;
115+
public final boolean isLive;
117116

118-
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) {
117+
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context, boolean isLive) {
119118
this.docId = docId;
120119
this.seqNo = seqNo;
121120
this.context = context;
121+
this.isLive = isLive;
122122
}
123123
}
124124

@@ -146,41 +146,34 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term)
146146
}
147147

148148
/**
149-
* Load the internal doc ID and sequence number for the uid from the reader, returning<ul>
150-
* <li>null if the uid wasn't found,
151-
* <li>a doc ID and the associated seqNo otherwise
152-
* </ul>
149+
* Loads the internal docId and sequence number of the latest copy for a given uid from the provided reader.
150+
* The flag {@link DocIdAndSeqNo#isLive} indicates whether the returned document is live or (soft)deleted.
151+
* This returns {@code null} if no such document matching the given term uid.
153152
*/
154153
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
155-
PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
156-
List<LeafReaderContext> leaves = reader.leaves();
154+
final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
155+
final List<LeafReaderContext> leaves = reader.leaves();
156+
DocIdAndSeqNo latest = null;
157157
// iterate backwards to optimize for the frequently updated documents
158158
// which are likely to be in the last segments
159159
for (int i = leaves.size() - 1; i >= 0; i--) {
160160
final LeafReaderContext leaf = leaves.get(i);
161-
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
162-
DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf);
163-
if (result != null) {
161+
final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
162+
final DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf);
163+
if (result == null) {
164+
continue;
165+
}
166+
if (result.isLive) {
167+
// The live document must always be the latest copy, thus we can early terminate here.
168+
assert latest == null || latest.seqNo <= result.seqNo :
169+
"the live doc does not have the highest seq_no; live_seq_no=" + result.seqNo + " < deleted_seq_no=" + latest.seqNo;
164170
return result;
165171
}
172+
if (latest == null || latest.seqNo < result.seqNo) {
173+
latest = result;
174+
}
166175
}
167-
return null;
168-
}
169-
170-
/**
171-
* Load the primaryTerm associated with the given {@link DocIdAndSeqNo}
172-
*/
173-
public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo, String uidField) throws IOException {
174-
NumericDocValues primaryTerms = docIdAndSeqNo.context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
175-
long result;
176-
if (primaryTerms != null && primaryTerms.advanceExact(docIdAndSeqNo.docId)) {
177-
result = primaryTerms.longValue();
178-
} else {
179-
result = 0;
180-
}
181-
assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]"
182-
+ " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]";
183-
return result;
176+
return latest;
184177
}
185178

186179
/**

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -666,31 +666,32 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
666666
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
667667
assert incrementVersionLookup();
668668
if (versionValue != null) {
669-
if (op.seqNo() > versionValue.seqNo ||
670-
(op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term))
669+
if (op.seqNo() > versionValue.seqNo) {
671670
status = OpVsLuceneDocStatus.OP_NEWER;
672-
else {
671+
} else if (op.seqNo() == versionValue.seqNo) {
672+
assert versionValue.term == op.primaryTerm() : "primary term not matched; id=" + op.id() + " seq_no=" + op.seqNo()
673+
+ " op_term=" + op.primaryTerm() + " existing_term=" + versionValue.term;
674+
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
675+
} else {
673676
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
674677
}
675678
} else {
676679
// load from index
677680
assert incrementIndexVersionLookup();
678681
try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) {
679-
DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid());
682+
final DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid());
680683
if (docAndSeqNo == null) {
681684
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
682685
} else if (op.seqNo() > docAndSeqNo.seqNo) {
683-
status = OpVsLuceneDocStatus.OP_NEWER;
684-
} else if (op.seqNo() == docAndSeqNo.seqNo) {
685-
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
686-
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
687-
// load term to tie break
688-
final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field());
689-
if (op.primaryTerm() > existingTerm) {
686+
if (docAndSeqNo.isLive) {
690687
status = OpVsLuceneDocStatus.OP_NEWER;
691688
} else {
692-
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
689+
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
693690
}
691+
} else if (op.seqNo() == docAndSeqNo.seqNo) {
692+
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
693+
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
694+
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
694695
} else {
695696
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
696697
}

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import org.elasticsearch.cluster.routing.ShardRouting;
7777
import org.elasticsearch.cluster.routing.ShardRoutingState;
7878
import org.elasticsearch.cluster.routing.TestShardRouting;
79+
import org.elasticsearch.common.CheckedRunnable;
7980
import org.elasticsearch.common.Randomness;
8081
import org.elasticsearch.common.Strings;
8182
import org.elasticsearch.common.UUIDs;
@@ -139,6 +140,7 @@
139140
import java.util.Base64;
140141
import java.util.Collections;
141142
import java.util.Comparator;
143+
import java.util.HashMap;
142144
import java.util.HashSet;
143145
import java.util.Iterator;
144146
import java.util.LinkedHashMap;
@@ -3681,6 +3683,81 @@ public void testSequenceIDs() throws Exception {
36813683
searchResult.close();
36823684
}
36833685

3686+
public void testLookupSeqNoByIdInLucene() throws Exception {
3687+
int numOps = between(10, 100);
3688+
long seqNo = 0;
3689+
List<Engine.Operation> operations = new ArrayList<>(numOps);
3690+
for (int i = 0; i < numOps; i++) {
3691+
String id = Integer.toString(between(1, 50));
3692+
boolean isIndexing = randomBoolean();
3693+
int copies = frequently() ? 1 : between(2, 4);
3694+
for (int c = 0; c < copies; c++) {
3695+
final ParsedDocument doc = EngineTestCase.createParsedDoc(id, null);
3696+
if (isIndexing) {
3697+
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
3698+
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true));
3699+
} else {
3700+
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
3701+
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis()));
3702+
}
3703+
}
3704+
seqNo++;
3705+
if (rarely()) {
3706+
seqNo++;
3707+
}
3708+
}
3709+
Randomness.shuffle(operations);
3710+
Settings.Builder settings = Settings.builder()
3711+
.put(defaultSettings.getSettings())
3712+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
3713+
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
3714+
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
3715+
Map<String, Engine.Operation> latestOps = new HashMap<>(); // id -> latest seq_no
3716+
try (Store store = createStore();
3717+
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))) {
3718+
CheckedRunnable<IOException> lookupAndCheck = () -> {
3719+
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
3720+
for (String id : latestOps.keySet()) {
3721+
String msg = "latestOps=" + latestOps + " op=" + id;
3722+
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id));
3723+
assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo()));
3724+
assertThat(msg, docIdAndSeqNo.isLive, equalTo(latestOps.get(id).operationType() == Engine.Operation.TYPE.INDEX));
3725+
}
3726+
assertThat(VersionsAndSeqNoResolver.loadDocIdAndVersion(
3727+
searcher.reader(), newUid("any-" + between(1, 10))), nullValue());
3728+
Map<String, Long> liveOps = latestOps.entrySet().stream()
3729+
.filter(e -> e.getValue().operationType() == Engine.Operation.TYPE.INDEX)
3730+
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().seqNo()));
3731+
assertThat(getDocIds(engine, true).stream().collect(Collectors.toMap(e -> e.getId(), e -> e.getSeqNo())),
3732+
equalTo(liveOps));
3733+
}
3734+
};
3735+
for (Engine.Operation op : operations) {
3736+
if (op instanceof Engine.Index) {
3737+
engine.index((Engine.Index) op);
3738+
if (latestOps.containsKey(op.id()) == false || latestOps.get(op.id()).seqNo() < op.seqNo()) {
3739+
latestOps.put(op.id(), op);
3740+
}
3741+
} else if (op instanceof Engine.Delete) {
3742+
engine.delete((Engine.Delete) op);
3743+
if (latestOps.containsKey(op.id()) == false || latestOps.get(op.id()).seqNo() < op.seqNo()) {
3744+
latestOps.put(op.id(), op);
3745+
}
3746+
}
3747+
if (randomInt(100) < 10) {
3748+
engine.refresh("test");
3749+
lookupAndCheck.run();
3750+
}
3751+
if (rarely()) {
3752+
engine.flush();
3753+
lookupAndCheck.run();
3754+
}
3755+
}
3756+
engine.refresh("test");
3757+
lookupAndCheck.run();
3758+
}
3759+
}
3760+
36843761
/**
36853762
* A sequence number generator that will generate a sequence number and if {@code stall} is set to true will wait on the barrier and the
36863763
* referenced latch before returning. If the local checkpoint should advance (because {@code stall} is false, then the value of
@@ -4059,7 +4136,11 @@ private Tuple<Long, Long> getSequenceID(Engine engine, Engine.Get get) throws En
40594136
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
40604137
} else {
40614138
seqNo = docIdAndSeqNo.seqNo;
4062-
primaryTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docIdAndSeqNo, get.uid().field());
4139+
NumericDocValues primaryTerms = docIdAndSeqNo.context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
4140+
if (primaryTerms == null || primaryTerms.advanceExact(docIdAndSeqNo.docId) == false) {
4141+
throw new AssertionError("document does not have primary term [" + docIdAndSeqNo.docId + "]");
4142+
}
4143+
primaryTerm = primaryTerms.longValue();
40634144
}
40644145
return new Tuple<>(seqNo, primaryTerm);
40654146
} catch (Exception e) {
@@ -5164,6 +5245,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
51645245
commits.add(new ArrayList<>());
51655246
try (Store store = createStore()) {
51665247
EngineConfig config = config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get);
5248+
final List<DocIdSeqNoAndTerm> docs;
51675249
try (InternalEngine engine = createEngine(config)) {
51685250
List<Engine.Operation> flushedOperations = new ArrayList<>();
51695251
for (Engine.Operation op : operations) {
@@ -5186,6 +5268,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
51865268
}
51875269
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
51885270
engine.syncTranslog();
5271+
docs = getDocIds(engine, true);
51895272
}
51905273
trimUnsafeCommits(config);
51915274
List<Engine.Operation> safeCommit = null;
@@ -5202,6 +5285,9 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
52025285
assertThat("seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(),
52035286
tracker.contains(op.seqNo()), equalTo(safeCommit.contains(op)));
52045287
}
5288+
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
5289+
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
5290+
assertThat(getDocIds(engine, true), equalTo(docs));
52055291
}
52065292
}
52075293
}

0 commit comments

Comments
 (0)