Skip to content

Commit ed78f42

Browse files
authored
Enable sort optimization in query Lucene changes (#77907)
This change enables the sort optimization in LuceneChangesSnapshot to speed up CCR and peer recoveries.
1 parent 238235c commit ed78f42

File tree

19 files changed

+228
-72
lines changed

19 files changed

+228
-72
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,8 @@ public void testShardChangesWithDefaultDocType() throws Exception {
669669
}
670670
}
671671
IndexShard shard = indexService.getShard(0);
672-
try (Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true, randomBoolean());
672+
try (Translog.Snapshot luceneSnapshot =
673+
shard.newChangesSnapshot("test", 0, numOps - 1, true, randomBoolean(), randomBoolean());
673674
Translog.Snapshot translogSnapshot = getTranslog(shard).newSnapshot()) {
674675
List<Translog.Operation> opsFromLucene = TestTranslog.drainSnapshot(luceneSnapshot, true);
675676
List<Translog.Operation> opsFromTranslog = TestTranslog.drainSnapshot(translogSnapshot, true);

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -686,12 +686,24 @@ public enum SearcherScope {
686686
*/
687687
public abstract Closeable acquireHistoryRetentionLock();
688688

689+
690+
/**
691+
* Counts the number of operations in the range of the given sequence numbers.
692+
*
693+
* @param source the source of the request
694+
* @param fromSeqNo the start sequence number (inclusive)
695+
* @param toSeqNo the end sequence number (inclusive)
696+
* @see #newChangesSnapshot(String, long, long, boolean, boolean, boolean)
697+
*/
698+
public abstract int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOException;
699+
689700
/**
690701
* Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
691702
* This feature requires soft-deletes enabled. If soft-deletes are disabled, this method will throw an {@link IllegalStateException}.
692703
*/
693704
public abstract Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo,
694-
boolean requiredFullRange, boolean singleConsumer) throws IOException;
705+
boolean requiredFullRange, boolean singleConsumer,
706+
boolean accessStats) throws IOException;
695707

696708
/**
697709
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2528,15 +2528,32 @@ long getNumDocUpdates() {
25282528
return numDocUpdates.count();
25292529
}
25302530

2531+
@Override
2532+
public int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOException {
2533+
ensureOpen();
2534+
refreshIfNeeded(source, toSeqNo);
2535+
try (Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL)) {
2536+
return LuceneChangesSnapshot.countOperations(searcher, fromSeqNo, toSeqNo);
2537+
} catch (Exception e) {
2538+
try {
2539+
maybeFailEngine("count changes", e);
2540+
} catch (Exception inner) {
2541+
e.addSuppressed(inner);
2542+
}
2543+
throw e;
2544+
}
2545+
}
2546+
25312547
@Override
25322548
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo,
2533-
boolean requiredFullRange, boolean singleConsumer) throws IOException {
2549+
boolean requiredFullRange, boolean singleConsumer,
2550+
boolean accessStats) throws IOException {
25342551
ensureOpen();
25352552
refreshIfNeeded(source, toSeqNo);
25362553
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
25372554
try {
25382555
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(
2539-
searcher, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, fromSeqNo, toSeqNo, requiredFullRange, singleConsumer);
2556+
searcher, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, fromSeqNo, toSeqNo, requiredFullRange, singleConsumer, accessStats);
25402557
searcher = null;
25412558
return snapshot;
25422559
} catch (Exception e) {

server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
import org.apache.lucene.index.NumericDocValues;
1616
import org.apache.lucene.search.BooleanClause;
1717
import org.apache.lucene.search.BooleanQuery;
18+
import org.apache.lucene.search.FieldDoc;
1819
import org.apache.lucene.search.IndexSearcher;
1920
import org.apache.lucene.search.Query;
2021
import org.apache.lucene.search.ScoreDoc;
2122
import org.apache.lucene.search.Sort;
2223
import org.apache.lucene.search.SortField;
2324
import org.apache.lucene.search.TopDocs;
25+
import org.apache.lucene.search.TopFieldCollector;
2426
import org.apache.lucene.util.ArrayUtil;
2527
import org.elasticsearch.common.bytes.BytesReference;
2628
import org.elasticsearch.common.lucene.Lucene;
@@ -54,6 +56,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
5456

5557
private final IndexSearcher indexSearcher;
5658
private int docIndex = 0;
59+
private final boolean accessStats;
5760
private final int totalHits;
5861
private ScoreDoc[] scoreDocs;
5962
private final ParallelArray parallelArray;
@@ -73,10 +76,11 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
7376
* @param toSeqNo the maximum requesting seq# - inclusive
7477
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
7578
* @param singleConsumer true if the snapshot is accessed by a single thread that creates the snapshot
79+
* @param accessStats true if the stats of the snapshot can be accessed via {@link #totalOperations()}
7680
*/
7781
LuceneChangesSnapshot(Engine.Searcher engineSearcher, int searchBatchSize,
7882
long fromSeqNo, long toSeqNo, boolean requiredFullRange,
79-
boolean singleConsumer) throws IOException {
83+
boolean singleConsumer, boolean accessStats) throws IOException {
8084
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
8185
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
8286
}
@@ -97,10 +101,11 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
97101
this.lastSeenSeqNo = fromSeqNo - 1;
98102
this.requiredFullRange = requiredFullRange;
99103
this.singleConsumer = singleConsumer;
100-
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
104+
this.indexSearcher = newIndexSearcher(engineSearcher);
101105
this.indexSearcher.setQueryCache(null);
106+
this.accessStats = accessStats;
102107
this.parallelArray = new ParallelArray(this.searchBatchSize);
103-
final TopDocs topDocs = searchOperations(null);
108+
final TopDocs topDocs = searchOperations(null, accessStats);
104109
this.totalHits = Math.toIntExact(topDocs.totalHits.value);
105110
this.scoreDocs = topDocs.scoreDocs;
106111
fillParallelArray(scoreDocs, parallelArray);
@@ -115,6 +120,9 @@ public void close() throws IOException {
115120
@Override
116121
public int totalOperations() {
117122
assert assertAccessingThread();
123+
if (accessStats == false) {
124+
throw new IllegalStateException("Access stats of a snapshot created with [access_stats] is false");
125+
}
118126
return totalHits;
119127
}
120128

@@ -169,7 +177,7 @@ private int nextDocIndex() throws IOException {
169177
// we have processed all docs in the current search - fetch the next batch
170178
if (docIndex == scoreDocs.length && docIndex > 0) {
171179
final ScoreDoc prev = scoreDocs[scoreDocs.length - 1];
172-
scoreDocs = searchOperations(prev).scoreDocs;
180+
scoreDocs = searchOperations((FieldDoc) prev, false).scoreDocs;
173181
fillParallelArray(scoreDocs, parallelArray);
174182
docIndex = 0;
175183
}
@@ -236,13 +244,33 @@ private static boolean hasSequentialAccess(ScoreDoc[] scoreDocs) {
236244
return true;
237245
}
238246

239-
private TopDocs searchOperations(ScoreDoc after) throws IOException {
240-
final Query rangeQuery = new BooleanQuery.Builder()
241-
.add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo), BooleanClause.Occur.MUST)
247+
private static IndexSearcher newIndexSearcher(Engine.Searcher engineSearcher) throws IOException {
248+
return new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
249+
}
250+
251+
private static Query rangeQuery(long fromSeqNo, long toSeqNo) {
252+
return new BooleanQuery.Builder()
253+
.add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST)
242254
.add(Queries.newNonNestedFilter(), BooleanClause.Occur.MUST) // exclude non-root nested documents
243255
.build();
244-
final Sort sortedBySeqNo = new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG));
245-
return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNo);
256+
}
257+
258+
static int countOperations(Engine.Searcher engineSearcher, long fromSeqNo, long toSeqNo) throws IOException {
259+
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
260+
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
261+
}
262+
return newIndexSearcher(engineSearcher).count(rangeQuery(fromSeqNo, toSeqNo));
263+
}
264+
265+
private TopDocs searchOperations(FieldDoc after, boolean accurateTotalHits) throws IOException {
266+
final Query rangeQuery = rangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo);
267+
assert accurateTotalHits == false || after == null : "accurate total hits is required by the first batch only";
268+
final SortField sortBySeqNo = new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG);
269+
sortBySeqNo.setCanUsePoints();
270+
final TopFieldCollector collector =
271+
TopFieldCollector.create(new Sort(sortBySeqNo), searchBatchSize, after, accurateTotalHits ? Integer.MAX_VALUE : 0);
272+
indexSearcher.search(rangeQuery, collector);
273+
return collector.topDocs();
246274
}
247275

248276
private Translog.Operation readDocAsOp(int docIndex) throws IOException {

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,9 +319,17 @@ public Closeable acquireHistoryRetentionLock() {
319319
return () -> {};
320320
}
321321

322+
@Override
323+
public int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOException {
324+
try (Translog.Snapshot snapshot = newChangesSnapshot(source, fromSeqNo, toSeqNo, false, true, true)) {
325+
return snapshot.totalOperations();
326+
}
327+
}
328+
322329
@Override
323330
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo,
324-
boolean requiredFullRange, boolean singleConsumer) {
331+
boolean requiredFullRange, boolean singleConsumer,
332+
boolean accessStats) {
325333
return newEmptySnapshot();
326334
}
327335

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2053,6 +2053,18 @@ public long getMinRetainedSeqNo() {
20532053
return getEngine().getMinRetainedSeqNo();
20542054
}
20552055

2056+
/**
2057+
* Counts the number of operations in the range of the given sequence numbers.
2058+
*
2059+
* @param source the source of the request
2060+
* @param fromSeqNo the start sequence number (inclusive)
2061+
* @param toSeqNo the end sequence number (inclusive)
2062+
* @see #newChangesSnapshot(String, long, long, boolean, boolean, boolean)
2063+
*/
2064+
public int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOException {
2065+
return getEngine().countChanges(source, fromSeqNo, toSeqNo);
2066+
}
2067+
20562068
/**
20572069
* Creates a new changes snapshot for reading operations whose seq_no are between {@code fromSeqNo}(inclusive)
20582070
* and {@code toSeqNo}(inclusive). The caller has to close the returned snapshot after finishing the reading.
@@ -2063,11 +2075,14 @@ public long getMinRetainedSeqNo() {
20632075
* @param requiredFullRange if {@code true} then {@link Translog.Snapshot#next()} will throw {@link IllegalStateException}
20642076
* if any operation between {@code fromSeqNo} and {@code toSeqNo} is missing.
20652077
* This parameter should be only enabled when the entire requesting range is below the global checkpoint.
2066-
* @param singleConsumer true if the snapshot is accessed by a single thread that creates the snapshot
2067-
*/
2068-
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo,
2069-
long toSeqNo, boolean requiredFullRange, boolean singleConsumer) throws IOException {
2070-
return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange, singleConsumer);
2078+
* @param singleConsumer true if the snapshot is accessed by only the thread that creates the snapshot. In this case, the
2079+
* snapshot can enable some optimizations to improve the performance.
2080+
* @param accessStats true if the stats of the snapshot is accessed via {@link Translog.Snapshot#totalOperations()}
2081+
*/
2082+
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo,
2083+
boolean requiredFullRange, boolean singleConsumer,
2084+
boolean accessStats) throws IOException {
2085+
return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange, singleConsumer, accessStats);
20712086
}
20722087

20732088
public List<Segment> segments(boolean verbose) {

server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void resync(final IndexShard indexShard, final ActionListener<ResyncTask>
8181
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
8282
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
8383
// Also fail the resync early if the shard is shutting down
84-
snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false, false);
84+
snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false, false, true);
8585
final Translog.Snapshot originalSnapshot = snapshot;
8686
final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
8787
@Override

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ && isTargetSameHistory()
309309
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
310310
logger.trace("snapshot for recovery; current size is [{}]", estimateNumberOfHistoryOperations(startingSeqNo));
311311
final Translog.Snapshot phase2Snapshot =
312-
shard.newChangesSnapshot("peer-recovery", startingSeqNo, Long.MAX_VALUE, false, false);
312+
shard.newChangesSnapshot("peer-recovery", startingSeqNo, Long.MAX_VALUE, false, false, true);
313313
resources.add(phase2Snapshot);
314314
retentionLock.close();
315315

@@ -354,9 +354,7 @@ private boolean isTargetSameHistory() {
354354
}
355355

356356
private int estimateNumberOfHistoryOperations(long startingSeqNo) throws IOException {
357-
try (Translog.Snapshot snapshot = shard.newChangesSnapshot("peer-recover", startingSeqNo, Long.MAX_VALUE, false, true)) {
358-
return snapshot.totalOperations();
359-
}
357+
return shard.countChanges("peer-recovery", startingSeqNo, Long.MAX_VALUE);
360358
}
361359

362360
static void runUnderPrimaryPermit(CancellableThreads.Interruptible runnable, String reason,

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -343,10 +343,7 @@ public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSe
343343

344344
private boolean hasUncommittedOperations() throws IOException {
345345
long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
346-
try (Translog.Snapshot snapshot =
347-
indexShard.newChangesSnapshot("peer-recovery", localCheckpointOfCommit + 1, Long.MAX_VALUE, false, false)) {
348-
return snapshot.totalOperations() > 0;
349-
}
346+
return indexShard.countChanges("peer-recovery", localCheckpointOfCommit + 1, Long.MAX_VALUE) > 0;
350347
}
351348

352349
@Override

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5327,8 +5327,13 @@ public void onFailure(Exception e) {
53275327
@Override
53285328
protected void doRun() throws Exception {
53295329
latch.await();
5330-
Translog.Snapshot changes = engine.newChangesSnapshot("test", min, max, true, randomBoolean());
5331-
changes.close();
5330+
if (randomBoolean()) {
5331+
try (Translog.Snapshot ignored =
5332+
engine.newChangesSnapshot("test", min, max, true, randomBoolean(), randomBoolean())) {
5333+
}
5334+
} else {
5335+
engine.countChanges("test", min, max);
5336+
}
53325337
}
53335338
});
53345339
snapshotThreads[i].start();

0 commit comments

Comments
 (0)