Skip to content

Shard Search Scroll failures consistency #62061

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public abstract class Engine implements Closeable {
public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid";
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
public static final String CAN_MATCH_SEARCH_SOURCE = "can_match"; // TODO: Make source of search enum?
public static final String SEARCH_SOURCE = "search"; // TODO: Make source of search enum?
public static final String CAN_MATCH_SEARCH_SOURCE = "can_match";

protected final ShardId shardId;
protected final String allocationId;
Expand Down
130 changes: 57 additions & 73 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,8 @@ private DfsSearchResult executeDfsPhase(ShardSearchRequest request,
SearchShardTask task,
boolean keepStatesInContext) throws IOException {
ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext);
try (Releasable ignored = readerContext.markAsUsed();
SearchContext context = createContext(readerContext, request, task, true)) {
try (Releasable ignored = updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(request));
SearchContext context = createContext(readerContext, request, task, true)) {
dfsPhase.execute(context);
return context.dfsResult();
} catch (Exception e) {
Expand Down Expand Up @@ -380,49 +380,24 @@ public void executeQueryPhase(ShardSearchRequest request, boolean keepStatesInCo
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
@Override
public void onResponse(ShardSearchRequest orig) {
final ReaderContext readerContext = createOrGetReaderContext(orig, keepStatesInContext);
final Releasable markAsUsed = readerContext.markAsUsed();
// check if we can shortcut the query phase entirely.
if (orig.canReturnNullResponseIfMatchNoDocs()) {
assert orig.scroll() == null;
// we clone the shard request and perform a quick rewrite using a lightweight
// searcher since we are outside of the search thread pool.
// If the request rewrites to "match none" we can shortcut the query phase
// entirely. Otherwise we fork the execution in the search thread pool.
ShardSearchRequest canMatchRequest = new ShardSearchRequest(orig);
try (Engine.Searcher searcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE)) {
QueryShardContext context = readerContext.indexService().newQueryShardContext(canMatchRequest.shardId().id(),
searcher, canMatchRequest::nowInMillis, canMatchRequest.getClusterAlias());
Rewriteable.rewrite(canMatchRequest.getRewriteable(), context, true);
final CanMatchResponse canMatchResp;
try {
ShardSearchRequest clone = new ShardSearchRequest(orig);
canMatchResp = canMatch(clone, false);
} catch (Exception exc) {
try (markAsUsed) {
listener.onFailure(exc);
} finally {
processFailure(readerContext, exc);
}
listener.onFailure(exc);
return;
}
if (canRewriteToMatchNone(canMatchRequest.source())
&& canMatchRequest.source().query() instanceof MatchNoneQueryBuilder) {
try (markAsUsed) {
if (orig.readerId() == null) {
try {
listener.onResponse(QuerySearchResult.nullInstance());
} finally {
// close and remove the ephemeral reader context
removeReaderContext(readerContext.id().getId());
Releasables.close(readerContext);
}
} else {
listener.onResponse(QuerySearchResult.nullInstance());
}
}
if (canMatchResp.canMatch == false) {
listener.onResponse(QuerySearchResult.nullInstance());
return;
}
}

// fork the execution in the search thread pool
runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, readerContext),
wrapFailureListener(listener, readerContext, markAsUsed));
runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener);
}

@Override
Expand All @@ -446,8 +421,10 @@ private <T> void runAsync(Executor executor, CheckedSupplier<T, Exception> execu

private SearchPhaseResult executeQueryPhase(ShardSearchRequest request,
SearchShardTask task,
ReaderContext readerContext) throws Exception {
try (SearchContext context = createContext(readerContext, request, task, true)) {
boolean keepStatesInContext) throws Exception {
final ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext);
try (Releasable ignored = updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(request));
SearchContext context = createContext(readerContext, request, task, true)) {
final long afterQueryTime;
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
loadOrExecuteQueryPhase(request, context);
Expand Down Expand Up @@ -494,16 +471,11 @@ public void executeQueryPhase(InternalScrollSearchRequest request,
SearchShardTask task,
ActionListener<ScrollQuerySearchResult> listener) {
final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request);
final Releasable markAsUsed = readerContext.markAsUsed();
final Releasable markAsUsed = updateKeepAliveAndMarkAsUsed(readerContext, getScrollKeepAlive(request.scroll()));
runAsync(getExecutor(readerContext.indexShard()), () -> {
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) {
if (request.scroll() != null && request.scroll().keepAlive() != null) {
final long keepAlive = request.scroll().keepAlive().millis();
checkKeepAliveLimit(keepAlive);
readerContext.keepAlive(keepAlive);
}
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
processScroll(request, readerContext, searchContext);
queryPhase.execute(searchContext);
Expand All @@ -519,10 +491,10 @@ public void executeQueryPhase(InternalScrollSearchRequest request,
}

public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener<QuerySearchResult> listener) {
final ReaderContext readerContext = findReaderContext(request.contextId(), request);
final Releasable markAsUsed = readerContext.markAsUsed();
final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest());
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest());
final Releasable markAsUsed = updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(shardSearchRequest));
runAsync(getExecutor(readerContext.indexShard()), () -> {
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest());
readerContext.setAggregatedDfs(request.dfs());
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) {
Expand Down Expand Up @@ -564,15 +536,11 @@ private Executor getExecutor(IndexShard indexShard) {
public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTask task,
ActionListener<ScrollQueryFetchSearchResult> listener) {
final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request);
final Releasable markAsUsed = readerContext.markAsUsed();
final Releasable markAsUsed = updateKeepAliveAndMarkAsUsed(readerContext, getScrollKeepAlive(request.scroll()));
runAsync(getExecutor(readerContext.indexShard()), () -> {
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) {
if (request.scroll() != null && request.scroll().keepAlive() != null) {
checkKeepAliveLimit(request.scroll().keepAlive().millis());
readerContext.keepAlive(request.scroll().keepAlive().millis());
}
searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null));
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
processScroll(request, readerContext, searchContext);
Expand All @@ -591,9 +559,9 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTa

public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener<FetchSearchResult> listener) {
final ReaderContext readerContext = findReaderContext(request.contextId(), request);
final Releasable markAsUsed = readerContext.markAsUsed();
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
final Releasable markAsUsed = updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(shardSearchRequest));
runAsync(getExecutor(readerContext.indexShard()), () -> {
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) {
if (request.lastEmittedDoc() != null) {
searchContext.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
Expand Down Expand Up @@ -643,13 +611,18 @@ private ReaderContext findReaderContext(ShardSearchContextId id, TransportReques
return reader;
}

private Releasable updateKeepAliveAndMarkAsUsed(ReaderContext reader, long keepAlive) {
if (keepAlive > 0L) {
checkKeepAliveLimit(keepAlive);
reader.keepAlive(keepAlive);
}
return reader.markAsUsed();
}

final ReaderContext createOrGetReaderContext(ShardSearchRequest request, boolean keepStatesInContext) {
if (request.readerId() != null) {
assert keepStatesInContext == false;
final ReaderContext readerContext = findReaderContext(request.readerId(), request);
final long keepAlive = request.keepAlive().millis();
checkKeepAliveLimit(keepAlive);
readerContext.keepAlive(keepAlive);
return readerContext;
}
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
Expand Down Expand Up @@ -835,11 +808,20 @@ public void freeAllScrollContexts() {
}

private long getKeepAlive(ShardSearchRequest request) {
if (request.scroll() != null && request.scroll().keepAlive() != null) {
return request.scroll().keepAlive().millis();
if (request.scroll() != null) {
return getScrollKeepAlive(request.scroll());
} else if (request.keepAlive() != null) {
return request.keepAlive().getMillis();
} else {
return defaultKeepAlive;
return request.readerId() == null ? defaultKeepAlive : -1;
}
}

private long getScrollKeepAlive(Scroll scroll) {
if (scroll != null && scroll.keepAlive() != null) {
return scroll.keepAlive().getMillis();
}
return defaultKeepAlive;
}

private void checkKeepAliveLimit(long keepAlive) {
Expand Down Expand Up @@ -1150,29 +1132,39 @@ public AliasFilter buildAliasFilter(ClusterState state, String index, Set<String
return indicesService.buildAliasFilter(state, index, resolvedExpressions);
}

public void canMatch(ShardSearchRequest request, ActionListener<CanMatchResponse> listener) {
try {
listener.onResponse(canMatch(request));
} catch (IOException e) {
listener.onFailure(e);
}
}

/**
* This method uses a lightweight searcher without wrapping (i.e., not open a full reader on frozen indices) to rewrite the query
* to check if the query can match any documents. This method can have false positives while if it returns {@code false} the query
* won't match any documents on the current shard.
*/
public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException {
return canMatch(request, true);
}

private CanMatchResponse canMatch(ShardSearchRequest request, boolean checkRefreshPending) throws IOException {
assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType();
final ReaderContext readerContext = request.readerId() != null ? findReaderContext(request.readerId(), request) : null;
final Releasable markAsUsed = readerContext != null ? readerContext.markAsUsed() : null;
final ReaderContext readerContext = request.readerId() != null ? findReaderContext(request.readerId(), request) : null;
final Releasable markAsUsed = readerContext != null ? updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(request)) : null;
try (markAsUsed) {
final IndexService indexService;
final Engine.Searcher canMatchSearcher;
final boolean hasRefreshPending;
if (readerContext != null) {
checkKeepAliveLimit(request.keepAlive().millis());
readerContext.keepAlive(request.keepAlive().millis());
indexService = readerContext.indexService();
canMatchSearcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
hasRefreshPending = false;
} else {
indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().getId());
hasRefreshPending = indexShard.hasRefreshPending();
hasRefreshPending = indexShard.hasRefreshPending() && checkRefreshPending;
canMatchSearcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE);
}

Expand All @@ -1197,14 +1189,6 @@ public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException
}
}

public void canMatch(ShardSearchRequest request, ActionListener<CanMatchResponse> listener) {
try {
listener.onResponse(canMatch(request));
} catch (IOException e) {
listener.onFailure(e);
}
}

/**
* Returns true iff the given search source builder can be early terminated by rewriting to a match none query. Or in other words
* if the execution of the search request can be early terminated without executing it. This is for instance not possible if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,38 +30,38 @@
public class LegacyReaderContext extends ReaderContext {
private final ShardSearchRequest shardSearchRequest;
private final ScrollContext scrollContext;
private final Engine.Searcher searcher;

private AggregatedDfs aggregatedDfs;
private RescoreDocIds rescoreDocIds;

private volatile Engine.Searcher searcher;

public LegacyReaderContext(long id, IndexService indexService, IndexShard indexShard, Engine.SearcherSupplier reader,
ShardSearchRequest shardSearchRequest, long keepAliveInMillis) {
super(id, indexService, indexShard, reader, keepAliveInMillis, false);
assert shardSearchRequest.readerId() == null;
assert shardSearchRequest.keepAlive() == null;
this.shardSearchRequest = Objects.requireNonNull(shardSearchRequest);
if (shardSearchRequest.scroll() != null) {
// Search scroll requests are special, they don't hold indices names so we have
// to reuse the searcher created on the request that initialized the scroll.
// This ensures that we wrap the searcher's reader with the user's permissions
// when they are available.
final Engine.Searcher delegate = searcherSupplier.acquireSearcher("search");
addOnClose(delegate);
// wrap the searcher so that closing is a noop, the actual closing happens when this context is closed
this.searcher = new Engine.Searcher(delegate.source(), delegate.getDirectoryReader(),
delegate.getSimilarity(), delegate.getQueryCache(), delegate.getQueryCachingPolicy(), () -> {});
this.scrollContext = new ScrollContext();
} else {
this.scrollContext = null;
this.searcher = null;
}
}

@Override
public Engine.Searcher acquireSearcher(String source) {
if (scrollContext != null && "search".equals(source)) {
// Search scroll requests are special, they don't hold indices names so we have
// to reuse the searcher created on the request that initialized the scroll.
// This ensures that we wrap the searcher's reader with the user's permissions
// when they are available.
if (searcher == null) {
final Engine.Searcher delegate = searcherSupplier.acquireSearcher(source);
addOnClose(delegate);
// wrap the searcher so that closing is a noop, the actual closing happens when this context is closed
searcher = new Engine.Searcher(delegate.source(), delegate.getDirectoryReader(),
delegate.getSimilarity(), delegate.getQueryCache(), delegate.getQueryCachingPolicy(), () -> {});
}
if (scrollContext != null) {
assert Engine.SEARCH_SOURCE.equals(source) : "scroll context should not acquire searcher for " + source;
return searcher;
}
return super.acquireSearcher(source);
Expand Down