Skip to content

Commit 487d442

Browse files
authored
Implement Exitable DirectoryReader (#52822) (#53162)
Implement an Exitable DirectoryReader that wraps the original DirectoryReader so that when a search task is cancelled the DirectoryReaders also stop their work fast. This is usuful for expensive operations like wilcard/prefix queries where the DirectoryReaders can spend lots of time and consume resources, as previously their work wouldn't stop even though the original search task was cancelled (e.g. because of timeout or dropped client connection). (cherry picked from commit 67acaf6)
1 parent 28df7ae commit 487d442

File tree

9 files changed

+521
-84
lines changed

9 files changed

+521
-84
lines changed

server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ final class DefaultSearchContext extends SearchContext {
159159
DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
160160
Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
161161
IndexShard indexShard, BigArrays bigArrays, LongSupplier relativeTimeSupplier, TimeValue timeout,
162-
FetchPhase fetchPhase, Version minNodeVersion) {
162+
FetchPhase fetchPhase, Version minNodeVersion) throws IOException {
163163
this.id = id;
164164
this.request = request;
165165
this.fetchPhase = fetchPhase;

server/src/main/java/org/elasticsearch/search/SearchService.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ && canRewriteToMatchNone(rewritten.source())
398398
}, listener::onFailure));
399399
}
400400

401-
private void onMatchNoDocs(SearchRewriteContext rewriteContext, ActionListener<SearchPhaseResult> listener) {
401+
private void onMatchNoDocs(SearchRewriteContext rewriteContext, ActionListener<SearchPhaseResult> listener) throws IOException {
402402
// creates a lightweight search context that we use to inform context listeners
403403
// before closing
404404
SearchContext searchContext = createSearchContext(rewriteContext, defaultSearchTimeout);
@@ -609,7 +609,7 @@ private SearchContext findContext(long id, TransportRequest request) throws Sear
609609
}
610610
}
611611

612-
final SearchContext createAndPutContext(SearchRewriteContext rewriteContext) {
612+
final SearchContext createAndPutContext(SearchRewriteContext rewriteContext) throws IOException {
613613
SearchContext context = createContext(rewriteContext);
614614
onNewContext(context);
615615
boolean success = false;
@@ -644,7 +644,7 @@ private void onNewContext(SearchContext context) {
644644
}
645645
}
646646

647-
final SearchContext createContext(SearchRewriteContext rewriteContext) {
647+
final SearchContext createContext(SearchRewriteContext rewriteContext) throws IOException {
648648
final DefaultSearchContext context = createSearchContext(rewriteContext, defaultSearchTimeout);
649649
try {
650650
if (rewriteContext.request != null && openScrollContexts.get() >= maxOpenScrollContext) {
@@ -695,7 +695,7 @@ public DefaultSearchContext createSearchContext(ShardSearchRequest request, Time
695695
return createSearchContext(rewriteContext.wrapSearcher(), timeout);
696696
}
697697

698-
private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) {
698+
private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) throws IOException {
699699
boolean success = false;
700700
try {
701701
final ShardSearchRequest request = rewriteContext.request;

server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java

+63-17
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@
6262
import java.io.IOException;
6363
import java.util.ArrayList;
6464
import java.util.Arrays;
65+
import java.util.HashSet;
6566
import java.util.List;
67+
import java.util.Objects;
6668
import java.util.Set;
6769

6870
/**
@@ -77,25 +79,46 @@ public class ContextIndexSearcher extends IndexSearcher {
7779

7880
private AggregatedDfs aggregatedDfs;
7981
private QueryProfiler profiler;
80-
private Runnable checkCancelled;
82+
private MutableQueryTimeout cancellable;
8183

82-
public ContextIndexSearcher(IndexReader reader, Similarity similarity, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy) {
83-
super(reader);
84+
public ContextIndexSearcher(IndexReader reader, Similarity similarity,
85+
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy) throws IOException {
86+
this(reader, similarity, queryCache, queryCachingPolicy, new MutableQueryTimeout());
87+
}
88+
89+
// TODO: Make the 2nd constructor private so that the IndexReader is always wrapped.
90+
// Some issues must be fixed:
91+
// - regarding tests deriving from AggregatorTestCase and more specifically the use of searchAndReduce and
92+
// the ShardSearcher sub-searchers.
93+
// - tests that use a MultiReader
94+
public ContextIndexSearcher(IndexReader reader, Similarity similarity,
95+
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
96+
MutableQueryTimeout cancellable) throws IOException {
97+
super(cancellable != null ? new ExitableDirectoryReader((DirectoryReader) reader, cancellable) : reader);
8498
setSimilarity(similarity);
8599
setQueryCache(queryCache);
86100
setQueryCachingPolicy(queryCachingPolicy);
101+
this.cancellable = cancellable != null ? cancellable : new MutableQueryTimeout();
87102
}
88103

89104
public void setProfiler(QueryProfiler profiler) {
90105
this.profiler = profiler;
91106
}
92107

93108
/**
94-
* Set a {@link Runnable} that will be run on a regular basis while
95-
* collecting documents.
109+
* Add a {@link Runnable} that will be run on a regular basis while accessing documents in the
110+
* DirectoryReader but also while collecting them and check for query cancellation or timeout.
111+
*/
112+
public Runnable addQueryCancellation(Runnable action) {
113+
return this.cancellable.add(action);
114+
}
115+
116+
/**
117+
* Remove a {@link Runnable} that checks for query cancellation or timeout
118+
* which is called while accessing documents in the DirectoryReader but also while collecting them.
96119
*/
97-
public void setCheckCancelled(Runnable checkCancelled) {
98-
this.checkCancelled = checkCancelled;
120+
public void removeQueryCancellation(Runnable action) {
121+
this.cancellable.remove(action);
99122
}
100123

101124
public void setAggregatedDfs(AggregatedDfs aggregatedDfs) {
@@ -139,12 +162,6 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws
139162
}
140163
}
141164

142-
private void checkCancelled() {
143-
if (checkCancelled != null) {
144-
checkCancelled.run();
145-
}
146-
}
147-
148165
public void search(List<LeafReaderContext> leaves, Weight weight, CollectorManager manager,
149166
QuerySearchResult result, DocValueFormat[] formats, TotalHits totalHits) throws IOException {
150167
final List<Collector> collectors = new ArrayList<>(leaves.size());
@@ -179,7 +196,7 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
179196
* the provided <code>ctx</code>.
180197
*/
181198
private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
182-
checkCancelled();
199+
cancellable.checkCancelled();
183200
weight = wrapWeight(weight);
184201
final LeafCollector leafCollector;
185202
try {
@@ -207,7 +224,7 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
207224
if (scorer != null) {
208225
try {
209226
intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector,
210-
checkCancelled == null ? () -> { } : checkCancelled);
227+
this.cancellable.isEnabled() ? cancellable::checkCancelled: () -> {});
211228
} catch (CollectionTerminatedException e) {
212229
// collection was terminated prematurely
213230
// continue with the following leaf
@@ -217,7 +234,7 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
217234
}
218235

219236
private Weight wrapWeight(Weight weight) {
220-
if (checkCancelled != null) {
237+
if (cancellable.isEnabled()) {
221238
return new Weight(weight.getQuery()) {
222239
@Override
223240
public void extractTerms(Set<Term> terms) {
@@ -243,7 +260,7 @@ public Scorer scorer(LeafReaderContext context) throws IOException {
243260
public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
244261
BulkScorer in = weight.bulkScorer(context);
245262
if (in != null) {
246-
return new CancellableBulkScorer(in, checkCancelled);
263+
return new CancellableBulkScorer(in, cancellable::checkCancelled);
247264
} else {
248265
return null;
249266
}
@@ -319,4 +336,33 @@ public DirectoryReader getDirectoryReader() {
319336
assert reader instanceof DirectoryReader : "expected an instance of DirectoryReader, got " + reader.getClass();
320337
return (DirectoryReader) reader;
321338
}
339+
340+
private static class MutableQueryTimeout implements ExitableDirectoryReader.QueryCancellation {
341+
342+
private final Set<Runnable> runnables = new HashSet<>();
343+
344+
private Runnable add(Runnable action) {
345+
Objects.requireNonNull(action, "cancellation runnable should not be null");
346+
if (runnables.add(action) == false) {
347+
throw new IllegalArgumentException("Cancellation runnable already added");
348+
}
349+
return action;
350+
}
351+
352+
private void remove(Runnable action) {
353+
runnables.remove(action);
354+
}
355+
356+
@Override
357+
public void checkCancelled() {
358+
for (Runnable timeout : runnables) {
359+
timeout.run();
360+
}
361+
}
362+
363+
@Override
364+
public boolean isEnabled() {
365+
return runnables.isEmpty() == false;
366+
}
367+
}
322368
}

0 commit comments

Comments
 (0)