Skip to content

Use sequential access of stored fields in CCR #68961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.index.engine;

import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
Expand All @@ -23,6 +24,7 @@
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
Expand Down Expand Up @@ -55,6 +57,9 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
private final ParallelArray parallelArray;
private final Closeable onClose;

private int storedFieldsReaderOrd = -1;
private StoredFieldsReader storedFieldsReader = null;

/**
* Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range.
*
Expand Down Expand Up @@ -162,9 +167,16 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray
for (int i = 0; i < scoreDocs.length; i++) {
scoreDocs[i].shardIndex = i;
}
parallelArray.useSequentialStoredFieldsReader = scoreDocs.length >= 10 && hasSequentialAccess(scoreDocs);
if (parallelArray.useSequentialStoredFieldsReader == false) {
storedFieldsReaderOrd = -1;
storedFieldsReader = null;
}
// for better loading performance we sort the array by docID and
// then visit all leaves in order.
ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.doc));
if (parallelArray.useSequentialStoredFieldsReader == false) {
ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.doc));
}
int docBase = -1;
int maxDoc = 0;
List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
Expand All @@ -190,8 +202,19 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray
parallelArray.hasRecoverySource[index] = combinedDocValues.hasRecoverySource(segmentDocID);
}
// now sort back based on the shardIndex. we use this to store the previous index
ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.shardIndex));
if (parallelArray.useSequentialStoredFieldsReader == false) {
ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.shardIndex));
}
}
}

private static boolean hasSequentialAccess(ScoreDoc[] scoreDocs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can avoid the loop entirely like in FetchPhase#hasSequentialDocs

Copy link
Member Author

@dnhatn dnhatn Feb 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit different. We can access documents out of order instead of ascending like in the fetch phase.

for (int i = 0; i < scoreDocs.length - 1; i++) {
if (scoreDocs[i].doc + 1 != scoreDocs[i + 1].doc) {
return false;
}
}
return true;
}

private TopDocs searchOperations(ScoreDoc after) throws IOException {
Expand All @@ -218,7 +241,25 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException {
final String sourceField = parallelArray.hasRecoverySource[docIndex] ? SourceFieldMapper.RECOVERY_SOURCE_NAME :
SourceFieldMapper.NAME;
final FieldsVisitor fields = new FieldsVisitor(true, sourceField);
leaf.reader().document(segmentDocID, fields);

if (parallelArray.useSequentialStoredFieldsReader) {
if (storedFieldsReaderOrd != leaf.ord) {
if (leaf.reader() instanceof SequentialStoredFieldsLeafReader) {
storedFieldsReader = ((SequentialStoredFieldsLeafReader) leaf.reader()).getSequentialStoredFieldsReader();
storedFieldsReaderOrd = leaf.ord;
} else {
storedFieldsReader = null;
storedFieldsReaderOrd = -1;
}
}
}
if (storedFieldsReader != null) {
assert parallelArray.useSequentialStoredFieldsReader;
assert storedFieldsReaderOrd == leaf.ord : storedFieldsReaderOrd + " != " + leaf.ord;
storedFieldsReader.visitDocument(segmentDocID, fields);
} else {
leaf.reader().document(segmentDocID, fields);
}

final Translog.Operation op;
final boolean isTombstone = parallelArray.isTombStone[docIndex];
Expand Down Expand Up @@ -270,6 +311,7 @@ private static final class ParallelArray {
final long[] primaryTerm;
final boolean[] isTombStone;
final boolean[] hasRecoverySource;
boolean useSequentialStoredFieldsReader = false;

ParallelArray(int size) {
version = new long[size];
Expand All @@ -281,4 +323,8 @@ private static final class ParallelArray {
}
}

// for testing
boolean useSequentialStoredFieldsReader() {
return storedFieldsReader != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@

package org.elasticsearch.index.engine;

import org.apache.lucene.index.NoMergePolicy;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.IndexSettingsModule;
Expand Down Expand Up @@ -207,6 +209,50 @@ public void testUpdateAndReadChangesConcurrently() throws Exception {
}
}

public void testAccessStoredFieldsSequentially() throws Exception {
try (Store store = createStore();
Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
int smallBatch = between(5, 9);
long seqNo = 0;
for (int i = 0; i < smallBatch; i++) {
engine.index(replicaIndexForDoc(createParsedDoc(Long.toString(seqNo), null), 1, seqNo, true));
seqNo++;
}
engine.index(replicaIndexForDoc(createParsedDoc(Long.toString(1000), null), 1, 1000, true));
seqNo = 11;
int largeBatch = between(15, 100);
for (int i = 0; i < largeBatch; i++) {
engine.index(replicaIndexForDoc(createParsedDoc(Long.toString(seqNo), null), 1, seqNo, true));
seqNo++;
}
// disable optimization for a small batch
Translog.Operation op;
try (LuceneChangesSnapshot snapshot =
(LuceneChangesSnapshot) engine.newChangesSnapshot("test", 0L, between(1, smallBatch), false)) {
while ((op = snapshot.next()) != null) {
assertFalse(op.toString(), snapshot.useSequentialStoredFieldsReader());
}
assertFalse(snapshot.useSequentialStoredFieldsReader());
}
// enable optimization for sequential access of 10+ docs
try (LuceneChangesSnapshot snapshot =
(LuceneChangesSnapshot) engine.newChangesSnapshot("test", between(1, 3), between(20, 100), false)) {
while ((op = snapshot.next()) != null) {
assertFalse(op.toString(), snapshot.useSequentialStoredFieldsReader());
}
assertFalse(snapshot.useSequentialStoredFieldsReader());
}
// disable optimization for non-sequential accesses
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesn't seem to correspond with the test below?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I fixed in 3eb853d.

try (LuceneChangesSnapshot snapshot =
(LuceneChangesSnapshot) engine.newChangesSnapshot("test", 11, between(21, 100), false)) {
while ((op = snapshot.next()) != null) {
assertTrue(op.toString(), snapshot.useSequentialStoredFieldsReader());
}
assertTrue(snapshot.useSequentialStoredFieldsReader());
}
}
}

class Follower extends Thread {
private final InternalEngine leader;
private final InternalEngine engine;
Expand Down