diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java index 0a08c7a2039c5..56da16acd80b0 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java @@ -53,7 +53,7 @@ import static org.hamcrest.Matchers.hasSize; /** - * Integration test for retry behavior. Useful because retrying relies on the way that the + * Integration test for bulk retry behavior. Useful because retrying relies on the way that the * rest of Elasticsearch throws exceptions and unit tests won't verify that. */ public class RetryTests extends ESIntegTestCase { @@ -77,7 +77,7 @@ protected Collection> nodePlugins() { } /** - * Lower the queue sizes to be small enough that both bulk and searches will time out and have to be retried. + * Lower the queue sizes to be small enough that bulk will time out and have to be retried. */ @Override protected Settings nodeSettings(int nodeOrdinal) { @@ -145,22 +145,15 @@ private void testCase( BulkIndexByScrollResponseMatcher matcher) throws Exception { /* - * These test cases work by stuffing the search and bulk queues of a single node and - * making sure that we read and write from that node. Because of some "fun" with the - * way that searches work, we need at least one more node to act as the coordinating - * node for the search request. If we didn't do this then the searches would get stuck - * in the queue anyway because we force queue portions of the coordinating node's - * actions. This is not a big deal in normal operations but a real pain when you are - * intentionally stuffing queues hoping for a failure. + * These test cases work by stuffing the bulk queue of a single node and + * making sure that we read and write from that node. */ final Settings nodeSettings = Settings.builder() // use pools of size 1 so we can block them .put("thread_pool.write.size", 1) - .put("thread_pool.search.size", 1) - // use queues of size 1 because size 0 is broken and because search requests need the queue to function + // use queues of size 1 because size 0 is broken and because bulk requests need the queue to function .put("thread_pool.write.queue_size", 1) - .put("thread_pool.search.queue_size", 1) .put("node.attr.color", "blue") .build(); final String node = internalCluster().startDataOnlyNode(nodeSettings); @@ -186,45 +179,25 @@ private void testCase( assertFalse(initialBulkResponse.buildFailureMessage(), initialBulkResponse.hasFailures()); client().admin().indices().prepareRefresh("source").get(); - logger.info("Blocking search"); - CyclicBarrier initialSearchBlock = blockExecutor(ThreadPool.Names.SEARCH, node); - AbstractBulkByScrollRequestBuilder builder = request.apply(internalCluster().masterClient()); // Make sure we use more than one batch so we have to scroll builder.source().setSize(DOC_COUNT / randomIntBetween(2, 10)); + logger.info("Blocking bulk so we start to get bulk rejections"); + CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.WRITE, node); + logger.info("Starting request"); ActionFuture responseListener = builder.execute(); try { - logger.info("Waiting for search rejections on the initial search"); - assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(0L))); - - logger.info("Blocking bulk and unblocking search so we start to get bulk rejections"); - CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.WRITE, node); - initialSearchBlock.await(); - logger.info("Waiting for bulk rejections"); assertBusy(() -> assertThat(taskStatus(action).getBulkRetries(), greaterThan(0L))); - - // Keep a copy of the current number of search rejections so we can assert that we get more when we block the scroll - long initialSearchRejections = taskStatus(action).getSearchRetries(); - - logger.info("Blocking search and unblocking bulk so we should get search rejections for the scroll"); - CyclicBarrier scrollBlock = blockExecutor(ThreadPool.Names.SEARCH, node); bulkBlock.await(); - logger.info("Waiting for search rejections for the scroll"); - assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(initialSearchRejections))); - - logger.info("Unblocking the scroll"); - scrollBlock.await(); - logger.info("Waiting for the request to finish"); BulkByScrollResponse response = responseListener.get(); assertThat(response, matcher); assertThat(response.getBulkRetries(), greaterThan(0L)); - assertThat(response.getSearchRetries(), greaterThan(initialSearchRejections)); } finally { // Fetch the response just in case we blew up half way through. This will make sure the failure is thrown up to the top level. BulkByScrollResponse response = responseListener.get(); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java index 1db433278620d..5486ae59581a2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java @@ -53,21 +53,6 @@ * run separate fetch phases etc. */ abstract class SearchScrollAsyncAction implements Runnable { - /* - * Some random TODO: - * Today we still have a dedicated executing mode for scrolls while we could simplify this by implementing - * scroll like functionality (mainly syntactic sugar) as an ordinary search with search_after. We could even go further and - * make the scroll entirely stateless and encode the state per shard in the scroll ID. - * - * Today we also hold a context per shard but maybe - * we want the context per coordinating node such that we route the scroll to the same coordinator all the time and hold the context - * here? This would have the advantage that if we loose that node the entire scroll is deal not just one shard. - * - * Additionally there is the possibility to associate the scroll with a seq. id. such that we can talk to any replica as long as - * the shards engine hasn't advanced that seq. id yet. Such a resume is possible and best effort, it could be even a safety net since - * if you rely on indices being read-only things can change in-between without notification or it's hard to detect if there where any - * changes while scrolling. These are all options to improve the current situation which we can look into down the road - */ protected final Logger logger; protected final ActionListener listener; protected final ParsedScrollId scrollId; diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index b79b713ccef53..f543cc7011db1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -108,7 +108,8 @@ public abstract class Engine implements Closeable { public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid"; public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no"; public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; - public static final String CAN_MATCH_SEARCH_SOURCE = "can_match"; // TODO: Make source of search enum? + public static final String SEARCH_SOURCE = "search"; // TODO: Make source of search enum? + public static final String CAN_MATCH_SEARCH_SOURCE = "can_match"; protected final ShardId shardId; protected final String allocationId; diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 13ef1b1259037..2de2497aacfbb 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -348,8 +348,8 @@ private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardTask task, boolean keepStatesInContext) throws IOException { ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext); - try (Releasable ignored = readerContext.markAsUsed(); - SearchContext context = createContext(readerContext, request, task, true)) { + try (Releasable ignored = readerContext.markAsUsed(getKeepAlive(request)); + SearchContext context = createContext(readerContext, request, task, true)) { dfsPhase.execute(context); return context.dfsResult(); } catch (Exception e) { @@ -380,49 +380,24 @@ public void executeQueryPhase(ShardSearchRequest request, boolean keepStatesInCo rewriteAndFetchShardRequest(shard, request, new ActionListener() { @Override public void onResponse(ShardSearchRequest orig) { - final ReaderContext readerContext = createOrGetReaderContext(orig, keepStatesInContext); - final Releasable markAsUsed = readerContext.markAsUsed(); + // check if we can shortcut the query phase entirely. if (orig.canReturnNullResponseIfMatchNoDocs()) { assert orig.scroll() == null; - // we clone the shard request and perform a quick rewrite using a lightweight - // searcher since we are outside of the search thread pool. - // If the request rewrites to "match none" we can shortcut the query phase - // entirely. Otherwise we fork the execution in the search thread pool. - ShardSearchRequest canMatchRequest = new ShardSearchRequest(orig); - try (Engine.Searcher searcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE)) { - QueryShardContext context = readerContext.indexService().newQueryShardContext(canMatchRequest.shardId().id(), - searcher, canMatchRequest::nowInMillis, canMatchRequest.getClusterAlias()); - Rewriteable.rewrite(canMatchRequest.getRewriteable(), context, true); + final CanMatchResponse canMatchResp; + try { + ShardSearchRequest clone = new ShardSearchRequest(orig); + canMatchResp = canMatch(clone, false); } catch (Exception exc) { - try (markAsUsed) { - listener.onFailure(exc); - } finally { - processFailure(readerContext, exc); - } + listener.onFailure(exc); return; } - if (canRewriteToMatchNone(canMatchRequest.source()) - && canMatchRequest.source().query() instanceof MatchNoneQueryBuilder) { - try (markAsUsed) { - if (orig.readerId() == null) { - try { - listener.onResponse(QuerySearchResult.nullInstance()); - } finally { - // close and remove the ephemeral reader context - removeReaderContext(readerContext.id().getId()); - Releasables.close(readerContext); - } - } else { - listener.onResponse(QuerySearchResult.nullInstance()); - } - } + if (canMatchResp.canMatch == false) { + listener.onResponse(QuerySearchResult.nullInstance()); return; } } - // fork the execution in the search thread pool - runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, readerContext), - wrapFailureListener(listener, readerContext, markAsUsed)); + runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener); } @Override @@ -446,8 +421,10 @@ private void runAsync(Executor executor, CheckedSupplier execu private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task, - ReaderContext readerContext) throws Exception { - try (SearchContext context = createContext(readerContext, request, task, true)) { + boolean keepStatesInContext) throws Exception { + final ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext); + try (Releasable ignored = readerContext.markAsUsed(getKeepAlive(request)); + SearchContext context = createContext(readerContext, request, task, true)) { final long afterQueryTime; try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { loadOrExecuteQueryPhase(request, context); @@ -494,16 +471,11 @@ public void executeQueryPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); - final Releasable markAsUsed = readerContext.markAsUsed(); + final Releasable markAsUsed = readerContext.markAsUsed(getScrollKeepAlive(request.scroll())); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { - if (request.scroll() != null && request.scroll().keepAlive() != null) { - final long keepAlive = request.scroll().keepAlive().millis(); - checkKeepAliveLimit(keepAlive); - readerContext.keepAlive(keepAlive); - } searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); processScroll(request, readerContext, searchContext); queryPhase.execute(searchContext); @@ -512,17 +484,17 @@ public void executeQueryPhase(InternalScrollSearchRequest request, return new ScrollQuerySearchResult(searchContext.queryResult(), searchContext.shardTarget()); } catch (Exception e) { logger.trace("Query phase failed", e); - processFailure(readerContext, e); + // we handle the failure in the failure listener below throw e; } - }, ActionListener.runAfter(listener, markAsUsed::close)); + }, wrapFailureListener(listener, readerContext, markAsUsed)); } public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { - final ReaderContext readerContext = findReaderContext(request.contextId(), request); - final Releasable markAsUsed = readerContext.markAsUsed(); + final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest()); + final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest()); + final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); runAsync(getExecutor(readerContext.indexShard()), () -> { - final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest()); readerContext.setAggregatedDfs(request.dfs()); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { @@ -542,7 +514,7 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); logger.trace("Query phase failed", e); - processFailure(readerContext, e); + // we handle the failure in the failure listener below throw e; } }, wrapFailureListener(listener, readerContext, markAsUsed)); @@ -564,15 +536,11 @@ private Executor getExecutor(IndexShard indexShard) { public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); - final Releasable markAsUsed = readerContext.markAsUsed(); + final Releasable markAsUsed = readerContext.markAsUsed(getScrollKeepAlive(request.scroll())); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { - if (request.scroll() != null && request.scroll().keepAlive() != null) { - checkKeepAliveLimit(request.scroll().keepAlive().millis()); - readerContext.keepAlive(request.scroll().keepAlive().millis()); - } searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null)); searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); processScroll(request, readerContext, searchContext); @@ -583,17 +551,17 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTa } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); logger.trace("Fetch phase failed", e); - processFailure(readerContext, e); + // we handle the failure in the failure listener below throw e; } - }, ActionListener.runAfter(listener, markAsUsed::close)); + }, wrapFailureListener(listener, readerContext, markAsUsed)); } public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { final ReaderContext readerContext = findReaderContext(request.contextId(), request); - final Releasable markAsUsed = readerContext.markAsUsed(); + final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); + final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); runAsync(getExecutor(readerContext.indexShard()), () -> { - final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) { if (request.lastEmittedDoc() != null) { searchContext.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); @@ -612,8 +580,7 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A return searchContext.fetchResult(); } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); - logger.trace("Fetch phase failed", e); - processFailure(readerContext, e); + // we handle the failure in the failure listener below throw e; } }, wrapFailureListener(listener, readerContext, markAsUsed)); @@ -648,9 +615,6 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request, boolean if (request.readerId() != null) { assert keepStatesInContext == false; final ReaderContext readerContext = findReaderContext(request.readerId(), request); - final long keepAlive = request.keepAlive().millis(); - checkKeepAliveLimit(keepAlive); - readerContext.keepAlive(keepAlive); return readerContext; } IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); @@ -676,7 +640,6 @@ final ReaderContext createAndPutReaderContext(ShardSearchRequest request, IndexS } } final long keepAlive = getKeepAlive(request); - checkKeepAliveLimit(keepAlive); if (keepStatesInContext || request.scroll() != null) { readerContext = new LegacyReaderContext(idGenerator.incrementAndGet(), indexService, shard, reader, request, keepAlive); if (request.scroll() != null) { @@ -777,7 +740,7 @@ public DefaultSearchContext createSearchContext(ShardSearchRequest request, Time final Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier(); try (ReaderContext readerContext = new ReaderContext(idGenerator.incrementAndGet(), indexService, indexShard, reader, -1L, true)) { DefaultSearchContext searchContext = createSearchContext(readerContext, request, timeout); - searchContext.addReleasable(readerContext.markAsUsed()); + searchContext.addReleasable(readerContext.markAsUsed(0L)); return searchContext; } } @@ -836,13 +799,24 @@ public void freeAllScrollContexts() { } private long getKeepAlive(ShardSearchRequest request) { - if (request.scroll() != null && request.scroll().keepAlive() != null) { - return request.scroll().keepAlive().millis(); + if (request.scroll() != null) { + return getScrollKeepAlive(request.scroll()); + } else if (request.keepAlive() != null) { + checkKeepAliveLimit(request.keepAlive().millis()); + return request.keepAlive().getMillis(); } else { - return defaultKeepAlive; + return request.readerId() == null ? defaultKeepAlive : -1; } } + private long getScrollKeepAlive(Scroll scroll) { + if (scroll != null && scroll.keepAlive() != null) { + checkKeepAliveLimit(scroll.keepAlive().millis()); + return scroll.keepAlive().getMillis(); + } + return defaultKeepAlive; + } + private void checkKeepAliveLimit(long keepAlive) { if (keepAlive > maxKeepAlive) { throw new IllegalArgumentException( @@ -1151,29 +1125,39 @@ public AliasFilter buildAliasFilter(ClusterState state, String index, Set listener) { + try { + listener.onResponse(canMatch(request)); + } catch (IOException e) { + listener.onFailure(e); + } + } + /** * This method uses a lightweight searcher without wrapping (i.e., not open a full reader on frozen indices) to rewrite the query * to check if the query can match any documents. This method can have false positives while if it returns {@code false} the query * won't match any documents on the current shard. */ public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException { + return canMatch(request, true); + } + + private CanMatchResponse canMatch(ShardSearchRequest request, boolean checkRefreshPending) throws IOException { assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType(); - final ReaderContext readerContext = request.readerId() != null ? findReaderContext(request.readerId(), request) : null; - final Releasable markAsUsed = readerContext != null ? readerContext.markAsUsed() : null; + final ReaderContext readerContext = request.readerId() != null ? findReaderContext(request.readerId(), request) : null; + final Releasable markAsUsed = readerContext != null ? readerContext.markAsUsed(getKeepAlive(request)) : null; try (markAsUsed) { final IndexService indexService; final Engine.Searcher canMatchSearcher; final boolean hasRefreshPending; if (readerContext != null) { - checkKeepAliveLimit(request.keepAlive().millis()); - readerContext.keepAlive(request.keepAlive().millis()); indexService = readerContext.indexService(); canMatchSearcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); hasRefreshPending = false; } else { indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexShard indexShard = indexService.getShard(request.shardId().getId()); - hasRefreshPending = indexShard.hasRefreshPending(); + hasRefreshPending = indexShard.hasRefreshPending() && checkRefreshPending; canMatchSearcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); } @@ -1198,14 +1182,6 @@ public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException } } - public void canMatch(ShardSearchRequest request, ActionListener listener) { - try { - listener.onResponse(canMatch(request)); - } catch (IOException e) { - listener.onFailure(e); - } - } - /** * Returns true iff the given search source builder can be early terminated by rewriting to a match none query. Or in other words * if the execution of the search request can be early terminated without executing it. This is for instance not possible if diff --git a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java index ab188f8ddf23f..f47d97fe33633 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java @@ -30,11 +30,11 @@ public class LegacyReaderContext extends ReaderContext { private final ShardSearchRequest shardSearchRequest; private final ScrollContext scrollContext; + private final Engine.Searcher searcher; + private AggregatedDfs aggregatedDfs; private RescoreDocIds rescoreDocIds; - private volatile Engine.Searcher searcher; - public LegacyReaderContext(long id, IndexService indexService, IndexShard indexShard, Engine.SearcherSupplier reader, ShardSearchRequest shardSearchRequest, long keepAliveInMillis) { super(id, indexService, indexShard, reader, keepAliveInMillis, false); @@ -42,26 +42,26 @@ public LegacyReaderContext(long id, IndexService indexService, IndexShard indexS assert shardSearchRequest.keepAlive() == null; this.shardSearchRequest = Objects.requireNonNull(shardSearchRequest); if (shardSearchRequest.scroll() != null) { + // Search scroll requests are special, they don't hold indices names so we have + // to reuse the searcher created on the request that initialized the scroll. + // This ensures that we wrap the searcher's reader with the user's permissions + // when they are available. + final Engine.Searcher delegate = searcherSupplier.acquireSearcher("search"); + addOnClose(delegate); + // wrap the searcher so that closing is a noop, the actual closing happens when this context is closed + this.searcher = new Engine.Searcher(delegate.source(), delegate.getDirectoryReader(), + delegate.getSimilarity(), delegate.getQueryCache(), delegate.getQueryCachingPolicy(), () -> {}); this.scrollContext = new ScrollContext(); } else { this.scrollContext = null; + this.searcher = null; } } @Override public Engine.Searcher acquireSearcher(String source) { - if (scrollContext != null && "search".equals(source)) { - // Search scroll requests are special, they don't hold indices names so we have - // to reuse the searcher created on the request that initialized the scroll. - // This ensures that we wrap the searcher's reader with the user's permissions - // when they are available. - if (searcher == null) { - final Engine.Searcher delegate = searcherSupplier.acquireSearcher(source); - addOnClose(delegate); - // wrap the searcher so that closing is a noop, the actual closing happens when this context is closed - searcher = new Engine.Searcher(delegate.source(), delegate.getDirectoryReader(), - delegate.getSimilarity(), delegate.getQueryCache(), delegate.getQueryCachingPolicy(), () -> {}); - } + if (scrollContext != null) { + assert Engine.SEARCH_SOURCE.equals(source) : "scroll context should not acquire searcher for " + source; return searcher; } return super.acquireSearcher(source); diff --git a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java index b2a69d601965c..c00333d38d6bb 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java @@ -124,17 +124,18 @@ public Engine.Searcher acquireSearcher(String source) { return searcherSupplier.acquireSearcher(source); } - public void keepAlive(long keepAlive) { + private void tryUpdateKeepAlive(long keepAlive) { this.keepAlive.updateAndGet(curr -> Math.max(curr, keepAlive)); } /** - * Marks this reader as being used so its time to live should not be expired. - * - * @return a releasable to indicate the caller has stopped using this reader + * Returns a releasable to indicate that the caller has stopped using this reader. + * The time to live of the reader after usage can be extended using the provided + * keepAliveInMillis. */ - public Releasable markAsUsed() { + public Releasable markAsUsed(long keepAliveInMillis) { refCounted.incRef(); + tryUpdateKeepAlive(keepAliveInMillis); return Releasables.releaseOnce(() -> { this.lastAccessTime.updateAndGet(curr -> Math.max(curr, nowInMillis())); refCounted.decRef();