|
19 | 19 |
|
20 | 20 | package org.elasticsearch.index.engine.internal;
|
21 | 21 |
|
22 |
| -import com.google.common.collect.Lists; |
| 22 | +import java.io.IOException; |
| 23 | +import java.util.*; |
| 24 | +import java.util.concurrent.ConcurrentMap; |
| 25 | +import java.util.concurrent.CopyOnWriteArrayList; |
| 26 | +import java.util.concurrent.TimeUnit; |
| 27 | +import java.util.concurrent.atomic.AtomicBoolean; |
| 28 | +import java.util.concurrent.atomic.AtomicInteger; |
| 29 | +import java.util.concurrent.atomic.AtomicLong; |
| 30 | +import java.util.concurrent.locks.Condition; |
| 31 | +import java.util.concurrent.locks.Lock; |
| 32 | +import java.util.concurrent.locks.ReentrantLock; |
| 33 | +import java.util.concurrent.locks.ReentrantReadWriteLock; |
| 34 | + |
23 | 35 | import org.apache.lucene.index.*;
|
24 | 36 | import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
|
25 | 37 | import org.apache.lucene.search.IndexSearcher;
|
|
28 | 40 | import org.apache.lucene.search.SearcherManager;
|
29 | 41 | import org.apache.lucene.store.AlreadyClosedException;
|
30 | 42 | import org.apache.lucene.store.LockObtainFailedException;
|
| 43 | +import org.apache.lucene.store.NoLockFactory; |
31 | 44 | import org.apache.lucene.util.BytesRef;
|
32 | 45 | import org.apache.lucene.util.IOUtils;
|
33 | 46 | import org.elasticsearch.ElasticsearchException;
|
|
39 | 52 | import org.elasticsearch.common.inject.Inject;
|
40 | 53 | import org.elasticsearch.common.lease.Releasable;
|
41 | 54 | import org.elasticsearch.common.lease.Releasables;
|
| 55 | +import org.elasticsearch.common.logging.ESLogger; |
| 56 | +import org.elasticsearch.common.logging.Loggers; |
42 | 57 | import org.elasticsearch.common.lucene.HashedBytesRef;
|
43 | 58 | import org.elasticsearch.common.lucene.LoggerInfoStream;
|
44 | 59 | import org.elasticsearch.common.lucene.Lucene;
|
|
75 | 90 | import org.elasticsearch.indices.warmer.IndicesWarmer;
|
76 | 91 | import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
|
77 | 92 | import org.elasticsearch.threadpool.ThreadPool;
|
78 |
| - |
79 |
| -import java.io.IOException; |
80 |
| -import java.util.*; |
81 |
| -import java.util.concurrent.ConcurrentMap; |
82 |
| -import java.util.concurrent.CopyOnWriteArrayList; |
83 |
| -import java.util.concurrent.TimeUnit; |
84 |
| -import java.util.concurrent.atomic.AtomicBoolean; |
85 |
| -import java.util.concurrent.atomic.AtomicInteger; |
86 |
| -import java.util.concurrent.atomic.AtomicLong; |
87 |
| -import java.util.concurrent.locks.Lock; |
88 |
| -import java.util.concurrent.locks.ReentrantLock; |
89 |
| -import java.util.concurrent.locks.ReentrantReadWriteLock; |
| 93 | +import com.google.common.collect.Lists; |
90 | 94 |
|
91 | 95 | /**
|
92 | 96 | *
|
@@ -163,6 +167,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
163 | 167 |
|
164 | 168 | private SegmentInfos lastCommittedSegmentInfos;
|
165 | 169 |
|
| 170 | + private IndexThrottle throttle; |
| 171 | + |
166 | 172 | @Inject
|
167 | 173 | public InternalEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
|
168 | 174 | IndexSettingsService indexSettingsService, ShardIndexingService indexingService, @Nullable IndicesWarmer warmer,
|
@@ -257,6 +263,9 @@ public void start() throws EngineException {
|
257 | 263 | }
|
258 | 264 | try {
|
259 | 265 | this.indexWriter = createWriter();
|
| 266 | + mergeScheduler.removeListener(this.throttle); |
| 267 | + this.throttle = new IndexThrottle(mergeScheduler.getMaxMerges(), logger); |
| 268 | + mergeScheduler.addListener(throttle); |
260 | 269 | } catch (IOException e) {
|
261 | 270 | throw new EngineCreationFailureException(shardId, "failed to create engine", e);
|
262 | 271 | }
|
@@ -373,7 +382,9 @@ public void create(Create create) throws EngineException {
|
373 | 382 | if (writer == null) {
|
374 | 383 | throw new EngineClosedException(shardId, failedEngine);
|
375 | 384 | }
|
376 |
| - innerCreate(create, writer); |
| 385 | + try (Releasable r = throttle.acquireThrottle()) { |
| 386 | + innerCreate(create, writer); |
| 387 | + } |
377 | 388 | dirty = true;
|
378 | 389 | possibleMergeNeeded = true;
|
379 | 390 | flushNeeded = true;
|
@@ -462,8 +473,9 @@ public void index(Index index) throws EngineException {
|
462 | 473 | if (writer == null) {
|
463 | 474 | throw new EngineClosedException(shardId, failedEngine);
|
464 | 475 | }
|
465 |
| - |
466 |
| - innerIndex(index, writer); |
| 476 | + try (Releasable r = throttle.acquireThrottle()) { |
| 477 | + innerIndex(index, writer); |
| 478 | + } |
467 | 479 | dirty = true;
|
468 | 480 | possibleMergeNeeded = true;
|
469 | 481 | flushNeeded = true;
|
@@ -744,7 +756,9 @@ public void flush(Flush flush) throws EngineException {
|
744 | 756 | // to be allocated to a different node
|
745 | 757 | currentIndexWriter().close(false);
|
746 | 758 | indexWriter = createWriter();
|
747 |
| - |
| 759 | + mergeScheduler.removeListener(this.throttle); |
| 760 | + this.throttle = new IndexThrottle(mergeScheduler.getMaxMerges(), this.logger); |
| 761 | + mergeScheduler.addListener(throttle); |
748 | 762 | // commit on a just opened writer will commit even if there are no changes done to it
|
749 | 763 | // we rely on that for the commit data translog id key
|
750 | 764 | if (flushNeeded || flush.force()) {
|
@@ -1559,4 +1573,75 @@ boolean assertLockIsHeld() {
|
1559 | 1573 | return aBoolean != null && aBoolean.booleanValue();
|
1560 | 1574 | }
|
1561 | 1575 | }
|
| 1576 | + |
| 1577 | + |
| 1578 | + private static final class IndexThrottle implements MergeSchedulerProvider.Listener { |
| 1579 | + |
| 1580 | + private static final InternalLock NOOP_LOCK = new InternalLock(new NoOpLock()); |
| 1581 | + private final InternalLock lockReference = new InternalLock(new ReentrantLock()); |
| 1582 | + private final AtomicInteger numMergesInFlight = new AtomicInteger(0); |
| 1583 | + private final AtomicBoolean isThrottling = new AtomicBoolean(); |
| 1584 | + private final int maxNumMerges; |
| 1585 | + private final ESLogger logger; |
| 1586 | + |
| 1587 | + private volatile InternalLock lock = NOOP_LOCK; |
| 1588 | + |
| 1589 | + public IndexThrottle(int maxNumMerges, ESLogger logger) { |
| 1590 | + this.maxNumMerges = maxNumMerges; |
| 1591 | + this.logger = logger; |
| 1592 | + } |
| 1593 | + |
| 1594 | + public Releasable acquireThrottle() { |
| 1595 | + return lock.acquire(); |
| 1596 | + } |
| 1597 | + |
| 1598 | + @Override |
| 1599 | + public void beforeMerge(OnGoingMerge merge) { |
| 1600 | + if (numMergesInFlight.incrementAndGet() > maxNumMerges) { |
| 1601 | + if (isThrottling.getAndSet(true) == false) { |
| 1602 | + logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); |
| 1603 | + } |
| 1604 | + lock = lockReference; |
| 1605 | + } |
| 1606 | + } |
| 1607 | + |
| 1608 | + @Override |
| 1609 | + public void afterMerge(OnGoingMerge merge) { |
| 1610 | + if (numMergesInFlight.decrementAndGet() < maxNumMerges) { |
| 1611 | + if (isThrottling.getAndSet(false)) { |
| 1612 | + logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); |
| 1613 | + } |
| 1614 | + lock = NOOP_LOCK; |
| 1615 | + } |
| 1616 | + } |
| 1617 | + } |
| 1618 | + |
| 1619 | + private static final class NoOpLock implements Lock { |
| 1620 | + |
| 1621 | + @Override |
| 1622 | + public void lock() {} |
| 1623 | + |
| 1624 | + @Override |
| 1625 | + public void lockInterruptibly() throws InterruptedException { |
| 1626 | + } |
| 1627 | + |
| 1628 | + @Override |
| 1629 | + public boolean tryLock() { |
| 1630 | + return true; |
| 1631 | + } |
| 1632 | + |
| 1633 | + @Override |
| 1634 | + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { |
| 1635 | + return true; |
| 1636 | + } |
| 1637 | + |
| 1638 | + @Override |
| 1639 | + public void unlock() { |
| 1640 | + } |
| 1641 | + |
| 1642 | + @Override |
| 1643 | + public Condition newCondition() { |
| 1644 | + throw new UnsupportedOperationException("NoOpLock can't provide a condition"); |
| 1645 | + } |
| 1646 | + } |
1562 | 1647 | }
|
0 commit comments