Skip to content

Total data set size in stats (#70625) #71057

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/reference/cluster/nodes-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,16 @@ Total size of all shards assigned to the node.
(integer)
Total size, in bytes, of all shards assigned to the node.

`total_data_set_size`::
(<<byte-units,byte value>>)
Total data set size of all shards assigned to the node.
This includes the size of shards not stored fully on the node (shared cache searchable snapshots).

`total_data_set_size_in_bytes`::
(integer)
Total data set size, in bytes, of all shards assigned to the node.
This includes the size of shards not stored fully on the node (shared cache searchable snapshots).

`reserved`::
(<<byte-units,byte value>>)
A prediction of how much larger the shard stores on this node will eventually
Expand Down
12 changes: 12 additions & 0 deletions docs/reference/cluster/stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,16 @@ Total size of all shards assigned to selected nodes.
(integer)
Total size, in bytes, of all shards assigned to selected nodes.

`total_data_set_size`::
(<<byte-units, byte units>>)
Total data set size of all shards assigned to selected nodes.
This includes the size of shards not stored fully on the nodes (shared cache searchable snapshots).

`total_data_set_size_in_bytes`::
(integer)
Total data set size, in bytes, of all shards assigned to selected nodes.
This includes the size of shards not stored fully on the nodes (shared cache searchable snapshots).

`reserved`::
(<<byte-units,byte value>>)
A prediction of how much larger the shard stores will eventually grow due to
Expand Down Expand Up @@ -1238,6 +1248,8 @@ The API returns the following response:
"store": {
"size": "16.2kb",
"size_in_bytes": 16684,
"total_data_set_size": "16.2kb",
"total_data_set_size_in_bytes": 16684,
"reserved": "0b",
"reserved_in_bytes": 0
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,35 @@
---
"Store stats":
- skip:
version: " - 7.99.99"
reason: "reserved_in_bytes field is not returned in prior versions"
features: [arbitrary_key]
version: " - 7.12.99"
reason: "total_data_set_size added in 7.13"

- do:
nodes.info:
node_id: _master
- set:
nodes._arbitrary_key_: master

- do:
nodes.stats:
metric: [ indices ]
index_metric: [ store ]

- is_false: nodes.$master.discovery
- is_true: nodes.$master.indices.store
- gte: { nodes.$master.indices.store.size_in_bytes: 0 }
- gte: { nodes.$master.indices.store.reserved_in_bytes: -1 }
- set:
nodes.$master.indices.store.size_in_bytes: size_in_bytes
- match: { nodes.$master.indices.store.total_data_set_size_in_bytes: $size_in_bytes }

---
"Store stats bwc":
- skip:
features: [arbitrary_key]
version: " - 7.8.99"
reason: "reserved_in_bytes field is not returned in prior versions"

- do:
nodes.info:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -373,7 +374,7 @@ private long getAvgShardSizeInBytes() throws IOException {
long sum = 0;
int count = 0;
for (IndexShard indexShard : this) {
sum += indexShard.store().stats(0L).sizeInBytes();
sum += indexShard.store().stats(0L, LongUnaryOperator.identity()).sizeInBytes();
count++;
}
if (count == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -1085,15 +1086,16 @@ public GetStats getStats() {
}

public StoreStats storeStats() {
if (DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.get(indexSettings.getSettings())) {
// if this shard has no disk footprint then its size is reported as 0
return new StoreStats(0, 0);
}
try {
final RecoveryState recoveryState = this.recoveryState;
final long bytesStillToRecover = recoveryState == null ? -1L : recoveryState.getIndex().bytesStillToRecover();
final long reservedBytes = bytesStillToRecover == -1 ? StoreStats.UNKNOWN_RESERVED_BYTES : bytesStillToRecover;
return store.stats(reservedBytes);
if (DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.get(indexSettings.getSettings())) {
// if this shard has no disk footprint then its local size is reported as 0
return store.stats(0, size -> 0);
} else {
final long bytesStillToRecover = recoveryState == null ? -1L : recoveryState.getIndex().bytesStillToRecover();
final long reservedBytes = bytesStillToRecover == -1 ? StoreStats.UNKNOWN_RESERVED_BYTES : bytesStillToRecover;
return store.stats(reservedBytes, LongUnaryOperator.identity());
}
} catch (IOException e) {
failShard("Failing shard because of exception during storeStats", e);
throw new ElasticsearchException("io exception while building 'store stats'", e);
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.LongUnaryOperator;
import java.util.zip.CRC32;
import java.util.zip.Checksum;

Expand Down Expand Up @@ -346,10 +347,12 @@ public CheckIndex.Status checkIndex(PrintStream out) throws IOException {

/**
* @param reservedBytes a prediction of how much larger the store is expected to grow, or {@link StoreStats#UNKNOWN_RESERVED_BYTES}.
* @param localSizeFunction to calculate the local size of the shard based on the shard size.
*/
public StoreStats stats(long reservedBytes) throws IOException {
public StoreStats stats(long reservedBytes, LongUnaryOperator localSizeFunction) throws IOException {
ensureOpen();
return new StoreStats(directory.estimateSize(), reservedBytes);
long sizeInBytes = directory.estimateSize();
return new StoreStats(localSizeFunction.applyAsLong(sizeInBytes), sizeInBytes, reservedBytes);
}

/**
Expand Down
28 changes: 26 additions & 2 deletions server/src/main/java/org/elasticsearch/index/store/StoreStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ public class StoreStats implements Writeable, ToXContentFragment {
public static final long UNKNOWN_RESERVED_BYTES = -1L;

public static final Version RESERVED_BYTES_VERSION = Version.V_7_9_0;
public static final Version TOTAL_DATA_SET_SIZE_SIZE_VERSION = Version.V_7_13_0;

private long sizeInBytes;
private long totalDataSetSizeInBytes;
private long reservedSize;

public StoreStats() {
Expand All @@ -40,6 +42,11 @@ public StoreStats(StreamInput in) throws IOException {
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
in.readVLong(); // throttleTimeInNanos
}
if (in.getVersion().onOrAfter(TOTAL_DATA_SET_SIZE_SIZE_VERSION)) {
totalDataSetSizeInBytes = in.readVLong();
} else {
totalDataSetSizeInBytes = sizeInBytes;
}
if (in.getVersion().onOrAfter(RESERVED_BYTES_VERSION)) {
reservedSize = in.readZLong();
} else {
Expand All @@ -49,19 +56,22 @@ public StoreStats(StreamInput in) throws IOException {

/**
* @param sizeInBytes the size of the store in bytes
* @param totalDataSetSizeInBytes the size of the total data set in bytes, can differ from sizeInBytes for shards using shared cache
* storage
* @param reservedSize a prediction of how much larger the store is expected to grow, or {@link StoreStats#UNKNOWN_RESERVED_BYTES}.
*/
public StoreStats(long sizeInBytes, long reservedSize) {
public StoreStats(long sizeInBytes, long totalDataSetSizeInBytes, long reservedSize) {
assert reservedSize == UNKNOWN_RESERVED_BYTES || reservedSize >= 0 : reservedSize;
this.sizeInBytes = sizeInBytes;
this.totalDataSetSizeInBytes = totalDataSetSizeInBytes;
this.reservedSize = reservedSize;
}

public void add(StoreStats stats) {
if (stats == null) {
return;
}
sizeInBytes += stats.sizeInBytes;
totalDataSetSizeInBytes += stats.totalDataSetSizeInBytes;
reservedSize = ignoreIfUnknown(reservedSize) + ignoreIfUnknown(stats.reservedSize);
}

Expand All @@ -85,6 +95,14 @@ public ByteSizeValue getSize() {
return size();
}

public ByteSizeValue totalDataSetSize() {
return new ByteSizeValue(totalDataSetSizeInBytes);
}

public ByteSizeValue getTotalDataSetSize() {
return totalDataSetSize();
}

/**
* A prediction of how much larger this store will eventually grow. For instance, if we are currently doing a peer recovery or restoring
* a snapshot into this store then we can account for the rest of the recovery using this field. A value of {@code -1B} indicates that
Expand All @@ -100,6 +118,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_6_0_0_alpha1)) {
out.writeVLong(0L); // throttleTimeInNanos
}
if (out.getVersion().onOrAfter(TOTAL_DATA_SET_SIZE_SIZE_VERSION)) {
out.writeVLong(totalDataSetSizeInBytes);
}
if (out.getVersion().onOrAfter(RESERVED_BYTES_VERSION)) {
out.writeZLong(reservedSize);
}
Expand All @@ -109,6 +130,7 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.STORE);
builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, size());
builder.humanReadableField(Fields.TOTAL_DATA_SET_SIZE_IN_BYTES, Fields.TOTAL_DATA_SET_SIZE, totalDataSetSize());
builder.humanReadableField(Fields.RESERVED_IN_BYTES, Fields.RESERVED, getReservedSize());
builder.endObject();
return builder;
Expand All @@ -118,6 +140,8 @@ static final class Fields {
static final String STORE = "store";
static final String SIZE = "size";
static final String SIZE_IN_BYTES = "size_in_bytes";
static final String TOTAL_DATA_SET_SIZE = "total_data_set_size";
static final String TOTAL_DATA_SET_SIZE_IN_BYTES = "total_data_set_size_in_bytes";
static final String RESERVED = "reserved";
static final String RESERVED_IN_BYTES = "reserved_in_bytes";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testCreation() {
Path path = createTempDir().resolve("indices").resolve(shardRouting.shardId().getIndex().getUUID())
.resolve(String.valueOf(shardRouting.shardId().id()));
IndexShard indexShard = mock(IndexShard.class);
StoreStats storeStats = new StoreStats(100, 200);
StoreStats storeStats = new StoreStats(100, 150, 200);
when(indexShard.storeStats()).thenReturn(storeStats);
ShardStats shardStats = new ShardStats(shardRouting, new ShardPath(false, path, path, shardRouting.shardId()),
new CommonStats(null, indexShard, new CommonStatsFlags(CommonStatsFlags.Flag.Store)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public void testErrorCondition() {
assertTrue(
expectThrows(IllegalStateException.class, () ->
TransportResizeAction.prepareCreateIndexRequest(new ResizeRequest("target", "source"), state,
new StoreStats(between(1, 100), between(1, 100)), (i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000),
new StoreStats(between(1, 100), between(0, 100), between(1, 100)),
(i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000),
between(1, 100)), "target")
).getMessage().startsWith("Can't merge index with more than [2147483519] docs - too many documents in shards "));

Expand All @@ -82,7 +83,7 @@ public void testErrorCondition() {
TransportResizeAction.prepareCreateIndexRequest(req,
createClusterState("source", 8, 1,
Settings.builder().put("index.blocks.write", true).build()).metadata().index("source"),
new StoreStats(between(1, 100), between(1, 100)),
new StoreStats(between(1, 100), between(0, 100), between(1, 100)),
(i) -> i == 2 || i == 3 ? new DocsStats(Integer.MAX_VALUE / 2, between(1, 1000), between(1, 10000)) : null
, "target");
}
Expand All @@ -96,7 +97,7 @@ public void testErrorCondition() {
createClusterState("source", 8, 1,
Settings.builder().put("index.blocks.write", true).put("index.soft_deletes.enabled", true).build())
.metadata().index("source"),
new StoreStats(between(1, 100), between(1, 100)),
new StoreStats(between(1, 100), between(0, 100), between(1, 100)),
(i) -> new DocsStats(between(10, 1000), between(1, 10), between(1, 10000)), "target");
});
assertThat(softDeletesError.getMessage(), equalTo("Can't disable [index.soft_deletes.enabled] setting on resize"));
Expand All @@ -117,7 +118,7 @@ public void testErrorCondition() {
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

TransportResizeAction.prepareCreateIndexRequest(new ResizeRequest("target", "source"), clusterState.metadata().index("source"),
new StoreStats(between(1, 100), between(1, 100)),
new StoreStats(between(1, 100), between(0, 100), between(1, 100)),
(i) -> new DocsStats(between(1, 1000), between(1, 1000), between(0, 10000)), "target");
}

Expand All @@ -141,15 +142,17 @@ public void testPassNumRoutingShards() {
resizeRequest.getTargetIndexRequest()
.settings(Settings.builder().put("index.number_of_shards", 2).build());
IndexMetadata indexMetadata = clusterState.metadata().index("source");
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, indexMetadata, new StoreStats(between(1, 100), between(1, 100)),
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, indexMetadata,
new StoreStats(between(1, 100), between(0, 100), between(1, 100)),
null, "target");

resizeRequest.getTargetIndexRequest()
.settings(Settings.builder()
.put("index.number_of_routing_shards", randomIntBetween(2, 10))
.put("index.number_of_shards", 2)
.build());
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, indexMetadata, new StoreStats(between(1, 100), between(1, 100)),
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, indexMetadata,
new StoreStats(between(1, 100), between(0, 100), between(1, 100)),
null, "target");
}

Expand All @@ -174,7 +177,7 @@ public void testPassNumRoutingShardsAndFail() {
resizeRequest.getTargetIndexRequest()
.settings(Settings.builder().put("index.number_of_shards", numShards * 2).build());
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, clusterState.metadata().index("source"),
new StoreStats(between(1, 100), between(1, 100)), null, "target");
new StoreStats(between(1, 100), between(0, 100), between(1, 100)), null, "target");

resizeRequest.getTargetIndexRequest()
.settings(Settings.builder()
Expand All @@ -183,7 +186,7 @@ public void testPassNumRoutingShardsAndFail() {
ClusterState finalState = clusterState;
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class,
() -> TransportResizeAction.prepareCreateIndexRequest(resizeRequest, finalState.metadata().index("source"),
new StoreStats(between(1, 100), between(1, 100)), null, "target"));
new StoreStats(between(1, 100), between(0, 100), between(1, 100)), null, "target"));
assertEquals("cannot provide index.number_of_routing_shards on resize", iae.getMessage());
}

Expand Down Expand Up @@ -211,7 +214,8 @@ public void testShrinkIndexSettings() {
final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE;
target.setWaitForActiveShards(activeShardCount);
CreateIndexClusterStateUpdateRequest request = TransportResizeAction.prepareCreateIndexRequest(
target, clusterState.metadata().index(indexName), new StoreStats(between(1, 100), between(1, 100)), (i) -> stats, "target");
target, clusterState.metadata().index(indexName),
new StoreStats(between(1, 100), between(0, 100), between(1, 100)), (i) -> stats, "target");
assertNotNull(request.recoverFrom());
assertEquals(indexName, request.recoverFrom().getName());
assertEquals("1", request.settings().get("index.number_of_shards"));
Expand Down Expand Up @@ -242,7 +246,8 @@ public void testShrinkWithMaxPrimaryShardSize() {
.settings(Settings.builder().put("index.number_of_shards", 2).build());
assertTrue(
expectThrows(IllegalArgumentException.class, () ->
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, state, new StoreStats(between(1, 100), between(1, 100)),
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, state,
new StoreStats(between(1, 100), between(0, 100), between(1, 100)),
(i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000), between(1, 100)), "target")
).getMessage().startsWith("Cannot set both index.number_of_shards and max_primary_shard_size for the target index"));

Expand All @@ -268,7 +273,7 @@ public void testShrinkWithMaxPrimaryShardSize() {
// each shard's storage will not be greater than the `max_primary_shard_size`
ResizeRequest target1 = new ResizeRequest("target", "source");
target1.setMaxPrimaryShardSize(new ByteSizeValue(2));
StoreStats storeStats = new StoreStats(10, between(1, 100));
StoreStats storeStats = new StoreStats(10, between(0, 100), between(1, 100));
final int targetIndexShardsNum1 = 5;
final ActiveShardCount activeShardCount1 = ActiveShardCount.from(targetIndexShardsNum1);
target1.setWaitForActiveShards(targetIndexShardsNum1);
Expand All @@ -285,7 +290,7 @@ public void testShrinkWithMaxPrimaryShardSize() {
// the shards number of the target index will be equal to the source index's shards number
ResizeRequest target2 = new ResizeRequest("target2", "source");
target2.setMaxPrimaryShardSize(new ByteSizeValue(1));
StoreStats storeStats2 = new StoreStats(100, between(1, 100));
StoreStats storeStats2 = new StoreStats(100, between(0, 100), between(1, 100));
final int targetIndexShardsNum2 = 10;
final ActiveShardCount activeShardCount2 = ActiveShardCount.from(targetIndexShardsNum2);
target2.setWaitForActiveShards(activeShardCount2);
Expand Down
Loading