Skip to content

Commit 704a7f7

Browse files
committed
Adds average document size to DocsStats (#27117)
This change is required in order to support a size based check for the index rollover. The index size is estimated by sampling the existing segments only. We prefer using segments to StoreStats because StoreStats is not reliable if indexing or merging operations are in progress. Relates #27004
1 parent 2f1d6bb commit 704a7f7

File tree

6 files changed

+174
-15
lines changed

6 files changed

+174
-15
lines changed

core/src/main/java/org/elasticsearch/index/shard/DocsStats.java

+34-5
Original file line numberDiff line numberDiff line change
@@ -19,34 +19,39 @@
1919

2020
package org.elasticsearch.index.shard;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.common.io.stream.StreamInput;
2324
import org.elasticsearch.common.io.stream.StreamOutput;
2425
import org.elasticsearch.common.io.stream.Streamable;
2526
import org.elasticsearch.common.xcontent.ToXContentFragment;
2627
import org.elasticsearch.common.xcontent.XContentBuilder;
28+
import org.elasticsearch.index.store.StoreStats;
2729

2830
import java.io.IOException;
2931

3032
public class DocsStats implements Streamable, ToXContentFragment {
3133

3234
long count = 0;
3335
long deleted = 0;
36+
long totalSizeInBytes = 0;
3437

3538
public DocsStats() {
3639

3740
}
3841

39-
public DocsStats(long count, long deleted) {
42+
public DocsStats(long count, long deleted, long totalSizeInBytes) {
4043
this.count = count;
4144
this.deleted = deleted;
45+
this.totalSizeInBytes = totalSizeInBytes;
4246
}
4347

44-
public void add(DocsStats docsStats) {
45-
if (docsStats == null) {
48+
public void add(DocsStats other) {
49+
if (other == null) {
4650
return;
4751
}
48-
count += docsStats.count;
49-
deleted += docsStats.deleted;
52+
this.totalSizeInBytes += other.totalSizeInBytes;
53+
this.count += other.count;
54+
this.deleted += other.deleted;
5055
}
5156

5257
public long getCount() {
@@ -57,16 +62,40 @@ public long getDeleted() {
5762
return this.deleted;
5863
}
5964

65+
/**
66+
* Returns the total size in bytes of all documents in this stats.
67+
* This value may be more reliable than {@link StoreStats#getSizeInBytes()} in estimating the index size.
68+
*/
69+
public long getTotalSizeInBytes() {
70+
return totalSizeInBytes;
71+
}
72+
73+
/**
74+
* Returns the average size in bytes of all documents in this stats.
75+
*/
76+
public long getAverageSizeInBytes() {
77+
long totalDocs = count + deleted;
78+
return totalDocs == 0 ? 0 : totalSizeInBytes / totalDocs;
79+
}
80+
6081
@Override
6182
public void readFrom(StreamInput in) throws IOException {
6283
count = in.readVLong();
6384
deleted = in.readVLong();
85+
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
86+
totalSizeInBytes = in.readVLong();
87+
} else {
88+
totalSizeInBytes = -1;
89+
}
6490
}
6591

6692
@Override
6793
public void writeTo(StreamOutput out) throws IOException {
6894
out.writeVLong(count);
6995
out.writeVLong(deleted);
96+
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
97+
out.writeVLong(totalSizeInBytes);
98+
}
7099
}
71100

72101
@Override

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -880,9 +880,18 @@ public FlushStats flushStats() {
880880
}
881881

882882
public DocsStats docStats() {
883-
try (Engine.Searcher searcher = acquireSearcher("doc_stats")) {
884-
return new DocsStats(searcher.reader().numDocs(), searcher.reader().numDeletedDocs());
883+
long numDocs = 0;
884+
long numDeletedDocs = 0;
885+
long sizeInBytes = 0;
886+
List<Segment> segments = segments(false);
887+
for (Segment segment : segments) {
888+
if (segment.search) {
889+
numDocs += segment.getNumDocs();
890+
numDeletedDocs += segment.getDeletedDocs();
891+
sizeInBytes += segment.getSizeInBytes();
892+
}
885893
}
894+
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
886895
}
887896

888897
/**

core/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,12 @@ public void testEvaluateConditions() throws Exception {
8282
.settings(settings)
8383
.build();
8484
final HashSet<Condition> conditions = Sets.newHashSet(maxDocsCondition, maxAgeCondition);
85-
Set<Condition.Result> results = evaluateConditions(conditions, new DocsStats(matchMaxDocs, 0L), metaData);
85+
Set<Condition.Result> results = evaluateConditions(conditions, new DocsStats(matchMaxDocs, 0L, between(1, 10000)), metaData);
8686
assertThat(results.size(), equalTo(2));
8787
for (Condition.Result result : results) {
8888
assertThat(result.matched, equalTo(true));
8989
}
90-
results = evaluateConditions(conditions, new DocsStats(notMatchMaxDocs, 0), metaData);
90+
results = evaluateConditions(conditions, new DocsStats(notMatchMaxDocs, 0, between(1, 10000)), metaData);
9191
assertThat(results.size(), equalTo(2));
9292
for (Condition.Result result : results) {
9393
if (result.condition instanceof MaxAgeCondition) {
@@ -213,10 +213,10 @@ public void testCreateIndexRequest() throws Exception {
213213

214214
private IndicesStatsResponse createIndecesStatResponse(long totalDocs, long primaryDocs) {
215215
final CommonStats primaryStats = mock(CommonStats.class);
216-
when(primaryStats.getDocs()).thenReturn(new DocsStats(primaryDocs, 0));
216+
when(primaryStats.getDocs()).thenReturn(new DocsStats(primaryDocs, 0, between(1, 10000)));
217217

218218
final CommonStats totalStats = mock(CommonStats.class);
219-
when(totalStats.getDocs()).thenReturn(new DocsStats(totalDocs, 0));
219+
when(totalStats.getDocs()).thenReturn(new DocsStats(totalDocs, 0, between(1, 10000)));
220220

221221
final IndicesStatsResponse response = mock(IndicesStatsResponse.class);
222222
when(response.getPrimaries()).thenReturn(primaryStats);

core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public void testErrorCondition() {
7373
assertTrue(
7474
expectThrows(IllegalStateException.class, () ->
7575
TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), state,
76-
(i) -> new DocsStats(Integer.MAX_VALUE, randomIntBetween(1, 1000)), new IndexNameExpressionResolver(Settings.EMPTY))
76+
(i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000), between(1, 100)), new IndexNameExpressionResolver(Settings.EMPTY))
7777
).getMessage().startsWith("Can't merge index with more than [2147483519] docs - too many documents in shards "));
7878

7979

@@ -84,7 +84,7 @@ public void testErrorCondition() {
8484
ClusterState clusterState = createClusterState("source", 8, 1,
8585
Settings.builder().put("index.blocks.write", true).build());
8686
TransportShrinkAction.prepareCreateIndexRequest(req, clusterState,
87-
(i) -> i == 2 || i == 3 ? new DocsStats(Integer.MAX_VALUE/2, randomIntBetween(1, 1000)) : null,
87+
(i) -> i == 2 || i == 3 ? new DocsStats(Integer.MAX_VALUE / 2, between(1, 1000), between(1, 10000)) : null,
8888
new IndexNameExpressionResolver(Settings.EMPTY));
8989
}
9090
).getMessage().startsWith("Can't merge index with more than [2147483519] docs - too many documents in shards "));
@@ -106,7 +106,7 @@ public void testErrorCondition() {
106106
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
107107

108108
TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), clusterState,
109-
(i) -> new DocsStats(randomIntBetween(1, 1000), randomIntBetween(1, 1000)), new IndexNameExpressionResolver(Settings.EMPTY));
109+
(i) -> new DocsStats(between(1, 1000), between(1, 1000), between(0, 10000)), new IndexNameExpressionResolver(Settings.EMPTY));
110110
}
111111

112112
public void testShrinkIndexSettings() {
@@ -128,7 +128,7 @@ public void testShrinkIndexSettings() {
128128
routingTable.index(indexName).shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
129129
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
130130
int numSourceShards = clusterState.metaData().index(indexName).getNumberOfShards();
131-
DocsStats stats = new DocsStats(randomIntBetween(0, (IndexWriter.MAX_DOCS) / numSourceShards), randomIntBetween(1, 1000));
131+
DocsStats stats = new DocsStats(between(0, (IndexWriter.MAX_DOCS) / numSourceShards), between(1, 1000), between(1, 10000));
132132
ShrinkRequest target = new ShrinkRequest("target", indexName);
133133
final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE;
134134
target.setWaitForActiveShards(activeShardCount);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.shard;
21+
22+
import org.elasticsearch.common.bytes.BytesReference;
23+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
24+
import org.elasticsearch.common.io.stream.StreamInput;
25+
import org.elasticsearch.test.ESTestCase;
26+
27+
import static org.hamcrest.Matchers.equalTo;
28+
29+
public class DocsStatsTests extends ESTestCase {
30+
31+
public void testCalculateAverageDocSize() throws Exception {
32+
DocsStats stats = new DocsStats(10, 2, 120);
33+
assertThat(stats.getAverageSizeInBytes(), equalTo(10L));
34+
35+
stats.add(new DocsStats(0, 0, 0));
36+
assertThat(stats.getAverageSizeInBytes(), equalTo(10L));
37+
38+
stats.add(new DocsStats(8, 30, 480));
39+
assertThat(stats.getCount(), equalTo(18L));
40+
assertThat(stats.getDeleted(), equalTo(32L));
41+
assertThat(stats.getTotalSizeInBytes(), equalTo(600L));
42+
assertThat(stats.getAverageSizeInBytes(), equalTo(12L));
43+
}
44+
45+
public void testSerialize() throws Exception {
46+
DocsStats originalStats = new DocsStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
47+
try (BytesStreamOutput out = new BytesStreamOutput()) {
48+
originalStats.writeTo(out);
49+
BytesReference bytes = out.bytes();
50+
try (StreamInput in = bytes.streamInput()) {
51+
DocsStats cloneStats = new DocsStats();
52+
cloneStats.readFrom(in);
53+
assertThat(cloneStats.getCount(), equalTo(originalStats.getCount()));
54+
assertThat(cloneStats.getDeleted(), equalTo(originalStats.getDeleted()));
55+
assertThat(cloneStats.getAverageSizeInBytes(), equalTo(originalStats.getAverageSizeInBytes()));
56+
}
57+
}
58+
}
59+
}

core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

+62
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
6868
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
6969
import org.elasticsearch.common.xcontent.XContentBuilder;
70+
import org.elasticsearch.common.xcontent.XContentFactory;
7071
import org.elasticsearch.common.xcontent.XContentType;
7172
import org.elasticsearch.env.NodeEnvironment;
7273
import org.elasticsearch.index.VersionType;
@@ -87,6 +88,7 @@
8788
import org.elasticsearch.index.seqno.SequenceNumbers;
8889
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
8990
import org.elasticsearch.index.store.Store;
91+
import org.elasticsearch.index.store.StoreStats;
9092
import org.elasticsearch.index.translog.Translog;
9193
import org.elasticsearch.index.translog.TranslogTests;
9294
import org.elasticsearch.indices.IndicesQueryCache;
@@ -150,6 +152,7 @@
150152
import static org.hamcrest.Matchers.hasSize;
151153
import static org.hamcrest.Matchers.hasToString;
152154
import static org.hamcrest.Matchers.instanceOf;
155+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
153156
import static org.hamcrest.Matchers.notNullValue;
154157
import static org.hamcrest.Matchers.nullValue;
155158

@@ -2227,6 +2230,7 @@ public void testDocStats() throws IOException {
22272230
final DocsStats docsStats = indexShard.docStats();
22282231
assertThat(docsStats.getCount(), equalTo(numDocs));
22292232
assertThat(docsStats.getDeleted(), equalTo(0L));
2233+
assertThat(docsStats.getAverageSizeInBytes(), greaterThan(0L));
22302234
}
22312235

22322236
final List<Integer> ids = randomSubsetOf(
@@ -2263,12 +2267,70 @@ public void testDocStats() throws IOException {
22632267
final DocsStats docStats = indexShard.docStats();
22642268
assertThat(docStats.getCount(), equalTo(numDocs));
22652269
assertThat(docStats.getDeleted(), equalTo(0L));
2270+
assertThat(docStats.getAverageSizeInBytes(), greaterThan(0L));
22662271
}
22672272
} finally {
22682273
closeShards(indexShard);
22692274
}
22702275
}
22712276

2277+
public void testEstimateTotalDocSize() throws Exception {
2278+
IndexShard indexShard = null;
2279+
try {
2280+
indexShard = newStartedShard(true);
2281+
2282+
int numDoc = randomIntBetween(100, 200);
2283+
for (int i = 0; i < numDoc; i++) {
2284+
String doc = XContentFactory.jsonBuilder()
2285+
.startObject()
2286+
.field("count", randomInt())
2287+
.field("point", randomFloat())
2288+
.field("description", randomUnicodeOfCodepointLength(100))
2289+
.endObject().string();
2290+
indexDoc(indexShard, "doc", Integer.toString(i), doc);
2291+
}
2292+
2293+
assertThat("Without flushing, segment sizes should be zero",
2294+
indexShard.docStats().getTotalSizeInBytes(), equalTo(0L));
2295+
2296+
indexShard.flush(new FlushRequest());
2297+
indexShard.refresh("test");
2298+
{
2299+
final DocsStats docsStats = indexShard.docStats();
2300+
final StoreStats storeStats = indexShard.storeStats();
2301+
assertThat(storeStats.sizeInBytes(), greaterThan(numDoc * 100L)); // A doc should be more than 100 bytes.
2302+
2303+
assertThat("Estimated total document size is too small compared with the stored size",
2304+
docsStats.getTotalSizeInBytes(), greaterThanOrEqualTo(storeStats.sizeInBytes() * 80/100));
2305+
assertThat("Estimated total document size is too large compared with the stored size",
2306+
docsStats.getTotalSizeInBytes(), lessThanOrEqualTo(storeStats.sizeInBytes() * 120/100));
2307+
}
2308+
2309+
// Do some updates and deletes, then recheck the correlation again.
2310+
for (int i = 0; i < numDoc / 2; i++) {
2311+
if (randomBoolean()) {
2312+
deleteDoc(indexShard, "doc", Integer.toString(i));
2313+
} else {
2314+
indexDoc(indexShard, "doc", Integer.toString(i), "{\"foo\": \"bar\"}");
2315+
}
2316+
}
2317+
2318+
indexShard.flush(new FlushRequest());
2319+
indexShard.refresh("test");
2320+
{
2321+
final DocsStats docsStats = indexShard.docStats();
2322+
final StoreStats storeStats = indexShard.storeStats();
2323+
assertThat("Estimated total document size is too small compared with the stored size",
2324+
docsStats.getTotalSizeInBytes(), greaterThanOrEqualTo(storeStats.sizeInBytes() * 80/100));
2325+
assertThat("Estimated total document size is too large compared with the stored size",
2326+
docsStats.getTotalSizeInBytes(), lessThanOrEqualTo(storeStats.sizeInBytes() * 120/100));
2327+
}
2328+
2329+
} finally {
2330+
closeShards(indexShard);
2331+
}
2332+
}
2333+
22722334
/**
22732335
* here we are simulating the scenario that happens when we do async shard fetching from GatewaySerivce while we are finishing
22742336
* a recovery and concurrently clean files. This should always be possible without any exception. Yet there was a bug where IndexShard

0 commit comments

Comments
 (0)