diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/GetAsyncSearchRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/GetAsyncSearchRequest.java index 11ad059349481..04f1c8d4be6e5 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/GetAsyncSearchRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/GetAsyncSearchRequest.java @@ -32,8 +32,6 @@ public class GetAsyncSearchRequest implements Validatable { private TimeValue waitForCompletion; private TimeValue keepAlive; - public static final long MIN_KEEPALIVE = TimeValue.timeValueMinutes(1).millis(); - private final String id; public GetAsyncSearchRequest(String id) { @@ -62,14 +60,7 @@ public void setKeepAlive(TimeValue keepAlive) { @Override public Optional validate() { - final ValidationException validationException = new ValidationException(); - if (keepAlive != null && keepAlive.getMillis() < MIN_KEEPALIVE) { - validationException.addValidationError("keep_alive must be greater than 1 minute, got: " + keepAlive.toString()); - } - if (validationException.validationErrors().isEmpty()) { - return Optional.empty(); - } - return Optional.of(validationException); + return Optional.empty(); } @Override diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/SubmitAsyncSearchRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/SubmitAsyncSearchRequest.java index 4a92eb803922d..00d13070e84d6 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/SubmitAsyncSearchRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/SubmitAsyncSearchRequest.java @@ -36,8 +36,6 @@ */ public class SubmitAsyncSearchRequest implements Validatable { - public static final int DEFAULT_BATCHED_REDUCE_SIZE = 5; - public static long MIN_KEEP_ALIVE = TimeValue.timeValueMinutes(1).millis(); private TimeValue waitForCompletionTimeout; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/GetAsyncSearchRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/GetAsyncSearchRequestTests.java deleted file mode 100644 index b6861b218cd28..0000000000000 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/GetAsyncSearchRequestTests.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.client.asyncsearch; - -import org.elasticsearch.client.ValidationException; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.test.ESTestCase; - -import java.util.concurrent.TimeUnit; - -public class GetAsyncSearchRequestTests extends ESTestCase { - - public void testValidation() { - GetAsyncSearchRequest getAsyncSearchRequest = new GetAsyncSearchRequest(randomAlphaOfLength(10)); - getAsyncSearchRequest.setKeepAlive(new TimeValue(0)); - assertTrue(getAsyncSearchRequest.validate().isPresent()); - ValidationException validationException = getAsyncSearchRequest.validate().get(); - assertEquals(1, validationException.validationErrors().size()); - assertEquals("Validation Failed: 1: keep_alive must be greater than 1 minute, got: 0s;", validationException.getMessage()); - - getAsyncSearchRequest.setKeepAlive(new TimeValue(1, TimeUnit.MINUTES)); - assertFalse(getAsyncSearchRequest.validate().isPresent()); - } -} diff --git a/server/src/main/java/org/elasticsearch/search/SearchShardTarget.java b/server/src/main/java/org/elasticsearch/search/SearchShardTarget.java index 4bc3e3ae986da..155de2c277995 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchShardTarget.java +++ b/server/src/main/java/org/elasticsearch/search/SearchShardTarget.java @@ -63,7 +63,7 @@ public SearchShardTarget(String nodeId, ShardId shardId, @Nullable String cluste @Nullable public String getNodeId() { - return nodeId.string(); + return nodeId != null ? nodeId.string() : null; } public Text getNodeIdText() { diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java index 6412ea055050e..bae2274e6364c 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java @@ -15,9 +15,9 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; @@ -39,6 +39,8 @@ import java.util.List; import java.util.function.Supplier; +import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING; + public final class AsyncSearch extends Plugin implements ActionPlugin { private final Settings settings; @@ -84,11 +86,16 @@ public Collection createComponents(Client client, AsyncSearchIndexService indexService = new AsyncSearchIndexService(clusterService, threadPool.getThreadContext(), client, namedWriteableRegistry); AsyncSearchMaintenanceService maintenanceService = - new AsyncSearchMaintenanceService(nodeEnvironment.nodeId(), threadPool, indexService, TimeValue.timeValueHours(1)); + new AsyncSearchMaintenanceService(nodeEnvironment.nodeId(), settings, threadPool, indexService); clusterService.addListener(maintenanceService); return Collections.singletonList(maintenanceService); } else { return Collections.emptyList(); } } + + @Override + public List> getSettings() { + return Collections.singletonList(ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING); + } } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java index 08df7e5769b59..fbe107c062db1 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java @@ -265,14 +265,16 @@ void getResponse(AsyncSearchId searchId, return; } - if (restoreResponseHeaders) { + if (restoreResponseHeaders && get.getSource().containsKey(RESPONSE_HEADERS_FIELD)) { @SuppressWarnings("unchecked") Map> responseHeaders = (Map>) get.getSource().get(RESPONSE_HEADERS_FIELD); restoreResponseHeadersContext(securityContext.getThreadContext(), responseHeaders); } + long expirationTime = (long) get.getSource().get(EXPIRATION_TIME_FIELD); String encoded = (String) get.getSource().get(RESULT_FIELD); - listener.onResponse(encoded != null ? decodeResponse(encoded) : null); + AsyncSearchResponse response = decodeResponse(encoded, expirationTime); + listener.onResponse(encoded != null ? response : null); }, listener::onFailure )); @@ -331,11 +333,11 @@ String encodeResponse(AsyncSearchResponse response) throws IOException { /** * Decode the provided base-64 bytes into a {@link AsyncSearchResponse}. */ - AsyncSearchResponse decodeResponse(String value) throws IOException { + AsyncSearchResponse decodeResponse(String value, long expirationTime) throws IOException { try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) { try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) { in.setVersion(Version.readVersion(in)); - return new AsyncSearchResponse(in); + return new AsyncSearchResponse(in, expirationTime); } } } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java index 1cac9c0eaf458..d0fd14f409f7c 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java @@ -14,6 +14,8 @@ import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.gateway.GatewayService; @@ -26,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.xpack.search.AsyncSearchIndexService.EXPIRATION_TIME_FIELD; +import static org.elasticsearch.xpack.search.AsyncSearchIndexService.INDEX; /** * A service that runs a periodic cleanup over the async-search index. @@ -33,23 +36,32 @@ class AsyncSearchMaintenanceService implements Releasable, ClusterStateListener { private static final Logger logger = LogManager.getLogger(AsyncSearchMaintenanceService.class); + /** + * Controls the interval at which the cleanup is scheduled. + * Defaults to 1h. It is an undocumented/expert setting that + * is mainly used by integration tests to make the garbage + * collection of search responses more reactive. + */ + public static final Setting ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING = + Setting.timeSetting("async_search.index_cleanup_interval", TimeValue.timeValueHours(1), Setting.Property.NodeScope); + private final String localNodeId; private final ThreadPool threadPool; private final AsyncSearchIndexService indexService; private final TimeValue delay; - private final AtomicBoolean isCleanupRunning = new AtomicBoolean(false); + private boolean isCleanupRunning; private final AtomicBoolean isClosed = new AtomicBoolean(false); private volatile Scheduler.Cancellable cancellable; AsyncSearchMaintenanceService(String localNodeId, + Settings nodeSettings, ThreadPool threadPool, - AsyncSearchIndexService indexService, - TimeValue delay) { + AsyncSearchIndexService indexService) { this.localNodeId = localNodeId; this.threadPool = threadPool; this.indexService = indexService; - this.delay = delay; + this.delay = ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings); } @Override @@ -62,31 +74,30 @@ public void clusterChanged(ClusterChangedEvent event) { tryStartCleanup(state); } - void tryStartCleanup(ClusterState state) { + synchronized void tryStartCleanup(ClusterState state) { if (isClosed.get()) { return; } IndexRoutingTable indexRouting = state.routingTable().index(AsyncSearchIndexService.INDEX); if (indexRouting == null) { - if (isCleanupRunning.compareAndSet(true, false)) { - close(); - } + stop(); return; } String primaryNodeId = indexRouting.shard(0).primaryShard().currentNodeId(); if (localNodeId.equals(primaryNodeId)) { - if (isCleanupRunning.compareAndSet(false, true)) { + if (isCleanupRunning == false) { + isCleanupRunning = true; executeNextCleanup(); } - } else if (isCleanupRunning.compareAndSet(true, false)) { - close(); + } else { + stop(); } } synchronized void executeNextCleanup() { - if (isClosed.get() == false && isCleanupRunning.get()) { + if (isClosed.get() == false && isCleanupRunning) { long nowInMillis = System.currentTimeMillis(); - DeleteByQueryRequest toDelete = new DeleteByQueryRequest() + DeleteByQueryRequest toDelete = new DeleteByQueryRequest(INDEX) .setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis)); indexService.getClient() .execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(() -> scheduleNextCleanup())); @@ -94,7 +105,7 @@ synchronized void executeNextCleanup() { } synchronized void scheduleNextCleanup() { - if (isClosed.get() == false && isCleanupRunning.get()) { + if (isClosed.get() == false && isCleanupRunning) { try { cancellable = threadPool.schedule(this::executeNextCleanup, delay, ThreadPool.Names.GENERIC); } catch (EsRejectedExecutionException e) { @@ -107,11 +118,18 @@ synchronized void scheduleNextCleanup() { } } + synchronized void stop() { + if (isCleanupRunning) { + if (cancellable != null && cancellable.isCancelled() == false) { + cancellable.cancel(); + } + isCleanupRunning = false; + } + } + @Override public void close() { - if (cancellable != null && cancellable.isCancelled() == false) { - cancellable.cancel(); - } + stop(); isClosed.compareAndSet(false, true); } } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index 76d7f3fe91eab..d6603119d0e5f 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -297,6 +297,7 @@ private void executeCompletionListeners() { */ private AsyncSearchResponse getResponse() { assert searchResponse.get() != null; + checkCancellation(); return searchResponse.get().toAsyncSearchResponse(this, expirationTimeMillis); } @@ -306,15 +307,17 @@ private AsyncSearchResponse getResponse() { */ private AsyncSearchResponse getResponseWithHeaders() { assert searchResponse.get() != null; + checkCancellation(); return searchResponse.get().toAsyncSearchResponseWithHeaders(this, expirationTimeMillis); } // checks if the search task should be cancelled - private void checkCancellation() { + private synchronized void checkCancellation() { long now = System.currentTimeMillis(); - if (expirationTimeMillis < now || checkSubmitCancellation.getAsBoolean()) { + if (hasCompleted == false && + expirationTimeMillis < now || checkSubmitCancellation.getAsBoolean()) { // we cancel the search task if the initial submit task was cancelled, // this is needed because the task cancellation mechanism doesn't // handle the cancellation of grand-children. diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java index 684f915e2ce69..db7115e4eb37c 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchAction; @@ -25,6 +26,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.engine.DocumentMissingException; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.tasks.CancellableTask; @@ -187,7 +189,9 @@ private void onFinalResponse(CancellableTask submitTask, store.storeFinalResponse(searchTask.getSearchId().getDocId(), threadContext.getResponseHeaders(),response, ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction), exc -> { - if (exc.getCause() instanceof DocumentMissingException == false) { + Throwable cause = ExceptionsHelper.unwrapCause(exc); + if (cause instanceof DocumentMissingException == false && + cause instanceof VersionConflictEngineException == false) { logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", searchTask.getSearchId().getEncoded()), exc); } diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java index aac4d1ff1d3cb..129f08cd27a2f 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.search; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -31,11 +32,12 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; -// TODO: add tests for keepAlive and expiration public class AsyncSearchActionIT extends AsyncSearchIntegTestCase { private String indexName; private int numShards; @@ -277,4 +279,117 @@ public void testCancellation() throws Exception { deleteAsyncSearch(response.getId()); ensureTaskRemoval(response.getId()); } + + public void testUpdateRunningKeepAlive() throws Exception { + SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName); + request.getSearchRequest().source( + new SearchSourceBuilder().aggregation(new CancellingAggregationBuilder("test")) + ); + long now = System.currentTimeMillis(); + request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1)); + AsyncSearchResponse response = submitAsyncSearch(request); + assertNotNull(response.getSearchResponse()); + assertTrue(response.isRunning()); + assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards)); + assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0)); + assertThat(response.getSearchResponse().getFailedShards(), equalTo(0)); + assertThat(response.getExpirationTime(), greaterThan(now)); + long expirationTime = response.getExpirationTime(); + + response = getAsyncSearch(response.getId()); + assertNotNull(response.getSearchResponse()); + assertTrue(response.isRunning()); + assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards)); + assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0)); + assertThat(response.getSearchResponse().getFailedShards(), equalTo(0)); + + response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10)); + assertThat(response.getExpirationTime(), greaterThan(expirationTime)); + + assertTrue(response.isRunning()); + assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards)); + assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0)); + assertThat(response.getSearchResponse().getFailedShards(), equalTo(0)); + + response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1)); + assertThat(response.getExpirationTime(), lessThan(expirationTime)); + ensureTaskNotRunning(response.getId()); + ensureTaskRemoval(response.getId()); + } + + public void testUpdateStoreKeepAlive() throws Exception { + SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName); + long now = System.currentTimeMillis(); + request.setWaitForCompletionTimeout(TimeValue.timeValueMinutes(10)); + request.setKeepOnCompletion(true); + AsyncSearchResponse response = submitAsyncSearch(request); + assertNotNull(response.getSearchResponse()); + assertFalse(response.isRunning()); + assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards)); + assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards)); + assertThat(response.getSearchResponse().getFailedShards(), equalTo(0)); + assertThat(response.getExpirationTime(), greaterThan(now)); + long expirationTime = response.getExpirationTime(); + + response = getAsyncSearch(response.getId()); + assertNotNull(response.getSearchResponse()); + assertFalse(response.isRunning()); + assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards)); + assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards)); + assertThat(response.getSearchResponse().getFailedShards(), equalTo(0)); + + response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10)); + assertThat(response.getExpirationTime(), greaterThan(expirationTime)); + + assertFalse(response.isRunning()); + assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards)); + assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards)); + assertThat(response.getSearchResponse().getFailedShards(), equalTo(0)); + + response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1)); + assertThat(response.getExpirationTime(), lessThan(expirationTime)); + ensureTaskNotRunning(response.getId()); + ensureTaskRemoval(response.getId()); + } + + public void testRemoveAsyncIndex() throws Exception { + SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName); + request.setWaitForCompletionTimeout(TimeValue.timeValueMinutes(10)); + request.setKeepOnCompletion(true); + long now = System.currentTimeMillis(); + AsyncSearchResponse response = submitAsyncSearch(request); + assertNotNull(response.getSearchResponse()); + assertFalse(response.isRunning()); + assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards)); + assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards)); + assertThat(response.getSearchResponse().getFailedShards(), equalTo(0)); + assertThat(response.getExpirationTime(), greaterThan(now)); + + // remove the async search index + client().admin().indices().prepareDelete(AsyncSearchIndexService.INDEX).get(); + + Exception exc = expectThrows(Exception.class, () -> getAsyncSearch(response.getId())); + Throwable cause = exc instanceof ExecutionException ? + ExceptionsHelper.unwrapCause(exc.getCause()) : ExceptionsHelper.unwrapCause(exc); + assertThat(ExceptionsHelper.status(cause).getStatus(), equalTo(404)); + + SubmitAsyncSearchRequest newReq = new SubmitAsyncSearchRequest(indexName); + newReq.getSearchRequest().source( + new SearchSourceBuilder().aggregation(new CancellingAggregationBuilder("test")) + ); + newReq.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1)); + AsyncSearchResponse newResp = submitAsyncSearch(newReq); + assertNotNull(newResp.getSearchResponse()); + assertTrue(newResp.isRunning()); + assertThat(newResp.getSearchResponse().getTotalShards(), equalTo(numShards)); + assertThat(newResp.getSearchResponse().getSuccessfulShards(), equalTo(0)); + assertThat(newResp.getSearchResponse().getFailedShards(), equalTo(0)); + long expirationTime = newResp.getExpirationTime(); + + // check garbage collection + newResp = getAsyncSearch(newResp.getId(), TimeValue.timeValueMillis(1)); + assertThat(newResp.getExpirationTime(), lessThan(expirationTime)); + ensureTaskNotRunning(newResp.getId()); + ensureTaskRemoval(newResp.getId()); + } } diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIndexServiceTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIndexServiceTests.java index e115c212008c1..dc33276cacdfe 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIndexServiceTests.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIndexServiceTests.java @@ -44,7 +44,7 @@ public void testEncodeSearchResponse() throws IOException { for (int i = 0; i < 10; i++) { AsyncSearchResponse response = randomAsyncSearchResponse(randomSearchId(), randomSearchResponse()); String encoded = indexService.encodeResponse(response); - AsyncSearchResponse same = indexService.decodeResponse(encoded); + AsyncSearchResponse same = indexService.decodeResponse(encoded, response.getExpirationTime()); assertEqualResponses(response, same); } } diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java index f140c218d714c..0deda3fc21aa5 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.search; import org.apache.lucene.search.TotalHits; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; @@ -46,6 +47,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.xpack.search.AsyncSearchIndexService.INDEX; +import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -58,6 +60,14 @@ protected Collection> nodePlugins() { SearchTestPlugin.class, ReindexPlugin.class); } + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(0)) + .put(ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(100)) + .build(); + } + /** * Restart the node that runs the {@link TaskId} decoded from the provided {@link AsyncSearchId}. */ @@ -83,6 +93,10 @@ protected AsyncSearchResponse getAsyncSearch(String id) throws ExecutionExceptio return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncSearchAction.Request(id)).get(); } + protected AsyncSearchResponse getAsyncSearch(String id, TimeValue keepAlive) throws ExecutionException, InterruptedException { + return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncSearchAction.Request(id).setKeepAlive(keepAlive)).get(); + } + protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException { return client().execute(DeleteAsyncSearchAction.INSTANCE, new DeleteAsyncSearchAction.Request(id)).get(); } @@ -101,6 +115,19 @@ protected void ensureTaskRemoval(String id) throws Exception { }); } + protected void ensureTaskNotRunning(String id) throws Exception { + assertBusy(() -> { + try { + AsyncSearchResponse resp = getAsyncSearch(id); + assertFalse(resp.isRunning()); + } catch (Exception exc) { + if (ExceptionsHelper.unwrapCause(exc.getCause()) instanceof ResourceNotFoundException == false) { + throw exc; + } + } + }); + } + /** * Wait the completion of the {@link TaskId} decoded from the provided {@link AsyncSearchId}. */ diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/GetAsyncSearchRequestTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/GetAsyncSearchRequestTests.java index 5e3ec4ded0ec8..831dfc0e91ffd 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/GetAsyncSearchRequestTests.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/GetAsyncSearchRequestTests.java @@ -34,8 +34,4 @@ static String randomSearchId() { return AsyncSearchId.encode(UUIDs.randomBase64UUID(), new TaskId(randomAlphaOfLengthBetween(10, 20), randomLongBetween(0, Long.MAX_VALUE))); } - - public void testValidateWaitForCompletion() { - - } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java index 8e362136e19fc..bb988cb1fadba 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java @@ -33,7 +33,7 @@ public class AsyncSearchResponse extends ActionResponse implements StatusToXCont private final boolean isPartial; private final long startTimeMillis; - private final long expirationTimeMillis; + private long expirationTimeMillis; /** * Creates an {@link AsyncSearchResponse} with meta-information only (not-modified). @@ -74,13 +74,18 @@ public AsyncSearchResponse(String id, } public AsyncSearchResponse(StreamInput in) throws IOException { + this(in, null); + } + + public AsyncSearchResponse(StreamInput in, Long expirationTime) throws IOException { this.id = in.readOptionalString(); this.error = in.readOptionalWriteable(ElasticsearchException::new); this.searchResponse = in.readOptionalWriteable(SearchResponse::new); this.isPartial = in.readBoolean(); this.isRunning = in.readBoolean(); this.startTimeMillis = in.readLong(); - this.expirationTimeMillis = in.readLong(); + long origExpiration = in.readLong(); + this.expirationTimeMillis = expirationTime == null ? origExpiration : expirationTime; } @Override @@ -157,6 +162,10 @@ public long getExpirationTime() { return expirationTimeMillis; } + public void setExpirationTime(long expirationTimeMillis) { + this.expirationTimeMillis = expirationTimeMillis; + } + @Override public RestStatus status() { if (searchResponse == null || isPartial) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/GetAsyncSearchAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/GetAsyncSearchAction.java index fe4801aab4a10..30aceb8d43d9c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/GetAsyncSearchAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/GetAsyncSearchAction.java @@ -16,9 +16,6 @@ import java.io.IOException; import java.util.Objects; -import static org.elasticsearch.action.ValidateActions.addValidationError; -import static org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest.MIN_KEEP_ALIVE; - public class GetAsyncSearchAction extends ActionType { public static final GetAsyncSearchAction INSTANCE = new GetAsyncSearchAction(); public static final String NAME = "indices:data/read/async_search/get"; @@ -63,12 +60,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public ActionRequestValidationException validate() { - ActionRequestValidationException validationException = null; - if (keepAlive.getMillis() != -1 && keepAlive.getMillis() < MIN_KEEP_ALIVE) { - validationException = - addValidationError("keep_alive must be greater than 1 minute, got:" + keepAlive.toString(), validationException); - } - return validationException; + return null; } /**