Skip to content

Commit 7202973

Browse files
committed
Reduce refresh when lookup term in FollowingEngine (#39184)
Today we always refresh when looking up the primary term in FollowingEngine. This is not necessary for we can simply return none for operations before the global checkpoint.
1 parent 2570bf6 commit 7202973

File tree

1 file changed

+23
-22
lines changed

1 file changed

+23
-22
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -151,32 +151,33 @@ protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
151151
}
152152

153153
private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException {
154+
// Don't need to look up term for operations before the global checkpoint for they were processed on every copies already.
155+
if (seqNo <= engineConfig.getGlobalCheckpointSupplier().getAsLong()) {
156+
return OptionalLong.empty();
157+
}
154158
refreshIfNeeded("lookup_primary_term", seqNo);
155159
try (Searcher engineSearcher = acquireSearcher("lookup_primary_term", SearcherScope.INTERNAL)) {
156-
// We have to acquire a searcher before execute this check to ensure that the requesting seq_no is always found in the else
157-
// branch. If the operation is at most the global checkpoint, we should not look up its term as we may have merged away the
158-
// operation. Moreover, we won't need to replicate this operation to replicas since it was processed on every copies already.
160+
final DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader());
161+
final IndexSearcher searcher = new IndexSearcher(reader);
162+
searcher.setQueryCache(null);
163+
final Query query = new BooleanQuery.Builder()
164+
.add(LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo), BooleanClause.Occur.FILTER)
165+
// excludes the non-root nested documents which don't have primary_term.
166+
.add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.FILTER)
167+
.build();
168+
final TopDocs topDocs = searcher.search(query, 1);
169+
if (topDocs.scoreDocs.length == 1) {
170+
final int docId = topDocs.scoreDocs[0].doc;
171+
final LeafReaderContext leaf = reader.leaves().get(ReaderUtil.subIndex(docId, reader.leaves()));
172+
final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
173+
if (primaryTermDV != null && primaryTermDV.advanceExact(docId - leaf.docBase)) {
174+
assert primaryTermDV.longValue() > 0 : "invalid term [" + primaryTermDV.longValue() + "]";
175+
return OptionalLong.of(primaryTermDV.longValue());
176+
}
177+
}
159178
if (seqNo <= engineConfig.getGlobalCheckpointSupplier().getAsLong()) {
160-
return OptionalLong.empty();
179+
return OptionalLong.empty(); // we have merged away the looking up operation.
161180
} else {
162-
final DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader());
163-
final IndexSearcher searcher = new IndexSearcher(reader);
164-
searcher.setQueryCache(null);
165-
final Query query = new BooleanQuery.Builder()
166-
.add(LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo), BooleanClause.Occur.FILTER)
167-
// excludes the non-root nested documents which don't have primary_term.
168-
.add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.FILTER)
169-
.build();
170-
final TopDocs topDocs = searcher.search(query, 1);
171-
if (topDocs.scoreDocs.length == 1) {
172-
final int docId = topDocs.scoreDocs[0].doc;
173-
final LeafReaderContext leaf = reader.leaves().get(ReaderUtil.subIndex(docId, reader.leaves()));
174-
final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
175-
if (primaryTermDV != null && primaryTermDV.advanceExact(docId - leaf.docBase)) {
176-
assert primaryTermDV.longValue() > 0 : "invalid term [" + primaryTermDV.longValue() + "]";
177-
return OptionalLong.of(primaryTermDV.longValue());
178-
}
179-
}
180181
assert false : "seq_no[" + seqNo + "] does not have primary_term, total_hits=[" + topDocs.totalHits + "]";
181182
throw new IllegalStateException("seq_no[" + seqNo + "] does not have primary_term (total_hits=" + topDocs.totalHits + ")");
182183
}

0 commit comments

Comments
 (0)