Skip to content

Commit 67acaf6

Browse files
authored
Implement Exitable DirectoryReader (elastic#52822)
Implement an Exitable DirectoryReader that wraps the original DirectoryReader so that when a search task is cancelled the DirectoryReaders also stop their work fast. This is usuful for expensive operations like wilcard/prefix queries where the DirectoryReaders can spend lots of time and consume resources, as previously their work wouldn't stop even though the original search task was cancelled (e.g. because of timeout or dropped client connection).
1 parent dcbc9d3 commit 67acaf6

File tree

9 files changed

+520
-86
lines changed

9 files changed

+520
-86
lines changed

server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ final class DefaultSearchContext extends SearchContext {
155155
DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
156156
Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
157157
IndexShard indexShard, BigArrays bigArrays, LongSupplier relativeTimeSupplier, TimeValue timeout,
158-
FetchPhase fetchPhase) {
158+
FetchPhase fetchPhase) throws IOException {
159159
this.id = id;
160160
this.request = request;
161161
this.fetchPhase = fetchPhase;

server/src/main/java/org/elasticsearch/search/SearchService.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ && canRewriteToMatchNone(rewritten.source())
398398
}, listener::onFailure));
399399
}
400400

401-
private void onMatchNoDocs(SearchRewriteContext rewriteContext, ActionListener<SearchPhaseResult> listener) {
401+
private void onMatchNoDocs(SearchRewriteContext rewriteContext, ActionListener<SearchPhaseResult> listener) throws IOException {
402402
// creates a lightweight search context that we use to inform context listeners
403403
// before closing
404404
SearchContext searchContext = createSearchContext(rewriteContext, defaultSearchTimeout);
@@ -609,7 +609,7 @@ private SearchContext findContext(long id, TransportRequest request) throws Sear
609609
}
610610
}
611611

612-
final SearchContext createAndPutContext(SearchRewriteContext rewriteContext) {
612+
final SearchContext createAndPutContext(SearchRewriteContext rewriteContext) throws IOException {
613613
SearchContext context = createContext(rewriteContext);
614614
onNewContext(context);
615615
boolean success = false;
@@ -644,7 +644,7 @@ private void onNewContext(SearchContext context) {
644644
}
645645
}
646646

647-
final SearchContext createContext(SearchRewriteContext rewriteContext) {
647+
final SearchContext createContext(SearchRewriteContext rewriteContext) throws IOException {
648648
final DefaultSearchContext context = createSearchContext(rewriteContext, defaultSearchTimeout);
649649
try {
650650
if (rewriteContext.request != null && openScrollContexts.get() >= maxOpenScrollContext) {
@@ -695,7 +695,7 @@ public DefaultSearchContext createSearchContext(ShardSearchRequest request, Time
695695
return createSearchContext(rewriteContext.wrapSearcher(), timeout);
696696
}
697697

698-
private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) {
698+
private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) throws IOException {
699699
boolean success = false;
700700
try {
701701
final ShardSearchRequest request = rewriteContext.request;

server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java

+63-17
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@
6262
import java.io.IOException;
6363
import java.util.ArrayList;
6464
import java.util.Arrays;
65+
import java.util.HashSet;
6566
import java.util.List;
67+
import java.util.Objects;
6668
import java.util.Set;
6769

6870
/**
@@ -77,25 +79,46 @@ public class ContextIndexSearcher extends IndexSearcher {
7779

7880
private AggregatedDfs aggregatedDfs;
7981
private QueryProfiler profiler;
80-
private Runnable checkCancelled;
82+
private MutableQueryTimeout cancellable;
8183

82-
public ContextIndexSearcher(IndexReader reader, Similarity similarity, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy) {
83-
super(reader);
84+
public ContextIndexSearcher(IndexReader reader, Similarity similarity,
85+
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy) throws IOException {
86+
this(reader, similarity, queryCache, queryCachingPolicy, new MutableQueryTimeout());
87+
}
88+
89+
// TODO: Make the 2nd constructor private so that the IndexReader is always wrapped.
90+
// Some issues must be fixed:
91+
// - regarding tests deriving from AggregatorTestCase and more specifically the use of searchAndReduce and
92+
// the ShardSearcher sub-searchers.
93+
// - tests that use a MultiReader
94+
public ContextIndexSearcher(IndexReader reader, Similarity similarity,
95+
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
96+
MutableQueryTimeout cancellable) throws IOException {
97+
super(cancellable != null ? new ExitableDirectoryReader((DirectoryReader) reader, cancellable) : reader);
8498
setSimilarity(similarity);
8599
setQueryCache(queryCache);
86100
setQueryCachingPolicy(queryCachingPolicy);
101+
this.cancellable = cancellable != null ? cancellable : new MutableQueryTimeout();
87102
}
88103

89104
public void setProfiler(QueryProfiler profiler) {
90105
this.profiler = profiler;
91106
}
92107

93108
/**
94-
* Set a {@link Runnable} that will be run on a regular basis while
95-
* collecting documents.
109+
* Add a {@link Runnable} that will be run on a regular basis while accessing documents in the
110+
* DirectoryReader but also while collecting them and check for query cancellation or timeout.
111+
*/
112+
public Runnable addQueryCancellation(Runnable action) {
113+
return this.cancellable.add(action);
114+
}
115+
116+
/**
117+
* Remove a {@link Runnable} that checks for query cancellation or timeout
118+
* which is called while accessing documents in the DirectoryReader but also while collecting them.
96119
*/
97-
public void setCheckCancelled(Runnable checkCancelled) {
98-
this.checkCancelled = checkCancelled;
120+
public void removeQueryCancellation(Runnable action) {
121+
this.cancellable.remove(action);
99122
}
100123

101124
public void setAggregatedDfs(AggregatedDfs aggregatedDfs) {
@@ -139,12 +162,6 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws
139162
}
140163
}
141164

142-
private void checkCancelled() {
143-
if (checkCancelled != null) {
144-
checkCancelled.run();
145-
}
146-
}
147-
148165
@SuppressWarnings({"unchecked", "rawtypes"})
149166
public void search(List<LeafReaderContext> leaves, Weight weight, CollectorManager manager,
150167
QuerySearchResult result, DocValueFormat[] formats, TotalHits totalHits) throws IOException {
@@ -180,7 +197,7 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
180197
* the provided <code>ctx</code>.
181198
*/
182199
private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
183-
checkCancelled();
200+
cancellable.checkCancelled();
184201
weight = wrapWeight(weight);
185202
final LeafCollector leafCollector;
186203
try {
@@ -208,7 +225,7 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
208225
if (scorer != null) {
209226
try {
210227
intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector,
211-
checkCancelled == null ? () -> { } : checkCancelled);
228+
this.cancellable.isEnabled() ? cancellable::checkCancelled: () -> {});
212229
} catch (CollectionTerminatedException e) {
213230
// collection was terminated prematurely
214231
// continue with the following leaf
@@ -218,7 +235,7 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
218235
}
219236

220237
private Weight wrapWeight(Weight weight) {
221-
if (checkCancelled != null) {
238+
if (cancellable.isEnabled()) {
222239
return new Weight(weight.getQuery()) {
223240
@Override
224241
public void extractTerms(Set<Term> terms) {
@@ -244,7 +261,7 @@ public Scorer scorer(LeafReaderContext context) throws IOException {
244261
public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
245262
BulkScorer in = weight.bulkScorer(context);
246263
if (in != null) {
247-
return new CancellableBulkScorer(in, checkCancelled);
264+
return new CancellableBulkScorer(in, cancellable::checkCancelled);
248265
} else {
249266
return null;
250267
}
@@ -320,4 +337,33 @@ public DirectoryReader getDirectoryReader() {
320337
assert reader instanceof DirectoryReader : "expected an instance of DirectoryReader, got " + reader.getClass();
321338
return (DirectoryReader) reader;
322339
}
340+
341+
private static class MutableQueryTimeout implements ExitableDirectoryReader.QueryCancellation {
342+
343+
private final Set<Runnable> runnables = new HashSet<>();
344+
345+
private Runnable add(Runnable action) {
346+
Objects.requireNonNull(action, "cancellation runnable should not be null");
347+
if (runnables.add(action) == false) {
348+
throw new IllegalArgumentException("Cancellation runnable already added");
349+
}
350+
return action;
351+
}
352+
353+
private void remove(Runnable action) {
354+
runnables.remove(action);
355+
}
356+
357+
@Override
358+
public void checkCancelled() {
359+
for (Runnable timeout : runnables) {
360+
timeout.run();
361+
}
362+
}
363+
364+
@Override
365+
public boolean isEnabled() {
366+
return runnables.isEmpty() == false;
367+
}
368+
}
323369
}

0 commit comments

Comments
 (0)