From 11d0bce6eebec3a848f0f7e5da495921928dbea5 Mon Sep 17 00:00:00 2001 From: jimczi Date: Wed, 14 Oct 2020 10:50:19 +0200 Subject: [PATCH 1/2] Async search should retry updates on version conflict The _async_search APIs can throw version conflict exception when the internal response is updated concurrently. That can happen if the final response is written while the user extends the expiration time. That scenario should be rare but it happened in Kibana for several users so this change ensures that updates are retried at least 5 times. That should resolve the transient errors for Kibana. This change also preserves the version conflict exception in case the retry didn't work instead of returning a confusing 404. This commit also ensures that we don't delete the response if the search was cancelled internally and not deleted explicitly by the user. Closes #63213 --- .../xpack/search/AsyncSearchActionIT.java | 32 +++++++++++++++++++ .../TransportSubmitAsyncSearchAction.java | 14 +------- .../xpack/core/async/AsyncResultsService.java | 8 +++-- .../core/async/AsyncTaskIndexService.java | 6 ++-- 4 files changed, 42 insertions(+), 18 deletions(-) diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java index dbe0a28fd509d..f20d076c586a5 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; @@ -419,4 +420,35 @@ public void testSearchPhaseFailureNoCause() throws Exception { assertNotNull(response.getFailure()); ensureTaskNotRunning(response.getId()); } + + public void testRetryVersionConflict() throws Exception { + SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName); + request.setWaitForCompletionTimeout(TimeValue.timeValueMinutes(10)); + request.setKeepOnCompletion(true); + AsyncSearchResponse response = submitAsyncSearch(request); + assertNotNull(response.getSearchResponse()); + assertFalse(response.isRunning()); + + List threads = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(1); + for (int i = 0; i < 2; i++) { + Runnable runnable = () -> { + for (int j = 0; j < 10; j++) { + try { + latch.await(); + getAsyncSearch(response.getId(), TimeValue.timeValueMinutes(10)); + } catch (Exception exc) { + throw new AssertionError(exc); + } + } + }; + Thread thread = new Thread(runnable); + thread.start(); + threads.add(thread); + } + latch.countDown(); + for (Thread thread : threads) { + thread.join(); + } + } } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java index 7965774f4869e..5c2b590e5daa0 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java @@ -173,24 +173,12 @@ private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shoul private void onFinalResponse(AsyncSearchTask searchTask, AsyncSearchResponse response, Runnable nextAction) { - if (searchTask.isCancelled()) { - // the task was cancelled so we ensure that there is nothing stored in the response index. - store.deleteResponse(searchTask.getExecutionId(), ActionListener.wrap( - resp -> unregisterTaskAndMoveOn(searchTask, nextAction), - exc -> { - logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", - searchTask.getExecutionId().getEncoded()), exc); - unregisterTaskAndMoveOn(searchTask, nextAction); - })); - return; - } - store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response, ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction), exc -> { Throwable cause = ExceptionsHelper.unwrapCause(exc); if (cause instanceof DocumentMissingException == false && - cause instanceof VersionConflictEngineException == false) { + cause instanceof VersionConflictEngineException == false) { logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", searchTask.getExecutionId().getEncoded()), exc); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java index 43c3de536ed03..d94dc9bf9c95e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java @@ -85,14 +85,16 @@ public void retrieveResult(GetAsyncResultRequest request, ActionListener getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener), exc -> { - //don't log when: the async search document or its index is not found. That can happen if an invalid - //search id is provided or no async search initial response has been stored yet. RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc)); if (status != RestStatus.NOT_FOUND) { logger.error(() -> new ParameterizedMessage("failed to update expiration time for async-search [{}]", searchId.getEncoded()), exc); + listener.onFailure(exc); + } else { + //the async search document or its index is not found. + //That can happen if an invalid/deleted search id is provided. + listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); } - listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); } )); } else { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index 8d6260759294c..4237a0ea70d63 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -193,7 +193,8 @@ public void updateResponse(String docId, UpdateRequest request = new UpdateRequest() .index(index) .id(docId) - .doc(source, XContentType.JSON); + .doc(source, XContentType.JSON) + .retryOnConflict(5); client.update(request, listener); } catch(Exception e) { listener.onFailure(e); @@ -210,7 +211,8 @@ public void updateExpirationTime(String docId, Map source = Collections.singletonMap(EXPIRATION_TIME_FIELD, expirationTimeMillis); UpdateRequest request = new UpdateRequest().index(index) .id(docId) - .doc(source, XContentType.JSON); + .doc(source, XContentType.JSON) + .retryOnConflict(5); client.update(request, listener); } From f2529bbaba80894d302404f90692e2c7d8167fd0 Mon Sep 17 00:00:00 2001 From: jimczi Date: Thu, 15 Oct 2020 15:00:01 +0200 Subject: [PATCH 2/2] cleanup test --- .../org/elasticsearch/xpack/search/AsyncSearchActionIT.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java index f20d076c586a5..b9e2232e0f61b 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -431,6 +432,7 @@ public void testRetryVersionConflict() throws Exception { List threads = new ArrayList<>(); CountDownLatch latch = new CountDownLatch(1); + List exceptions = Collections.synchronizedList(new ArrayList<>()); for (int i = 0; i < 2; i++) { Runnable runnable = () -> { for (int j = 0; j < 10; j++) { @@ -438,7 +440,7 @@ public void testRetryVersionConflict() throws Exception { latch.await(); getAsyncSearch(response.getId(), TimeValue.timeValueMinutes(10)); } catch (Exception exc) { - throw new AssertionError(exc); + exceptions.add(exc); } } }; @@ -450,5 +452,6 @@ public void testRetryVersionConflict() throws Exception { for (Thread thread : threads) { thread.join(); } + assertTrue(exceptions.toString(), exceptions.isEmpty()); } }