Skip to content

Commit e6d74a0

Browse files
Relax some search interfaces to allow arbitrary cancellable tasks (elastic#122188)
An easy change we can split out of elastic#121885 to make that shorter.
1 parent 2988109 commit e6d74a0

File tree

9 files changed

+34
-32
lines changed

9 files changed

+34
-32
lines changed

server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.lucene.search.Query;
2222
import org.apache.lucene.search.TotalHits;
2323
import org.apache.lucene.util.NumericUtils;
24-
import org.elasticsearch.action.search.SearchShardTask;
2524
import org.elasticsearch.action.search.SearchType;
2625
import org.elasticsearch.cluster.routing.IndexRouting;
2726
import org.elasticsearch.common.lucene.search.Queries;
@@ -77,6 +76,7 @@
7776
import org.elasticsearch.search.slice.SliceBuilder;
7877
import org.elasticsearch.search.sort.SortAndFormats;
7978
import org.elasticsearch.search.suggest.SuggestionSearchContext;
79+
import org.elasticsearch.tasks.CancellableTask;
8080

8181
import java.io.IOException;
8282
import java.io.UncheckedIOException;
@@ -132,7 +132,7 @@ final class DefaultSearchContext extends SearchContext {
132132
private CollapseContext collapse;
133133
// filter for sliced scroll
134134
private SliceBuilder sliceBuilder;
135-
private SearchShardTask task;
135+
private CancellableTask task;
136136
private QueryPhaseRankShardContext queryPhaseRankShardContext;
137137

138138
/**
@@ -466,7 +466,7 @@ public void preProcess() {
466466
this.query = buildFilteredQuery(query);
467467
if (lowLevelCancellation) {
468468
searcher().addQueryCancellation(() -> {
469-
final SearchShardTask task = getTask();
469+
final CancellableTask task = getTask();
470470
if (task != null) {
471471
task.ensureNotCancelled();
472472
}
@@ -941,12 +941,12 @@ public void setProfilers(Profilers profilers) {
941941
}
942942

943943
@Override
944-
public void setTask(SearchShardTask task) {
944+
public void setTask(CancellableTask task) {
945945
this.task = task;
946946
}
947947

948948
@Override
949-
public SearchShardTask getTask() {
949+
public CancellableTask getTask() {
950950
return task;
951951
}
952952

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@
128128
import org.elasticsearch.search.sort.SortBuilder;
129129
import org.elasticsearch.search.suggest.Suggest;
130130
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
131+
import org.elasticsearch.tasks.CancellableTask;
131132
import org.elasticsearch.tasks.TaskCancelledException;
132133
import org.elasticsearch.telemetry.tracing.Tracer;
133134
import org.elasticsearch.threadpool.Scheduler;
@@ -590,7 +591,7 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea
590591
}
591592
}
592593

593-
public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
594+
public void executeQueryPhase(ShardSearchRequest request, CancellableTask task, ActionListener<SearchPhaseResult> listener) {
594595
ActionListener<SearchPhaseResult> finalListener = maybeWrapListenerForStackTrace(listener, request.getChannelVersion(), threadPool);
595596
assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
596597
: "empty responses require more than one shard";
@@ -738,7 +739,7 @@ private static <T extends RefCounted> void runAsync(
738739
* It is the responsibility of the caller to ensure that the ref count is correctly decremented
739740
* when the object is no longer needed.
740741
*/
741-
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception {
742+
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, CancellableTask task) throws Exception {
742743
final ReaderContext readerContext = createOrGetReaderContext(request);
743744
try (
744745
Releasable scope = tracer.withScope(task);
@@ -998,7 +999,7 @@ public void executeFetchPhase(
998999
}, wrapFailureListener(listener, readerContext, markAsUsed));
9991000
}
10001001

1001-
public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener<FetchSearchResult> listener) {
1002+
public void executeFetchPhase(ShardFetchRequest request, CancellableTask task, ActionListener<FetchSearchResult> listener) {
10021003
final ReaderContext readerContext = findReaderContext(request.contextId(), request);
10031004
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
10041005
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
@@ -1038,7 +1039,7 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
10381039
}));
10391040
}
10401041

1041-
protected void checkCancelled(SearchShardTask task) {
1042+
protected void checkCancelled(CancellableTask task) {
10421043
// check cancellation as early as possible, as it avoids opening up a Lucene reader on FrozenEngine
10431044
try {
10441045
task.ensureNotCancelled();
@@ -1169,7 +1170,7 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen
11691170
protected SearchContext createContext(
11701171
ReaderContext readerContext,
11711172
ShardSearchRequest request,
1172-
SearchShardTask task,
1173+
CancellableTask task,
11731174
ResultsType resultsType,
11741175
boolean includeAggregations
11751176
) throws IOException {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.apache.lucene.search.MatchNoDocsQuery;
1414
import org.elasticsearch.ElasticsearchStatusException;
1515
import org.elasticsearch.action.ActionListener;
16-
import org.elasticsearch.action.search.SearchShardTask;
1716
import org.elasticsearch.common.logging.DeprecationCategory;
1817
import org.elasticsearch.common.logging.DeprecationLogger;
1918
import org.elasticsearch.index.query.QueryBuilder;
@@ -38,6 +37,7 @@
3837
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
3938
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
4039
import org.elasticsearch.search.internal.ShardSearchRequest;
40+
import org.elasticsearch.tasks.CancellableTask;
4141
import org.elasticsearch.xcontent.ParseField;
4242

4343
import java.io.IOException;
@@ -128,7 +128,7 @@ private static SignificantTermsAggregatorSupplier bytesSupplier() {
128128
* <p>
129129
* Some searches that will never match can still fall through and we endup running query that will produce no results.
130130
* However even in that case we sometimes do expensive things like loading global ordinals. This method should prevent this.
131-
* Note that if {@link org.elasticsearch.search.SearchService#executeQueryPhase(ShardSearchRequest, SearchShardTask, ActionListener)}
131+
* Note that if {@link org.elasticsearch.search.SearchService#executeQueryPhase(ShardSearchRequest, CancellableTask, ActionListener)}
132132
* always do a can match then we don't need this code here.
133133
*/
134134
static boolean matchNoDocs(AggregationContext context, Aggregator parent) {

server/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.apache.lucene.search.FieldDoc;
1313
import org.apache.lucene.search.Query;
1414
import org.apache.lucene.search.TotalHits;
15-
import org.elasticsearch.action.search.SearchShardTask;
1615
import org.elasticsearch.action.search.SearchType;
1716
import org.elasticsearch.core.TimeValue;
1817
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
@@ -40,6 +39,7 @@
4039
import org.elasticsearch.search.rescore.RescoreContext;
4140
import org.elasticsearch.search.sort.SortAndFormats;
4241
import org.elasticsearch.search.suggest.SuggestionSearchContext;
42+
import org.elasticsearch.tasks.CancellableTask;
4343

4444
import java.util.List;
4545

@@ -422,12 +422,12 @@ public SearchExecutionContext getSearchExecutionContext() {
422422
}
423423

424424
@Override
425-
public void setTask(SearchShardTask task) {
425+
public void setTask(CancellableTask task) {
426426
in.setTask(task);
427427
}
428428

429429
@Override
430-
public SearchShardTask getTask() {
430+
public CancellableTask getTask() {
431431
return in.getTask();
432432
}
433433

server/src/main/java/org/elasticsearch/search/internal/SearchContext.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.apache.lucene.search.FieldDoc;
1212
import org.apache.lucene.search.Query;
1313
import org.apache.lucene.search.TotalHits;
14-
import org.elasticsearch.action.search.SearchShardTask;
1514
import org.elasticsearch.action.search.SearchType;
1615
import org.elasticsearch.core.Assertions;
1716
import org.elasticsearch.core.Nullable;
@@ -48,6 +47,7 @@
4847
import org.elasticsearch.search.rescore.RescoreContext;
4948
import org.elasticsearch.search.sort.SortAndFormats;
5049
import org.elasticsearch.search.suggest.SuggestionSearchContext;
50+
import org.elasticsearch.tasks.CancellableTask;
5151
import org.elasticsearch.transport.LeakTracker;
5252

5353
import java.io.IOException;
@@ -90,7 +90,7 @@ public final List<Runnable> getCancellationChecks() {
9090
if (lowLevelCancellation()) {
9191
// This searching doesn't live beyond this phase, so we don't need to remove query cancellation
9292
Runnable c = () -> {
93-
final SearchShardTask task = getTask();
93+
final CancellableTask task = getTask();
9494
if (task != null) {
9595
task.ensureNotCancelled();
9696
}
@@ -100,9 +100,9 @@ public final List<Runnable> getCancellationChecks() {
100100
return timeoutRunnable == null ? List.of() : List.of(timeoutRunnable);
101101
}
102102

103-
public abstract void setTask(SearchShardTask task);
103+
public abstract void setTask(CancellableTask task);
104104

105-
public abstract SearchShardTask getTask();
105+
public abstract CancellableTask getTask();
106106

107107
public abstract boolean isCancelled();
108108

server/src/main/java/org/elasticsearch/search/rank/RankSearchContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.apache.lucene.search.FieldDoc;
1313
import org.apache.lucene.search.Query;
1414
import org.apache.lucene.search.TotalHits;
15-
import org.elasticsearch.action.search.SearchShardTask;
1615
import org.elasticsearch.action.search.SearchType;
1716
import org.elasticsearch.core.TimeValue;
1817
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
@@ -48,6 +47,7 @@
4847
import org.elasticsearch.search.rescore.RescoreContext;
4948
import org.elasticsearch.search.sort.SortAndFormats;
5049
import org.elasticsearch.search.suggest.SuggestionSearchContext;
50+
import org.elasticsearch.tasks.CancellableTask;
5151

5252
import java.util.List;
5353

@@ -211,12 +211,12 @@ public long getRelativeTimeInMillis() {
211211
/* ---- ALL METHODS ARE UNSUPPORTED BEYOND HERE ---- */
212212

213213
@Override
214-
public void setTask(SearchShardTask task) {
214+
public void setTask(CancellableTask task) {
215215
throw new UnsupportedOperationException();
216216
}
217217

218218
@Override
219-
public SearchShardTask getTask() {
219+
public CancellableTask getTask() {
220220
throw new UnsupportedOperationException();
221221
}
222222

server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.search.builder.SearchSourceBuilder;
2727
import org.elasticsearch.search.internal.SearchContext;
2828
import org.elasticsearch.search.internal.ShardSearchRequest;
29+
import org.elasticsearch.tasks.CancellableTask;
2930
import org.elasticsearch.tasks.Task;
3031
import org.elasticsearch.test.ESSingleNodeTestCase;
3132
import org.elasticsearch.test.TestSearchContext;
@@ -93,7 +94,7 @@ public ShardSearchRequest request() {
9394
}
9495

9596
@Override
96-
public SearchShardTask getTask() {
97+
public CancellableTask getTask() {
9798
return super.getTask();
9899
}
99100
};

test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.search;
1111

12-
import org.elasticsearch.action.search.SearchShardTask;
1312
import org.elasticsearch.cluster.service.ClusterService;
1413
import org.elasticsearch.common.util.BigArrays;
1514
import org.elasticsearch.core.TimeValue;
@@ -25,6 +24,7 @@
2524
import org.elasticsearch.search.internal.SearchContext;
2625
import org.elasticsearch.search.internal.ShardSearchRequest;
2726
import org.elasticsearch.search.rank.feature.RankFeatureShardPhase;
27+
import org.elasticsearch.tasks.CancellableTask;
2828
import org.elasticsearch.telemetry.tracing.Tracer;
2929
import org.elasticsearch.threadpool.ThreadPool;
3030

@@ -48,7 +48,7 @@ public static class TestPlugin extends Plugin {}
4848

4949
private Consumer<SearchContext> onCreateSearchContext = context -> {};
5050

51-
private Function<SearchShardTask, SearchShardTask> onCheckCancelled = Function.identity();
51+
private Function<CancellableTask, CancellableTask> onCheckCancelled = Function.identity();
5252

5353
/** Throw an {@link AssertionError} if there are still in-flight contexts. */
5454
public static void assertNoInFlightContext() {
@@ -138,7 +138,7 @@ public void setOnCreateSearchContext(Consumer<SearchContext> onCreateSearchConte
138138
protected SearchContext createContext(
139139
ReaderContext readerContext,
140140
ShardSearchRequest request,
141-
SearchShardTask task,
141+
CancellableTask task,
142142
ResultsType resultsType,
143143
boolean includeAggregations
144144
) throws IOException {
@@ -160,12 +160,12 @@ public SearchContext createSearchContext(ShardSearchRequest request, TimeValue t
160160
return searchContext;
161161
}
162162

163-
public void setOnCheckCancelled(Function<SearchShardTask, SearchShardTask> onCheckCancelled) {
163+
public void setOnCheckCancelled(Function<CancellableTask, CancellableTask> onCheckCancelled) {
164164
this.onCheckCancelled = onCheckCancelled;
165165
}
166166

167167
@Override
168-
protected void checkCancelled(SearchShardTask task) {
168+
protected void checkCancelled(CancellableTask task) {
169169
super.checkCancelled(onCheckCancelled.apply(task));
170170
}
171171
}

test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.apache.lucene.search.FieldDoc;
1212
import org.apache.lucene.search.Query;
1313
import org.apache.lucene.search.TotalHits;
14-
import org.elasticsearch.action.search.SearchShardTask;
1514
import org.elasticsearch.action.search.SearchType;
1615
import org.elasticsearch.core.TimeValue;
1716
import org.elasticsearch.index.IndexService;
@@ -49,6 +48,7 @@
4948
import org.elasticsearch.search.rescore.RescoreContext;
5049
import org.elasticsearch.search.sort.SortAndFormats;
5150
import org.elasticsearch.search.suggest.SuggestionSearchContext;
51+
import org.elasticsearch.tasks.CancellableTask;
5252

5353
import java.util.Collections;
5454
import java.util.HashMap;
@@ -67,7 +67,7 @@ public class TestSearchContext extends SearchContext {
6767
ParsedQuery postFilter;
6868
Query query;
6969
Float minScore;
70-
SearchShardTask task;
70+
CancellableTask task;
7171
SortAndFormats sort;
7272
boolean trackScores = false;
7373
int trackTotalHitsUpTo = SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO;
@@ -506,12 +506,12 @@ public SearchExecutionContext getSearchExecutionContext() {
506506
}
507507

508508
@Override
509-
public void setTask(SearchShardTask task) {
509+
public void setTask(CancellableTask task) {
510510
this.task = task;
511511
}
512512

513513
@Override
514-
public SearchShardTask getTask() {
514+
public CancellableTask getTask() {
515515
return task;
516516
}
517517

0 commit comments

Comments
 (0)