@@ -163,6 +163,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
163
163
164
164
private SegmentInfos lastCommittedSegmentInfos ;
165
165
166
+ private IndexThrottle throttle ;
167
+
166
168
@ Inject
167
169
public InternalEngine (ShardId shardId , @ IndexSettings Settings indexSettings , ThreadPool threadPool ,
168
170
IndexSettingsService indexSettingsService , ShardIndexingService indexingService , @ Nullable IndicesWarmer warmer ,
@@ -257,6 +259,10 @@ public void start() throws EngineException {
257
259
}
258
260
try {
259
261
this .indexWriter = createWriter ();
262
+ mergeScheduler .removeListener (this .throttle );
263
+ //nocommit find a good value for this?
264
+ this .throttle = new IndexThrottle (ConcurrentMergeScheduler .DEFAULT_MAX_MERGE_COUNT , indexWriter );
265
+ mergeScheduler .addListener (throttle );
260
266
} catch (IOException e ) {
261
267
throw new EngineCreationFailureException (shardId , "failed to create engine" , e );
262
268
}
@@ -373,7 +379,9 @@ public void create(Create create) throws EngineException {
373
379
if (writer == null ) {
374
380
throw new EngineClosedException (shardId , failedEngine );
375
381
}
376
- innerCreate (create , writer );
382
+ try (Releasable r = throttle .acquireThrottle ()) {
383
+ innerCreate (create , writer );
384
+ }
377
385
dirty = true ;
378
386
possibleMergeNeeded = true ;
379
387
flushNeeded = true ;
@@ -462,8 +470,9 @@ public void index(Index index) throws EngineException {
462
470
if (writer == null ) {
463
471
throw new EngineClosedException (shardId , failedEngine );
464
472
}
465
-
466
- innerIndex (index , writer );
473
+ try (Releasable r = throttle .acquireThrottle ()) {
474
+ innerIndex (index , writer );
475
+ }
467
476
dirty = true ;
468
477
possibleMergeNeeded = true ;
469
478
flushNeeded = true ;
@@ -744,7 +753,10 @@ public void flush(Flush flush) throws EngineException {
744
753
// to be allocated to a different node
745
754
currentIndexWriter ().close (false );
746
755
indexWriter = createWriter ();
747
-
756
+ mergeScheduler .removeListener (this .throttle );
757
+ //nocommit find a good value for this?
758
+ this .throttle = new IndexThrottle (ConcurrentMergeScheduler .DEFAULT_MAX_MERGE_COUNT , indexWriter );
759
+ mergeScheduler .addListener (throttle );
748
760
// commit on a just opened writer will commit even if there are no changes done to it
749
761
// we rely on that for the commit data translog id key
750
762
if (flushNeeded || flush .force ()) {
@@ -1559,4 +1571,46 @@ boolean assertLockIsHeld() {
1559
1571
return aBoolean != null && aBoolean .booleanValue ();
1560
1572
}
1561
1573
}
1574
+
1575
+ private static final class IndexThrottle implements MergeSchedulerProvider .Listener {
1576
+
1577
+ private final IndexWriter writer ;
1578
+ private volatile InternalLock lock ;
1579
+ private final InternalLock lockReference = new InternalLock (new ReentrantLock ());
1580
+ private final AtomicInteger numMergesInFlight = new AtomicInteger (0 );
1581
+ private final int maxNumMerges ;
1582
+
1583
+ private static final Releasable DUMMY = new Releasable () {
1584
+ @ Override
1585
+ public void close () throws ElasticsearchException {}
1586
+ };
1587
+
1588
+ public IndexThrottle (int maxNumMerges , IndexWriter writer ) {
1589
+ this .maxNumMerges = maxNumMerges ;
1590
+ this .writer = writer ;
1591
+ }
1592
+
1593
+ public Releasable acquireThrottle () {
1594
+ final InternalLock lock = this .lock ;
1595
+ if (lock == null ) {
1596
+ return DUMMY ;
1597
+ }
1598
+ return lock .acquire ();
1599
+ }
1600
+
1601
+
1602
+ @ Override
1603
+ public void beforeMerge (OnGoingMerge merge ) {
1604
+ if (numMergesInFlight .incrementAndGet () > maxNumMerges && writer .hasPendingMerges ()) {
1605
+ lock = lockReference ;
1606
+ }
1607
+ }
1608
+
1609
+ @ Override
1610
+ public void afterMerge (OnGoingMerge merge ) {
1611
+ if (numMergesInFlight .decrementAndGet () <= maxNumMerges && !writer .hasPendingMerges ()) {
1612
+ lock = null ;
1613
+ }
1614
+ }
1615
+ }
1562
1616
}
0 commit comments