Skip to content

Ensure doc_stats are changing even if refresh is disabled #27505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions core/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,19 @@
import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.FieldDoc;
Expand Down Expand Up @@ -650,6 +653,21 @@ public static Version parseVersionLenient(String toParse, Version defaultValue)
return LenientParser.parse(toParse, defaultValue);
}

/**
* Tries to extract a segment reader from the given index reader.
* If no SegmentReader can be extracted an {@link IllegalStateException} is thrown.
*/
public static SegmentReader segmentReader(LeafReader reader) {
if (reader instanceof SegmentReader) {
return (SegmentReader) reader;
} else if (reader instanceof FilterLeafReader) {
final FilterLeafReader fReader = (FilterLeafReader) reader;
return segmentReader(FilterLeafReader.unwrap(fReader));
}
// hard fail - we can't get a SegmentReader
throw new IllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
}

@SuppressForbidden(reason = "Version#parseLeniently() used in a central place")
private static final class LenientParser {
public static Version parse(String toParse, Version defaultValue) {
Expand Down
22 changes: 3 additions & 19 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexReader;
Expand Down Expand Up @@ -143,27 +142,12 @@ protected static long guardedRamBytesUsed(Accountable a) {
return a.ramBytesUsed();
}

/**
* Tries to extract a segment reader from the given index reader.
* If no SegmentReader can be extracted an {@link IllegalStateException} is thrown.
*/
protected static SegmentReader segmentReader(LeafReader reader) {
if (reader instanceof SegmentReader) {
return (SegmentReader) reader;
} else if (reader instanceof FilterLeafReader) {
final FilterLeafReader fReader = (FilterLeafReader) reader;
return segmentReader(FilterLeafReader.unwrap(fReader));
}
// hard fail - we can't get a SegmentReader
throw new IllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
}

/**
* Returns whether a leaf reader comes from a merge (versus flush or addIndexes).
*/
protected static boolean isMergedSegment(LeafReader reader) {
// We expect leaves to be segment readers
final Map<String, String> diagnostics = segmentReader(reader).getSegmentInfo().info.getDiagnostics();
final Map<String, String> diagnostics = Lucene.segmentReader(reader).getSegmentInfo().info.getDiagnostics();
final String source = diagnostics.get(IndexWriter.SOURCE);
assert Arrays.asList(IndexWriter.SOURCE_ADDINDEXES_READERS, IndexWriter.SOURCE_FLUSH,
IndexWriter.SOURCE_MERGE).contains(source) : "Unknown source " + source;
Expand Down Expand Up @@ -611,7 +595,7 @@ public final SegmentsStats segmentsStats(boolean includeSegmentFileSizes) {
try (Searcher searcher = acquireSearcher("segments_stats")) {
SegmentsStats stats = new SegmentsStats();
for (LeafReaderContext reader : searcher.reader().leaves()) {
final SegmentReader segmentReader = segmentReader(reader.reader());
final SegmentReader segmentReader = Lucene.segmentReader(reader.reader());
stats.add(1, segmentReader.ramBytesUsed());
stats.addTermsMemoryInBytes(guardedRamBytesUsed(segmentReader.getPostingsReader()));
stats.addStoredFieldsMemoryInBytes(guardedRamBytesUsed(segmentReader.getFieldsReader()));
Expand Down Expand Up @@ -718,7 +702,7 @@ protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boole
// first, go over and compute the search ones...
try (Searcher searcher = acquireSearcher("segments")){
for (LeafReaderContext reader : searcher.reader().leaves()) {
final SegmentReader segmentReader = segmentReader(reader.reader());
final SegmentReader segmentReader = Lucene.segmentReader(reader.reader());
SegmentCommitInfo info = segmentReader.getSegmentInfo();
assert !segments.containsKey(info.info.name);
Segment segment = new Segment(info.info.name);
Expand Down
30 changes: 22 additions & 8 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@

import com.carrotsearch.hppc.ObjectLongMap;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
Expand Down Expand Up @@ -58,7 +62,6 @@
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -151,7 +154,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand Down Expand Up @@ -856,15 +858,27 @@ public FlushStats flushStats() {
}

public DocsStats docStats() {
// we calculate the doc stats based on the internal reader that is more up-to-date and not subject
// to external refreshes. For instance we don't refresh an external reader if we flush and indices with
// index.refresh_interval=-1 won't see any doc stats updates at all. This change will give more accurate statistics
// when indexing but not refreshing in general. Yet, if a refresh happens the internal reader is refresh as well so we are
// safe here.
long numDocs = 0;
long numDeletedDocs = 0;
long sizeInBytes = 0;
List<Segment> segments = segments(false);
for (Segment segment : segments) {
if (segment.search) {
numDocs += segment.getNumDocs();
numDeletedDocs += segment.getDeletedDocs();
sizeInBytes += segment.getSizeInBytes();
try (Engine.Searcher searcher = acquireSearcher("docStats", Engine.SearcherScope.INTERNAL)) {
for (LeafReaderContext reader : searcher.reader().leaves()) {
// we go on the segment level here to get accurate numbers
final SegmentReader segmentReader = Lucene.segmentReader(reader.reader());
SegmentCommitInfo info = segmentReader.getSegmentInfo();
numDocs += reader.reader().numDocs();
numDeletedDocs += reader.reader().numDeletedDocs();
try {
sizeInBytes += info.sizeInBytes();
} catch (IOException e) {
logger.trace((org.apache.logging.log4j.util.Supplier<?>)
() -> new ParameterizedMessage("failed to get size for [{}]", info.info.name), e);
}
}
}
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2269,11 +2269,17 @@ public void testDocStats() throws IOException {
final String id = Integer.toString(i);
indexDoc(indexShard, "test", id);
}

indexShard.refresh("test");
if (randomBoolean()) {
indexShard.refresh("test");
} else {
indexShard.flush(new FlushRequest());
}
{
final DocsStats docsStats = indexShard.docStats();
assertThat(docsStats.getCount(), equalTo(numDocs));
try (Engine.Searcher searcher = indexShard.acquireSearcher("test")) {
assertTrue(searcher.reader().numDocs() <= docsStats.getCount());
}
assertThat(docsStats.getDeleted(), equalTo(0L));
assertThat(docsStats.getAverageSizeInBytes(), greaterThan(0L));
}
Expand All @@ -2293,9 +2299,14 @@ public void testDocStats() throws IOException {
flushRequest.waitIfOngoing(false);
indexShard.flush(flushRequest);

indexShard.refresh("test");
if (randomBoolean()) {
indexShard.refresh("test");
}
{
final DocsStats docStats = indexShard.docStats();
try (Engine.Searcher searcher = indexShard.acquireSearcher("test")) {
assertTrue(searcher.reader().numDocs() <= docStats.getCount());
}
assertThat(docStats.getCount(), equalTo(numDocs));
// Lucene will delete a segment if all docs are deleted from it; this means that we lose the deletes when deleting all docs
assertThat(docStats.getDeleted(), equalTo(numDocsToDelete == numDocs ? 0 : numDocsToDelete));
Expand All @@ -2307,7 +2318,11 @@ public void testDocStats() throws IOException {
forceMergeRequest.maxNumSegments(1);
indexShard.forceMerge(forceMergeRequest);

indexShard.refresh("test");
if (randomBoolean()) {
indexShard.refresh("test");
} else {
indexShard.flush(new FlushRequest());
}
{
final DocsStats docStats = indexShard.docStats();
assertThat(docStats.getCount(), equalTo(numDocs));
Expand Down Expand Up @@ -2338,8 +2353,11 @@ public void testEstimateTotalDocSize() throws Exception {
assertThat("Without flushing, segment sizes should be zero",
indexShard.docStats().getTotalSizeInBytes(), equalTo(0L));

indexShard.flush(new FlushRequest());
indexShard.refresh("test");
if (randomBoolean()) {
indexShard.flush(new FlushRequest());
} else {
indexShard.refresh("test");
}
{
final DocsStats docsStats = indexShard.docStats();
final StoreStats storeStats = indexShard.storeStats();
Expand All @@ -2359,9 +2377,11 @@ public void testEstimateTotalDocSize() throws Exception {
indexDoc(indexShard, "doc", Integer.toString(i), "{\"foo\": \"bar\"}");
}
}

indexShard.flush(new FlushRequest());
indexShard.refresh("test");
if (randomBoolean()) {
indexShard.flush(new FlushRequest());
} else {
indexShard.refresh("test");
}
{
final DocsStats docsStats = indexShard.docStats();
final StoreStats storeStats = indexShard.storeStats();
Expand Down