Skip to content

Commit 33a9711

Browse files
committed
#6066: log when we start/stop throttling indexing because of too-many-merges; fix nocommit
1 parent 34d4a03 commit 33a9711

File tree

4 files changed

+36
-20
lines changed

4 files changed

+36
-20
lines changed

src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,18 @@
1919

2020
package org.elasticsearch.index.engine.internal;
2121

22-
import com.google.common.collect.Lists;
22+
import java.io.IOException;
23+
import java.util.*;
24+
import java.util.concurrent.ConcurrentMap;
25+
import java.util.concurrent.CopyOnWriteArrayList;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
import java.util.concurrent.atomic.AtomicLong;
30+
import java.util.concurrent.locks.Lock;
31+
import java.util.concurrent.locks.ReentrantLock;
32+
import java.util.concurrent.locks.ReentrantReadWriteLock;
33+
2334
import org.apache.lucene.index.*;
2435
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
2536
import org.apache.lucene.search.IndexSearcher;
@@ -39,6 +50,8 @@
3950
import org.elasticsearch.common.inject.Inject;
4051
import org.elasticsearch.common.lease.Releasable;
4152
import org.elasticsearch.common.lease.Releasables;
53+
import org.elasticsearch.common.logging.ESLogger;
54+
import org.elasticsearch.common.logging.Loggers;
4255
import org.elasticsearch.common.lucene.HashedBytesRef;
4356
import org.elasticsearch.common.lucene.LoggerInfoStream;
4457
import org.elasticsearch.common.lucene.Lucene;
@@ -75,18 +88,7 @@
7588
import org.elasticsearch.indices.warmer.IndicesWarmer;
7689
import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
7790
import org.elasticsearch.threadpool.ThreadPool;
78-
79-
import java.io.IOException;
80-
import java.util.*;
81-
import java.util.concurrent.ConcurrentMap;
82-
import java.util.concurrent.CopyOnWriteArrayList;
83-
import java.util.concurrent.TimeUnit;
84-
import java.util.concurrent.atomic.AtomicBoolean;
85-
import java.util.concurrent.atomic.AtomicInteger;
86-
import java.util.concurrent.atomic.AtomicLong;
87-
import java.util.concurrent.locks.Lock;
88-
import java.util.concurrent.locks.ReentrantLock;
89-
import java.util.concurrent.locks.ReentrantReadWriteLock;
91+
import com.google.common.collect.Lists;
9092

9193
/**
9294
*
@@ -260,8 +262,7 @@ public void start() throws EngineException {
260262
try {
261263
this.indexWriter = createWriter();
262264
mergeScheduler.removeListener(this.throttle);
263-
//nocommit find a good value for this?
264-
this.throttle = new IndexThrottle(ConcurrentMergeScheduler.DEFAULT_MAX_MERGE_COUNT, indexWriter);
265+
this.throttle = new IndexThrottle(mergeScheduler.getMaxMerges(), indexWriter, indexSettings, shardId);
265266
mergeScheduler.addListener(throttle);
266267
} catch (IOException e) {
267268
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
@@ -754,8 +755,7 @@ public void flush(Flush flush) throws EngineException {
754755
currentIndexWriter().close(false);
755756
indexWriter = createWriter();
756757
mergeScheduler.removeListener(this.throttle);
757-
//nocommit find a good value for this?
758-
this.throttle = new IndexThrottle(ConcurrentMergeScheduler.DEFAULT_MAX_MERGE_COUNT, indexWriter);
758+
this.throttle = new IndexThrottle(mergeScheduler.getMaxMerges(), indexWriter, indexSettings, shardId);
759759
mergeScheduler.addListener(throttle);
760760
// commit on a just opened writer will commit even if there are no changes done to it
761761
// we rely on that for the commit data translog id key
@@ -1579,15 +1579,17 @@ private static final class IndexThrottle implements MergeSchedulerProvider.Liste
15791579
private final InternalLock lockReference = new InternalLock(new ReentrantLock());
15801580
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
15811581
private final int maxNumMerges;
1582+
private final ESLogger logger;
15821583

15831584
private static final Releasable DUMMY = new Releasable() {
15841585
@Override
15851586
public void close() throws ElasticsearchException {}
15861587
};
15871588

1588-
public IndexThrottle(int maxNumMerges, IndexWriter writer) {
1589+
public IndexThrottle(int maxNumMerges, IndexWriter writer, Settings indexSettings, ShardId shardId) {
15891590
this.maxNumMerges = maxNumMerges;
15901591
this.writer = writer;
1592+
this.logger = Loggers.getLogger(getClass(), indexSettings, shardId);
15911593
}
15921594

15931595
public Releasable acquireThrottle() {
@@ -1598,17 +1600,18 @@ public Releasable acquireThrottle() {
15981600
return lock.acquire();
15991601
}
16001602

1601-
16021603
@Override
16031604
public void beforeMerge(OnGoingMerge merge) {
16041605
if (numMergesInFlight.incrementAndGet() > maxNumMerges && writer.hasPendingMerges()) {
1605-
lock = lockReference;
1606+
logger.warn("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
1607+
lock = lockReference;
16061608
}
16071609
}
16081610

16091611
@Override
16101612
public void afterMerge(OnGoingMerge merge) {
16111613
if (numMergesInFlight.decrementAndGet() <= maxNumMerges && !writer.hasPendingMerges()) {
1614+
logger.warn("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
16121615
lock = null;
16131616
}
16141617
}

src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ public Set<OnGoingMerge> onGoingMerges() {
8484
return ImmutableSet.of();
8585
}
8686

87+
@Override
88+
public int getMaxMerges() {
89+
return this.maxMergeCount;
90+
}
91+
8792
public static class CustomConcurrentMergeScheduler extends TrackingConcurrentMergeScheduler {
8893

8994
private final ShardId shardId;

src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ public final MergeScheduler newMergeScheduler() {
122122
return scheduler;
123123
}
124124

125+
/** Maximum number of allowed running merges before index throttling kicks in. */
126+
public abstract int getMaxMerges();
127+
125128
protected abstract MergeScheduler buildMergeScheduler();
126129

127130
public abstract MergeStats stats();

src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ public Set<OnGoingMerge> onGoingMerges() {
7474
return ImmutableSet.of();
7575
}
7676

77+
@Override
78+
public int getMaxMerges() {
79+
return 1;
80+
}
81+
7782
public static class CustomSerialMergeScheduler extends TrackingSerialMergeScheduler {
7883

7984
private final SerialMergeSchedulerProvider provider;

0 commit comments

Comments
 (0)