Skip to content

Commit 9dd892a

Browse files
committed
Use separate searchers for "search visibility" vs "move indexing buffer to disk"
Today, when ES detects it's using too much heap vs the configured indexing buffer (default 10% of JVM heap) it opens a new searcher to force Lucene to move the bytes to disk, clear version map, etc. But this has the unexpected side effect of making newly indexed/deleted documents visible to future searches, which is not nice for users who are trying to prevent that, e.g. elastic#3593. This is also an indirect spinoff from elastic#26802 where we potentially pay a big price on rebuilding caches etc. when updates / realtime-get is used. We are refreshing the internal reader for realtime gets which causes for instance global ords to be rebuild. I think we can gain quite a bit if we'd use a reader that is only used for GETs and not for searches etc. that way we can also solve problems of searchers being refreshed unexpectedly aside of replica recovery / relocation. Closes elastic#15768 Closes elastic#26912
1 parent 6658ff0 commit 9dd892a

File tree

5 files changed

+132
-30
lines changed

5 files changed

+132
-30
lines changed

core/src/main/java/org/elasticsearch/index/engine/Engine.java

+24-3
Original file line numberDiff line numberDiff line change
@@ -496,21 +496,38 @@ protected final GetResult getFromSearcher(Get get, Function<String, Searcher> se
496496

497497
public abstract GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException;
498498

499+
499500
/**
500501
* Returns a new searcher instance. The consumer of this
501502
* API is responsible for releasing the returned searcher in a
502503
* safe manner, preferably in a try/finally block.
503504
*
505+
* @param source the source API or routing that triggers this searcher acquire
506+
*
504507
* @see Searcher#close()
505508
*/
506509
public final Searcher acquireSearcher(String source) throws EngineException {
510+
return acquireSearcher(source, SearcherScope.SEARCH);
511+
}
512+
513+
/**
514+
* Returns a new searcher instance. The consumer of this
515+
* API is responsible for releasing the returned searcher in a
516+
* safe manner, preferably in a try/finally block.
517+
*
518+
* @param source the source API or routing that triggers this searcher acquire
519+
* @param scope the scope of this searcher ie. if the searcher will be used for get or search purposes
520+
*
521+
* @see Searcher#close()
522+
*/
523+
public final Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException {
507524
boolean success = false;
508525
/* Acquire order here is store -> manager since we need
509526
* to make sure that the store is not closed before
510527
* the searcher is acquired. */
511528
store.incRef();
512529
try {
513-
final SearcherManager manager = getSearcherManager(); // can never be null
530+
final SearcherManager manager = getSearcherManager(source, scope); // can never be null
514531
/* This might throw NPE but that's fine we will run ensureOpen()
515532
* in the catch block and throw the right exception */
516533
final IndexSearcher searcher = manager.acquire();
@@ -536,6 +553,10 @@ public final Searcher acquireSearcher(String source) throws EngineException {
536553
}
537554
}
538555

556+
public enum SearcherScope {
557+
SEARCH, GET
558+
}
559+
539560
/** returns the translog for this engine */
540561
public abstract Translog getTranslog();
541562

@@ -768,7 +789,7 @@ public final boolean refreshNeeded() {
768789
the store is closed so we need to make sure we increment it here
769790
*/
770791
try {
771-
return getSearcherManager().isSearcherCurrent() == false;
792+
return getSearcherManager("refresh_needed", SearcherScope.SEARCH).isSearcherCurrent() == false;
772793
} catch (IOException e) {
773794
logger.error("failed to access searcher manager", e);
774795
failEngine("failed to access searcher manager", e);
@@ -1306,7 +1327,7 @@ public void release() {
13061327
}
13071328
}
13081329

1309-
protected abstract SearcherManager getSearcherManager();
1330+
protected abstract SearcherManager getSearcherManager(String source, SearcherScope scope);
13101331

13111332
/**
13121333
* Method to close the engine while the write lock is held.

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+39-18
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ public class InternalEngine extends Engine {
110110

111111
private final SearcherFactory searcherFactory;
112112
private final SearcherManager searcherManager;
113+
private final SearcherManager internalSearcherManager;
113114

114115
private final Lock flushLock = new ReentrantLock();
115116
private final ReentrantLock optimizeLock = new ReentrantLock();
@@ -164,6 +165,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
164165
IndexWriter writer = null;
165166
Translog translog = null;
166167
SearcherManager manager = null;
168+
SearcherManager internalSearcherManager = null;
167169
EngineMergeScheduler scheduler = null;
168170
boolean success = false;
169171
try {
@@ -215,9 +217,11 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
215217
throw e;
216218
}
217219
}
218-
manager = createSearcherManager();
220+
manager = createSearcherManager(searcherFactory);
221+
internalSearcherManager = createSearcherManager(new SearcherFactory());
222+
this.internalSearcherManager = internalSearcherManager;
219223
this.searcherManager = manager;
220-
this.versionMap.setManager(searcherManager);
224+
this.versionMap.setManager(internalSearcherManager);
221225
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
222226
// don't allow commits until we are done with recovering
223227
pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
@@ -227,7 +231,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
227231
success = true;
228232
} finally {
229233
if (success == false) {
230-
IOUtils.closeWhileHandlingException(writer, translog, manager, scheduler);
234+
IOUtils.closeWhileHandlingException(writer, translog, manager, internalSearcherManager, scheduler);
231235
versionMap.clear();
232236
if (isClosed.get() == false) {
233237
// failure we need to dec the store reference
@@ -441,7 +445,7 @@ private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean force
441445
return uuid;
442446
}
443447

444-
private SearcherManager createSearcherManager() throws EngineException {
448+
private SearcherManager createSearcherManager(SearcherFactory searcherFactory) throws EngineException {
445449
boolean success = false;
446450
SearcherManager searcherManager = null;
447451
try {
@@ -482,7 +486,7 @@ public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws
482486
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
483487
get.versionType().explainConflictForReads(versionValue.version, get.version()));
484488
}
485-
refresh("realtime_get");
489+
refresh("realtime_get", false);
486490
}
487491
}
488492

@@ -1187,17 +1191,26 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
11871191

11881192
@Override
11891193
public void refresh(String source) throws EngineException {
1194+
refresh(source, true);
1195+
}
1196+
1197+
final void refresh(String source, boolean refreshExternal) throws EngineException {
11901198
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
11911199
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
11921200
try (ReleasableLock lock = readLock.acquire()) {
11931201
ensureOpen();
1194-
searcherManager.maybeRefreshBlocking();
1202+
internalSearcherManager.maybeRefreshBlocking();
1203+
if (refreshExternal) {
1204+
// even though we maintain 2 managers we really do the heavy-lifting only once.
1205+
// the second refresh will only do the extra work we have to do for warming caches etc.
1206+
searcherManager.maybeRefreshBlocking();
1207+
}
11951208
} catch (AlreadyClosedException e) {
11961209
failOnTragicEvent(e);
11971210
throw e;
11981211
} catch (Exception e) {
11991212
try {
1200-
failEngine("refresh failed", e);
1213+
failEngine("refresh failed source[" + source + "]", e);
12011214
} catch (Exception inner) {
12021215
e.addSuppressed(inner);
12031216
}
@@ -1219,10 +1232,6 @@ public void writeIndexingBuffer() throws EngineException {
12191232
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
12201233
try (ReleasableLock lock = readLock.acquire()) {
12211234
ensureOpen();
1222-
1223-
// TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two
1224-
// searcher managers, one for searching which is only refreshed by the schedule the user requested (refresh_interval, or invoking
1225-
// refresh API), and another for version map interactions. See #15768.
12261235
final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
12271236
final long indexingBufferBytes = indexWriter.ramBytesUsed();
12281237

@@ -1231,7 +1240,7 @@ public void writeIndexingBuffer() throws EngineException {
12311240
// The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears
12321241
logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])",
12331242
new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes));
1234-
refresh("write indexing buffer");
1243+
refresh("write indexing buffer", false);
12351244
} else {
12361245
// Most of our heap is used by the indexing buffer, so we do a cheaper (just writes segments, doesn't open a new searcher) IW.flush:
12371246
logger.debug("use IndexWriter.flush to write indexing buffer (heap size=[{}]) since version map is small (heap size=[{}])",
@@ -1303,9 +1312,8 @@ final boolean tryRenewSyncCommit() {
13031312
throw new EngineException(shardId, "failed to renew sync commit", ex);
13041313
}
13051314
if (renewed) { // refresh outside of the write lock
1306-
refresh("renew sync commit");
1315+
refresh("renew sync commit"); // we have to refresh both searchers here to ensure we release unreferenced segments.
13071316
}
1308-
13091317
return renewed;
13101318
}
13111319

@@ -1347,7 +1355,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
13471355
commitIndexWriter(indexWriter, translog, null);
13481356
logger.trace("finished commit for flush");
13491357
// we need to refresh in order to clear older version values
1350-
refresh("version_table_flush");
1358+
refresh("version_table_flush"); // TODO technically we could also only refresh the internal searcher
13511359
translog.trimUnreferencedReaders();
13521360
} catch (Exception e) {
13531361
throw new FlushFailedEngineException(shardId, e);
@@ -1500,6 +1508,8 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
15001508
if (flush) {
15011509
if (tryRenewSyncCommit() == false) {
15021510
flush(false, true);
1511+
} else {
1512+
refresh("renew sync commit"); // we have to refresh both searchers here to ensure we release unreferenced segments.
15031513
}
15041514
}
15051515
if (upgrade) {
@@ -1652,7 +1662,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
16521662
try {
16531663
this.versionMap.clear();
16541664
try {
1655-
IOUtils.close(searcherManager);
1665+
IOUtils.close(searcherManager, internalSearcherManager);
16561666
} catch (Exception e) {
16571667
logger.warn("Failed to close SearcherManager", e);
16581668
}
@@ -1684,8 +1694,15 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
16841694
}
16851695

16861696
@Override
1687-
protected SearcherManager getSearcherManager() {
1688-
return searcherManager;
1697+
protected SearcherManager getSearcherManager(String source, SearcherScope scope) {
1698+
switch (scope) {
1699+
case GET:
1700+
return internalSearcherManager;
1701+
case SEARCH:
1702+
return searcherManager;
1703+
default:
1704+
throw new IllegalStateException("unknonw scope: " + scope);
1705+
}
16891706
}
16901707

16911708
private Releasable acquireLock(BytesRef uid) {
@@ -1867,6 +1884,10 @@ protected void doRun() throws Exception {
18671884
// free up transient disk usage of the (presumably biggish) segments that were just merged
18681885
if (tryRenewSyncCommit() == false) {
18691886
flush();
1887+
} else {
1888+
// we only refresh the rather cheap internal searcher manager in order to not trigger new datastructures
1889+
// by accident ie. warm big segments in parent child case etc.
1890+
refresh("renew sync commit", false);
18701891
}
18711892
}
18721893
});

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -832,7 +832,7 @@ private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws I
832832

833833
public Engine.GetResult get(Engine.Get get) {
834834
readAllowed();
835-
return getEngine().get(get, this::acquireSearcher);
835+
return getEngine().get(get, source -> this.acquireSearcher(source, Engine.SearcherScope.GET));
836836
}
837837

838838
/**
@@ -1127,11 +1127,14 @@ public void failShard(String reason, @Nullable Exception e) {
11271127
// fail the engine. This will cause this shard to also be removed from the node's index service.
11281128
getEngine().failEngine(reason, e);
11291129
}
1130-
11311130
public Engine.Searcher acquireSearcher(String source) {
1131+
return acquireSearcher(source, Engine.SearcherScope.SEARCH);
1132+
}
1133+
1134+
private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) {
11321135
readAllowed();
11331136
final Engine engine = getEngine();
1134-
final Engine.Searcher searcher = engine.acquireSearcher(source);
1137+
final Engine.Searcher searcher = engine.acquireSearcher(source, scope);
11351138
boolean success = false;
11361139
try {
11371140
final Engine.Searcher wrappedSearcher = searcherWrapper == null ? searcher : searcherWrapper.wrap(searcher);

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

+62-5
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import org.elasticsearch.common.collect.Tuple;
8787
import org.elasticsearch.common.logging.Loggers;
8888
import org.elasticsearch.common.lucene.Lucene;
89+
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
8990
import org.elasticsearch.common.lucene.uid.Versions;
9091
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
9192
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
@@ -143,6 +144,7 @@
143144
import org.hamcrest.Matchers;
144145
import org.junit.After;
145146
import org.junit.Before;
147+
import sun.nio.ch.IOUtil;
146148

147149
import java.io.IOException;
148150
import java.io.UncheckedIOException;
@@ -942,7 +944,7 @@ public void testConcurrentGetAndFlush() throws Exception {
942944
engine.index(indexForDoc(doc));
943945

944946
final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>();
945-
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
947+
final Function<String, Searcher> searcherFactory = s -> engine.acquireSearcher(s, Engine.SearcherScope.GET);
946948
latestGetResult.set(engine.get(newGet(true, doc), searcherFactory));
947949
final AtomicBoolean flushFinished = new AtomicBoolean(false);
948950
final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -977,7 +979,7 @@ public void testSimpleOperations() throws Exception {
977979
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
978980
searchResult.close();
979981

980-
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
982+
final Function<String, Searcher> searcherFactory = s -> engine.acquireSearcher(s, Engine.SearcherScope.GET);
981983

982984
// create a document
983985
Document document = testDocumentWithTextField();
@@ -1884,7 +1886,7 @@ class OpAndVersion {
18841886
ParsedDocument doc = testParsedDocument("1", null, testDocument(), bytesArray(""), null);
18851887
final Term uidTerm = newUid(doc);
18861888
engine.index(indexForDoc(doc));
1887-
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
1889+
final Function<String, Searcher> searcherFactory = s -> engine.acquireSearcher(s, Engine.SearcherScope.GET);
18881890
for (int i = 0; i < thread.length; i++) {
18891891
thread[i] = new Thread(() -> {
18901892
startGun.countDown();
@@ -2314,7 +2316,7 @@ public void testEnableGcDeletes() throws Exception {
23142316
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) {
23152317
engine.config().setEnableGcDeletes(false);
23162318

2317-
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
2319+
final Function<String, Searcher> searcherFactory = s -> engine.acquireSearcher(s, Engine.SearcherScope.GET);
23182320

23192321
// Add document
23202322
Document document = testDocument();
@@ -3847,7 +3849,7 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOExceptio
38473849
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
38483850
final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null);
38493851
final Term uid = newUid(doc);
3850-
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
3852+
final Function<String, Searcher> searcherFactory = s -> engine.acquireSearcher(s, Engine.SearcherScope.GET);
38513853
for (int i = 0; i < numberOfOperations; i++) {
38523854
if (randomBoolean()) {
38533855
final Engine.Index index = new Engine.Index(
@@ -4203,4 +4205,59 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
42034205
IOUtils.close(recoveringEngine);
42044206
}
42054207
}
4208+
4209+
4210+
public void assertSameReader(Searcher left, Searcher right) {
4211+
List<LeafReaderContext> leftLeaves = ElasticsearchDirectoryReader.unwrap(left.getDirectoryReader()).leaves();
4212+
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
4213+
assertEquals(rightLeaves.size(), leftLeaves.size());
4214+
for (int i = 0; i < leftLeaves.size(); i++) {
4215+
assertSame(leftLeaves.get(i).reader(), rightLeaves.get(0).reader());
4216+
}
4217+
}
4218+
4219+
public void assertNotSameReader(Searcher left, Searcher right) {
4220+
List<LeafReaderContext> leftLeaves = ElasticsearchDirectoryReader.unwrap(left.getDirectoryReader()).leaves();
4221+
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
4222+
if (rightLeaves.size() == leftLeaves.size()) {
4223+
for (int i = 0; i < leftLeaves.size(); i++) {
4224+
if (leftLeaves.get(i).reader() != rightLeaves.get(0).reader()) {
4225+
return; // all is well
4226+
}
4227+
}
4228+
fail("readers are same");
4229+
}
4230+
}
4231+
4232+
public void testRefreshScopedSearcher() throws IOException {
4233+
Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.GET);
4234+
Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.SEARCH);
4235+
assertSameReader(getSearcher, searchSearcher);
4236+
IOUtils.close(getSearcher, searchSearcher);
4237+
for (int i = 0; i < 10; i++) {
4238+
final String docId = Integer.toString(i);
4239+
final ParsedDocument doc =
4240+
testParsedDocument(docId, null, testDocumentWithTextField(), SOURCE, null);
4241+
Engine.Index primaryResponse = indexForDoc(doc);
4242+
engine.index(primaryResponse);
4243+
}
4244+
assertTrue(engine.refreshNeeded());
4245+
engine.refresh("test", false);
4246+
4247+
getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.GET);
4248+
assertEquals(10, getSearcher.reader().numDocs());
4249+
searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.SEARCH);
4250+
assertEquals(0, searchSearcher.reader().numDocs());
4251+
assertNotSameReader(getSearcher, searchSearcher);
4252+
IOUtils.close(getSearcher, searchSearcher);
4253+
4254+
engine.refresh("test", true);
4255+
4256+
getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.GET);
4257+
assertEquals(10, getSearcher.reader().numDocs());
4258+
searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.SEARCH);
4259+
assertEquals(10, searchSearcher.reader().numDocs());
4260+
assertSameReader(getSearcher, searchSearcher);
4261+
IOUtils.close(getSearcher, searchSearcher);
4262+
}
42064263
}

0 commit comments

Comments
 (0)