Skip to content

Commit 244fc95

Browse files
authored
Extend async search keep alive (elastic#67877)
There can be a race between two GET async search requests, and the one with a lower keep_alive parameter wins the race. This scenario is not desirable as we should retain the search result for all requests. This commit ensures the keep_alive is extended and never goes backward.
1 parent 3d2e82f commit 244fc95

File tree

11 files changed

+229
-45
lines changed

11 files changed

+229
-45
lines changed

x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import org.apache.lucene.store.AlreadyClosedException;
1010
import org.elasticsearch.ExceptionsHelper;
11+
import org.elasticsearch.action.ActionRequestValidationException;
1112
import org.elasticsearch.action.index.IndexRequestBuilder;
1213
import org.elasticsearch.common.settings.Settings;
1314
import org.elasticsearch.common.unit.TimeValue;
@@ -345,7 +346,7 @@ public void testUpdateRunningKeepAlive() throws Exception {
345346
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
346347
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
347348

348-
response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10));
349+
response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(6));
349350
assertThat(response.getExpirationTime(), greaterThan(expirationTime));
350351

351352
assertTrue(response.isRunning());
@@ -364,8 +365,13 @@ public void testUpdateRunningKeepAlive() throws Exception {
364365
assertEquals(0, statusResponse.getSkippedShards());
365366
assertEquals(null, statusResponse.getCompletionStatus());
366367

367-
response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1));
368-
assertThat(response.getExpirationTime(), lessThan(expirationTime));
368+
expirationTime = response.getExpirationTime();
369+
response = getAsyncSearch(response.getId(), TimeValue.timeValueMinutes(between(1, 24 * 60)));
370+
assertThat(response.getExpirationTime(), equalTo(expirationTime));
371+
response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10));
372+
assertThat(response.getExpirationTime(), greaterThan(expirationTime));
373+
374+
deleteAsyncSearch(response.getId());
369375
ensureTaskNotRunning(response.getId());
370376
ensureTaskRemoval(response.getId());
371377
}
@@ -391,16 +397,21 @@ public void testUpdateStoreKeepAlive() throws Exception {
391397
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards));
392398
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
393399

394-
response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10));
400+
response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(8));
395401
assertThat(response.getExpirationTime(), greaterThan(expirationTime));
402+
expirationTime = response.getExpirationTime();
396403

397404
assertFalse(response.isRunning());
398405
assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
399406
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards));
400407
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
401408

402409
response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1));
403-
assertThat(response.getExpirationTime(), lessThan(expirationTime));
410+
assertThat(response.getExpirationTime(), equalTo(expirationTime));
411+
response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10));
412+
assertThat(response.getExpirationTime(), greaterThan(expirationTime));
413+
414+
deleteAsyncSearch(response.getId());
404415
ensureTaskNotRunning(response.getId());
405416
ensureTaskRemoval(response.getId());
406417
}
@@ -427,22 +438,24 @@ public void testRemoveAsyncIndex() throws Exception {
427438
ExceptionsHelper.unwrapCause(exc.getCause()) : ExceptionsHelper.unwrapCause(exc);
428439
assertThat(ExceptionsHelper.status(cause).getStatus(), equalTo(404));
429440

430-
SubmitAsyncSearchRequest newReq = new SubmitAsyncSearchRequest(indexName);
441+
SubmitAsyncSearchRequest newReq = new SubmitAsyncSearchRequest(indexName) {
442+
@Override
443+
public ActionRequestValidationException validate() {
444+
return null; // to use a small keep_alive
445+
}
446+
};
431447
newReq.getSearchRequest().source(
432448
new SearchSourceBuilder().aggregation(new CancellingAggregationBuilder("test", randomLong()))
433449
);
434-
newReq.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
450+
newReq.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1)).setKeepAlive(TimeValue.timeValueSeconds(5));
435451
AsyncSearchResponse newResp = submitAsyncSearch(newReq);
436452
assertNotNull(newResp.getSearchResponse());
437453
assertTrue(newResp.isRunning());
438454
assertThat(newResp.getSearchResponse().getTotalShards(), equalTo(numShards));
439455
assertThat(newResp.getSearchResponse().getSuccessfulShards(), equalTo(0));
440456
assertThat(newResp.getSearchResponse().getFailedShards(), equalTo(0));
441-
long expirationTime = newResp.getExpirationTime();
442457

443458
// check garbage collection
444-
newResp = getAsyncSearch(newResp.getId(), TimeValue.timeValueMillis(1));
445-
assertThat(newResp.getExpirationTime(), lessThan(expirationTime));
446459
ensureTaskNotRunning(newResp.getId());
447460
ensureTaskRemoval(newResp.getId());
448461
}

x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.plugins.Plugin;
2424
import org.elasticsearch.plugins.SearchPlugin;
2525
import org.elasticsearch.rest.RestStatus;
26+
import org.elasticsearch.script.MockScriptPlugin;
2627
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
2728
import org.elasticsearch.search.builder.PointInTimeBuilder;
2829
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -57,11 +58,15 @@
5758
import java.util.Collections;
5859
import java.util.Iterator;
5960
import java.util.List;
61+
import java.util.Map;
6062
import java.util.concurrent.ExecutionException;
63+
import java.util.concurrent.TimeUnit;
6164
import java.util.concurrent.atomic.AtomicBoolean;
65+
import java.util.function.Function;
6266

6367
import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX;
6468
import static org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
69+
import static org.hamcrest.Matchers.contains;
6570
import static org.hamcrest.Matchers.equalTo;
6671
import static org.hamcrest.Matchers.lessThanOrEqualTo;
6772

@@ -93,6 +98,44 @@ public List<AggregationSpec> getAggregations() {
9398
}
9499
}
95100

101+
public static class ExpirationTimeScriptPlugin extends MockScriptPlugin {
102+
@Override
103+
public String pluginScriptLang() {
104+
return "painless";
105+
}
106+
107+
@Override
108+
@SuppressWarnings("unchecked")
109+
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
110+
final String fieldName = "expiration_time";
111+
final String script =
112+
" if (ctx._source.expiration_time < params.expiration_time) { " +
113+
" ctx._source.expiration_time = params.expiration_time; " +
114+
" } else { " +
115+
" ctx.op = \"noop\"; " +
116+
" }";
117+
return Map.of(
118+
script, vars -> {
119+
Map<String, Object> params = (Map<String, Object>) vars.get("params");
120+
assertNotNull(params);
121+
assertThat(params.keySet(), contains(fieldName));
122+
long updatingValue = (long) params.get(fieldName);
123+
124+
Map<String, Object> ctx = (Map<String, Object>) vars.get("ctx");
125+
assertNotNull(ctx);
126+
Map<String, Object> source = (Map<String, Object>) ctx.get("_source");
127+
long currentValue = (long) source.get(fieldName);
128+
if (currentValue < updatingValue) {
129+
source.put(fieldName, updatingValue);
130+
} else {
131+
ctx.put("op", "noop");
132+
}
133+
return ctx;
134+
}
135+
);
136+
}
137+
}
138+
96139
@Before
97140
public void startMaintenanceService() {
98141
for (AsyncTaskMaintenanceService service : internalCluster().getDataNodeInstances(AsyncTaskMaintenanceService.class)) {
@@ -120,7 +163,7 @@ public void releaseQueryLatch() {
120163
@Override
121164
protected Collection<Class<? extends Plugin>> nodePlugins() {
122165
return Arrays.asList(LocalStateCompositeXPackPlugin.class, AsyncSearch.class, AsyncResultsIndexPlugin.class, IndexLifecycle.class,
123-
SearchTestPlugin.class, ReindexPlugin.class);
166+
SearchTestPlugin.class, ReindexPlugin.class, ExpirationTimeScriptPlugin.class);
124167
}
125168

126169
@Override
@@ -189,7 +232,7 @@ protected void ensureTaskNotRunning(String id) throws Exception {
189232
throw exc;
190233
}
191234
}
192-
});
235+
}, 30, TimeUnit.SECONDS);
193236
}
194237

195238
/**
@@ -207,7 +250,7 @@ protected void ensureTaskCompletion(String id) throws Exception {
207250
throw exc;
208251
}
209252
}
210-
});
253+
}, 30, TimeUnit.SECONDS);
211254
}
212255

213256
/**

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.List;
3939
import java.util.Map;
4040
import java.util.concurrent.atomic.AtomicBoolean;
41+
import java.util.concurrent.atomic.AtomicLong;
4142
import java.util.concurrent.atomic.AtomicReference;
4243
import java.util.function.Consumer;
4344
import java.util.function.Supplier;
@@ -62,7 +63,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
6263
private final List<Runnable> initListeners = new ArrayList<>();
6364
private final Map<Long, Consumer<AsyncSearchResponse>> completionListeners = new HashMap<>();
6465

65-
private volatile long expirationTimeMillis;
66+
private final AtomicLong expirationTimeMillis;
6667
private final AtomicBoolean isCancelling = new AtomicBoolean(false);
6768

6869
private final AtomicReference<MutableSearchResponse> searchResponse = new AtomicReference<>();
@@ -93,7 +94,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
9394
ThreadPool threadPool,
9495
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier) {
9596
super(id, type, action, () -> "async_search{" + descriptionSupplier.get() + "}", parentTaskId, taskHeaders);
96-
this.expirationTimeMillis = getStartTime() + keepAlive.getMillis();
97+
this.expirationTimeMillis = new AtomicLong(getStartTime() + keepAlive.getMillis());
9798
this.originHeaders = originHeaders;
9899
this.searchId = searchId;
99100
this.client = client;
@@ -127,8 +128,8 @@ Listener getSearchProgressActionListener() {
127128
* Update the expiration time of the (partial) response.
128129
*/
129130
@Override
130-
public void setExpirationTime(long expirationTimeMillis) {
131-
this.expirationTimeMillis = expirationTimeMillis;
131+
public void extendExpirationTime(long newExpirationTimeMillis) {
132+
this.expirationTimeMillis.updateAndGet(curr -> Math.max(curr, newExpirationTimeMillis));
132133
}
133134

134135
@Override
@@ -330,19 +331,19 @@ private AsyncSearchResponse getResponse(boolean restoreResponseHeaders) {
330331
checkCancellation();
331332
AsyncSearchResponse asyncSearchResponse;
332333
try {
333-
asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis, restoreResponseHeaders);
334+
asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis.get(), restoreResponseHeaders);
334335
} catch(Exception e) {
335336
ElasticsearchException exception = new ElasticsearchStatusException("Async search: error while reducing partial results",
336337
ExceptionsHelper.status(e), e);
337-
asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis, exception);
338+
asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis.get(), exception);
338339
}
339340
return asyncSearchResponse;
340341
}
341342

342343
// checks if the search task should be cancelled
343344
private synchronized void checkCancellation() {
344345
long now = System.currentTimeMillis();
345-
if (hasCompleted == false && expirationTimeMillis < now) {
346+
if (hasCompleted == false && expirationTimeMillis.get() < now) {
346347
// we cancel expired search task even if they are still running
347348
cancelTask(() -> {}, "async search has expired");
348349
}
@@ -354,7 +355,7 @@ private synchronized void checkCancellation() {
354355
public AsyncStatusResponse getStatusResponse() {
355356
MutableSearchResponse mutableSearchResponse = searchResponse.get();
356357
assert mutableSearchResponse != null;
357-
return mutableSearchResponse.toStatusResponse(searchId.getEncoded(), getStartTime(), expirationTimeMillis);
358+
return mutableSearchResponse.toStatusResponse(searchId.getEncoded(), getStartTime(), expirationTimeMillis.get());
358359
}
359360

360361
class Listener extends SearchProgressActionListener {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void retrieveResult(GetAsyncResultRequest request, ActionListener<Respons
8181
// EQL doesn't store initial or intermediate results so we only need to update expiration time in store for only in case of
8282
// async search
8383
if (updateInitialResultsInStore & expirationTime > 0) {
84-
store.updateExpirationTime(searchId.getDocId(), expirationTime,
84+
store.extendExpirationTime(searchId.getDocId(), expirationTime,
8585
ActionListener.wrap(
8686
p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener),
8787
exc -> {
@@ -123,7 +123,7 @@ private void getSearchResponseFromTask(AsyncExecutionId searchId,
123123
}
124124

125125
if (expirationTimeMillis != -1) {
126-
task.setExpirationTime(expirationTimeMillis);
126+
task.extendExpirationTime(expirationTimeMillis);
127127
}
128128
addCompletionListener.apply(task, new ActionListener<>() {
129129
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ public interface AsyncTask {
3030
boolean isCancelled();
3131

3232
/**
33-
* Update the expiration time of the (partial) response.
33+
* Extends the expiration time of the (partial) response if needed
3434
*/
35-
void setExpirationTime(long expirationTimeMillis);
35+
void extendExpirationTime(long newExpirationTimeMillis);
3636

3737
/**
3838
* Performs necessary checks, cancels the task and calls the runnable upon completion

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.elasticsearch.common.xcontent.XContentBuilder;
3333
import org.elasticsearch.common.xcontent.XContentType;
3434
import org.elasticsearch.indices.SystemIndexDescriptor;
35+
import org.elasticsearch.script.Script;
36+
import org.elasticsearch.script.ScriptType;
3537
import org.elasticsearch.tasks.Task;
3638
import org.elasticsearch.tasks.TaskManager;
3739
import org.elasticsearch.xpack.core.XPackPlugin;
@@ -45,7 +47,6 @@
4547
import java.io.UncheckedIOException;
4648
import java.nio.ByteBuffer;
4749
import java.util.Base64;
48-
import java.util.Collections;
4950
import java.util.HashMap;
5051
import java.util.List;
5152
import java.util.Map;
@@ -64,6 +65,13 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
6465
public static final String HEADERS_FIELD = "headers";
6566
public static final String RESPONSE_HEADERS_FIELD = "response_headers";
6667
public static final String EXPIRATION_TIME_FIELD = "expiration_time";
68+
public static final String EXPIRATION_TIME_SCRIPT =
69+
" if (ctx._source.expiration_time < params.expiration_time) { " +
70+
" ctx._source.expiration_time = params.expiration_time; " +
71+
" } else { " +
72+
" ctx.op = \"noop\"; " +
73+
" }";
74+
6775
public static final String RESULT_FIELD = "result";
6876

6977
// Usually the settings, mappings and system index descriptor below
@@ -196,16 +204,15 @@ public void updateResponse(String docId,
196204
}
197205

198206
/**
199-
* Updates the expiration time of the provided <code>docId</code> if the place-holder
200-
* document is still present (update).
207+
* Extends the expiration time of the provided <code>docId</code> if the place-holder document is still present (update).
201208
*/
202-
public void updateExpirationTime(String docId,
203-
long expirationTimeMillis,
204-
ActionListener<UpdateResponse> listener) {
205-
Map<String, Object> source = Collections.singletonMap(EXPIRATION_TIME_FIELD, expirationTimeMillis);
206-
UpdateRequest request = new UpdateRequest().index(index)
209+
public void extendExpirationTime(String docId, long expirationTimeMillis, ActionListener<UpdateResponse> listener) {
210+
Script script = new Script(ScriptType.INLINE, "painless", EXPIRATION_TIME_SCRIPT,
211+
Map.of(EXPIRATION_TIME_FIELD, expirationTimeMillis));
212+
UpdateRequest request = new UpdateRequest()
213+
.index(index)
207214
.id(docId)
208-
.doc(source, XContentType.JSON)
215+
.script(script)
209216
.retryOnConflict(5);
210217
client.update(request, listener);
211218
}

0 commit comments

Comments
 (0)