Skip to content

Commit bb2f51f

Browse files
committed
Exclude nested documents in LuceneChangesSnapshot (#51279)
LuceneChangesSnapshot can be slow if nested documents are heavily used. Also, it estimates the number of operations to be recovered in peer recoveries inaccurately. With this change, we prefer excluding the nested non-root documents in a Lucene query instead.
1 parent 887ad40 commit bb2f51f

File tree

6 files changed

+38
-38
lines changed

6 files changed

+38
-38
lines changed

server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ final class CombinedDocValues {
4747
long docVersion(int segmentDocId) throws IOException {
4848
assert versionDV.docID() < segmentDocId;
4949
if (versionDV.advanceExact(segmentDocId) == false) {
50+
assert false : "DocValues for field [" + VersionFieldMapper.NAME + "] is not found";
5051
throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found");
5152
}
5253
return versionDV.longValue();
@@ -55,19 +56,18 @@ long docVersion(int segmentDocId) throws IOException {
5556
long docSeqNo(int segmentDocId) throws IOException {
5657
assert seqNoDV.docID() < segmentDocId;
5758
if (seqNoDV.advanceExact(segmentDocId) == false) {
59+
assert false : "DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found";
5860
throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found");
5961
}
6062
return seqNoDV.longValue();
6163
}
6264

6365
long docPrimaryTerm(int segmentDocId) throws IOException {
64-
if (primaryTermDV == null) {
65-
return -1L;
66-
}
66+
// We exclude non-root nested documents when querying changes, every returned document must have primary term.
6767
assert primaryTermDV.docID() < segmentDocId;
68-
// Use -1 for docs which don't have primary term. The caller considers those docs as nested docs.
6968
if (primaryTermDV.advanceExact(segmentDocId) == false) {
70-
return -1;
69+
assert false : "DocValues for field [" + SeqNoFieldMapper.PRIMARY_TERM_NAME + "] is not found";
70+
throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.PRIMARY_TERM_NAME + "] is not found");
7171
}
7272
return primaryTermDV.longValue();
7373
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+10-5
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@
3939
import org.apache.lucene.index.ShuffleForcedMergePolicy;
4040
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
4141
import org.apache.lucene.index.Term;
42+
import org.apache.lucene.search.BooleanClause;
43+
import org.apache.lucene.search.BooleanQuery;
4244
import org.apache.lucene.search.DocIdSetIterator;
45+
import org.apache.lucene.search.DocValuesFieldExistsQuery;
4346
import org.apache.lucene.search.IndexSearcher;
4447
import org.apache.lucene.search.Query;
4548
import org.apache.lucene.search.ReferenceManager;
@@ -2850,8 +2853,13 @@ private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOExcept
28502853
private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader) throws IOException {
28512854
final IndexSearcher searcher = new IndexSearcher(directoryReader);
28522855
searcher.setQueryCache(null);
2853-
final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE);
2854-
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
2856+
final Query query = new BooleanQuery.Builder()
2857+
.add(LongPoint.newRangeQuery(
2858+
SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE), BooleanClause.Occur.MUST)
2859+
// exclude non-root nested documents
2860+
.add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.MUST)
2861+
.build();
2862+
final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1.0f);
28552863
for (LeafReaderContext leaf : directoryReader.leaves()) {
28562864
final Scorer scorer = weight.scorer(leaf);
28572865
if (scorer == null) {
@@ -2863,9 +2871,6 @@ private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryRead
28632871
int docId;
28642872
while ((docId = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
28652873
final long primaryTerm = dv.docPrimaryTerm(docId);
2866-
if (primaryTerm == -1L) {
2867-
continue; // skip children docs which do not have primary term
2868-
}
28692874
final long seqNo = dv.docSeqNo(docId);
28702875
localCheckpointTracker.markSeqNoAsProcessed(seqNo);
28712876
localCheckpointTracker.markSeqNoAsPersisted(seqNo);

server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.apache.lucene.index.LeafReaderContext;
2525
import org.apache.lucene.index.NumericDocValues;
2626
import org.apache.lucene.index.Term;
27+
import org.apache.lucene.search.BooleanClause;
28+
import org.apache.lucene.search.BooleanQuery;
29+
import org.apache.lucene.search.DocValuesFieldExistsQuery;
2730
import org.apache.lucene.search.IndexSearcher;
2831
import org.apache.lucene.search.Query;
2932
import org.apache.lucene.search.ScoreDoc;
@@ -210,7 +213,11 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray
210213
}
211214

212215
private TopDocs searchOperations(ScoreDoc after) throws IOException {
213-
final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo);
216+
final Query rangeQuery = new BooleanQuery.Builder()
217+
.add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo), BooleanClause.Occur.MUST)
218+
// exclude non-root nested documents
219+
.add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.MUST)
220+
.build();
214221
final Sort sortedBySeqNo = new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG));
215222
return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNo);
216223
}
@@ -219,11 +226,7 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException {
219226
final LeafReaderContext leaf = parallelArray.leafReaderContexts[docIndex];
220227
final int segmentDocID = scoreDocs[docIndex].doc - leaf.docBase;
221228
final long primaryTerm = parallelArray.primaryTerm[docIndex];
222-
// We don't have to read the nested child documents - those docs don't have primary terms.
223-
if (primaryTerm == -1) {
224-
skippedOperations++;
225-
return null;
226-
}
229+
assert primaryTerm > 0 : "nested child document must be excluded";
227230
final long seqNo = parallelArray.seqNo[docIndex];
228231
// Only pick the first seen seq#
229232
if (seqNo == lastSeenSeqNo) {

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -1811,12 +1811,12 @@ public void run() {
18111811

18121812
public void testVersioningCreateExistsException() throws IOException {
18131813
ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null);
1814-
Engine.Index create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
1814+
Engine.Index create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
18151815
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0);
18161816
Engine.IndexResult indexResult = engine.index(create);
18171817
assertThat(indexResult.getVersion(), equalTo(1L));
18181818

1819-
create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED,
1819+
create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED,
18201820
VersionType.INTERNAL, PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0);
18211821
indexResult = engine.index(create);
18221822
assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.FAILURE));
@@ -3063,7 +3063,7 @@ public void testTranslogReplay() throws IOException {
30633063
final int numDocs = randomIntBetween(1, 10);
30643064
for (int i = 0; i < numDocs; i++) {
30653065
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
3066-
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
3066+
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
30673067
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
30683068
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
30693069
assertThat(indexResult.getVersion(), equalTo(1L));
@@ -3097,7 +3097,7 @@ public void testTranslogReplay() throws IOException {
30973097
final boolean flush = randomBoolean();
30983098
int randomId = randomIntBetween(numDocs + 1, numDocs + 10);
30993099
ParsedDocument doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null);
3100-
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 1,
3100+
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, 1,
31013101
VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
31023102
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
31033103
assertThat(indexResult.getVersion(), equalTo(1L));
@@ -3107,7 +3107,7 @@ public void testTranslogReplay() throws IOException {
31073107
}
31083108

31093109
doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null);
3110-
Engine.Index idxRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 2,
3110+
Engine.Index idxRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, 2,
31113111
VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
31123112
Engine.IndexResult result = engine.index(idxRequest);
31133113
engine.refresh("test");
@@ -3143,7 +3143,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
31433143
final int numDocs = randomIntBetween(1, 10);
31443144
for (int i = 0; i < numDocs; i++) {
31453145
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
3146-
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
3146+
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
31473147
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
31483148
Engine.IndexResult index = engine.index(firstIndexRequest);
31493149
assertThat(index.getVersion(), equalTo(1L));
@@ -3677,7 +3677,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep
36773677
boolean isRetry = false;
36783678
long autoGeneratedIdTimestamp = 0;
36793679

3680-
Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
3680+
Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
36813681
randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(),
36823682
autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
36833683
Engine.IndexResult indexResult = engine.index(index);
@@ -3689,7 +3689,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep
36893689
assertThat(indexResult.getVersion(), equalTo(1L));
36903690

36913691
isRetry = true;
3692-
index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL,
3692+
index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_ANY, VersionType.INTERNAL,
36933693
PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
36943694
indexResult = engine.index(index);
36953695
assertThat(indexResult.getVersion(), equalTo(1L));
@@ -3718,7 +3718,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs()
37183718
boolean isRetry = true;
37193719
long autoGeneratedIdTimestamp = 0;
37203720

3721-
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
3721+
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
37223722
randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(),
37233723
autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
37243724
Engine.IndexResult result = engine.index(firstIndexRequest);
@@ -3730,7 +3730,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs()
37303730
assertThat(indexReplicaResult.getVersion(), equalTo(1L));
37313731

37323732
isRetry = false;
3733-
Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
3733+
Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
37343734
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO,
37353735
0);
37363736
Engine.IndexResult indexResult = engine.index(secondIndexRequest);
@@ -3760,7 +3760,7 @@ public Engine.Index randomAppendOnly(ParsedDocument doc, boolean retry, final lo
37603760
}
37613761

37623762
public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, boolean create) {
3763-
return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, create ? Versions.MATCH_DELETED : Versions.MATCH_ANY,
3763+
return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, create ? Versions.MATCH_DELETED : Versions.MATCH_ANY,
37643764
VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry,
37653765
UNASSIGNED_SEQ_NO, 0);
37663766
}

server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java

+2-10
Original file line numberDiff line numberDiff line change
@@ -157,15 +157,9 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
157157
public void testSkipNonRootOfNestedDocuments() throws Exception {
158158
Map<Long, Long> seqNoToTerm = new HashMap<>();
159159
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean());
160-
int totalOps = 0;
161160
for (Engine.Operation op : operations) {
162161
if (engine.getLocalCheckpointTracker().hasProcessed(op.seqNo()) == false) {
163162
seqNoToTerm.put(op.seqNo(), op.primaryTerm());
164-
if (op instanceof Engine.Index) {
165-
totalOps += ((Engine.Index) op).docs().size();
166-
} else {
167-
totalOps++;
168-
}
169163
}
170164
applyOperation(engine, op);
171165
if (rarely()) {
@@ -182,14 +176,12 @@ public void testSkipNonRootOfNestedDocuments() throws Exception {
182176
engine.refresh("test");
183177
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
184178
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, between(1, 100), 0, maxSeqNo, false)) {
185-
searcher = null;
179+
assertThat(snapshot.totalOperations(), equalTo(seqNoToTerm.size()));
186180
Translog.Operation op;
187181
while ((op = snapshot.next()) != null) {
188182
assertThat(op.toString(), op.primaryTerm(), equalTo(seqNoToTerm.get(op.seqNo())));
189183
}
190-
assertThat(snapshot.skippedOperations(), equalTo(totalOps - seqNoToTerm.size()));
191-
} finally {
192-
IOUtils.close(searcher);
184+
assertThat(snapshot.skippedOperations(), equalTo(0));
193185
}
194186
}
195187

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public abstract class EngineTestCase extends ESTestCase {
160160
protected Path primaryTranslogDir;
161161
protected Path replicaTranslogDir;
162162
// A default primary term is used by engine instances created in this test.
163-
protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(0L);
163+
protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(1L);
164164

165165
protected static void assertVisibleCount(Engine engine, int numDocs) throws IOException {
166166
assertVisibleCount(engine, numDocs, true);

0 commit comments

Comments
 (0)