Skip to content

Commit e2fbfbe

Browse files
committed
Upgrade to Lucene 4.8.1
This commit upgrades to the latest Lucene 4.8.1 release including the following bugfixes: * An IndexThrottle now kicks in when merges start falling behind limiting index threads to 1 until merges caught up. Closes #6066 * RateLimiter now kicks in at the configured rate where previously the limiter was limiting at ~8MB/sec almost all the time. Closes #6018
1 parent c207135 commit e2fbfbe

File tree

7 files changed

+163
-27
lines changed

7 files changed

+163
-27
lines changed

docs/reference/index-modules/merge.asciidoc

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,5 +193,25 @@ scheduler supports this setting:
193193

194194
`index.merge.scheduler.max_thread_count`::
195195

196-
The maximum number of threads to perform the merge operation. Defaults to
197-
`Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors() / 2))`.
196+
The maximum number of concurrent merge threads that may run at once. Defaults
197+
to `1` which works best with spinning-magnets disks. If you are using
198+
a good solid-state disk (SSD) instead then try setting this to `3`.
199+
200+
[float]
201+
==== SerialMergeScheduler
202+
203+
A merge scheduler that simply does each merge sequentially using the
204+
calling thread (blocking the operations that triggered the merge or the
205+
index operation). This merge scheduler has a merge thread pool that
206+
explicitly schedules merges, and it makes sure that merges are serial
207+
within a shard, yet concurrent across multiple shards.
208+
209+
The scheduler supports the following settings:
210+
211+
`index.merge.scheduler.max_merge_at_once`::
212+
213+
The maximum number of merges a single merge run performs. This setting prevents
214+
executing unlimited amount of merges in a loop until another shards has a
215+
chance to get a merge thread from the pool. If this limit is reached the
216+
merge thread returns to the pool and continues once the the call to a single
217+
shards is executed. The default is `5`

pom.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
</parent>
3232

3333
<properties>
34-
<lucene.version>4.8.0</lucene.version>
34+
<lucene.version>4.8.1</lucene.version>
3535
<tests.jvms>auto</tests.jvms>
3636
<tests.shuffle>true</tests.shuffle>
3737
<tests.output>onerror</tests.output>
@@ -47,6 +47,10 @@
4747
<id>Codehaus Snapshots</id>
4848
<url>http://repository.codehaus.org/</url>
4949
</repository>
50+
<repository>
51+
<id>Release Candidate</id>
52+
<url>http://people.apache.org/~rmuir/fake_staging_area/lucene-solr-4.8.1-r1593252/maven/</url>
53+
</repository>
5054
</repositories>
5155

5256
<dependencies>

src/main/java/org/apache/lucene/store/RateLimitedFSDirectory.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ static final class RateLimitedIndexOutput extends BufferedIndexOutput {
8181
private final BufferedIndexOutput bufferedDelegate;
8282
private final RateLimiter rateLimiter;
8383
private final StoreRateLimiting.Listener rateListener;
84+
private long bytesSinceLastRateLimit;
8485

8586
RateLimitedIndexOutput(final RateLimiter rateLimiter, final StoreRateLimiting.Listener rateListener, final IndexOutput delegate) {
8687
super(delegate instanceof BufferedIndexOutput ? ((BufferedIndexOutput) delegate).getBufferSize() : BufferedIndexOutput.DEFAULT_BUFFER_SIZE);
@@ -97,7 +98,12 @@ static final class RateLimitedIndexOutput extends BufferedIndexOutput {
9798

9899
@Override
99100
protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
100-
rateListener.onPause(rateLimiter.pause(len));
101+
bytesSinceLastRateLimit += len;
102+
if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) {
103+
long pause = rateLimiter.pause(bytesSinceLastRateLimit);
104+
bytesSinceLastRateLimit = 0;
105+
rateListener.onPause(pause);
106+
}
101107
if (bufferedDelegate != null) {
102108
bufferedDelegate.flushBuffer(b, offset, len);
103109
} else {

src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java

Lines changed: 102 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,19 @@
1919

2020
package org.elasticsearch.index.engine.internal;
2121

22-
import com.google.common.collect.Lists;
22+
import java.io.IOException;
23+
import java.util.*;
24+
import java.util.concurrent.ConcurrentMap;
25+
import java.util.concurrent.CopyOnWriteArrayList;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
import java.util.concurrent.atomic.AtomicLong;
30+
import java.util.concurrent.locks.Condition;
31+
import java.util.concurrent.locks.Lock;
32+
import java.util.concurrent.locks.ReentrantLock;
33+
import java.util.concurrent.locks.ReentrantReadWriteLock;
34+
2335
import org.apache.lucene.index.*;
2436
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
2537
import org.apache.lucene.search.IndexSearcher;
@@ -28,6 +40,7 @@
2840
import org.apache.lucene.search.SearcherManager;
2941
import org.apache.lucene.store.AlreadyClosedException;
3042
import org.apache.lucene.store.LockObtainFailedException;
43+
import org.apache.lucene.store.NoLockFactory;
3144
import org.apache.lucene.util.BytesRef;
3245
import org.apache.lucene.util.IOUtils;
3346
import org.elasticsearch.ElasticsearchException;
@@ -39,6 +52,8 @@
3952
import org.elasticsearch.common.inject.Inject;
4053
import org.elasticsearch.common.lease.Releasable;
4154
import org.elasticsearch.common.lease.Releasables;
55+
import org.elasticsearch.common.logging.ESLogger;
56+
import org.elasticsearch.common.logging.Loggers;
4257
import org.elasticsearch.common.lucene.HashedBytesRef;
4358
import org.elasticsearch.common.lucene.LoggerInfoStream;
4459
import org.elasticsearch.common.lucene.Lucene;
@@ -75,18 +90,7 @@
7590
import org.elasticsearch.indices.warmer.IndicesWarmer;
7691
import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
7792
import org.elasticsearch.threadpool.ThreadPool;
78-
79-
import java.io.IOException;
80-
import java.util.*;
81-
import java.util.concurrent.ConcurrentMap;
82-
import java.util.concurrent.CopyOnWriteArrayList;
83-
import java.util.concurrent.TimeUnit;
84-
import java.util.concurrent.atomic.AtomicBoolean;
85-
import java.util.concurrent.atomic.AtomicInteger;
86-
import java.util.concurrent.atomic.AtomicLong;
87-
import java.util.concurrent.locks.Lock;
88-
import java.util.concurrent.locks.ReentrantLock;
89-
import java.util.concurrent.locks.ReentrantReadWriteLock;
93+
import com.google.common.collect.Lists;
9094

9195
/**
9296
*
@@ -163,6 +167,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
163167

164168
private SegmentInfos lastCommittedSegmentInfos;
165169

170+
private IndexThrottle throttle;
171+
166172
@Inject
167173
public InternalEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
168174
IndexSettingsService indexSettingsService, ShardIndexingService indexingService, @Nullable IndicesWarmer warmer,
@@ -257,6 +263,9 @@ public void start() throws EngineException {
257263
}
258264
try {
259265
this.indexWriter = createWriter();
266+
mergeScheduler.removeListener(this.throttle);
267+
this.throttle = new IndexThrottle(mergeScheduler.getMaxMerges(), logger);
268+
mergeScheduler.addListener(throttle);
260269
} catch (IOException e) {
261270
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
262271
}
@@ -373,7 +382,9 @@ public void create(Create create) throws EngineException {
373382
if (writer == null) {
374383
throw new EngineClosedException(shardId, failedEngine);
375384
}
376-
innerCreate(create, writer);
385+
try (Releasable r = throttle.acquireThrottle()) {
386+
innerCreate(create, writer);
387+
}
377388
dirty = true;
378389
possibleMergeNeeded = true;
379390
flushNeeded = true;
@@ -462,8 +473,9 @@ public void index(Index index) throws EngineException {
462473
if (writer == null) {
463474
throw new EngineClosedException(shardId, failedEngine);
464475
}
465-
466-
innerIndex(index, writer);
476+
try (Releasable r = throttle.acquireThrottle()) {
477+
innerIndex(index, writer);
478+
}
467479
dirty = true;
468480
possibleMergeNeeded = true;
469481
flushNeeded = true;
@@ -744,7 +756,9 @@ public void flush(Flush flush) throws EngineException {
744756
// to be allocated to a different node
745757
currentIndexWriter().close(false);
746758
indexWriter = createWriter();
747-
759+
mergeScheduler.removeListener(this.throttle);
760+
this.throttle = new IndexThrottle(mergeScheduler.getMaxMerges(), this.logger);
761+
mergeScheduler.addListener(throttle);
748762
// commit on a just opened writer will commit even if there are no changes done to it
749763
// we rely on that for the commit data translog id key
750764
if (flushNeeded || flush.force()) {
@@ -1559,4 +1573,75 @@ boolean assertLockIsHeld() {
15591573
return aBoolean != null && aBoolean.booleanValue();
15601574
}
15611575
}
1576+
1577+
1578+
private static final class IndexThrottle implements MergeSchedulerProvider.Listener {
1579+
1580+
private static final InternalLock NOOP_LOCK = new InternalLock(new NoOpLock());
1581+
private final InternalLock lockReference = new InternalLock(new ReentrantLock());
1582+
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
1583+
private final AtomicBoolean isThrottling = new AtomicBoolean();
1584+
private final int maxNumMerges;
1585+
private final ESLogger logger;
1586+
1587+
private volatile InternalLock lock = NOOP_LOCK;
1588+
1589+
public IndexThrottle(int maxNumMerges, ESLogger logger) {
1590+
this.maxNumMerges = maxNumMerges;
1591+
this.logger = logger;
1592+
}
1593+
1594+
public Releasable acquireThrottle() {
1595+
return lock.acquire();
1596+
}
1597+
1598+
@Override
1599+
public void beforeMerge(OnGoingMerge merge) {
1600+
if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
1601+
if (isThrottling.getAndSet(true) == false) {
1602+
logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
1603+
}
1604+
lock = lockReference;
1605+
}
1606+
}
1607+
1608+
@Override
1609+
public void afterMerge(OnGoingMerge merge) {
1610+
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
1611+
if (isThrottling.getAndSet(false)) {
1612+
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
1613+
}
1614+
lock = NOOP_LOCK;
1615+
}
1616+
}
1617+
}
1618+
1619+
private static final class NoOpLock implements Lock {
1620+
1621+
@Override
1622+
public void lock() {}
1623+
1624+
@Override
1625+
public void lockInterruptibly() throws InterruptedException {
1626+
}
1627+
1628+
@Override
1629+
public boolean tryLock() {
1630+
return true;
1631+
}
1632+
1633+
@Override
1634+
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
1635+
return true;
1636+
}
1637+
1638+
@Override
1639+
public void unlock() {
1640+
}
1641+
1642+
@Override
1643+
public Condition newCondition() {
1644+
throw new UnsupportedOperationException("NoOpLock can't provide a condition");
1645+
}
1646+
}
15621647
}

src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,11 @@ public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings
7373
@Override
7474
public MergeScheduler buildMergeScheduler() {
7575
CustomConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(logger, shardId, this);
76-
concurrentMergeScheduler.setMaxMergesAndThreads(maxMergeCount, maxThreadCount);
76+
// nocommit but this doesn't handle SMS ... should we even expose/allow SMS? or, if user requests SMS can we just use CMS(1,1),
77+
// which would then stall if there are 2 merges in flight, and unstall once we are back to 1 or 0 merges
78+
// NOTE: we pass maxMergeCount+1 here so that CMS will allow one too many merges to kick off which then allows
79+
// InternalEngine.IndexThrottle to detect too-many-merges and throttle:
80+
concurrentMergeScheduler.setMaxMergesAndThreads(maxMergeCount+1, maxThreadCount);
7781
schedulers.add(concurrentMergeScheduler);
7882
return concurrentMergeScheduler;
7983
}
@@ -101,6 +105,10 @@ public void close() {
101105
indexSettingsService.removeListener(applySettings);
102106
}
103107

108+
public int getMaxMerges() {
109+
return this.maxMergeCount;
110+
}
111+
104112
public static class CustomConcurrentMergeScheduler extends TrackingConcurrentMergeScheduler {
105113

106114
private final ShardId shardId;

src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ public final MergeScheduler newMergeScheduler() {
122122
return scheduler;
123123
}
124124

125+
/** Maximum number of allowed running merges before index throttling kicks in. */
126+
public abstract int getMaxMerges();
127+
125128
protected abstract MergeScheduler buildMergeScheduler();
126129

127130
public abstract MergeStats stats();

src/main/java/org/elasticsearch/index/snapshots/blobstore/RateLimitingInputStream.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public class RateLimitingInputStream extends InputStream {
3535

3636
private final Listener listener;
3737

38+
private long bytesSinceLastRateLimit;
39+
3840
public interface Listener {
3941
void onPause(long nanos);
4042
}
@@ -45,13 +47,21 @@ public RateLimitingInputStream(InputStream delegate, RateLimiter rateLimiter, Li
4547
this.listener = listener;
4648
}
4749

50+
private void maybePause(int bytes) {
51+
bytesSinceLastRateLimit += bytes;
52+
if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) {
53+
long pause = rateLimiter.pause(bytesSinceLastRateLimit);
54+
bytesSinceLastRateLimit = 0;
55+
if (pause > 0) {
56+
listener.onPause(pause);
57+
}
58+
}
59+
}
60+
4861
@Override
4962
public int read() throws IOException {
5063
int b = delegate.read();
51-
long pause = rateLimiter.pause(1);
52-
if (pause > 0) {
53-
listener.onPause(pause);
54-
}
64+
maybePause(1);
5565
return b;
5666
}
5767

@@ -64,7 +74,7 @@ public int read(byte[] b) throws IOException {
6474
public int read(byte[] b, int off, int len) throws IOException {
6575
int n = delegate.read(b, off, len);
6676
if (n > 0) {
67-
listener.onPause(rateLimiter.pause(n));
77+
maybePause(n);
6878
}
6979
return n;
7080
}

0 commit comments

Comments
 (0)