Skip to content

Commit a34c2f0

Browse files
authored
Ensure external refreshes will also refresh internal searcher to minimize segment creation (#27253)
We cut over to internal and external IndexReader/IndexSearcher in #26972 which uses two independent searcher managers. This has the downside that refreshes of the external reader will never clear the internal version map which in-turn will trigger additional and potentially unnecessary segment flushes since memory must be freed. Under heavy indexing load with low refresh intervals this can cause excessive segment creation which causes high GC activity and significantly increases the required segment merges. This change adds a dedicated external reference manager that delegates refreshes to the internal reference manager that then `steals` the refreshed reader from the internal reference manager for external usage. This ensures that external and internal readers are consistent on an external refresh. As a sideeffect this also releases old segments referenced by the internal reference manager which can potentially hold on to already merged away segments until it is refreshed due to a flush or indexing activity.
1 parent 4abb5fa commit a34c2f0

File tree

6 files changed

+146
-37
lines changed

6 files changed

+146
-37
lines changed

core/src/main/java/org/elasticsearch/index/engine/Engine.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.apache.lucene.index.SnapshotDeletionPolicy;
3737
import org.apache.lucene.index.Term;
3838
import org.apache.lucene.search.IndexSearcher;
39-
import org.apache.lucene.search.SearcherManager;
39+
import org.apache.lucene.search.ReferenceManager;
4040
import org.apache.lucene.store.AlreadyClosedException;
4141
import org.apache.lucene.store.Directory;
4242
import org.apache.lucene.store.IOContext;
@@ -170,7 +170,7 @@ protected static boolean isMergedSegment(LeafReader reader) {
170170
return IndexWriter.SOURCE_MERGE.equals(source);
171171
}
172172

173-
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) {
173+
protected Searcher newSearcher(String source, IndexSearcher searcher, ReferenceManager<IndexSearcher> manager) {
174174
return new EngineSearcher(source, searcher, manager, store, logger);
175175
}
176176

@@ -531,7 +531,7 @@ public final Searcher acquireSearcher(String source, SearcherScope scope) throws
531531
* the searcher is acquired. */
532532
store.incRef();
533533
try {
534-
final SearcherManager manager = getSearcherManager(source, scope); // can never be null
534+
final ReferenceManager<IndexSearcher> manager = getSearcherManager(source, scope); // can never be null
535535
/* This might throw NPE but that's fine we will run ensureOpen()
536536
* in the catch block and throw the right exception */
537537
final IndexSearcher searcher = manager.acquire();
@@ -585,7 +585,7 @@ public CommitStats commitStats() {
585585
/**
586586
* Read the last segments info from the commit pointed to by the searcher manager
587587
*/
588-
protected static SegmentInfos readLastCommittedSegmentInfos(final SearcherManager sm, final Store store) throws IOException {
588+
protected static SegmentInfos readLastCommittedSegmentInfos(final ReferenceManager<IndexSearcher> sm, final Store store) throws IOException {
589589
IndexSearcher searcher = sm.acquire();
590590
try {
591591
IndexCommit latestCommit = ((DirectoryReader) searcher.getIndexReader()).getIndexCommit();
@@ -787,13 +787,19 @@ public int compare(Segment o1, Segment o2) {
787787
public final boolean refreshNeeded() {
788788
if (store.tryIncRef()) {
789789
/*
790-
we need to inc the store here since searcherManager.isSearcherCurrent()
791-
acquires a searcher internally and that might keep a file open on the
790+
we need to inc the store here since we acquire a searcher and that might keep a file open on the
792791
store. this violates the assumption that all files are closed when
793792
the store is closed so we need to make sure we increment it here
794793
*/
795794
try {
796-
return getSearcherManager("refresh_needed", SearcherScope.EXTERNAL).isSearcherCurrent() == false;
795+
ReferenceManager<IndexSearcher> manager = getSearcherManager("refresh_needed", SearcherScope.EXTERNAL);
796+
final IndexSearcher searcher = manager.acquire();
797+
try {
798+
final IndexReader r = searcher.getIndexReader();
799+
return ((DirectoryReader) r).isCurrent() == false;
800+
} finally {
801+
manager.release(searcher);
802+
}
797803
} catch (IOException e) {
798804
logger.error("failed to access searcher manager", e);
799805
failEngine("failed to access searcher manager", e);
@@ -1331,7 +1337,7 @@ public void release() {
13311337
}
13321338
}
13331339

1334-
protected abstract SearcherManager getSearcherManager(String source, SearcherScope scope);
1340+
protected abstract ReferenceManager<IndexSearcher> getSearcherManager(String source, SearcherScope scope);
13351341

13361342
/**
13371343
* Method to close the engine while the write lock is held.

core/src/main/java/org/elasticsearch/index/engine/EngineSearcher.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.logging.log4j.Logger;
2323
import org.apache.lucene.search.IndexSearcher;
24+
import org.apache.lucene.search.ReferenceManager;
2425
import org.apache.lucene.search.SearcherManager;
2526
import org.apache.lucene.store.AlreadyClosedException;
2627
import org.elasticsearch.index.store.Store;
@@ -32,12 +33,12 @@
3233
* Searcher for an Engine
3334
*/
3435
public class EngineSearcher extends Engine.Searcher {
35-
private final SearcherManager manager;
36+
private final ReferenceManager<IndexSearcher> manager;
3637
private final AtomicBoolean released = new AtomicBoolean(false);
3738
private final Store store;
3839
private final Logger logger;
3940

40-
public EngineSearcher(String source, IndexSearcher searcher, SearcherManager manager, Store store, Logger logger) {
41+
public EngineSearcher(String source, IndexSearcher searcher, ReferenceManager<IndexSearcher> manager, Store store, Logger logger) {
4142
super(source, searcher);
4243
this.manager = manager;
4344
this.store = store;

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+90-21
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.elasticsearch.Version;
4949
import org.elasticsearch.action.index.IndexRequest;
5050
import org.elasticsearch.common.Nullable;
51+
import org.elasticsearch.common.SuppressForbidden;
5152
import org.elasticsearch.common.UUIDs;
5253
import org.elasticsearch.common.lease.Releasable;
5354
import org.elasticsearch.common.lucene.LoggerInfoStream;
@@ -57,7 +58,6 @@
5758
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
5859
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
5960
import org.elasticsearch.common.metrics.CounterMetric;
60-
import org.elasticsearch.common.unit.ByteSizeValue;
6161
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
6262
import org.elasticsearch.common.util.concurrent.KeyedLock;
6363
import org.elasticsearch.common.util.concurrent.ReleasableLock;
@@ -108,7 +108,7 @@ public class InternalEngine extends Engine {
108108

109109
private final IndexWriter indexWriter;
110110

111-
private final SearcherManager externalSearcherManager;
111+
private final ExternalSearcherManager externalSearcherManager;
112112
private final SearcherManager internalSearcherManager;
113113

114114
private final Lock flushLock = new ReentrantLock();
@@ -172,7 +172,7 @@ public InternalEngine(EngineConfig engineConfig) {
172172
store.incRef();
173173
IndexWriter writer = null;
174174
Translog translog = null;
175-
SearcherManager externalSearcherManager = null;
175+
ExternalSearcherManager externalSearcherManager = null;
176176
SearcherManager internalSearcherManager = null;
177177
EngineMergeScheduler scheduler = null;
178178
boolean success = false;
@@ -224,8 +224,8 @@ public InternalEngine(EngineConfig engineConfig) {
224224
throw e;
225225
}
226226
}
227-
internalSearcherManager = createSearcherManager(new SearcherFactory(), false);
228-
externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig), true);
227+
externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig));
228+
internalSearcherManager = externalSearcherManager.internalSearcherManager;
229229
this.internalSearcherManager = internalSearcherManager;
230230
this.externalSearcherManager = externalSearcherManager;
231231
internalSearcherManager.addListener(versionMap);
@@ -238,7 +238,7 @@ public InternalEngine(EngineConfig engineConfig) {
238238
success = true;
239239
} finally {
240240
if (success == false) {
241-
IOUtils.closeWhileHandlingException(writer, translog, externalSearcherManager, internalSearcherManager, scheduler);
241+
IOUtils.closeWhileHandlingException(writer, translog, internalSearcherManager, externalSearcherManager, scheduler);
242242
if (isClosed.get() == false) {
243243
// failure we need to dec the store reference
244244
store.decRef();
@@ -248,6 +248,75 @@ public InternalEngine(EngineConfig engineConfig) {
248248
logger.trace("created new InternalEngine");
249249
}
250250

251+
/**
252+
* This reference manager delegates all it's refresh calls to another (internal) SearcherManager
253+
* The main purpose for this is that if we have external refreshes happening we don't issue extra
254+
* refreshes to clear version map memory etc. this can cause excessive segment creation if heavy indexing
255+
* is happening and the refresh interval is low (ie. 1 sec)
256+
*
257+
* This also prevents segment starvation where an internal reader holds on to old segments literally forever
258+
* since no indexing is happening and refreshes are only happening to the external reader manager, while with
259+
* this specialized implementation an external refresh will immediately be reflected on the internal reader
260+
* and old segments can be released in the same way previous version did this (as a side-effect of _refresh)
261+
*/
262+
@SuppressForbidden(reason = "reference counting is required here")
263+
private static final class ExternalSearcherManager extends ReferenceManager<IndexSearcher> {
264+
private final SearcherFactory searcherFactory;
265+
private final SearcherManager internalSearcherManager;
266+
267+
ExternalSearcherManager(SearcherManager internalSearcherManager, SearcherFactory searcherFactory) throws IOException {
268+
IndexSearcher acquire = internalSearcherManager.acquire();
269+
try {
270+
IndexReader indexReader = acquire.getIndexReader();
271+
assert indexReader instanceof ElasticsearchDirectoryReader:
272+
"searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + indexReader;
273+
indexReader.incRef(); // steal the reader - getSearcher will decrement if it fails
274+
current = SearcherManager.getSearcher(searcherFactory, indexReader, null);
275+
} finally {
276+
internalSearcherManager.release(acquire);
277+
}
278+
this.searcherFactory = searcherFactory;
279+
this.internalSearcherManager = internalSearcherManager;
280+
}
281+
282+
@Override
283+
protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException {
284+
// we simply run a blocking refresh on the internal reference manager and then steal it's reader
285+
// it's a save operation since we acquire the reader which incs it's reference but then down the road
286+
// steal it by calling incRef on the "stolen" reader
287+
internalSearcherManager.maybeRefreshBlocking();
288+
IndexSearcher acquire = internalSearcherManager.acquire();
289+
final IndexReader previousReader = referenceToRefresh.getIndexReader();
290+
assert previousReader instanceof ElasticsearchDirectoryReader:
291+
"searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + previousReader;
292+
try {
293+
final IndexReader newReader = acquire.getIndexReader();
294+
if (newReader == previousReader) {
295+
// nothing has changed - both ref managers share the same instance so we can use reference equality
296+
return null;
297+
} else {
298+
newReader.incRef(); // steal the reader - getSearcher will decrement if it fails
299+
return SearcherManager.getSearcher(searcherFactory, newReader, previousReader);
300+
}
301+
} finally {
302+
internalSearcherManager.release(acquire);
303+
}
304+
}
305+
306+
@Override
307+
protected boolean tryIncRef(IndexSearcher reference) {
308+
return reference.getIndexReader().tryIncRef();
309+
}
310+
311+
@Override
312+
protected int getRefCount(IndexSearcher reference) {
313+
return reference.getIndexReader().getRefCount();
314+
}
315+
316+
@Override
317+
protected void decRef(IndexSearcher reference) throws IOException { reference.getIndexReader().decRef(); }
318+
}
319+
251320
@Override
252321
public void restoreLocalCheckpointFromTranslog() throws IOException {
253322
try (ReleasableLock ignored = writeLock.acquire()) {
@@ -456,18 +525,18 @@ private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean force
456525
return uuid;
457526
}
458527

459-
private SearcherManager createSearcherManager(SearcherFactory searcherFactory, boolean readSegmentsInfo) throws EngineException {
528+
private ExternalSearcherManager createSearcherManager(SearchFactory externalSearcherFactory) throws EngineException {
460529
boolean success = false;
461-
SearcherManager searcherManager = null;
530+
SearcherManager internalSearcherManager = null;
462531
try {
463532
try {
464533
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
465-
searcherManager = new SearcherManager(directoryReader, searcherFactory);
466-
if (readSegmentsInfo) {
467-
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store);
468-
}
534+
internalSearcherManager = new SearcherManager(directoryReader, new SearcherFactory());
535+
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(internalSearcherManager, store);
536+
ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager,
537+
externalSearcherFactory);
469538
success = true;
470-
return searcherManager;
539+
return externalSearcherManager;
471540
} catch (IOException e) {
472541
maybeFailEngine("start", e);
473542
try {
@@ -479,7 +548,7 @@ private SearcherManager createSearcherManager(SearcherFactory searcherFactory, b
479548
}
480549
} finally {
481550
if (success == false) { // release everything we created on a failure
482-
IOUtils.closeWhileHandlingException(searcherManager, indexWriter);
551+
IOUtils.closeWhileHandlingException(internalSearcherManager, indexWriter);
483552
}
484553
}
485554
}
@@ -1229,24 +1298,24 @@ public void refresh(String source) throws EngineException {
12291298
}
12301299

12311300
final void refresh(String source, SearcherScope scope) throws EngineException {
1232-
long bytes = 0;
12331301
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
12341302
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
1303+
// both refresh types will result in an internal refresh but only the external will also
1304+
// pass the new reader reference to the external reader manager.
1305+
1306+
// this will also cause version map ram to be freed hence we always account for it.
1307+
final long bytes = indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
1308+
writingBytes.addAndGet(bytes);
12351309
try (ReleasableLock lock = readLock.acquire()) {
12361310
ensureOpen();
1237-
bytes = indexWriter.ramBytesUsed();
12381311
switch (scope) {
12391312
case EXTERNAL:
12401313
// even though we maintain 2 managers we really do the heavy-lifting only once.
12411314
// the second refresh will only do the extra work we have to do for warming caches etc.
1242-
writingBytes.addAndGet(bytes);
12431315
externalSearcherManager.maybeRefreshBlocking();
12441316
// the break here is intentional we never refresh both internal / external together
12451317
break;
12461318
case INTERNAL:
1247-
final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
1248-
bytes += versionMapBytes;
1249-
writingBytes.addAndGet(bytes);
12501319
internalSearcherManager.maybeRefreshBlocking();
12511320
break;
12521321
default:
@@ -1709,7 +1778,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
17091778
}
17101779

17111780
@Override
1712-
protected SearcherManager getSearcherManager(String source, SearcherScope scope) {
1781+
protected ReferenceManager<IndexSearcher> getSearcherManager(String source, SearcherScope scope) {
17131782
switch (scope) {
17141783
case INTERNAL:
17151784
return internalSearcherManager;

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

+32-3
Original file line numberDiff line numberDiff line change
@@ -3882,7 +3882,7 @@ public void assertSameReader(Searcher left, Searcher right) {
38823882
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
38833883
assertEquals(rightLeaves.size(), leftLeaves.size());
38843884
for (int i = 0; i < leftLeaves.size(); i++) {
3885-
assertSame(leftLeaves.get(i).reader(), rightLeaves.get(0).reader());
3885+
assertSame(leftLeaves.get(i).reader(), rightLeaves.get(i).reader());
38863886
}
38873887
}
38883888

@@ -3891,7 +3891,7 @@ public void assertNotSameReader(Searcher left, Searcher right) {
38913891
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
38923892
if (rightLeaves.size() == leftLeaves.size()) {
38933893
for (int i = 0; i < leftLeaves.size(); i++) {
3894-
if (leftLeaves.get(i).reader() != rightLeaves.get(0).reader()) {
3894+
if (leftLeaves.get(i).reader() != rightLeaves.get(i).reader()) {
38953895
return; // all is well
38963896
}
38973897
}
@@ -3919,7 +3919,6 @@ public void testRefreshScopedSearcher() throws IOException {
39193919
assertEquals(0, searchSearcher.reader().numDocs());
39203920
assertNotSameReader(getSearcher, searchSearcher);
39213921
}
3922-
39233922
engine.refresh("test", Engine.SearcherScope.EXTERNAL);
39243923

39253924
try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
@@ -3928,6 +3927,36 @@ public void testRefreshScopedSearcher() throws IOException {
39283927
assertEquals(10, searchSearcher.reader().numDocs());
39293928
assertSameReader(getSearcher, searchSearcher);
39303929
}
3930+
3931+
// now ensure external refreshes are reflected on the internal reader
3932+
final String docId = Integer.toString(10);
3933+
final ParsedDocument doc =
3934+
testParsedDocument(docId, null, testDocumentWithTextField(), SOURCE, null);
3935+
Engine.Index primaryResponse = indexForDoc(doc);
3936+
engine.index(primaryResponse);
3937+
3938+
engine.refresh("test", Engine.SearcherScope.EXTERNAL);
3939+
3940+
try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
3941+
Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
3942+
assertEquals(11, getSearcher.reader().numDocs());
3943+
assertEquals(11, searchSearcher.reader().numDocs());
3944+
assertSameReader(getSearcher, searchSearcher);
3945+
}
3946+
3947+
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)){
3948+
engine.refresh("test", Engine.SearcherScope.INTERNAL);
3949+
try (Searcher nextSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)){
3950+
assertSame(searcher.searcher(), nextSearcher.searcher());
3951+
}
3952+
}
3953+
3954+
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
3955+
engine.refresh("test", Engine.SearcherScope.EXTERNAL);
3956+
try (Searcher nextSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
3957+
assertSame(searcher.searcher(), nextSearcher.searcher());
3958+
}
3959+
}
39313960
}
39323961

39333962
public void testSeqNoGenerator() throws IOException {

0 commit comments

Comments
 (0)