Skip to content

Commit 399d53e

Browse files
authored
Refactor index engines to manage readers instead of searchers (elastic#43860)
This commit changes the way we manage refreshes in the index engines. Instead of relying on a SearcherManager, this change uses a ReaderManager that creates ElasticsearchDirectoryReader when needed. Searchers are now created on-demand (when acquireSearcher is called) from the current ElasticsearchDirectoryReader. It also slightly changes the Engine.Searcher to extend IndexSearcher in order to simplify the usage in the consumer.
1 parent 9f4880c commit 399d53e

36 files changed

+488
-501
lines changed

modules/percolator/src/test/java/org/elasticsearch/percolator/PercolatorQuerySearchTests.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.elasticsearch.percolator;
2020

21-
import org.apache.lucene.search.IndexSearcher;
2221
import org.apache.lucene.search.Query;
2322
import org.apache.lucene.search.join.ScoreMode;
2423
import org.elasticsearch.action.search.SearchResponse;
@@ -256,19 +255,18 @@ public void testRangeQueriesWithNow() throws Exception {
256255
.get();
257256
client().admin().indices().prepareRefresh().get();
258257

259-
try (Engine.Searcher engineSearcher = indexService.getShard(0).acquireSearcher("test")) {
260-
IndexSearcher indexSearcher = engineSearcher.searcher();
258+
try (Engine.Searcher searcher = indexService.getShard(0).acquireSearcher("test")) {
261259
long[] currentTime = new long[] {System.currentTimeMillis()};
262260
QueryShardContext queryShardContext =
263-
indexService.newQueryShardContext(0, engineSearcher.reader(), () -> currentTime[0], null);
261+
indexService.newQueryShardContext(0, searcher.getIndexReader(), () -> currentTime[0], null);
264262

265263
BytesReference source = BytesReference.bytes(jsonBuilder().startObject()
266264
.field("field1", "value")
267265
.field("field2", currentTime[0])
268266
.endObject());
269267
QueryBuilder queryBuilder = new PercolateQueryBuilder("query", source, XContentType.JSON);
270268
Query query = queryBuilder.toQuery(queryShardContext);
271-
assertThat(indexSearcher.count(query), equalTo(3));
269+
assertThat(searcher.count(query), equalTo(3));
272270

273271
currentTime[0] = currentTime[0] + 10800000; // + 3 hours
274272
source = BytesReference.bytes(jsonBuilder().startObject()
@@ -277,7 +275,7 @@ public void testRangeQueriesWithNow() throws Exception {
277275
.endObject());
278276
queryBuilder = new PercolateQueryBuilder("query", source, XContentType.JSON);
279277
query = queryBuilder.toQuery(queryShardContext);
280-
assertThat(indexSearcher.count(query), equalTo(3));
278+
assertThat(searcher.count(query), equalTo(3));
281279
}
282280
}
283281

server/src/main/java/org/elasticsearch/index/IndexService.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -398,10 +398,10 @@ public synchronized IndexShard createShard(
398398

399399
logger.debug("creating shard_id {}", shardId);
400400
// if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary.
401-
final Engine.Warmer engineWarmer = (searcher) -> {
401+
final Engine.Warmer engineWarmer = (reader) -> {
402402
IndexShard shard = getShardOrNull(shardId.getId());
403403
if (shard != null) {
404-
warmer.warm(searcher, shard, IndexService.this.indexSettings);
404+
warmer.warm(reader, shard, IndexService.this.indexSettings);
405405
}
406406
};
407407
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);

server/src/main/java/org/elasticsearch/index/IndexWarmer.java

+6-8
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
2424
import org.apache.logging.log4j.message.ParameterizedMessage;
25-
import org.apache.lucene.index.DirectoryReader;
25+
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
2626
import org.elasticsearch.common.unit.TimeValue;
27-
import org.elasticsearch.index.engine.Engine;
2827
import org.elasticsearch.index.fielddata.IndexFieldData;
2928
import org.elasticsearch.index.fielddata.IndexFieldDataService;
3029
import org.elasticsearch.index.mapper.MappedFieldType;
@@ -58,22 +57,22 @@ public final class IndexWarmer {
5857
this.listeners = Collections.unmodifiableList(list);
5958
}
6059

61-
void warm(Engine.Searcher searcher, IndexShard shard, IndexSettings settings) {
60+
void warm(ElasticsearchDirectoryReader reader, IndexShard shard, IndexSettings settings) {
6261
if (shard.state() == IndexShardState.CLOSED) {
6362
return;
6463
}
6564
if (settings.isWarmerEnabled() == false) {
6665
return;
6766
}
6867
if (logger.isTraceEnabled()) {
69-
logger.trace("{} top warming [{}]", shard.shardId(), searcher.reader());
68+
logger.trace("{} top warming [{}]", shard.shardId(), reader);
7069
}
7170
shard.warmerService().onPreWarm();
7271
long time = System.nanoTime();
7372
final List<TerminationHandle> terminationHandles = new ArrayList<>();
7473
// get a handle on pending tasks
7574
for (final Listener listener : listeners) {
76-
terminationHandles.add(listener.warmReader(shard, searcher));
75+
terminationHandles.add(listener.warmReader(shard, reader));
7776
}
7877
// wait for termination
7978
for (TerminationHandle terminationHandle : terminationHandles) {
@@ -103,7 +102,7 @@ public interface TerminationHandle {
103102
public interface Listener {
104103
/** Queue tasks to warm-up the given segments and return handles that allow to wait for termination of the
105104
* execution of those tasks. */
106-
TerminationHandle warmReader(IndexShard indexShard, Engine.Searcher searcher);
105+
TerminationHandle warmReader(IndexShard indexShard, ElasticsearchDirectoryReader reader);
107106
}
108107

109108
private static class FieldDataWarmer implements IndexWarmer.Listener {
@@ -117,7 +116,7 @@ private static class FieldDataWarmer implements IndexWarmer.Listener {
117116
}
118117

119118
@Override
120-
public TerminationHandle warmReader(final IndexShard indexShard, final Engine.Searcher searcher) {
119+
public TerminationHandle warmReader(final IndexShard indexShard, final ElasticsearchDirectoryReader reader) {
121120
final MapperService mapperService = indexShard.mapperService();
122121
final Map<String, MappedFieldType> warmUpGlobalOrdinals = new HashMap<>();
123122
for (MappedFieldType fieldType : mapperService.fieldTypes()) {
@@ -133,7 +132,6 @@ public TerminationHandle warmReader(final IndexShard indexShard, final Engine.Se
133132
try {
134133
final long start = System.nanoTime();
135134
IndexFieldData.Global ifd = indexFieldDataService.getForField(fieldType);
136-
DirectoryReader reader = searcher.getDirectoryReader();
137135
IndexFieldData<?> global = ifd.loadGlobal(reader);
138136
if (reader.leaves().isEmpty() == false) {
139137
global.load(reader.leaves().get(0));

server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.common.cache.CacheBuilder;
3939
import org.elasticsearch.common.cache.RemovalListener;
4040
import org.elasticsearch.common.cache.RemovalNotification;
41+
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
4142
import org.elasticsearch.common.lucene.search.Queries;
4243
import org.elasticsearch.common.settings.Setting;
4344
import org.elasticsearch.common.settings.Setting.Property;
@@ -46,7 +47,6 @@
4647
import org.elasticsearch.index.IndexSettings;
4748
import org.elasticsearch.index.IndexWarmer;
4849
import org.elasticsearch.index.IndexWarmer.TerminationHandle;
49-
import org.elasticsearch.index.engine.Engine;
5050
import org.elasticsearch.index.mapper.DocumentMapper;
5151
import org.elasticsearch.index.mapper.MapperService;
5252
import org.elasticsearch.index.mapper.ObjectMapper;
@@ -222,7 +222,7 @@ final class BitSetProducerWarmer implements IndexWarmer.Listener {
222222
}
223223

224224
@Override
225-
public IndexWarmer.TerminationHandle warmReader(final IndexShard indexShard, final Engine.Searcher searcher) {
225+
public IndexWarmer.TerminationHandle warmReader(final IndexShard indexShard, final ElasticsearchDirectoryReader reader) {
226226
if (indexSettings.getIndex().equals(indexShard.indexSettings().getIndex()) == false) {
227227
// this is from a different index
228228
return TerminationHandle.NO_WAIT;
@@ -254,8 +254,8 @@ public IndexWarmer.TerminationHandle warmReader(final IndexShard indexShard, fin
254254
warmUp.add(Queries.newNonNestedFilter());
255255
}
256256

257-
final CountDownLatch latch = new CountDownLatch(searcher.reader().leaves().size() * warmUp.size());
258-
for (final LeafReaderContext ctx : searcher.reader().leaves()) {
257+
final CountDownLatch latch = new CountDownLatch(reader.leaves().size() * warmUp.size());
258+
for (final LeafReaderContext ctx : reader.leaves()) {
259259
for (final Query filterToWarm : warmUp) {
260260
executor.execute(() -> {
261261
try {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.engine;
21+
22+
import java.io.IOException;
23+
import java.util.function.BiConsumer;
24+
25+
import org.apache.lucene.index.DirectoryReader;
26+
import org.apache.lucene.search.ReferenceManager;
27+
28+
import org.apache.lucene.search.SearcherManager;
29+
import org.elasticsearch.common.SuppressForbidden;
30+
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
31+
32+
/**
33+
* Utility class to safely share {@link ElasticsearchDirectoryReader} instances across
34+
* multiple threads, while periodically reopening. This class ensures each
35+
* reader is closed only once all threads have finished using it.
36+
*
37+
* @see SearcherManager
38+
*
39+
*/
40+
@SuppressForbidden(reason = "reference counting is required here")
41+
class ElasticsearchReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
42+
private final BiConsumer<ElasticsearchDirectoryReader, ElasticsearchDirectoryReader> refreshListener;
43+
44+
/**
45+
* Creates and returns a new ElasticsearchReaderManager from the given
46+
* already-opened {@link ElasticsearchDirectoryReader}, stealing
47+
* the incoming reference.
48+
*
49+
* @param reader the directoryReader to use for future reopens
50+
* @param refreshListener A consumer that is called every time a new reader is opened
51+
*/
52+
ElasticsearchReaderManager(ElasticsearchDirectoryReader reader,
53+
BiConsumer<ElasticsearchDirectoryReader, ElasticsearchDirectoryReader> refreshListener) {
54+
this.current = reader;
55+
this.refreshListener = refreshListener;
56+
refreshListener.accept(current, null);
57+
}
58+
59+
@Override
60+
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
61+
reference.decRef();
62+
}
63+
64+
@Override
65+
protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {
66+
final ElasticsearchDirectoryReader reader = (ElasticsearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh);
67+
if (reader != null) {
68+
refreshListener.accept(reader, referenceToRefresh);
69+
}
70+
return reader;
71+
}
72+
73+
@Override
74+
protected boolean tryIncRef(ElasticsearchDirectoryReader reference) {
75+
return reference.tryIncRef();
76+
}
77+
78+
@Override
79+
protected int getRefCount(ElasticsearchDirectoryReader reference) {
80+
return reference.getRefCount();
81+
}
82+
}

0 commit comments

Comments
 (0)