Skip to content

Commit eb31490

Browse files
Conor Landrydnhatn
Conor Landry
authored andcommitted
Expose external refreshes through the stats API (elastic#38643)
Right now, the stats API only provides refresh metrics regarding internal refreshes. This isn't very useful and somewhat misleading for cluster administrators since the internal refreshes are not indicative of documents being available for search. In this PR I added a new metric for collecting external refreshes as they occur and exposing them through the stats API. Now, calling an endpoint for stats will yield external refresh metrics as well. Relates elastic#36712
1 parent 421d765 commit eb31490

File tree

12 files changed

+181
-55
lines changed

12 files changed

+181
-55
lines changed

rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
---
22
"Help":
3-
3+
- skip:
4+
version: " - 7.0.99"
5+
reason: external refresh stats were added in 7.1.0
46
- do:
57
cat.shards:
68
help: true
@@ -52,6 +54,8 @@
5254
merges.total_time .+ \n
5355
refresh.total .+ \n
5456
refresh.time .+ \n
57+
refresh.external_total .+ \n
58+
refresh.external_time .+ \n
5559
refresh.listeners .+ \n
5660
search.fetch_current .+ \n
5761
search.fetch_time .+ \n

server/src/main/java/org/elasticsearch/index/refresh/RefreshStats.java

+50-11
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.index.refresh;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.common.io.stream.StreamInput;
2324
import org.elasticsearch.common.io.stream.StreamOutput;
2425
import org.elasticsearch.common.io.stream.Streamable;
@@ -36,6 +37,10 @@ public class RefreshStats implements Streamable, Writeable, ToXContentFragment {
3637

3738
private long totalTimeInMillis;
3839

40+
private long externalTotal;
41+
42+
private long externalTotalTimeInMillis;
43+
3944
/**
4045
* Number of waiting refresh listeners.
4146
*/
@@ -47,12 +52,29 @@ public RefreshStats() {
4752
public RefreshStats(StreamInput in) throws IOException {
4853
total = in.readVLong();
4954
totalTimeInMillis = in.readVLong();
55+
if (in.getVersion().onOrAfter(Version.V_7_1_0)) {
56+
externalTotal = in.readVLong();
57+
externalTotalTimeInMillis = in.readVLong();
58+
}
5059
listeners = in.readVInt();
5160
}
5261

53-
public RefreshStats(long total, long totalTimeInMillis, int listeners) {
62+
@Override
63+
public void writeTo(StreamOutput out) throws IOException {
64+
out.writeVLong(total);
65+
out.writeVLong(totalTimeInMillis);
66+
if (out.getVersion().onOrAfter(Version.V_7_1_0)) {
67+
out.writeVLong(externalTotal);
68+
out.writeVLong(externalTotalTimeInMillis);
69+
}
70+
out.writeVInt(listeners);
71+
}
72+
73+
public RefreshStats(long total, long totalTimeInMillis, long externalTotal, long externalTotalTimeInMillis, int listeners) {
5474
this.total = total;
5575
this.totalTimeInMillis = totalTimeInMillis;
76+
this.externalTotal = externalTotal;
77+
this.externalTotalTimeInMillis = externalTotalTimeInMillis;
5678
this.listeners = listeners;
5779
}
5880

@@ -66,6 +88,8 @@ public void addTotals(RefreshStats refreshStats) {
6688
}
6789
this.total += refreshStats.total;
6890
this.totalTimeInMillis += refreshStats.totalTimeInMillis;
91+
this.externalTotal += refreshStats.externalTotal;
92+
this.externalTotalTimeInMillis += refreshStats.externalTotalTimeInMillis;
6993
this.listeners += refreshStats.listeners;
7094
}
7195

@@ -76,20 +100,38 @@ public long getTotal() {
76100
return this.total;
77101
}
78102

103+
/*
104+
* The total number of external refresh executed.
105+
*/
106+
public long getExternalTotal() { return this.externalTotal; }
107+
79108
/**
80-
* The total time merges have been executed (in milliseconds).
109+
* The total time spent executing refreshes (in milliseconds).
81110
*/
82111
public long getTotalTimeInMillis() {
83112
return this.totalTimeInMillis;
84113
}
85114

86115
/**
87-
* The total time merges have been executed.
116+
* The total time spent executing external refreshes (in milliseconds).
117+
*/
118+
public long getExternalTotalTimeInMillis() {
119+
return this.externalTotalTimeInMillis;
120+
}
121+
122+
/**
123+
* The total time refreshes have been executed.
88124
*/
89125
public TimeValue getTotalTime() {
90126
return new TimeValue(totalTimeInMillis);
91127
}
92128

129+
/**
130+
* The total time external refreshes have been executed.
131+
*/
132+
public TimeValue getExternalTotalTime() {
133+
return new TimeValue(externalTotalTimeInMillis);
134+
}
93135
/**
94136
* The number of waiting refresh listeners.
95137
*/
@@ -102,6 +144,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
102144
builder.startObject("refresh");
103145
builder.field("total", total);
104146
builder.humanReadableField("total_time_in_millis", "total_time", getTotalTime());
147+
builder.field("external_total", externalTotal);
148+
builder.humanReadableField("external_total_time_in_millis", "external_total_time", getExternalTotalTime());
105149
builder.field("listeners", listeners);
106150
builder.endObject();
107151
return builder;
@@ -112,13 +156,6 @@ public void readFrom(StreamInput in) throws IOException {
112156
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
113157
}
114158

115-
@Override
116-
public void writeTo(StreamOutput out) throws IOException {
117-
out.writeVLong(total);
118-
out.writeVLong(totalTimeInMillis);
119-
out.writeVInt(listeners);
120-
}
121-
122159
@Override
123160
public boolean equals(Object obj) {
124161
if (obj == null || obj.getClass() != RefreshStats.class) {
@@ -127,11 +164,13 @@ public boolean equals(Object obj) {
127164
RefreshStats rhs = (RefreshStats) obj;
128165
return total == rhs.total
129166
&& totalTimeInMillis == rhs.totalTimeInMillis
167+
&& externalTotal == rhs.externalTotal
168+
&& externalTotalTimeInMillis == rhs.externalTotalTimeInMillis
130169
&& listeners == rhs.listeners;
131170
}
132171

133172
@Override
134173
public int hashCode() {
135-
return Objects.hash(total, totalTimeInMillis, listeners);
174+
return Objects.hash(total, totalTimeInMillis, externalTotal, externalTotalTimeInMillis, listeners);
136175
}
137176
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ Runnable getGlobalCheckpointSyncer() {
222222

223223
private final RecoveryStats recoveryStats = new RecoveryStats();
224224
private final MeanMetric refreshMetric = new MeanMetric();
225+
private final MeanMetric externalRefreshMetric = new MeanMetric();
225226
private final MeanMetric flushMetric = new MeanMetric();
226227
private final CounterMetric periodicFlushMetric = new CounterMetric();
227228

@@ -932,7 +933,12 @@ public long getWritingBytes() {
932933

933934
public RefreshStats refreshStats() {
934935
int listeners = refreshListeners.pendingCount();
935-
return new RefreshStats(refreshMetric.count(), TimeUnit.NANOSECONDS.toMillis(refreshMetric.sum()), listeners);
936+
return new RefreshStats(
937+
refreshMetric.count(),
938+
TimeUnit.NANOSECONDS.toMillis(refreshMetric.sum()),
939+
externalRefreshMetric.count(),
940+
TimeUnit.NANOSECONDS.toMillis(externalRefreshMetric.sum()),
941+
listeners);
936942
}
937943

938944
public FlushStats flushStats() {
@@ -2900,7 +2906,8 @@ private RefreshListeners buildRefreshListeners() {
29002906
indexSettings::getMaxRefreshListeners,
29012907
() -> refresh("too_many_listeners"),
29022908
threadPool.executor(ThreadPool.Names.LISTENER)::execute,
2903-
logger, threadPool.getThreadContext());
2909+
logger, threadPool.getThreadContext(),
2910+
externalRefreshMetric);
29042911
}
29052912

29062913
/**

server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.lucene.search.ReferenceManager;
2424
import org.elasticsearch.common.collect.Tuple;
2525
import org.elasticsearch.common.lease.Releasable;
26+
import org.elasticsearch.common.metrics.MeanMetric;
2627
import org.elasticsearch.common.util.concurrent.RunOnce;
2728
import org.elasticsearch.common.util.concurrent.ThreadContext;
2829
import org.elasticsearch.index.translog.Translog;
@@ -50,6 +51,12 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
5051
private final Executor listenerExecutor;
5152
private final Logger logger;
5253
private final ThreadContext threadContext;
54+
private final MeanMetric refreshMetric;
55+
56+
/**
57+
* Time in nanosecond when beforeRefresh() is called. Used for calculating refresh metrics.
58+
*/
59+
private long currentRefreshStartTime;
5360

5461
/**
5562
* Is this closed? If true then we won't add more listeners and have flushed all pending listeners.
@@ -76,12 +83,13 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
7683
private volatile Translog.Location lastRefreshedLocation;
7784

7885
public RefreshListeners(IntSupplier getMaxRefreshListeners, Runnable forceRefresh, Executor listenerExecutor, Logger logger,
79-
ThreadContext threadContext) {
86+
ThreadContext threadContext, MeanMetric refreshMetric) {
8087
this.getMaxRefreshListeners = getMaxRefreshListeners;
8188
this.forceRefresh = forceRefresh;
8289
this.listenerExecutor = listenerExecutor;
8390
this.logger = logger;
8491
this.threadContext = threadContext;
92+
this.refreshMetric = refreshMetric;
8593
}
8694

8795
/**
@@ -204,10 +212,14 @@ public void setCurrentRefreshLocationSupplier(Supplier<Translog.Location> curren
204212
@Override
205213
public void beforeRefresh() throws IOException {
206214
currentRefreshLocation = currentRefreshLocationSupplier.get();
215+
currentRefreshStartTime = System.nanoTime();
207216
}
208217

209218
@Override
210219
public void afterRefresh(boolean didRefresh) throws IOException {
220+
// Increment refresh metric before communicating to listeners.
221+
refreshMetric.inc(System.nanoTime() - currentRefreshStartTime);
222+
211223
/* We intentionally ignore didRefresh here because our timing is a little off. It'd be a useful flag if we knew everything that made
212224
* it into the refresh, but the way we snapshot the translog position before the refresh, things can sneak into the refresh that we
213225
* don't know about. */

server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java

+14
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,14 @@ protected Table getTableWithHeader(final RestRequest request) {
289289
table.addCell("refresh.time", "sibling:pri;alias:rti,refreshTime;default:false;text-align:right;desc:time spent in refreshes");
290290
table.addCell("pri.refresh.time", "default:false;text-align:right;desc:time spent in refreshes");
291291

292+
table.addCell("refresh.external_total",
293+
"sibling:pri;alias:rto,refreshTotal;default:false;text-align:right;desc:total external refreshes");
294+
table.addCell("pri.refresh.external_total", "default:false;text-align:right;desc:total external refreshes");
295+
296+
table.addCell("refresh.external_time",
297+
"sibling:pri;alias:rti,refreshTime;default:false;text-align:right;desc:time spent in external refreshes");
298+
table.addCell("pri.refresh.external_time", "default:false;text-align:right;desc:time spent in external refreshes");
299+
292300
table.addCell("refresh.listeners",
293301
"sibling:pri;alias:rli,refreshListeners;default:false;text-align:right;desc:number of pending refresh listeners");
294302
table.addCell("pri.refresh.listeners", "default:false;text-align:right;desc:number of pending refresh listeners");
@@ -562,6 +570,12 @@ Table buildTable(final RestRequest request,
562570
table.addCell(totalStats.getRefresh() == null ? null : totalStats.getRefresh().getTotalTime());
563571
table.addCell(primaryStats.getRefresh() == null ? null : primaryStats.getRefresh().getTotalTime());
564572

573+
table.addCell(totalStats.getRefresh() == null ? null : totalStats.getRefresh().getExternalTotal());
574+
table.addCell(primaryStats.getRefresh() == null ? null : primaryStats.getRefresh().getExternalTotal());
575+
576+
table.addCell(totalStats.getRefresh() == null ? null : totalStats.getRefresh().getExternalTotalTime());
577+
table.addCell(primaryStats.getRefresh() == null ? null : primaryStats.getRefresh().getExternalTotalTime());
578+
565579
table.addCell(totalStats.getRefresh() == null ? null : totalStats.getRefresh().getListeners());
566580
table.addCell(primaryStats.getRefresh() == null ? null : primaryStats.getRefresh().getListeners());
567581

server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java

+5
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ protected Table getTableWithHeader(final RestRequest request) {
201201

202202
table.addCell("refresh.total", "alias:rto,refreshTotal;default:false;text-align:right;desc:total refreshes");
203203
table.addCell("refresh.time", "alias:rti,refreshTime;default:false;text-align:right;desc:time spent in refreshes");
204+
table.addCell("refresh.external_total", "alias:rto,refreshTotal;default:false;text-align:right;desc:total external refreshes");
205+
table.addCell("refresh.external_time",
206+
"alias:rti,refreshTime;default:false;text-align:right;desc:time spent in external refreshes");
204207
table.addCell("refresh.listeners", "alias:rli,refreshListeners;default:false;text-align:right;"
205208
+ "desc:number of pending refresh listeners");
206209

@@ -378,6 +381,8 @@ Table buildTable(boolean fullId, RestRequest req, ClusterStateResponse state, No
378381
RefreshStats refreshStats = indicesStats == null ? null : indicesStats.getRefresh();
379382
table.addCell(refreshStats == null ? null : refreshStats.getTotal());
380383
table.addCell(refreshStats == null ? null : refreshStats.getTotalTime());
384+
table.addCell(refreshStats == null ? null : refreshStats.getExternalTotal());
385+
table.addCell(refreshStats == null ? null : refreshStats.getExternalTotalTime());
381386
table.addCell(refreshStats == null ? null : refreshStats.getListeners());
382387

383388
ScriptStats scriptStats = stats == null ? null : stats.getScriptStats();

server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java

+5
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,9 @@ protected Table getTableWithHeader(final RestRequest request) {
164164

165165
table.addCell("refresh.total", "alias:rto,refreshTotal;default:false;text-align:right;desc:total refreshes");
166166
table.addCell("refresh.time", "alias:rti,refreshTime;default:false;text-align:right;desc:time spent in refreshes");
167+
table.addCell("refresh.external_total", "alias:rto,refreshTotal;default:false;text-align:right;desc:total external refreshes");
168+
table.addCell("refresh.external_time",
169+
"alias:rti,refreshTime;default:false;text-align:right;desc:time spent in external refreshes");
167170
table.addCell("refresh.listeners",
168171
"alias:rli,refreshListeners;default:false;text-align:right;desc:number of pending refresh listeners");
169172

@@ -319,6 +322,8 @@ private Table buildTable(RestRequest request, ClusterStateResponse state, Indice
319322

320323
table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getTotal));
321324
table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getTotalTime));
325+
table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getExternalTotal));
326+
table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getExternalTotalTime));
322327
table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getListeners));
323328

324329
table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getFetchCurrent()));

server/src/test/java/org/elasticsearch/index/refresh/RefreshStatsTests.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,17 @@
2828
public class RefreshStatsTests extends ESTestCase {
2929

3030
public void testSerialize() throws IOException {
31-
RefreshStats stats = new RefreshStats(randomNonNegativeLong(), randomNonNegativeLong(), between(0, Integer.MAX_VALUE));
31+
RefreshStats stats = new RefreshStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
32+
randomNonNegativeLong(), between(0, Integer.MAX_VALUE));
3233
BytesStreamOutput out = new BytesStreamOutput();
3334
stats.writeTo(out);
3435
StreamInput input = out.bytes().streamInput();
3536
RefreshStats read = new RefreshStats(input);
3637
assertEquals(-1, input.read());
3738
assertEquals(stats.getTotal(), read.getTotal());
39+
assertEquals(stats.getExternalTotal(), read.getExternalTotal());
3840
assertEquals(stats.getListeners(), read.getListeners());
3941
assertEquals(stats.getTotalTimeInMillis(), read.getTotalTimeInMillis());
42+
assertEquals(stats.getExternalTotalTimeInMillis(), read.getExternalTotalTimeInMillis());
4043
}
4144
}

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

+27
Original file line numberDiff line numberDiff line change
@@ -1513,6 +1513,33 @@ public void testRefreshMetric() throws IOException {
15131513
closeShards(shard);
15141514
}
15151515

1516+
public void testExternalRefreshMetric() throws IOException {
1517+
IndexShard shard = newStartedShard();
1518+
assertThat(shard.refreshStats().getExternalTotal(), equalTo(2L)); // refresh on: finalize and end of recovery
1519+
long initialTotalTime = shard.refreshStats().getExternalTotalTimeInMillis();
1520+
// check time advances
1521+
for (int i = 1; shard.refreshStats().getExternalTotalTimeInMillis() == initialTotalTime; i++) {
1522+
indexDoc(shard, "_doc", "test");
1523+
assertThat(shard.refreshStats().getExternalTotal(), equalTo(2L + i - 1));
1524+
shard.refresh("test");
1525+
assertThat(shard.refreshStats().getExternalTotal(), equalTo(2L + i));
1526+
assertThat(shard.refreshStats().getExternalTotalTimeInMillis(), greaterThanOrEqualTo(initialTotalTime));
1527+
}
1528+
long externalRefreshCount = shard.refreshStats().getExternalTotal();
1529+
1530+
indexDoc(shard, "_doc", "test");
1531+
try (Engine.GetResult ignored = shard.get(new Engine.Get(true, false, "_doc", "test",
1532+
new Term(IdFieldMapper.NAME, Uid.encodeId("test"))))) {
1533+
assertThat(shard.refreshStats().getExternalTotal(), equalTo(externalRefreshCount));
1534+
assertThat(shard.refreshStats().getExternalTotal(), equalTo(shard.refreshStats().getTotal() - 1));
1535+
}
1536+
indexDoc(shard, "_doc", "test");
1537+
shard.writeIndexingBuffer();
1538+
assertThat(shard.refreshStats().getExternalTotal(), equalTo(externalRefreshCount));
1539+
assertThat(shard.refreshStats().getExternalTotal(), equalTo(shard.refreshStats().getTotal() - 2));
1540+
closeShards(shard);
1541+
}
1542+
15161543
public void testIndexingOperationsListeners() throws IOException {
15171544
IndexShard shard = newStartedShard(true);
15181545
indexDoc(shard, "_doc", "0", "{\"foo\" : \"bar\"}");

server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.common.bytes.BytesReference;
3434
import org.elasticsearch.common.lease.Releasable;
3535
import org.elasticsearch.common.lucene.uid.Versions;
36+
import org.elasticsearch.common.metrics.MeanMetric;
3637
import org.elasticsearch.common.settings.Settings;
3738
import org.elasticsearch.common.unit.TimeValue;
3839
import org.elasticsearch.common.util.BigArrays;
@@ -89,20 +90,23 @@ public class RefreshListenersTests extends ESTestCase {
8990
private volatile int maxListeners;
9091
private ThreadPool threadPool;
9192
private Store store;
93+
private MeanMetric refreshMetric;
9294

9395
@Before
9496
public void setupListeners() throws Exception {
9597
// Setup dependencies of the listeners
9698
maxListeners = randomIntBetween(1, 1000);
9799
// Now setup the InternalEngine which is much more complicated because we aren't mocking anything
98100
threadPool = new TestThreadPool(getTestName());
101+
refreshMetric = new MeanMetric();
99102
listeners = new RefreshListeners(
100103
() -> maxListeners,
101104
() -> engine.refresh("too-many-listeners"),
102105
// Immediately run listeners rather than adding them to the listener thread pool like IndexShard does to simplify the test.
103106
Runnable::run,
104107
logger,
105-
threadPool.getThreadContext());
108+
threadPool.getThreadContext(),
109+
refreshMetric);
106110

107111
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY);
108112
ShardId shardId = new ShardId(new Index("index", "_na_"), 1);

0 commit comments

Comments
 (0)