Skip to content

Commit 4e6a890

Browse files
authored
Add periodic flush count to flush stats (#29360)
Currently, a flush stats contains only the total flush which is the sum of manual flush (via API) and periodic flush (async triggered when the uncommitted translog size is exceeded the flush threshold). Sometimes, it's useful to know these two numbers independently. This commit tracks and returns a periodic flush count in a flush stats.
1 parent 6a6c0ea commit 4e6a890

File tree

4 files changed

+90
-6
lines changed

4 files changed

+90
-6
lines changed

rest-api-spec/src/main/resources/rest-api-spec/test/indices.flush/10_basic.yml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,34 @@
2121
indices.stats: {level: shards}
2222

2323
- is_true: indices.testing.shards.0.0.commit.user_data.sync_id
24+
25+
---
26+
"Flush stats":
27+
- skip:
28+
version: " - 6.99.99"
29+
reason: periodic flush stats is introduced in 7.0
30+
- do:
31+
indices.create:
32+
index: test
33+
body:
34+
settings:
35+
number_of_shards: 1
36+
index.translog.flush_threshold_size: 160b
37+
- do:
38+
indices.flush:
39+
index: test
40+
- do:
41+
indices.stats: { index: test }
42+
- match: { indices.test.primaries.flush.periodic: 0 }
43+
- match: { indices.test.primaries.flush.total: 1 }
44+
- do:
45+
index:
46+
index: test
47+
type: doc
48+
id: 1
49+
body: { "message": "a long message to make a periodic flush happen after this index operation" }
50+
- do:
51+
indices.stats: { index: test }
52+
# periodic flush is async
53+
- gte: { indices.test.primaries.flush.periodic: 0 }
54+
- gte: { indices.test.primaries.flush.total: 1 }

server/src/main/java/org/elasticsearch/index/flush/FlushStats.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.index.flush;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.common.io.stream.StreamInput;
2324
import org.elasticsearch.common.io.stream.StreamOutput;
2425
import org.elasticsearch.common.io.stream.Streamable;
@@ -31,20 +32,22 @@
3132
public class FlushStats implements Streamable, ToXContentFragment {
3233

3334
private long total;
34-
35+
private long periodic;
3536
private long totalTimeInMillis;
3637

3738
public FlushStats() {
3839

3940
}
4041

41-
public FlushStats(long total, long totalTimeInMillis) {
42+
public FlushStats(long total, long periodic, long totalTimeInMillis) {
4243
this.total = total;
44+
this.periodic = periodic;
4345
this.totalTimeInMillis = totalTimeInMillis;
4446
}
4547

46-
public void add(long total, long totalTimeInMillis) {
48+
public void add(long total, long periodic, long totalTimeInMillis) {
4749
this.total += total;
50+
this.periodic += periodic;
4851
this.totalTimeInMillis += totalTimeInMillis;
4952
}
5053

@@ -57,6 +60,7 @@ public void addTotals(FlushStats flushStats) {
5760
return;
5861
}
5962
this.total += flushStats.total;
63+
this.periodic += flushStats.periodic;
6064
this.totalTimeInMillis += flushStats.totalTimeInMillis;
6165
}
6266

@@ -67,6 +71,13 @@ public long getTotal() {
6771
return this.total;
6872
}
6973

74+
/**
75+
* The number of flushes that were periodically triggered when translog exceeded the flush threshold.
76+
*/
77+
public long getPeriodic() {
78+
return periodic;
79+
}
80+
7081
/**
7182
* The total time merges have been executed (in milliseconds).
7283
*/
@@ -85,6 +96,7 @@ public TimeValue getTotalTime() {
8596
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
8697
builder.startObject(Fields.FLUSH);
8798
builder.field(Fields.TOTAL, total);
99+
builder.field(Fields.PERIODIC, periodic);
88100
builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, getTotalTime());
89101
builder.endObject();
90102
return builder;
@@ -93,6 +105,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
93105
static final class Fields {
94106
static final String FLUSH = "flush";
95107
static final String TOTAL = "total";
108+
static final String PERIODIC = "periodic";
96109
static final String TOTAL_TIME = "total_time";
97110
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
98111
}
@@ -101,11 +114,17 @@ static final class Fields {
101114
public void readFrom(StreamInput in) throws IOException {
102115
total = in.readVLong();
103116
totalTimeInMillis = in.readVLong();
117+
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
118+
periodic = in.readVLong();
119+
}
104120
}
105121

106122
@Override
107123
public void writeTo(StreamOutput out) throws IOException {
108124
out.writeVLong(total);
109125
out.writeVLong(totalTimeInMillis);
126+
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
127+
out.writeVLong(periodic);
128+
}
110129
}
111130
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.elasticsearch.common.lease.Releasable;
5858
import org.elasticsearch.common.lease.Releasables;
5959
import org.elasticsearch.common.lucene.Lucene;
60+
import org.elasticsearch.common.metrics.CounterMetric;
6061
import org.elasticsearch.common.metrics.MeanMetric;
6162
import org.elasticsearch.common.settings.Settings;
6263
import org.elasticsearch.common.unit.TimeValue;
@@ -208,6 +209,7 @@ Runnable getGlobalCheckpointSyncer() {
208209
private final RecoveryStats recoveryStats = new RecoveryStats();
209210
private final MeanMetric refreshMetric = new MeanMetric();
210211
private final MeanMetric flushMetric = new MeanMetric();
212+
private final CounterMetric periodicFlushMetric = new CounterMetric();
211213

212214
private final ShardEventListener shardEventListener = new ShardEventListener();
213215

@@ -827,7 +829,7 @@ public RefreshStats refreshStats() {
827829
}
828830

829831
public FlushStats flushStats() {
830-
return new FlushStats(flushMetric.count(), TimeUnit.NANOSECONDS.toMillis(flushMetric.sum()));
832+
return new FlushStats(flushMetric.count(), periodicFlushMetric.count(), TimeUnit.NANOSECONDS.toMillis(flushMetric.sum()));
831833
}
832834

833835
public DocsStats docStats() {
@@ -2344,6 +2346,7 @@ public void onFailure(final Exception e) {
23442346
@Override
23452347
protected void doRun() throws IOException {
23462348
flush(new FlushRequest());
2349+
periodicFlushMetric.inc();
23472350
}
23482351

23492352
@Override

server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.elasticsearch.index.shard;
2020

2121
import org.apache.lucene.store.LockObtainFailedException;
22-
import org.elasticsearch.core.internal.io.IOUtils;
2322
import org.elasticsearch.ExceptionsHelper;
2423
import org.elasticsearch.Version;
2524
import org.elasticsearch.action.ActionListener;
@@ -42,6 +41,7 @@
4241
import org.elasticsearch.cluster.routing.UnassignedInfo;
4342
import org.elasticsearch.cluster.service.ClusterService;
4443
import org.elasticsearch.common.CheckedRunnable;
44+
import org.elasticsearch.common.UUIDs;
4545
import org.elasticsearch.common.breaker.CircuitBreaker;
4646
import org.elasticsearch.common.bytes.BytesArray;
4747
import org.elasticsearch.common.lucene.uid.Versions;
@@ -50,6 +50,7 @@
5050
import org.elasticsearch.common.unit.ByteSizeValue;
5151
import org.elasticsearch.common.unit.TimeValue;
5252
import org.elasticsearch.common.xcontent.XContentType;
53+
import org.elasticsearch.core.internal.io.IOUtils;
5354
import org.elasticsearch.env.Environment;
5455
import org.elasticsearch.env.NodeEnvironment;
5556
import org.elasticsearch.env.ShardLock;
@@ -102,6 +103,7 @@
102103
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
103104
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
104105
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits;
106+
import static org.hamcrest.Matchers.allOf;
105107
import static org.hamcrest.Matchers.containsString;
106108
import static org.hamcrest.Matchers.equalTo;
107109
import static org.hamcrest.Matchers.greaterThan;
@@ -347,6 +349,7 @@ public void testMaybeFlush() throws Exception {
347349
.setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
348350
assertBusy(() -> { // this is async
349351
assertFalse(shard.shouldPeriodicallyFlush());
352+
assertThat(shard.flushStats().getPeriodic(), greaterThan(0L));
350353
});
351354
assertEquals(0, translog.stats().getUncommittedOperations());
352355
translog.sync();
@@ -444,8 +447,12 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception {
444447
if (flush) {
445448
final FlushStats flushStats = shard.flushStats();
446449
final long total = flushStats.getTotal();
450+
final long periodic = flushStats.getPeriodic();
447451
client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get();
448-
check = () -> assertEquals(total + 1, shard.flushStats().getTotal());
452+
check = () -> {
453+
assertThat(shard.flushStats().getTotal(), equalTo(total + 1));
454+
assertThat(shard.flushStats().getPeriodic(), equalTo(periodic + 1));
455+
};
449456
} else {
450457
final long generation = shard.getEngine().getTranslog().currentFileGeneration();
451458
client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get();
@@ -461,6 +468,30 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception {
461468
check.run();
462469
}
463470

471+
public void testFlushStats() throws Exception {
472+
final IndexService indexService = createIndex("test");
473+
ensureGreen();
474+
Settings settings = Settings.builder().put("index.translog.flush_threshold_size", "" + between(200, 300) + "b").build();
475+
client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get();
476+
final int numDocs = between(10, 100);
477+
for (int i = 0; i < numDocs; i++) {
478+
client().prepareIndex("test", "doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get();
479+
}
480+
// A flush stats may include the new total count but the old period count - assert eventually.
481+
assertBusy(() -> {
482+
final FlushStats flushStats = client().admin().indices().prepareStats("test").clear().setFlush(true).get().getTotal().flush;
483+
assertThat(flushStats.getPeriodic(), allOf(equalTo(flushStats.getTotal()), greaterThan(0L)));
484+
});
485+
assertBusy(() -> assertThat(indexService.getShard(0).shouldPeriodicallyFlush(), equalTo(false)));
486+
settings = Settings.builder().put("index.translog.flush_threshold_size", (String) null).build();
487+
client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get();
488+
489+
client().prepareIndex("test", "doc", UUIDs.randomBase64UUID()).setSource("{}", XContentType.JSON).get();
490+
client().admin().indices().prepareFlush("test").setForce(randomBoolean()).setWaitIfOngoing(true).get();
491+
final FlushStats flushStats = client().admin().indices().prepareStats("test").clear().setFlush(true).get().getTotal().flush;
492+
assertThat(flushStats.getTotal(), greaterThan(flushStats.getPeriodic()));
493+
}
494+
464495
public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable {
465496
createIndex("test");
466497
ensureGreen();

0 commit comments

Comments
 (0)