Skip to content

Commit 1d78bd0

Browse files
committed
Async search should retry updates on version conflict (#63652)
* Async search should retry updates on version conflict The _async_search APIs can throw version conflict exception when the internal response is updated concurrently. That can happen if the final response is written while the user extends the expiration time. That scenario should be rare but it happened in Kibana for several users so this change ensures that updates are retried at least 5 times. That should resolve the transient errors for Kibana. This change also preserves the version conflict exception in case the retry didn't work instead of returning a confusing 404. This commit also ensures that we don't delete the response if the search was cancelled internally and not deleted explicitly by the user. Closes #63213
1 parent f4e1e68 commit 1d78bd0

File tree

4 files changed

+45
-18
lines changed

4 files changed

+45
-18
lines changed

x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java

+35
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@
2424
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
2525

2626
import java.util.ArrayList;
27+
import java.util.Collections;
2728
import java.util.HashMap;
2829
import java.util.HashSet;
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.util.Set;
33+
import java.util.concurrent.CountDownLatch;
3234
import java.util.concurrent.ExecutionException;
3335
import java.util.concurrent.atomic.AtomicInteger;
3436

@@ -417,4 +419,37 @@ public void testSearchPhaseFailureNoCause() throws Exception {
417419
assertNotNull(response.getFailure());
418420
ensureTaskNotRunning(response.getId());
419421
}
422+
423+
public void testRetryVersionConflict() throws Exception {
424+
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName);
425+
request.setWaitForCompletionTimeout(TimeValue.timeValueMinutes(10));
426+
request.setKeepOnCompletion(true);
427+
AsyncSearchResponse response = submitAsyncSearch(request);
428+
assertNotNull(response.getSearchResponse());
429+
assertFalse(response.isRunning());
430+
431+
List<Thread> threads = new ArrayList<>();
432+
CountDownLatch latch = new CountDownLatch(1);
433+
List<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
434+
for (int i = 0; i < 2; i++) {
435+
Runnable runnable = () -> {
436+
for (int j = 0; j < 10; j++) {
437+
try {
438+
latch.await();
439+
getAsyncSearch(response.getId(), TimeValue.timeValueMinutes(10));
440+
} catch (Exception exc) {
441+
exceptions.add(exc);
442+
}
443+
}
444+
};
445+
Thread thread = new Thread(runnable);
446+
thread.start();
447+
threads.add(thread);
448+
}
449+
latch.countDown();
450+
for (Thread thread : threads) {
451+
thread.join();
452+
}
453+
assertTrue(exceptions.toString(), exceptions.isEmpty());
454+
}
420455
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java

+1-13
Original file line numberDiff line numberDiff line change
@@ -173,24 +173,12 @@ private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shoul
173173
private void onFinalResponse(AsyncSearchTask searchTask,
174174
AsyncSearchResponse response,
175175
Runnable nextAction) {
176-
if (searchTask.isCancelled()) {
177-
// the task was cancelled so we ensure that there is nothing stored in the response index.
178-
store.deleteResponse(searchTask.getExecutionId(), ActionListener.wrap(
179-
resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
180-
exc -> {
181-
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]",
182-
searchTask.getExecutionId().getEncoded()), exc);
183-
unregisterTaskAndMoveOn(searchTask, nextAction);
184-
}));
185-
return;
186-
}
187-
188176
store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response,
189177
ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
190178
exc -> {
191179
Throwable cause = ExceptionsHelper.unwrapCause(exc);
192180
if (cause instanceof DocumentMissingException == false &&
193-
cause instanceof VersionConflictEngineException == false) {
181+
cause instanceof VersionConflictEngineException == false) {
194182
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
195183
searchTask.getExecutionId().getEncoded()), exc);
196184
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,16 @@ public void retrieveResult(GetAsyncResultRequest request, ActionListener<Respons
8585
ActionListener.wrap(
8686
p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener),
8787
exc -> {
88-
//don't log when: the async search document or its index is not found. That can happen if an invalid
89-
//search id is provided or no async search initial response has been stored yet.
9088
RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc));
9189
if (status != RestStatus.NOT_FOUND) {
9290
logger.error(() -> new ParameterizedMessage("failed to update expiration time for async-search [{}]",
9391
searchId.getEncoded()), exc);
92+
listener.onFailure(exc);
93+
} else {
94+
//the async search document or its index is not found.
95+
//That can happen if an invalid/deleted search id is provided.
96+
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
9497
}
95-
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
9698
}
9799
));
98100
} else {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ public void updateResponse(String docId,
193193
UpdateRequest request = new UpdateRequest()
194194
.index(index)
195195
.id(docId)
196-
.doc(source, XContentType.JSON);
196+
.doc(source, XContentType.JSON)
197+
.retryOnConflict(5);
197198
client.update(request, listener);
198199
} catch(Exception e) {
199200
listener.onFailure(e);
@@ -210,7 +211,8 @@ public void updateExpirationTime(String docId,
210211
Map<String, Object> source = Collections.singletonMap(EXPIRATION_TIME_FIELD, expirationTimeMillis);
211212
UpdateRequest request = new UpdateRequest().index(index)
212213
.id(docId)
213-
.doc(source, XContentType.JSON);
214+
.doc(source, XContentType.JSON)
215+
.retryOnConflict(5);
214216
client.update(request, listener);
215217
}
216218

0 commit comments

Comments
 (0)