Skip to content

Commit 68053ef

Browse files
committed
Use sequential access stored fields in CCR
1 parent d1f560a commit 68053ef

File tree

2 files changed

+95
-3
lines changed

2 files changed

+95
-3
lines changed

server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.index.engine;
1010

11+
import org.apache.lucene.codecs.StoredFieldsReader;
1112
import org.apache.lucene.document.LongPoint;
1213
import org.apache.lucene.index.LeafReader;
1314
import org.apache.lucene.index.LeafReaderContext;
@@ -23,6 +24,7 @@
2324
import org.apache.lucene.util.ArrayUtil;
2425
import org.elasticsearch.common.bytes.BytesReference;
2526
import org.elasticsearch.common.lucene.Lucene;
27+
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
2628
import org.elasticsearch.common.lucene.search.Queries;
2729
import org.elasticsearch.core.internal.io.IOUtils;
2830
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
@@ -55,6 +57,9 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
5557
private final ParallelArray parallelArray;
5658
private final Closeable onClose;
5759

60+
private int storedFieldsReaderOrd = -1;
61+
private StoredFieldsReader storedFieldsReader = null;
62+
5863
/**
5964
* Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range.
6065
*
@@ -162,9 +167,16 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray
162167
for (int i = 0; i < scoreDocs.length; i++) {
163168
scoreDocs[i].shardIndex = i;
164169
}
170+
parallelArray.useSequentialStoredFieldsReader = scoreDocs.length >= 10 && hasSequentialAccess(scoreDocs);
171+
if (parallelArray.useSequentialStoredFieldsReader == false) {
172+
storedFieldsReaderOrd = -1;
173+
storedFieldsReader = null;
174+
}
165175
// for better loading performance we sort the array by docID and
166176
// then visit all leaves in order.
167-
ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.doc));
177+
if (parallelArray.useSequentialStoredFieldsReader == false) {
178+
ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.doc));
179+
}
168180
int docBase = -1;
169181
int maxDoc = 0;
170182
List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
@@ -190,8 +202,19 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray
190202
parallelArray.hasRecoverySource[index] = combinedDocValues.hasRecoverySource(segmentDocID);
191203
}
192204
// now sort back based on the shardIndex. we use this to store the previous index
193-
ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.shardIndex));
205+
if (parallelArray.useSequentialStoredFieldsReader == false) {
206+
ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.shardIndex));
207+
}
208+
}
209+
}
210+
211+
private static boolean hasSequentialAccess(ScoreDoc[] scoreDocs) {
212+
for (int i = 0; i < scoreDocs.length - 1; i++) {
213+
if (scoreDocs[i].doc + 1 != scoreDocs[i + 1].doc) {
214+
return false;
215+
}
194216
}
217+
return true;
195218
}
196219

197220
private TopDocs searchOperations(ScoreDoc after) throws IOException {
@@ -218,7 +241,25 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException {
218241
final String sourceField = parallelArray.hasRecoverySource[docIndex] ? SourceFieldMapper.RECOVERY_SOURCE_NAME :
219242
SourceFieldMapper.NAME;
220243
final FieldsVisitor fields = new FieldsVisitor(true, sourceField);
221-
leaf.reader().document(segmentDocID, fields);
244+
245+
if (parallelArray.useSequentialStoredFieldsReader) {
246+
if (storedFieldsReaderOrd != leaf.ord) {
247+
if (leaf.reader() instanceof SequentialStoredFieldsLeafReader) {
248+
storedFieldsReader = ((SequentialStoredFieldsLeafReader) leaf.reader()).getSequentialStoredFieldsReader();
249+
storedFieldsReaderOrd = leaf.ord;
250+
} else {
251+
storedFieldsReader = null;
252+
storedFieldsReaderOrd = -1;
253+
}
254+
}
255+
}
256+
if (storedFieldsReader != null) {
257+
assert parallelArray.useSequentialStoredFieldsReader;
258+
assert storedFieldsReaderOrd == leaf.ord : storedFieldsReaderOrd + " != " + leaf.ord;
259+
storedFieldsReader.visitDocument(segmentDocID, fields);
260+
} else {
261+
leaf.reader().document(segmentDocID, fields);
262+
}
222263

223264
final Translog.Operation op;
224265
final boolean isTombstone = parallelArray.isTombStone[docIndex];
@@ -270,6 +311,7 @@ private static final class ParallelArray {
270311
final long[] primaryTerm;
271312
final boolean[] isTombStone;
272313
final boolean[] hasRecoverySource;
314+
boolean useSequentialStoredFieldsReader = false;
273315

274316
ParallelArray(int size) {
275317
version = new long[size];
@@ -281,4 +323,8 @@ private static final class ParallelArray {
281323
}
282324
}
283325

326+
// for testing
327+
boolean useSequentialStoredFieldsReader() {
328+
return storedFieldsReader != null;
329+
}
284330
}

server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88

99
package org.elasticsearch.index.engine;
1010

11+
import org.apache.lucene.index.NoMergePolicy;
1112
import org.elasticsearch.common.settings.Settings;
1213
import org.elasticsearch.core.internal.io.IOUtils;
1314
import org.elasticsearch.index.IndexSettings;
1415
import org.elasticsearch.index.mapper.ParsedDocument;
16+
import org.elasticsearch.index.store.Store;
1517
import org.elasticsearch.index.translog.SnapshotMatchers;
1618
import org.elasticsearch.index.translog.Translog;
1719
import org.elasticsearch.test.IndexSettingsModule;
@@ -207,6 +209,50 @@ public void testUpdateAndReadChangesConcurrently() throws Exception {
207209
}
208210
}
209211

212+
public void testAccessStoredFieldsSequentially() throws Exception {
213+
try (Store store = createStore();
214+
Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
215+
int smallBatch = between(5, 9);
216+
long seqNo = 0;
217+
for (int i = 0; i < smallBatch; i++) {
218+
engine.index(replicaIndexForDoc(createParsedDoc(Long.toString(seqNo), null), 1, seqNo, true));
219+
seqNo++;
220+
}
221+
engine.index(replicaIndexForDoc(createParsedDoc(Long.toString(1000), null), 1, 1000, true));
222+
seqNo = 11;
223+
int largeBatch = between(15, 100);
224+
for (int i = 0; i < largeBatch; i++) {
225+
engine.index(replicaIndexForDoc(createParsedDoc(Long.toString(seqNo), null), 1, seqNo, true));
226+
seqNo++;
227+
}
228+
// disable optimization for a small batch
229+
Translog.Operation op;
230+
try (LuceneChangesSnapshot snapshot =
231+
(LuceneChangesSnapshot) engine.newChangesSnapshot("test", 0L, between(1, smallBatch), false)) {
232+
while ((op = snapshot.next()) != null) {
233+
assertFalse(op.toString(), snapshot.useSequentialStoredFieldsReader());
234+
}
235+
assertFalse(snapshot.useSequentialStoredFieldsReader());
236+
}
237+
// enable optimization for sequential access of 10+ docs
238+
try (LuceneChangesSnapshot snapshot =
239+
(LuceneChangesSnapshot) engine.newChangesSnapshot("test", between(1, 3), between(20, 100), false)) {
240+
while ((op = snapshot.next()) != null) {
241+
assertFalse(op.toString(), snapshot.useSequentialStoredFieldsReader());
242+
}
243+
assertFalse(snapshot.useSequentialStoredFieldsReader());
244+
}
245+
// disable optimization for non-sequential accesses
246+
try (LuceneChangesSnapshot snapshot =
247+
(LuceneChangesSnapshot) engine.newChangesSnapshot("test", 11, between(21, 100), false)) {
248+
while ((op = snapshot.next()) != null) {
249+
assertTrue(op.toString(), snapshot.useSequentialStoredFieldsReader());
250+
}
251+
assertTrue(snapshot.useSequentialStoredFieldsReader());
252+
}
253+
}
254+
}
255+
210256
class Follower extends Thread {
211257
private final InternalEngine leader;
212258
private final InternalEngine engine;

0 commit comments

Comments
 (0)