Skip to content

Commit 27a49bb

Browse files
committed
Ensure external refreshes will also refresh internal searcher to minimize segment creation (#27253)
We cut over to internal and external IndexReader/IndexSearcher in #26972 which uses two independent searcher managers. This has the downside that refreshes of the external reader will never clear the internal version map which in-turn will trigger additional and potentially unnecessary segment flushes since memory must be freed. Under heavy indexing load with low refresh intervals this can cause excessive segment creation which causes high GC activity and significantly increases the required segment merges. This change adds a dedicated external reference manager that delegates refreshes to the internal reference manager that then `steals` the refreshed reader from the internal reference manager for external usage. This ensures that external and internal readers are consistent on an external refresh. As a sideeffect this also releases old segments referenced by the internal reference manager which can potentially hold on to already merged away segments until it is refreshed due to a flush or indexing activity.
1 parent a16119d commit 27a49bb

File tree

6 files changed

+146
-38
lines changed

6 files changed

+146
-38
lines changed

core/src/main/java/org/elasticsearch/index/engine/Engine.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.apache.lucene.index.SnapshotDeletionPolicy;
3737
import org.apache.lucene.index.Term;
3838
import org.apache.lucene.search.IndexSearcher;
39-
import org.apache.lucene.search.SearcherManager;
39+
import org.apache.lucene.search.ReferenceManager;
4040
import org.apache.lucene.store.AlreadyClosedException;
4141
import org.apache.lucene.store.Directory;
4242
import org.apache.lucene.store.IOContext;
@@ -170,7 +170,7 @@ protected static boolean isMergedSegment(LeafReader reader) {
170170
return IndexWriter.SOURCE_MERGE.equals(source);
171171
}
172172

173-
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) {
173+
protected Searcher newSearcher(String source, IndexSearcher searcher, ReferenceManager<IndexSearcher> manager) {
174174
return new EngineSearcher(source, searcher, manager, store, logger);
175175
}
176176

@@ -531,7 +531,7 @@ public final Searcher acquireSearcher(String source, SearcherScope scope) throws
531531
* the searcher is acquired. */
532532
store.incRef();
533533
try {
534-
final SearcherManager manager = getSearcherManager(source, scope); // can never be null
534+
final ReferenceManager<IndexSearcher> manager = getSearcherManager(source, scope); // can never be null
535535
/* This might throw NPE but that's fine we will run ensureOpen()
536536
* in the catch block and throw the right exception */
537537
final IndexSearcher searcher = manager.acquire();
@@ -585,7 +585,7 @@ public CommitStats commitStats() {
585585
/**
586586
* Read the last segments info from the commit pointed to by the searcher manager
587587
*/
588-
protected static SegmentInfos readLastCommittedSegmentInfos(final SearcherManager sm, final Store store) throws IOException {
588+
protected static SegmentInfos readLastCommittedSegmentInfos(final ReferenceManager<IndexSearcher> sm, final Store store) throws IOException {
589589
IndexSearcher searcher = sm.acquire();
590590
try {
591591
IndexCommit latestCommit = ((DirectoryReader) searcher.getIndexReader()).getIndexCommit();
@@ -787,13 +787,19 @@ public int compare(Segment o1, Segment o2) {
787787
public final boolean refreshNeeded() {
788788
if (store.tryIncRef()) {
789789
/*
790-
we need to inc the store here since searcherManager.isSearcherCurrent()
791-
acquires a searcher internally and that might keep a file open on the
790+
we need to inc the store here since we acquire a searcher and that might keep a file open on the
792791
store. this violates the assumption that all files are closed when
793792
the store is closed so we need to make sure we increment it here
794793
*/
795794
try {
796-
return getSearcherManager("refresh_needed", SearcherScope.EXTERNAL).isSearcherCurrent() == false;
795+
ReferenceManager<IndexSearcher> manager = getSearcherManager("refresh_needed", SearcherScope.EXTERNAL);
796+
final IndexSearcher searcher = manager.acquire();
797+
try {
798+
final IndexReader r = searcher.getIndexReader();
799+
return ((DirectoryReader) r).isCurrent() == false;
800+
} finally {
801+
manager.release(searcher);
802+
}
797803
} catch (IOException e) {
798804
logger.error("failed to access searcher manager", e);
799805
failEngine("failed to access searcher manager", e);
@@ -1331,7 +1337,7 @@ public void release() {
13311337
}
13321338
}
13331339

1334-
protected abstract SearcherManager getSearcherManager(String source, SearcherScope scope);
1340+
protected abstract ReferenceManager<IndexSearcher> getSearcherManager(String source, SearcherScope scope);
13351341

13361342
/**
13371343
* Method to close the engine while the write lock is held.

core/src/main/java/org/elasticsearch/index/engine/EngineSearcher.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.logging.log4j.Logger;
2323
import org.apache.lucene.search.IndexSearcher;
24+
import org.apache.lucene.search.ReferenceManager;
2425
import org.apache.lucene.search.SearcherManager;
2526
import org.apache.lucene.store.AlreadyClosedException;
2627
import org.elasticsearch.index.store.Store;
@@ -32,12 +33,12 @@
3233
* Searcher for an Engine
3334
*/
3435
public class EngineSearcher extends Engine.Searcher {
35-
private final SearcherManager manager;
36+
private final ReferenceManager<IndexSearcher> manager;
3637
private final AtomicBoolean released = new AtomicBoolean(false);
3738
private final Store store;
3839
private final Logger logger;
3940

40-
public EngineSearcher(String source, IndexSearcher searcher, SearcherManager manager, Store store, Logger logger) {
41+
public EngineSearcher(String source, IndexSearcher searcher, ReferenceManager<IndexSearcher> manager, Store store, Logger logger) {
4142
super(source, searcher);
4243
this.manager = manager;
4344
this.store = store;

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+90-22
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.elasticsearch.Version;
4949
import org.elasticsearch.action.index.IndexRequest;
5050
import org.elasticsearch.common.Nullable;
51+
import org.elasticsearch.common.SuppressForbidden;
5152
import org.elasticsearch.common.UUIDs;
5253
import org.elasticsearch.common.lease.Releasable;
5354
import org.elasticsearch.common.lucene.LoggerInfoStream;
@@ -57,7 +58,6 @@
5758
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
5859
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
5960
import org.elasticsearch.common.metrics.CounterMetric;
60-
import org.elasticsearch.common.unit.ByteSizeValue;
6161
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
6262
import org.elasticsearch.common.util.concurrent.KeyedLock;
6363
import org.elasticsearch.common.util.concurrent.ReleasableLock;
@@ -108,7 +108,7 @@ public class InternalEngine extends Engine {
108108

109109
private final IndexWriter indexWriter;
110110

111-
private final SearcherManager externalSearcherManager;
111+
private final ExternalSearcherManager externalSearcherManager;
112112
private final SearcherManager internalSearcherManager;
113113

114114
private final Lock flushLock = new ReentrantLock();
@@ -172,7 +172,7 @@ public InternalEngine(EngineConfig engineConfig) {
172172
store.incRef();
173173
IndexWriter writer = null;
174174
Translog translog = null;
175-
SearcherManager externalSearcherManager = null;
175+
ExternalSearcherManager externalSearcherManager = null;
176176
SearcherManager internalSearcherManager = null;
177177
EngineMergeScheduler scheduler = null;
178178
boolean success = false;
@@ -225,10 +225,9 @@ public InternalEngine(EngineConfig engineConfig) {
225225
throw e;
226226
}
227227
}
228-
229228
this.translog = translog;
230-
internalSearcherManager = createSearcherManager(new SearcherFactory(), false);
231-
externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig), true);
229+
externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig));
230+
internalSearcherManager = externalSearcherManager.internalSearcherManager;
232231
this.internalSearcherManager = internalSearcherManager;
233232
this.externalSearcherManager = externalSearcherManager;
234233
internalSearcherManager.addListener(versionMap);
@@ -241,7 +240,7 @@ public InternalEngine(EngineConfig engineConfig) {
241240
success = true;
242241
} finally {
243242
if (success == false) {
244-
IOUtils.closeWhileHandlingException(writer, translog, externalSearcherManager, internalSearcherManager, scheduler);
243+
IOUtils.closeWhileHandlingException(writer, translog, internalSearcherManager, externalSearcherManager, scheduler);
245244
if (isClosed.get() == false) {
246245
// failure we need to dec the store reference
247246
store.decRef();
@@ -251,6 +250,75 @@ public InternalEngine(EngineConfig engineConfig) {
251250
logger.trace("created new InternalEngine");
252251
}
253252

253+
/**
254+
* This reference manager delegates all it's refresh calls to another (internal) SearcherManager
255+
* The main purpose for this is that if we have external refreshes happening we don't issue extra
256+
* refreshes to clear version map memory etc. this can cause excessive segment creation if heavy indexing
257+
* is happening and the refresh interval is low (ie. 1 sec)
258+
*
259+
* This also prevents segment starvation where an internal reader holds on to old segments literally forever
260+
* since no indexing is happening and refreshes are only happening to the external reader manager, while with
261+
* this specialized implementation an external refresh will immediately be reflected on the internal reader
262+
* and old segments can be released in the same way previous version did this (as a side-effect of _refresh)
263+
*/
264+
@SuppressForbidden(reason = "reference counting is required here")
265+
private static final class ExternalSearcherManager extends ReferenceManager<IndexSearcher> {
266+
private final SearcherFactory searcherFactory;
267+
private final SearcherManager internalSearcherManager;
268+
269+
ExternalSearcherManager(SearcherManager internalSearcherManager, SearcherFactory searcherFactory) throws IOException {
270+
IndexSearcher acquire = internalSearcherManager.acquire();
271+
try {
272+
IndexReader indexReader = acquire.getIndexReader();
273+
assert indexReader instanceof ElasticsearchDirectoryReader:
274+
"searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + indexReader;
275+
indexReader.incRef(); // steal the reader - getSearcher will decrement if it fails
276+
current = SearcherManager.getSearcher(searcherFactory, indexReader, null);
277+
} finally {
278+
internalSearcherManager.release(acquire);
279+
}
280+
this.searcherFactory = searcherFactory;
281+
this.internalSearcherManager = internalSearcherManager;
282+
}
283+
284+
@Override
285+
protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException {
286+
// we simply run a blocking refresh on the internal reference manager and then steal it's reader
287+
// it's a save operation since we acquire the reader which incs it's reference but then down the road
288+
// steal it by calling incRef on the "stolen" reader
289+
internalSearcherManager.maybeRefreshBlocking();
290+
IndexSearcher acquire = internalSearcherManager.acquire();
291+
final IndexReader previousReader = referenceToRefresh.getIndexReader();
292+
assert previousReader instanceof ElasticsearchDirectoryReader:
293+
"searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + previousReader;
294+
try {
295+
final IndexReader newReader = acquire.getIndexReader();
296+
if (newReader == previousReader) {
297+
// nothing has changed - both ref managers share the same instance so we can use reference equality
298+
return null;
299+
} else {
300+
newReader.incRef(); // steal the reader - getSearcher will decrement if it fails
301+
return SearcherManager.getSearcher(searcherFactory, newReader, previousReader);
302+
}
303+
} finally {
304+
internalSearcherManager.release(acquire);
305+
}
306+
}
307+
308+
@Override
309+
protected boolean tryIncRef(IndexSearcher reference) {
310+
return reference.getIndexReader().tryIncRef();
311+
}
312+
313+
@Override
314+
protected int getRefCount(IndexSearcher reference) {
315+
return reference.getIndexReader().getRefCount();
316+
}
317+
318+
@Override
319+
protected void decRef(IndexSearcher reference) throws IOException { reference.getIndexReader().decRef(); }
320+
}
321+
254322
@Override
255323
public void restoreLocalCheckpointFromTranslog() throws IOException {
256324
try (ReleasableLock ignored = writeLock.acquire()) {
@@ -469,18 +537,18 @@ private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean force
469537
return uuid;
470538
}
471539

472-
private SearcherManager createSearcherManager(SearcherFactory searcherFactory, boolean readSegmentsInfo) throws EngineException {
540+
private ExternalSearcherManager createSearcherManager(SearchFactory externalSearcherFactory) throws EngineException {
473541
boolean success = false;
474-
SearcherManager searcherManager = null;
542+
SearcherManager internalSearcherManager = null;
475543
try {
476544
try {
477545
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
478-
searcherManager = new SearcherManager(directoryReader, searcherFactory);
479-
if (readSegmentsInfo) {
480-
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store);
481-
}
546+
internalSearcherManager = new SearcherManager(directoryReader, new SearcherFactory());
547+
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(internalSearcherManager, store);
548+
ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager,
549+
externalSearcherFactory);
482550
success = true;
483-
return searcherManager;
551+
return externalSearcherManager;
484552
} catch (IOException e) {
485553
maybeFailEngine("start", e);
486554
try {
@@ -492,7 +560,7 @@ private SearcherManager createSearcherManager(SearcherFactory searcherFactory, b
492560
}
493561
} finally {
494562
if (success == false) { // release everything we created on a failure
495-
IOUtils.closeWhileHandlingException(searcherManager, indexWriter);
563+
IOUtils.closeWhileHandlingException(internalSearcherManager, indexWriter);
496564
}
497565
}
498566
}
@@ -1242,24 +1310,24 @@ public void refresh(String source) throws EngineException {
12421310
}
12431311

12441312
final void refresh(String source, SearcherScope scope) throws EngineException {
1245-
long bytes = 0;
12461313
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
12471314
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
1315+
// both refresh types will result in an internal refresh but only the external will also
1316+
// pass the new reader reference to the external reader manager.
1317+
1318+
// this will also cause version map ram to be freed hence we always account for it.
1319+
final long bytes = indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
1320+
writingBytes.addAndGet(bytes);
12481321
try (ReleasableLock lock = readLock.acquire()) {
12491322
ensureOpen();
1250-
bytes = indexWriter.ramBytesUsed();
12511323
switch (scope) {
12521324
case EXTERNAL:
12531325
// even though we maintain 2 managers we really do the heavy-lifting only once.
12541326
// the second refresh will only do the extra work we have to do for warming caches etc.
1255-
writingBytes.addAndGet(bytes);
12561327
externalSearcherManager.maybeRefreshBlocking();
12571328
// the break here is intentional we never refresh both internal / external together
12581329
break;
12591330
case INTERNAL:
1260-
final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
1261-
bytes += versionMapBytes;
1262-
writingBytes.addAndGet(bytes);
12631331
internalSearcherManager.maybeRefreshBlocking();
12641332
break;
12651333
default:
@@ -1722,7 +1790,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
17221790
}
17231791

17241792
@Override
1725-
protected SearcherManager getSearcherManager(String source, SearcherScope scope) {
1793+
protected ReferenceManager<IndexSearcher> getSearcherManager(String source, SearcherScope scope) {
17261794
switch (scope) {
17271795
case INTERNAL:
17281796
return internalSearcherManager;

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

+32-3
Original file line numberDiff line numberDiff line change
@@ -3882,7 +3882,7 @@ public void assertSameReader(Searcher left, Searcher right) {
38823882
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
38833883
assertEquals(rightLeaves.size(), leftLeaves.size());
38843884
for (int i = 0; i < leftLeaves.size(); i++) {
3885-
assertSame(leftLeaves.get(i).reader(), rightLeaves.get(0).reader());
3885+
assertSame(leftLeaves.get(i).reader(), rightLeaves.get(i).reader());
38863886
}
38873887
}
38883888

@@ -3891,7 +3891,7 @@ public void assertNotSameReader(Searcher left, Searcher right) {
38913891
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
38923892
if (rightLeaves.size() == leftLeaves.size()) {
38933893
for (int i = 0; i < leftLeaves.size(); i++) {
3894-
if (leftLeaves.get(i).reader() != rightLeaves.get(0).reader()) {
3894+
if (leftLeaves.get(i).reader() != rightLeaves.get(i).reader()) {
38953895
return; // all is well
38963896
}
38973897
}
@@ -3919,7 +3919,6 @@ public void testRefreshScopedSearcher() throws IOException {
39193919
assertEquals(0, searchSearcher.reader().numDocs());
39203920
assertNotSameReader(getSearcher, searchSearcher);
39213921
}
3922-
39233922
engine.refresh("test", Engine.SearcherScope.EXTERNAL);
39243923

39253924
try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
@@ -3928,6 +3927,36 @@ public void testRefreshScopedSearcher() throws IOException {
39283927
assertEquals(10, searchSearcher.reader().numDocs());
39293928
assertSameReader(getSearcher, searchSearcher);
39303929
}
3930+
3931+
// now ensure external refreshes are reflected on the internal reader
3932+
final String docId = Integer.toString(10);
3933+
final ParsedDocument doc =
3934+
testParsedDocument(docId, null, testDocumentWithTextField(), SOURCE, null);
3935+
Engine.Index primaryResponse = indexForDoc(doc);
3936+
engine.index(primaryResponse);
3937+
3938+
engine.refresh("test", Engine.SearcherScope.EXTERNAL);
3939+
3940+
try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
3941+
Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
3942+
assertEquals(11, getSearcher.reader().numDocs());
3943+
assertEquals(11, searchSearcher.reader().numDocs());
3944+
assertSameReader(getSearcher, searchSearcher);
3945+
}
3946+
3947+
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)){
3948+
engine.refresh("test", Engine.SearcherScope.INTERNAL);
3949+
try (Searcher nextSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)){
3950+
assertSame(searcher.searcher(), nextSearcher.searcher());
3951+
}
3952+
}
3953+
3954+
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
3955+
engine.refresh("test", Engine.SearcherScope.EXTERNAL);
3956+
try (Searcher nextSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
3957+
assertSame(searcher.searcher(), nextSearcher.searcher());
3958+
}
3959+
}
39313960
}
39323961

39333962
public void testSeqNoGenerator() throws IOException {

0 commit comments

Comments
 (0)