Skip to content

Commit ef64225

Browse files
committed
Stats : Add time in index throttle to stats.
This commit adds throttle stats to the indexing stats and uses a call back from InternalEngine to manage the stats. Also includes updates the IndexStatsTests to test for these new stats. Stats added : ``` throttle_time_in_millis is_throttled ``` Closes #7861
1 parent 46e9e46 commit ef64225

File tree

4 files changed

+176
-6
lines changed

4 files changed

+176
-6
lines changed

src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ public void start() throws EngineException {
272272
try {
273273
this.indexWriter = createWriter();
274274
mergeScheduler.removeListener(this.throttle);
275-
this.throttle = new IndexThrottle(mergeScheduler, logger);
275+
this.throttle = new IndexThrottle(mergeScheduler, logger, indexingService);
276276
mergeScheduler.addListener(throttle);
277277
} catch (IOException e) {
278278
maybeFailEngine(e, "start");
@@ -844,7 +844,8 @@ public void flush(Flush flush) throws EngineException {
844844
currentIndexWriter().close(false);
845845
indexWriter = createWriter();
846846
mergeScheduler.removeListener(this.throttle);
847-
this.throttle = new IndexThrottle(mergeScheduler, this.logger);
847+
848+
this.throttle = new IndexThrottle(mergeScheduler, this.logger, indexingService);
848849
mergeScheduler.addListener(throttle);
849850
// commit on a just opened writer will commit even if there are no changes done to it
850851
// we rely on that for the commit data translog id key
@@ -1716,20 +1717,23 @@ boolean assertLockIsHeld() {
17161717
}
17171718

17181719

1719-
private static final class IndexThrottle implements MergeSchedulerProvider.Listener {
1720+
1721+
static final class IndexThrottle implements MergeSchedulerProvider.Listener {
17201722

17211723
private static final InternalLock NOOP_LOCK = new InternalLock(new NoOpLock());
17221724
private final InternalLock lockReference = new InternalLock(new ReentrantLock());
17231725
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
17241726
private final AtomicBoolean isThrottling = new AtomicBoolean();
17251727
private final MergeSchedulerProvider mergeScheduler;
17261728
private final ESLogger logger;
1729+
private final ShardIndexingService indexingService;
17271730

17281731
private volatile InternalLock lock = NOOP_LOCK;
17291732

1730-
public IndexThrottle(MergeSchedulerProvider mergeScheduler, ESLogger logger) {
1733+
public IndexThrottle(MergeSchedulerProvider mergeScheduler, ESLogger logger, ShardIndexingService indexingService) {
17311734
this.mergeScheduler = mergeScheduler;
17321735
this.logger = logger;
1736+
this.indexingService = indexingService;
17331737
}
17341738

17351739
public Releasable acquireThrottle() {
@@ -1742,6 +1746,7 @@ public synchronized void beforeMerge(OnGoingMerge merge) {
17421746
if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
17431747
if (isThrottling.getAndSet(true) == false) {
17441748
logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
1749+
indexingService.throttlingActivated();
17451750
}
17461751
lock = lockReference;
17471752
}
@@ -1753,10 +1758,12 @@ public synchronized void afterMerge(OnGoingMerge merge) {
17531758
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
17541759
if (isThrottling.getAndSet(false)) {
17551760
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
1761+
indexingService.throttlingDeactivated();
17561762
}
17571763
lock = NOOP_LOCK;
17581764
}
17591765
}
1766+
17601767
}
17611768

17621769
private static final class NoOpLock implements Lock {

src/main/java/org/elasticsearch/index/indexing/IndexingStats.java

+50-1
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,23 @@ public static class Stats implements Streamable, ToXContent {
4949

5050
private long noopUpdateCount;
5151

52+
private long throttleTimeInMillis;
53+
private boolean isThrottled;
54+
5255
Stats() {
5356

5457
}
5558

56-
public Stats(long indexCount, long indexTimeInMillis, long indexCurrent, long deleteCount, long deleteTimeInMillis, long deleteCurrent, long noopUpdateCount) {
59+
public Stats(long indexCount, long indexTimeInMillis, long indexCurrent, long deleteCount, long deleteTimeInMillis, long deleteCurrent, long noopUpdateCount, boolean isThrottled, long throttleTimeInMillis) {
5760
this.indexCount = indexCount;
5861
this.indexTimeInMillis = indexTimeInMillis;
5962
this.indexCurrent = indexCurrent;
6063
this.deleteCount = deleteCount;
6164
this.deleteTimeInMillis = deleteTimeInMillis;
6265
this.deleteCurrent = deleteCurrent;
6366
this.noopUpdateCount = noopUpdateCount;
67+
this.isThrottled = isThrottled;
68+
this.throttleTimeInMillis = throttleTimeInMillis;
6469
}
6570

6671
public void add(Stats stats) {
@@ -73,6 +78,10 @@ public void add(Stats stats) {
7378
deleteCurrent += stats.deleteCurrent;
7479

7580
noopUpdateCount += stats.noopUpdateCount;
81+
throttleTimeInMillis += stats.throttleTimeInMillis;
82+
if (isThrottled != stats.isThrottled) {
83+
isThrottled = true; //When combining if one is throttled set result to throttled.
84+
}
7685
}
7786

7887
public long getIndexCount() {
@@ -95,6 +104,30 @@ public long getDeleteCount() {
95104
return deleteCount;
96105
}
97106

107+
/**
108+
* Returns if the index is under merge throttling control
109+
* @return
110+
*/
111+
public boolean isThrottled() {
112+
return isThrottled;
113+
}
114+
115+
/**
116+
* Gets the amount of time in milliseconds that the index has been under merge throttling control
117+
* @return
118+
*/
119+
public long getThrottleTimeInMillis() {
120+
return throttleTimeInMillis;
121+
}
122+
123+
/**
124+
* Gets the amount of time in a TimeValue that the index has been under merge throttling control
125+
* @return
126+
*/
127+
public TimeValue getThrottleTime() {
128+
return new TimeValue(throttleTimeInMillis);
129+
}
130+
98131
public TimeValue getDeleteTime() {
99132
return new TimeValue(deleteTimeInMillis);
100133
}
@@ -130,6 +163,11 @@ public void readFrom(StreamInput in) throws IOException {
130163
if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
131164
noopUpdateCount = in.readVLong();
132165
}
166+
167+
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
168+
isThrottled = in.readBoolean();
169+
throttleTimeInMillis = in.readLong();
170+
}
133171
}
134172

135173
@Override
@@ -145,6 +183,12 @@ public void writeTo(StreamOutput out) throws IOException {
145183
if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
146184
out.writeVLong(noopUpdateCount);
147185
}
186+
187+
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
188+
out.writeBoolean(isThrottled);
189+
out.writeLong(throttleTimeInMillis);
190+
}
191+
148192
}
149193

150194
@Override
@@ -159,6 +203,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
159203

160204
builder.field(Fields.NOOP_UPDATE_TOTAL, noopUpdateCount);
161205

206+
builder.field(Fields.IS_THROTTLED, isThrottled);
207+
builder.timeValueField(Fields.THROTTLED_TIME_IN_MILLIS, Fields.THROTTLED_TIME, throttleTimeInMillis);
162208
return builder;
163209
}
164210
}
@@ -239,6 +285,9 @@ static final class Fields {
239285
static final XContentBuilderString DELETE_TIME_IN_MILLIS = new XContentBuilderString("delete_time_in_millis");
240286
static final XContentBuilderString DELETE_CURRENT = new XContentBuilderString("delete_current");
241287
static final XContentBuilderString NOOP_UPDATE_TOTAL = new XContentBuilderString("noop_update_total");
288+
static final XContentBuilderString IS_THROTTLED = new XContentBuilderString("is_throttled");
289+
static final XContentBuilderString THROTTLED_TIME_IN_MILLIS = new XContentBuilderString("throttle_time_in_millis");
290+
static final XContentBuilderString THROTTLED_TIME = new XContentBuilderString("throttle_time");
242291
}
243292

244293
public static IndexingStats readIndexingStats(StreamInput in) throws IOException {

src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java

+37-1
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,14 @@ public void postCreateUnderLock(Engine.Create create) {
107107
}
108108
}
109109

110+
public void throttlingActivated() {
111+
totalStats.setThrottled(true);
112+
}
113+
114+
public void throttlingDeactivated() {
115+
totalStats.setThrottled(false);
116+
}
117+
110118
public void postCreate(Engine.Create create) {
111119
long took = create.endTime() - create.startTime();
112120
totalStats.indexMetric.inc(took);
@@ -259,12 +267,38 @@ static class StatsHolder {
259267
public final CounterMetric indexCurrent = new CounterMetric();
260268
public final CounterMetric deleteCurrent = new CounterMetric();
261269
public final CounterMetric noopUpdates = new CounterMetric();
270+
public final CounterMetric throttleTimeMillisMetric = new CounterMetric();
271+
volatile boolean isThrottled = false;
272+
volatile long startOfThrottleMillis;
262273

263274
public IndexingStats.Stats stats() {
275+
long currentThrottleMillis = 0;
276+
if (isThrottled && startOfThrottleMillis != 0) {
277+
currentThrottleMillis += System.currentTimeMillis() - startOfThrottleMillis;
278+
if (currentThrottleMillis < 0) {
279+
//Timeslip must have happened, have to ignore this value
280+
currentThrottleMillis = 0;
281+
}
282+
}
264283
return new IndexingStats.Stats(
265284
indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), indexCurrent.count(),
266285
deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(),
267-
noopUpdates.count());
286+
noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(throttleTimeMillisMetric.count() + currentThrottleMillis));
287+
}
288+
289+
290+
void setThrottled(boolean isThrottled) {
291+
if (!this.isThrottled && isThrottled) {
292+
startOfThrottleMillis = System.currentTimeMillis();
293+
} else if (this.isThrottled && !isThrottled) {
294+
assert startOfThrottleMillis > 0 : "Bad state of startOfThrottleMillis";
295+
long throttleTimeMillis = System.currentTimeMillis() - startOfThrottleMillis;
296+
if (throttleTimeMillis >= 0) {
297+
//A timeslip may have occured but never want to add a negative number
298+
throttleTimeMillisMetric.inc(throttleTimeMillis);
299+
}
300+
}
301+
this.isThrottled = isThrottled;
268302
}
269303

270304
public long totalCurrent() {
@@ -275,5 +309,7 @@ public void clear() {
275309
indexMetric.clear();
276310
deleteMetric.clear();
277311
}
312+
313+
278314
}
279315
}

src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java

+78
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@
3333
import org.elasticsearch.common.io.stream.BytesStreamOutput;
3434
import org.elasticsearch.common.settings.ImmutableSettings;
3535
import org.elasticsearch.common.settings.Settings;
36+
import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider;
37+
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
3638
import org.elasticsearch.index.query.FilterBuilders;
39+
import org.elasticsearch.index.store.support.AbstractIndexStore;
3740
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
3841
import org.elasticsearch.search.sort.SortOrder;
3942
import org.elasticsearch.test.ElasticsearchIntegrationTest;
@@ -281,6 +284,79 @@ public void run() {
281284
assertThat(client().admin().indices().prepareStats("idx").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), greaterThan(0l));
282285
}
283286

287+
288+
@Test
289+
public void nonThrottleStats() throws Exception {
290+
assertAcked(prepareCreate("test")
291+
.setSettings(ImmutableSettings.builder()
292+
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge")
293+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
294+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
295+
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
296+
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
297+
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
298+
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "10000")
299+
));
300+
ensureGreen();
301+
long termUpto = 0;
302+
IndicesStatsResponse stats;
303+
// Provoke slowish merging by making many unique terms:
304+
for(int i=0; i<100; i++) {
305+
StringBuilder sb = new StringBuilder();
306+
for(int j=0; j<100; j++) {
307+
sb.append(' ');
308+
sb.append(termUpto++);
309+
sb.append(" some random text that keeps repeating over and over again hambone");
310+
}
311+
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
312+
}
313+
refresh();
314+
stats = client().admin().indices().prepareStats().execute().actionGet();
315+
//nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
316+
317+
stats = client().admin().indices().prepareStats().execute().actionGet();
318+
assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTimeInMillis(), equalTo(0l));
319+
}
320+
321+
@Test
322+
public void throttleStats() throws Exception {
323+
assertAcked(prepareCreate("test")
324+
.setSettings(ImmutableSettings.builder()
325+
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge")
326+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
327+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
328+
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
329+
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
330+
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
331+
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "1")
332+
));
333+
ensureGreen();
334+
long termUpto = 0;
335+
IndicesStatsResponse stats;
336+
// make sure we see throttling kicking in:
337+
boolean done = false;
338+
while (!done) {
339+
for(int i=0; i<100; i++) {
340+
// Provoke slowish merging by making many unique terms:
341+
StringBuilder sb = new StringBuilder();
342+
for(int j=0; j<100; j++) {
343+
sb.append(' ');
344+
sb.append(termUpto++);
345+
}
346+
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
347+
if (i % 2 == 0) {
348+
refresh();
349+
}
350+
}
351+
refresh();
352+
stats = client().admin().indices().prepareStats().execute().actionGet();
353+
//nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
354+
done = stats.getPrimaries().getIndexing().getTotal().getThrottleTimeInMillis() > 0;
355+
}
356+
stats = client().admin().indices().prepareStats().execute().actionGet();
357+
assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTimeInMillis(), greaterThan(0l));
358+
}
359+
284360
@Test
285361
public void simpleStats() throws Exception {
286362
createIndex("test1", "test2");
@@ -302,6 +378,8 @@ public void simpleStats() throws Exception {
302378
assertThat(stats.getPrimaries().getDocs().getCount(), equalTo(3l));
303379
assertThat(stats.getTotal().getDocs().getCount(), equalTo(totalExpectedWrites));
304380
assertThat(stats.getPrimaries().getIndexing().getTotal().getIndexCount(), equalTo(3l));
381+
assertThat(stats.getPrimaries().getIndexing().getTotal().isThrottled(), equalTo(false));
382+
assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTimeInMillis(), equalTo(0l));
305383
assertThat(stats.getTotal().getIndexing().getTotal().getIndexCount(), equalTo(totalExpectedWrites));
306384
assertThat(stats.getTotal().getStore(), notNullValue());
307385
assertThat(stats.getTotal().getMerge(), notNullValue());

0 commit comments

Comments
 (0)