Skip to content

Adds average document size to DocsStats #27117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions core/src/main/java/org/elasticsearch/index/shard/DocsStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,39 @@

package org.elasticsearch.index.shard;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.store.StoreStats;

import java.io.IOException;

public class DocsStats implements Streamable, ToXContentFragment {

long count = 0;
long deleted = 0;
long totalSizeInBytes = 0;

public DocsStats() {

}

public DocsStats(long count, long deleted) {
public DocsStats(long count, long deleted, long totalSizeInBytes) {
this.count = count;
this.deleted = deleted;
this.totalSizeInBytes = totalSizeInBytes;
}

public void add(DocsStats docsStats) {
if (docsStats == null) {
public void add(DocsStats other) {
if (other == null) {
return;
}
count += docsStats.count;
deleted += docsStats.deleted;
this.totalSizeInBytes += other.totalSizeInBytes;
this.count += other.count;
this.deleted += other.deleted;
}

public long getCount() {
Expand All @@ -57,16 +62,40 @@ public long getDeleted() {
return this.deleted;
}

/**
* Returns the total size in bytes of all documents in this stats.
* This value may be more reliable than {@link StoreStats#getSizeInBytes()} in estimating the index size.
*/
public long getTotalSizeInBytes() {
return totalSizeInBytes;
}

/**
* Returns the average size in bytes of all documents in this stats.
*/
public long getAverageSizeInBytes() {
long totalDocs = count + deleted;
return totalDocs == 0 ? 0 : totalSizeInBytes / totalDocs;
}

@Override
public void readFrom(StreamInput in) throws IOException {
count = in.readVLong();
deleted = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this should be V_6_1_0 since you will be backporting this to the 6.x branch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpountz recommended to make this for v7, then change for the backport later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct, it should be 7.0.0. Then when you backport set it to 6.1.0 in the 6.x branch and make sure that the BWC tests in master against 6.x pass (you might have to skip some of them). Then push a commit to master flipping the version to 6.1.0 and removing the skips.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh okay, I hadn't realized we were doing it the reverse way now :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you mean by the reverse way, these are part of the steps to have green CI on all branches every step of the way.

totalSizeInBytes = in.readVLong();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set totalSizeInBytes to -1 to indicate that it cannot be read and not that there are 0 bytes in use for 6.x nodes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 96e9be7

} else {
totalSizeInBytes = -1;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(count);
out.writeVLong(deleted);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here for V_6_1_0

out.writeVLong(totalSizeInBytes);
}
}

@Override
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -880,9 +880,18 @@ public FlushStats flushStats() {
}

public DocsStats docStats() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should rather be something like this:

public DocsStats docStats() {
        long numDocs = 0;
        long numDeletedDocs = 0;
        long sizeInByte = 0;
        List<Segment> segments = segments(false);
        for (Segment segment : segments) {
            if (segment.search) {
                numDocs += segment.getNumDocs();
                numDeletedDocs += segment.getDeletedDocs();
                sizeInByte += segment.getSizeInBytes();
            }
        }
        return new DocsStats(numDocs, numDeletedDocs, sizeInByte);
    }

that way we maintain a consistent total and we can calculate the average at read time and aggregation of doc stats will be much simpler? I also think we should make sure the size in bytes is based on the currently used reader which is guaranteed by the Segment#search flag. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this makes the DocsStats simpler and the average value more accurate. I have updated this in 662f062

try (Engine.Searcher searcher = acquireSearcher("doc_stats")) {
return new DocsStats(searcher.reader().numDocs(), searcher.reader().numDeletedDocs());
long numDocs = 0;
long numDeletedDocs = 0;
long sizeInBytes = 0;
List<Segment> segments = segments(false);
for (Segment segment : segments) {
if (segment.search) {
numDocs += segment.getNumDocs();
numDeletedDocs += segment.getDeletedDocs();
sizeInBytes += segment.getSizeInBytes();
}
}
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ public void testEvaluateConditions() throws Exception {
.settings(settings)
.build();
final HashSet<Condition> conditions = Sets.newHashSet(maxDocsCondition, maxAgeCondition);
Set<Condition.Result> results = evaluateConditions(conditions, new DocsStats(matchMaxDocs, 0L), metaData);
Set<Condition.Result> results = evaluateConditions(conditions, new DocsStats(matchMaxDocs, 0L, between(1, 10000)), metaData);
assertThat(results.size(), equalTo(2));
for (Condition.Result result : results) {
assertThat(result.matched, equalTo(true));
}
results = evaluateConditions(conditions, new DocsStats(notMatchMaxDocs, 0), metaData);
results = evaluateConditions(conditions, new DocsStats(notMatchMaxDocs, 0, between(1, 10000)), metaData);
assertThat(results.size(), equalTo(2));
for (Condition.Result result : results) {
if (result.condition instanceof MaxAgeCondition) {
Expand Down Expand Up @@ -213,10 +213,10 @@ public void testCreateIndexRequest() throws Exception {

private IndicesStatsResponse createIndecesStatResponse(long totalDocs, long primaryDocs) {
final CommonStats primaryStats = mock(CommonStats.class);
when(primaryStats.getDocs()).thenReturn(new DocsStats(primaryDocs, 0));
when(primaryStats.getDocs()).thenReturn(new DocsStats(primaryDocs, 0, between(1, 10000)));

final CommonStats totalStats = mock(CommonStats.class);
when(totalStats.getDocs()).thenReturn(new DocsStats(totalDocs, 0));
when(totalStats.getDocs()).thenReturn(new DocsStats(totalDocs, 0, between(1, 10000)));

final IndicesStatsResponse response = mock(IndicesStatsResponse.class);
when(response.getPrimaries()).thenReturn(primaryStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void testErrorCondition() {
assertTrue(
expectThrows(IllegalStateException.class, () ->
TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), state,
(i) -> new DocsStats(Integer.MAX_VALUE, randomIntBetween(1, 1000)), new IndexNameExpressionResolver(Settings.EMPTY))
(i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000), between(1, 100)), new IndexNameExpressionResolver(Settings.EMPTY))
).getMessage().startsWith("Can't merge index with more than [2147483519] docs - too many documents in shards "));


Expand All @@ -84,7 +84,7 @@ public void testErrorCondition() {
ClusterState clusterState = createClusterState("source", 8, 1,
Settings.builder().put("index.blocks.write", true).build());
TransportShrinkAction.prepareCreateIndexRequest(req, clusterState,
(i) -> i == 2 || i == 3 ? new DocsStats(Integer.MAX_VALUE/2, randomIntBetween(1, 1000)) : null,
(i) -> i == 2 || i == 3 ? new DocsStats(Integer.MAX_VALUE / 2, between(1, 1000), between(1, 10000)) : null,
new IndexNameExpressionResolver(Settings.EMPTY));
}
).getMessage().startsWith("Can't merge index with more than [2147483519] docs - too many documents in shards "));
Expand All @@ -106,7 +106,7 @@ public void testErrorCondition() {
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), clusterState,
(i) -> new DocsStats(randomIntBetween(1, 1000), randomIntBetween(1, 1000)), new IndexNameExpressionResolver(Settings.EMPTY));
(i) -> new DocsStats(between(1, 1000), between(1, 1000), between(0, 10000)), new IndexNameExpressionResolver(Settings.EMPTY));
}

public void testShrinkIndexSettings() {
Expand All @@ -128,7 +128,7 @@ public void testShrinkIndexSettings() {
routingTable.index(indexName).shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
int numSourceShards = clusterState.metaData().index(indexName).getNumberOfShards();
DocsStats stats = new DocsStats(randomIntBetween(0, (IndexWriter.MAX_DOCS) / numSourceShards), randomIntBetween(1, 1000));
DocsStats stats = new DocsStats(between(0, (IndexWriter.MAX_DOCS) / numSourceShards), between(1, 1000), between(1, 10000));
ShrinkRequest target = new ShrinkRequest("target", indexName);
final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE;
target.setWaitForActiveShards(activeShardCount);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.shard;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.Matchers.equalTo;

public class DocsStatsTests extends ESTestCase {

public void testCalculateAverageDocSize() throws Exception {
DocsStats stats = new DocsStats(10, 2, 120);
assertThat(stats.getAverageSizeInBytes(), equalTo(10L));

stats.add(new DocsStats(0, 0, 0));
assertThat(stats.getAverageSizeInBytes(), equalTo(10L));

stats.add(new DocsStats(8, 30, 480));
assertThat(stats.getCount(), equalTo(18L));
assertThat(stats.getDeleted(), equalTo(32L));
assertThat(stats.getTotalSizeInBytes(), equalTo(600L));
assertThat(stats.getAverageSizeInBytes(), equalTo(12L));
}

public void testSerialize() throws Exception {
DocsStats originalStats = new DocsStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
try (BytesStreamOutput out = new BytesStreamOutput()) {
originalStats.writeTo(out);
BytesReference bytes = out.bytes();
try (StreamInput in = bytes.streamInput()) {
DocsStats cloneStats = new DocsStats();
cloneStats.readFrom(in);
assertThat(cloneStats.getCount(), equalTo(originalStats.getCount()));
assertThat(cloneStats.getDeleted(), equalTo(originalStats.getDeleted()));
assertThat(cloneStats.getAverageSizeInBytes(), equalTo(originalStats.getAverageSizeInBytes()));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.VersionType;
Expand All @@ -87,6 +88,7 @@
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogTests;
import org.elasticsearch.indices.IndicesQueryCache;
Expand Down Expand Up @@ -150,6 +152,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

Expand Down Expand Up @@ -2227,6 +2230,7 @@ public void testDocStats() throws IOException {
final DocsStats docsStats = indexShard.docStats();
assertThat(docsStats.getCount(), equalTo(numDocs));
assertThat(docsStats.getDeleted(), equalTo(0L));
assertThat(docsStats.getAverageSizeInBytes(), greaterThan(0L));
}

final List<Integer> ids = randomSubsetOf(
Expand Down Expand Up @@ -2263,12 +2267,70 @@ public void testDocStats() throws IOException {
final DocsStats docStats = indexShard.docStats();
assertThat(docStats.getCount(), equalTo(numDocs));
assertThat(docStats.getDeleted(), equalTo(0L));
assertThat(docStats.getAverageSizeInBytes(), greaterThan(0L));
}
} finally {
closeShards(indexShard);
}
}

public void testEstimateTotalDocSize() throws Exception {
IndexShard indexShard = null;
try {
indexShard = newStartedShard(true);

int numDoc = randomIntBetween(100, 200);
for (int i = 0; i < numDoc; i++) {
String doc = XContentFactory.jsonBuilder()
.startObject()
.field("count", randomInt())
.field("point", randomFloat())
.field("description", randomUnicodeOfCodepointLength(100))
.endObject().string();
indexDoc(indexShard, "doc", Integer.toString(i), doc);
}

assertThat("Without flushing, segment sizes should be zero",
indexShard.docStats().getTotalSizeInBytes(), equalTo(0L));

indexShard.flush(new FlushRequest());
indexShard.refresh("test");
{
final DocsStats docsStats = indexShard.docStats();
final StoreStats storeStats = indexShard.storeStats();
assertThat(storeStats.sizeInBytes(), greaterThan(numDoc * 100L)); // A doc should be more than 100 bytes.

assertThat("Estimated total document size is too small compared with the stored size",
docsStats.getTotalSizeInBytes(), greaterThanOrEqualTo(storeStats.sizeInBytes() * 80/100));
assertThat("Estimated total document size is too large compared with the stored size",
docsStats.getTotalSizeInBytes(), lessThanOrEqualTo(storeStats.sizeInBytes() * 120/100));
}

// Do some updates and deletes, then recheck the correlation again.
for (int i = 0; i < numDoc / 2; i++) {
if (randomBoolean()) {
deleteDoc(indexShard, "doc", Integer.toString(i));
} else {
indexDoc(indexShard, "doc", Integer.toString(i), "{\"foo\": \"bar\"}");
}
}

indexShard.flush(new FlushRequest());
indexShard.refresh("test");
{
final DocsStats docsStats = indexShard.docStats();
final StoreStats storeStats = indexShard.storeStats();
assertThat("Estimated total document size is too small compared with the stored size",
docsStats.getTotalSizeInBytes(), greaterThanOrEqualTo(storeStats.sizeInBytes() * 80/100));
assertThat("Estimated total document size is too large compared with the stored size",
docsStats.getTotalSizeInBytes(), lessThanOrEqualTo(storeStats.sizeInBytes() * 120/100));
}

} finally {
closeShards(indexShard);
}
}

/**
* here we are simulating the scenario that happens when we do async shard fetching from GatewaySerivce while we are finishing
* a recovery and concurrently clean files. This should always be possible without any exception. Yet there was a bug where IndexShard
Expand Down