Skip to content

Commit 1f12505

Browse files
committed
Make the searcher for legacy reader context final
This commit ensures that the searcher that we create for scrolls is initialized only once.t
1 parent 5a02677 commit 1f12505

File tree

3 files changed

+73
-88
lines changed

3 files changed

+73
-88
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ public abstract class Engine implements Closeable {
108108
public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid";
109109
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
110110
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
111-
public static final String CAN_MATCH_SEARCH_SOURCE = "can_match"; // TODO: Make source of search enum?
111+
public static final String SEARCH_SOURCE = "search"; // TODO: Make source of search enum?
112+
public static final String CAN_MATCH_SEARCH_SOURCE = "can_match";
112113

113114
protected final ShardId shardId;
114115
protected final String allocationId;

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 57 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -348,8 +348,8 @@ private DfsSearchResult executeDfsPhase(ShardSearchRequest request,
348348
SearchShardTask task,
349349
boolean keepStatesInContext) throws IOException {
350350
ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext);
351-
try (Releasable ignored = readerContext.markAsUsed();
352-
SearchContext context = createContext(readerContext, request, task, true)) {
351+
try (Releasable ignored = updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(request));
352+
SearchContext context = createContext(readerContext, request, task, true)) {
353353
dfsPhase.execute(context);
354354
return context.dfsResult();
355355
} catch (Exception e) {
@@ -380,49 +380,24 @@ public void executeQueryPhase(ShardSearchRequest request, boolean keepStatesInCo
380380
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
381381
@Override
382382
public void onResponse(ShardSearchRequest orig) {
383-
final ReaderContext readerContext = createOrGetReaderContext(orig, keepStatesInContext);
384-
final Releasable markAsUsed = readerContext.markAsUsed();
383+
// check if we can shortcut the query phase entirely.
385384
if (orig.canReturnNullResponseIfMatchNoDocs()) {
386385
assert orig.scroll() == null;
387-
// we clone the shard request and perform a quick rewrite using a lightweight
388-
// searcher since we are outside of the search thread pool.
389-
// If the request rewrites to "match none" we can shortcut the query phase
390-
// entirely. Otherwise we fork the execution in the search thread pool.
391-
ShardSearchRequest canMatchRequest = new ShardSearchRequest(orig);
392-
try (Engine.Searcher searcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE)) {
393-
QueryShardContext context = readerContext.indexService().newQueryShardContext(canMatchRequest.shardId().id(),
394-
searcher, canMatchRequest::nowInMillis, canMatchRequest.getClusterAlias());
395-
Rewriteable.rewrite(canMatchRequest.getRewriteable(), context, true);
386+
final CanMatchResponse canMatchResp;
387+
try {
388+
ShardSearchRequest clone = new ShardSearchRequest(orig);
389+
canMatchResp = canMatch(clone, false);
396390
} catch (Exception exc) {
397-
try (markAsUsed) {
398-
listener.onFailure(exc);
399-
} finally {
400-
processFailure(readerContext, exc);
401-
}
391+
listener.onFailure(exc);
402392
return;
403393
}
404-
if (canRewriteToMatchNone(canMatchRequest.source())
405-
&& canMatchRequest.source().query() instanceof MatchNoneQueryBuilder) {
406-
try (markAsUsed) {
407-
if (orig.readerId() == null) {
408-
try {
409-
listener.onResponse(QuerySearchResult.nullInstance());
410-
} finally {
411-
// close and remove the ephemeral reader context
412-
removeReaderContext(readerContext.id().getId());
413-
Releasables.close(readerContext);
414-
}
415-
} else {
416-
listener.onResponse(QuerySearchResult.nullInstance());
417-
}
418-
}
394+
if (canMatchResp.canMatch == false) {
395+
listener.onResponse(QuerySearchResult.nullInstance());
419396
return;
420397
}
421398
}
422-
423399
// fork the execution in the search thread pool
424-
runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, readerContext),
425-
wrapFailureListener(listener, readerContext, markAsUsed));
400+
runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener);
426401
}
427402

428403
@Override
@@ -446,8 +421,10 @@ private <T> void runAsync(Executor executor, CheckedSupplier<T, Exception> execu
446421

447422
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request,
448423
SearchShardTask task,
449-
ReaderContext readerContext) throws Exception {
450-
try (SearchContext context = createContext(readerContext, request, task, true)) {
424+
boolean keepStatesInContext) throws Exception {
425+
final ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext);
426+
try (Releasable ignored = updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(request));
427+
SearchContext context = createContext(readerContext, request, task, true)) {
451428
final long afterQueryTime;
452429
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
453430
loadOrExecuteQueryPhase(request, context);
@@ -494,16 +471,11 @@ public void executeQueryPhase(InternalScrollSearchRequest request,
494471
SearchShardTask task,
495472
ActionListener<ScrollQuerySearchResult> listener) {
496473
final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request);
497-
final Releasable markAsUsed = readerContext.markAsUsed();
474+
final Releasable markAsUsed = updateKeepAliveAndMarkAsUsed(readerContext, getScrollKeepAlive(request.scroll()));
498475
runAsync(getExecutor(readerContext.indexShard()), () -> {
499476
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
500477
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false);
501478
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) {
502-
if (request.scroll() != null && request.scroll().keepAlive() != null) {
503-
final long keepAlive = request.scroll().keepAlive().millis();
504-
checkKeepAliveLimit(keepAlive);
505-
readerContext.keepAlive(keepAlive);
506-
}
507479
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
508480
processScroll(request, readerContext, searchContext);
509481
queryPhase.execute(searchContext);
@@ -519,10 +491,10 @@ public void executeQueryPhase(InternalScrollSearchRequest request,
519491
}
520492

521493
public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener<QuerySearchResult> listener) {
522-
final ReaderContext readerContext = findReaderContext(request.contextId(), request);
523-
final Releasable markAsUsed = readerContext.markAsUsed();
494+
final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest());
495+
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest());
496+
final Releasable markAsUsed = updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(shardSearchRequest));
524497
runAsync(getExecutor(readerContext.indexShard()), () -> {
525-
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest());
526498
readerContext.setAggregatedDfs(request.dfs());
527499
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true);
528500
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) {
@@ -564,15 +536,11 @@ private Executor getExecutor(IndexShard indexShard) {
564536
public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTask task,
565537
ActionListener<ScrollQueryFetchSearchResult> listener) {
566538
final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request);
567-
final Releasable markAsUsed = readerContext.markAsUsed();
539+
final Releasable markAsUsed = updateKeepAliveAndMarkAsUsed(readerContext, getScrollKeepAlive(request.scroll()));
568540
runAsync(getExecutor(readerContext.indexShard()), () -> {
569541
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
570542
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false);
571543
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) {
572-
if (request.scroll() != null && request.scroll().keepAlive() != null) {
573-
checkKeepAliveLimit(request.scroll().keepAlive().millis());
574-
readerContext.keepAlive(request.scroll().keepAlive().millis());
575-
}
576544
searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null));
577545
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
578546
processScroll(request, readerContext, searchContext);
@@ -591,9 +559,9 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTa
591559

592560
public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener<FetchSearchResult> listener) {
593561
final ReaderContext readerContext = findReaderContext(request.contextId(), request);
594-
final Releasable markAsUsed = readerContext.markAsUsed();
562+
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
563+
final Releasable markAsUsed = updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(shardSearchRequest));
595564
runAsync(getExecutor(readerContext.indexShard()), () -> {
596-
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
597565
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) {
598566
if (request.lastEmittedDoc() != null) {
599567
searchContext.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
@@ -643,13 +611,18 @@ private ReaderContext findReaderContext(ShardSearchContextId id, TransportReques
643611
return reader;
644612
}
645613

614+
private Releasable updateKeepAliveAndMarkAsUsed(ReaderContext reader, long keepAlive) {
615+
if (keepAlive > 0L) {
616+
checkKeepAliveLimit(keepAlive);
617+
reader.keepAlive(keepAlive);
618+
}
619+
return reader.markAsUsed();
620+
}
621+
646622
final ReaderContext createOrGetReaderContext(ShardSearchRequest request, boolean keepStatesInContext) {
647623
if (request.readerId() != null) {
648624
assert keepStatesInContext == false;
649625
final ReaderContext readerContext = findReaderContext(request.readerId(), request);
650-
final long keepAlive = request.keepAlive().millis();
651-
checkKeepAliveLimit(keepAlive);
652-
readerContext.keepAlive(keepAlive);
653626
return readerContext;
654627
}
655628
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
@@ -835,11 +808,20 @@ public void freeAllScrollContexts() {
835808
}
836809

837810
private long getKeepAlive(ShardSearchRequest request) {
838-
if (request.scroll() != null && request.scroll().keepAlive() != null) {
839-
return request.scroll().keepAlive().millis();
811+
if (request.scroll() != null) {
812+
return getScrollKeepAlive(request.scroll());
813+
} else if (request.keepAlive() != null) {
814+
return request.keepAlive().getMillis();
840815
} else {
841-
return defaultKeepAlive;
816+
return request.readerId() == null ? defaultKeepAlive : -1;
817+
}
818+
}
819+
820+
private long getScrollKeepAlive(Scroll scroll) {
821+
if (scroll != null && scroll.keepAlive() != null) {
822+
return scroll.keepAlive().getMillis();
842823
}
824+
return defaultKeepAlive;
843825
}
844826

845827
private void checkKeepAliveLimit(long keepAlive) {
@@ -1150,29 +1132,39 @@ public AliasFilter buildAliasFilter(ClusterState state, String index, Set<String
11501132
return indicesService.buildAliasFilter(state, index, resolvedExpressions);
11511133
}
11521134

1135+
public void canMatch(ShardSearchRequest request, ActionListener<CanMatchResponse> listener) {
1136+
try {
1137+
listener.onResponse(canMatch(request));
1138+
} catch (IOException e) {
1139+
listener.onFailure(e);
1140+
}
1141+
}
1142+
11531143
/**
11541144
* This method uses a lightweight searcher without wrapping (i.e., not open a full reader on frozen indices) to rewrite the query
11551145
* to check if the query can match any documents. This method can have false positives while if it returns {@code false} the query
11561146
* won't match any documents on the current shard.
11571147
*/
11581148
public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException {
1149+
return canMatch(request, true);
1150+
}
1151+
1152+
private CanMatchResponse canMatch(ShardSearchRequest request, boolean checkRefreshPending) throws IOException {
11591153
assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType();
1160-
final ReaderContext readerContext = request.readerId() != null ? findReaderContext(request.readerId(), request) : null;
1161-
final Releasable markAsUsed = readerContext != null ? readerContext.markAsUsed() : null;
1154+
final ReaderContext readerContext = request.readerId() != null ? findReaderContext(request.readerId(), request) : null;
1155+
final Releasable markAsUsed = readerContext != null ? updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(request)) : null;
11621156
try (markAsUsed) {
11631157
final IndexService indexService;
11641158
final Engine.Searcher canMatchSearcher;
11651159
final boolean hasRefreshPending;
11661160
if (readerContext != null) {
1167-
checkKeepAliveLimit(request.keepAlive().millis());
1168-
readerContext.keepAlive(request.keepAlive().millis());
11691161
indexService = readerContext.indexService();
11701162
canMatchSearcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
11711163
hasRefreshPending = false;
11721164
} else {
11731165
indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
11741166
IndexShard indexShard = indexService.getShard(request.shardId().getId());
1175-
hasRefreshPending = indexShard.hasRefreshPending();
1167+
hasRefreshPending = indexShard.hasRefreshPending() && checkRefreshPending;
11761168
canMatchSearcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
11771169
}
11781170

@@ -1197,14 +1189,6 @@ public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException
11971189
}
11981190
}
11991191

1200-
public void canMatch(ShardSearchRequest request, ActionListener<CanMatchResponse> listener) {
1201-
try {
1202-
listener.onResponse(canMatch(request));
1203-
} catch (IOException e) {
1204-
listener.onFailure(e);
1205-
}
1206-
}
1207-
12081192
/**
12091193
* Returns true iff the given search source builder can be early terminated by rewriting to a match none query. Or in other words
12101194
* if the execution of the search request can be early terminated without executing it. This is for instance not possible if

server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,38 +30,38 @@
3030
public class LegacyReaderContext extends ReaderContext {
3131
private final ShardSearchRequest shardSearchRequest;
3232
private final ScrollContext scrollContext;
33+
private final Engine.Searcher searcher;
34+
3335
private AggregatedDfs aggregatedDfs;
3436
private RescoreDocIds rescoreDocIds;
3537

36-
private volatile Engine.Searcher searcher;
37-
3838
public LegacyReaderContext(long id, IndexService indexService, IndexShard indexShard, Engine.SearcherSupplier reader,
3939
ShardSearchRequest shardSearchRequest, long keepAliveInMillis) {
4040
super(id, indexService, indexShard, reader, keepAliveInMillis, false);
4141
assert shardSearchRequest.readerId() == null;
4242
assert shardSearchRequest.keepAlive() == null;
4343
this.shardSearchRequest = Objects.requireNonNull(shardSearchRequest);
4444
if (shardSearchRequest.scroll() != null) {
45+
// Search scroll requests are special, they don't hold indices names so we have
46+
// to reuse the searcher created on the request that initialized the scroll.
47+
// This ensures that we wrap the searcher's reader with the user's permissions
48+
// when they are available.
49+
final Engine.Searcher delegate = searcherSupplier.acquireSearcher("search");
50+
addOnClose(delegate);
51+
// wrap the searcher so that closing is a noop, the actual closing happens when this context is closed
52+
this.searcher = new Engine.Searcher(delegate.source(), delegate.getDirectoryReader(),
53+
delegate.getSimilarity(), delegate.getQueryCache(), delegate.getQueryCachingPolicy(), () -> {});
4554
this.scrollContext = new ScrollContext();
4655
} else {
4756
this.scrollContext = null;
57+
this.searcher = null;
4858
}
4959
}
5060

5161
@Override
5262
public Engine.Searcher acquireSearcher(String source) {
53-
if (scrollContext != null && "search".equals(source)) {
54-
// Search scroll requests are special, they don't hold indices names so we have
55-
// to reuse the searcher created on the request that initialized the scroll.
56-
// This ensures that we wrap the searcher's reader with the user's permissions
57-
// when they are available.
58-
if (searcher == null) {
59-
final Engine.Searcher delegate = searcherSupplier.acquireSearcher(source);
60-
addOnClose(delegate);
61-
// wrap the searcher so that closing is a noop, the actual closing happens when this context is closed
62-
searcher = new Engine.Searcher(delegate.source(), delegate.getDirectoryReader(),
63-
delegate.getSimilarity(), delegate.getQueryCache(), delegate.getQueryCachingPolicy(), () -> {});
64-
}
63+
if (scrollContext != null) {
64+
assert Engine.SEARCH_SOURCE.equals(source) : "scroll context should not acquire searcher for " + source;
6565
return searcher;
6666
}
6767
return super.acquireSearcher(source);

0 commit comments

Comments
 (0)