Skip to content

Commit b5c8b32

Browse files
committed
Do not use soft-deletes to resolve indexing strategy (#43336)
This PR reverts #35230. Previously, we reply on soft-deletes to fill the mismatch between the version map and the Lucene index. This is no longer needed after #43202 where we rebuild the version map when opening an engine. Moreover, PrunePostingsMergePolicy can prune _id of soft-deleted documents out of order; thus the lookup result including soft-deletes sometimes does not return the latest version (although it's okay as we only use a valid result in an engine). With this change, we use only live documents in Lucene to resolve the indexing strategy. This is perfectly safe since we keep all deleted documents after the local checkpoint in the version map. Closes #42979
1 parent a4c45b5 commit b5c8b32

File tree

6 files changed

+55
-87
lines changed

6 files changed

+55
-87
lines changed

server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java

+20-55
Original file line numberDiff line numberDiff line change
@@ -102,38 +102,20 @@ public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderC
102102
throws IOException {
103103
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
104104
"context's reader is not the same as the reader class was initialized on.";
105-
int docID = getDocID(id, context.reader().getLiveDocs());
105+
int docID = getDocID(id, context);
106106

107107
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
108-
final NumericDocValues versions = context.reader().getNumericDocValues(VersionFieldMapper.NAME);
109-
if (versions == null) {
110-
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + "] field");
111-
}
112-
if (versions.advanceExact(docID) == false) {
113-
throw new IllegalArgumentException("Document [" + docID + "] misses the [" + VersionFieldMapper.NAME + "] field");
114-
}
115108
final long seqNo;
116109
final long term;
117110
if (loadSeqNo) {
118-
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
119-
// remove the null check in 7.0 once we can't read indices with no seq#
120-
if (seqNos != null && seqNos.advanceExact(docID)) {
121-
seqNo = seqNos.longValue();
122-
} else {
123-
seqNo = UNASSIGNED_SEQ_NO;
124-
}
125-
NumericDocValues terms = context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
126-
if (terms != null && terms.advanceExact(docID)) {
127-
term = terms.longValue();
128-
} else {
129-
term = UNASSIGNED_PRIMARY_TERM;
130-
}
131-
111+
seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID);
112+
term = readNumericDocValues(context.reader(), SeqNoFieldMapper.PRIMARY_TERM_NAME, docID);
132113
} else {
133114
seqNo = UNASSIGNED_SEQ_NO;
134115
term = UNASSIGNED_PRIMARY_TERM;
135116
}
136-
return new DocIdAndVersion(docID, versions.longValue(), seqNo, term, context.reader(), context.docBase);
117+
final long version = readNumericDocValues(context.reader(), VersionFieldMapper.NAME, docID);
118+
return new DocIdAndVersion(docID, version, seqNo, term, context.reader(), context.docBase);
137119
} else {
138120
return null;
139121
}
@@ -143,9 +125,10 @@ public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderC
143125
* returns the internal lucene doc id for the given id bytes.
144126
* {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
145127
* */
146-
private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
128+
private int getDocID(BytesRef id, LeafReaderContext context) throws IOException {
147129
// termsEnum can possibly be null here if this leaf contains only no-ops.
148130
if (termsEnum != null && termsEnum.seekExact(id)) {
131+
final Bits liveDocs = context.reader().getLiveDocs();
149132
int docID = DocIdSetIterator.NO_MORE_DOCS;
150133
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
151134
docsEnum = termsEnum.postings(docsEnum, 0);
@@ -161,41 +144,23 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
161144
}
162145
}
163146

147+
private static long readNumericDocValues(LeafReader reader, String field, int docId) throws IOException {
148+
final NumericDocValues dv = reader.getNumericDocValues(field);
149+
if (dv == null || dv.advanceExact(docId) == false) {
150+
assert false : "document [" + docId + "] does not have docValues for [" + field + "]";
151+
throw new IllegalStateException("document [" + docId + "] does not have docValues for [" + field + "]");
152+
}
153+
return dv.longValue();
154+
}
155+
164156
/** Return null if id is not found. */
165157
DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException {
166158
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
167159
"context's reader is not the same as the reader class was initialized on.";
168-
// termsEnum can possibly be null here if this leaf contains only no-ops.
169-
if (termsEnum != null && termsEnum.seekExact(id)) {
170-
docsEnum = termsEnum.postings(docsEnum, 0);
171-
final Bits liveDocs = context.reader().getLiveDocs();
172-
DocIdAndSeqNo result = null;
173-
int docID = docsEnum.nextDoc();
174-
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
175-
final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
176-
for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) {
177-
final long seqNo;
178-
// remove the null check in 7.0 once we can't read indices with no seq#
179-
if (seqNoDV != null && seqNoDV.advanceExact(docID)) {
180-
seqNo = seqNoDV.longValue();
181-
} else {
182-
seqNo = UNASSIGNED_SEQ_NO;
183-
}
184-
final boolean isLive = (liveDocs == null || liveDocs.get(docID));
185-
if (isLive) {
186-
// The live document must always be the latest copy, thus we can early terminate here.
187-
// If a nested docs is live, we return the first doc which doesn't have term (only the last doc has term).
188-
// This should not be an issue since we no longer use primary term as tier breaker when comparing operations.
189-
assert result == null || result.seqNo <= seqNo :
190-
"the live doc does not have the highest seq_no; live_seq_no=" + seqNo + " < deleted_seq_no=" + result.seqNo;
191-
return new DocIdAndSeqNo(docID, seqNo, context, isLive);
192-
}
193-
if (result == null || result.seqNo < seqNo) {
194-
result = new DocIdAndSeqNo(docID, seqNo, context, isLive);
195-
}
196-
}
197-
}
198-
return result;
160+
final int docID = getDocID(id, context);
161+
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
162+
final long seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID);
163+
return new DocIdAndSeqNo(docID, seqNo, context);
199164
} else {
200165
return null;
201166
}

server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java

+4-17
Original file line numberDiff line numberDiff line change
@@ -114,13 +114,11 @@ public static class DocIdAndSeqNo {
114114
public final int docId;
115115
public final long seqNo;
116116
public final LeafReaderContext context;
117-
public final boolean isLive;
118117

119-
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context, boolean isLive) {
118+
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) {
120119
this.docId = docId;
121120
this.seqNo = seqNo;
122121
this.context = context;
123-
this.isLive = isLive;
124122
}
125123
}
126124

@@ -149,32 +147,21 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term,
149147

150148
/**
151149
* Loads the internal docId and sequence number of the latest copy for a given uid from the provided reader.
152-
* The flag {@link DocIdAndSeqNo#isLive} indicates whether the returned document is live or (soft)deleted.
153-
* This returns {@code null} if no such document matching the given term uid.
150+
* The result is either null or the live and latest version of the given uid.
154151
*/
155152
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
156153
final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
157154
final List<LeafReaderContext> leaves = reader.leaves();
158-
DocIdAndSeqNo latest = null;
159155
// iterate backwards to optimize for the frequently updated documents
160156
// which are likely to be in the last segments
161157
for (int i = leaves.size() - 1; i >= 0; i--) {
162158
final LeafReaderContext leaf = leaves.get(i);
163159
final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
164160
final DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf);
165-
if (result == null) {
166-
continue;
167-
}
168-
if (result.isLive) {
169-
// The live document must always be the latest copy, thus we can early terminate here.
170-
assert latest == null || latest.seqNo <= result.seqNo :
171-
"the live doc does not have the highest seq_no; live_seq_no=" + result.seqNo + " < deleted_seq_no=" + latest.seqNo;
161+
if (result != null) {
172162
return result;
173163
}
174-
if (latest == null || latest.seqNo < result.seqNo) {
175-
latest = result;
176-
}
177164
}
178-
return latest;
165+
return null;
179166
}
180167
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -709,11 +709,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
709709
if (docAndSeqNo == null) {
710710
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
711711
} else if (op.seqNo() > docAndSeqNo.seqNo) {
712-
if (docAndSeqNo.isLive) {
713-
status = OpVsLuceneDocStatus.OP_NEWER;
714-
} else {
715-
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
716-
}
712+
status = OpVsLuceneDocStatus.OP_NEWER;
717713
} else if (op.seqNo() == docAndSeqNo.seqNo) {
718714
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
719715
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();

server/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.common.lucene.Lucene;
3434
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
3535
import org.elasticsearch.index.mapper.IdFieldMapper;
36+
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
3637
import org.elasticsearch.index.mapper.VersionFieldMapper;
3738
import org.elasticsearch.test.ESTestCase;
3839

@@ -52,6 +53,8 @@ public void testSimple() throws Exception {
5253
Document doc = new Document();
5354
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
5455
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
56+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
57+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
5558
writer.addDocument(doc);
5659
writer.addDocument(new Document());
5760
DirectoryReader reader = DirectoryReader.open(writer);
@@ -86,6 +89,8 @@ public void testTwoDocuments() throws Exception {
8689
Document doc = new Document();
8790
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
8891
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
92+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
93+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
8994
writer.addDocument(doc);
9095
writer.addDocument(doc);
9196
writer.addDocument(new Document());

server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java

+13
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.lucene.Lucene;
3131
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
3232
import org.elasticsearch.index.mapper.IdFieldMapper;
33+
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
3334
import org.elasticsearch.index.mapper.VersionFieldMapper;
3435
import org.elasticsearch.index.shard.ShardId;
3536
import org.elasticsearch.test.ESTestCase;
@@ -69,6 +70,8 @@ public void testVersions() throws Exception {
6970
Document doc = new Document();
7071
doc.add(new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE));
7172
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 1));
73+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
74+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
7275
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);
7376
directoryReader = reopen(directoryReader);
7477
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(1L));
@@ -78,6 +81,8 @@ public void testVersions() throws Exception {
7881
Field version = new NumericDocValuesField(VersionFieldMapper.NAME, 2);
7982
doc.add(uid);
8083
doc.add(version);
84+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
85+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
8186
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);
8287
directoryReader = reopen(directoryReader);
8388
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(2L));
@@ -87,6 +92,8 @@ public void testVersions() throws Exception {
8792
version.setLongValue(3);
8893
doc.add(uid);
8994
doc.add(version);
95+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
96+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
9097
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);
9198

9299
directoryReader = reopen(directoryReader);
@@ -116,6 +123,8 @@ public void testNestedDocuments() throws IOException {
116123
doc.add(new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE));
117124
NumericDocValuesField version = new NumericDocValuesField(VersionFieldMapper.NAME, 5L);
118125
doc.add(version);
126+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
127+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
119128
docs.add(doc);
120129

121130
writer.updateDocuments(new Term(IdFieldMapper.NAME, "1"), docs);
@@ -146,6 +155,8 @@ public void testCache() throws Exception {
146155
Document doc = new Document();
147156
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
148157
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
158+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
159+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
149160
writer.addDocument(doc);
150161
DirectoryReader reader = DirectoryReader.open(writer);
151162
// should increase cache size by 1
@@ -171,6 +182,8 @@ public void testCacheFilterReader() throws Exception {
171182
Document doc = new Document();
172183
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
173184
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
185+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
186+
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
174187
writer.addDocument(doc);
175188
DirectoryReader reader = DirectoryReader.open(writer);
176189
assertEquals(87, loadDocIdAndVersion(reader, new Term(IdFieldMapper.NAME, "6"), randomBoolean()).version);

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

+12-10
Original file line numberDiff line numberDiff line change
@@ -4026,7 +4026,6 @@ public void testSequenceIDs() throws Exception {
40264026
searchResult.close();
40274027
}
40284028

4029-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42979")
40304029
public void testLookupSeqNoByIdInLucene() throws Exception {
40314030
int numOps = between(10, 100);
40324031
long seqNo = 0;
@@ -4061,20 +4060,23 @@ public void testLookupSeqNoByIdInLucene() throws Exception {
40614060
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))) {
40624061
CheckedRunnable<IOException> lookupAndCheck = () -> {
40634062
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
4064-
for (String id : latestOps.keySet()) {
4065-
String msg = "latestOps=" + latestOps + " op=" + id;
4066-
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id));
4067-
assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo()));
4068-
assertThat(msg, docIdAndSeqNo.isLive,
4069-
equalTo(latestOps.get(id).operationType() == Engine.Operation.TYPE.INDEX));
4070-
}
4071-
assertThat(VersionsAndSeqNoResolver.loadDocIdAndVersion(
4072-
searcher.reader(), newUid("any-" + between(1, 10)), randomBoolean()), nullValue());
40734063
Map<String, Long> liveOps = latestOps.entrySet().stream()
40744064
.filter(e -> e.getValue().operationType() == Engine.Operation.TYPE.INDEX)
40754065
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().seqNo()));
40764066
assertThat(getDocIds(engine, true).stream().collect(Collectors.toMap(e -> e.getId(), e -> e.getSeqNo())),
40774067
equalTo(liveOps));
4068+
for (String id : latestOps.keySet()) {
4069+
String msg = "latestOps=" + latestOps + " op=" + id;
4070+
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id));
4071+
if (liveOps.containsKey(id) == false) {
4072+
assertNull(msg, docIdAndSeqNo);
4073+
} else {
4074+
assertNotNull(msg, docIdAndSeqNo);
4075+
assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo()));
4076+
}
4077+
}
4078+
String notFoundId = randomValueOtherThanMany(liveOps::containsKey, () -> Long.toString(randomNonNegativeLong()));
4079+
assertNull(VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(notFoundId)));
40784080
}
40794081
};
40804082
for (Engine.Operation op : operations) {

0 commit comments

Comments
 (0)