Skip to content

Commit e7b4b01

Browse files
authored
Revert "Extend async search keep alive (#67877)" (#68855)
This reverts commit 244fc95.
1 parent c92f217 commit e7b4b01

File tree

11 files changed

+43
-211
lines changed

11 files changed

+43
-211
lines changed

x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ public void testUpdateRunningKeepAlive() throws Exception {
344344
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
345345
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
346346

347-
response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(6));
347+
response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10));
348348
assertThat(response.getExpirationTime(), greaterThan(expirationTime));
349349

350350
assertTrue(response.isRunning());
@@ -363,13 +363,8 @@ public void testUpdateRunningKeepAlive() throws Exception {
363363
assertEquals(0, statusResponse.getSkippedShards());
364364
assertEquals(null, statusResponse.getCompletionStatus());
365365

366-
expirationTime = response.getExpirationTime();
367-
response = getAsyncSearch(response.getId(), TimeValue.timeValueMinutes(between(1, 24 * 60)));
368-
assertThat(response.getExpirationTime(), equalTo(expirationTime));
369-
response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10));
370-
assertThat(response.getExpirationTime(), greaterThan(expirationTime));
371-
372-
deleteAsyncSearch(response.getId());
366+
response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1));
367+
assertThat(response.getExpirationTime(), lessThan(expirationTime));
373368
ensureTaskNotRunning(response.getId());
374369
ensureTaskRemoval(response.getId());
375370
}
@@ -395,21 +390,16 @@ public void testUpdateStoreKeepAlive() throws Exception {
395390
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards));
396391
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
397392

398-
response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(8));
393+
response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10));
399394
assertThat(response.getExpirationTime(), greaterThan(expirationTime));
400-
expirationTime = response.getExpirationTime();
401395

402396
assertFalse(response.isRunning());
403397
assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
404398
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards));
405399
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
406400

407401
response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1));
408-
assertThat(response.getExpirationTime(), equalTo(expirationTime));
409-
response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10));
410-
assertThat(response.getExpirationTime(), greaterThan(expirationTime));
411-
412-
deleteAsyncSearch(response.getId());
402+
assertThat(response.getExpirationTime(), lessThan(expirationTime));
413403
ensureTaskNotRunning(response.getId());
414404
ensureTaskRemoval(response.getId());
415405
}
@@ -447,8 +437,11 @@ public void testRemoveAsyncIndex() throws Exception {
447437
assertThat(newResp.getSearchResponse().getTotalShards(), equalTo(numShards));
448438
assertThat(newResp.getSearchResponse().getSuccessfulShards(), equalTo(0));
449439
assertThat(newResp.getSearchResponse().getFailedShards(), equalTo(0));
440+
long expirationTime = newResp.getExpirationTime();
450441

451442
// check garbage collection
443+
newResp = getAsyncSearch(newResp.getId(), TimeValue.timeValueMillis(1));
444+
assertThat(newResp.getExpirationTime(), lessThan(expirationTime));
452445
ensureTaskNotRunning(newResp.getId());
453446
ensureTaskRemoval(newResp.getId());
454447
}

x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java

Lines changed: 3 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.plugins.Plugin;
2525
import org.elasticsearch.plugins.SearchPlugin;
2626
import org.elasticsearch.rest.RestStatus;
27-
import org.elasticsearch.script.MockScriptPlugin;
2827
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
2928
import org.elasticsearch.search.builder.PointInTimeBuilder;
3029
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -59,15 +58,11 @@
5958
import java.util.Collections;
6059
import java.util.Iterator;
6160
import java.util.List;
62-
import java.util.Map;
6361
import java.util.concurrent.ExecutionException;
64-
import java.util.concurrent.TimeUnit;
6562
import java.util.concurrent.atomic.AtomicBoolean;
66-
import java.util.function.Function;
6763

6864
import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX;
6965
import static org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
70-
import static org.hamcrest.Matchers.contains;
7166
import static org.hamcrest.Matchers.equalTo;
7267
import static org.hamcrest.Matchers.lessThanOrEqualTo;
7368

@@ -99,44 +94,6 @@ public List<AggregationSpec> getAggregations() {
9994
}
10095
}
10196

102-
public static class ExpirationTimeScriptPlugin extends MockScriptPlugin {
103-
@Override
104-
public String pluginScriptLang() {
105-
return "painless";
106-
}
107-
108-
@Override
109-
@SuppressWarnings("unchecked")
110-
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
111-
final String fieldName = "expiration_time";
112-
final String script =
113-
" if (ctx._source.expiration_time < params.expiration_time) { " +
114-
" ctx._source.expiration_time = params.expiration_time; " +
115-
" } else { " +
116-
" ctx.op = \"noop\"; " +
117-
" }";
118-
return Map.of(
119-
script, vars -> {
120-
Map<String, Object> params = (Map<String, Object>) vars.get("params");
121-
assertNotNull(params);
122-
assertThat(params.keySet(), contains(fieldName));
123-
long updatingValue = (long) params.get(fieldName);
124-
125-
Map<String, Object> ctx = (Map<String, Object>) vars.get("ctx");
126-
assertNotNull(ctx);
127-
Map<String, Object> source = (Map<String, Object>) ctx.get("_source");
128-
long currentValue = (long) source.get(fieldName);
129-
if (currentValue < updatingValue) {
130-
source.put(fieldName, updatingValue);
131-
} else {
132-
ctx.put("op", "noop");
133-
}
134-
return ctx;
135-
}
136-
);
137-
}
138-
}
139-
14097
@Before
14198
public void startMaintenanceService() {
14299
for (AsyncTaskMaintenanceService service : internalCluster().getDataNodeInstances(AsyncTaskMaintenanceService.class)) {
@@ -164,7 +121,7 @@ public void releaseQueryLatch() {
164121
@Override
165122
protected Collection<Class<? extends Plugin>> nodePlugins() {
166123
return Arrays.asList(LocalStateCompositeXPackPlugin.class, AsyncSearch.class, AsyncResultsIndexPlugin.class, IndexLifecycle.class,
167-
SearchTestPlugin.class, ReindexPlugin.class, ExpirationTimeScriptPlugin.class);
124+
SearchTestPlugin.class, ReindexPlugin.class);
168125
}
169126

170127
@Override
@@ -233,7 +190,7 @@ protected void ensureTaskNotRunning(String id) throws Exception {
233190
throw exc;
234191
}
235192
}
236-
}, 30, TimeUnit.SECONDS);
193+
});
237194
}
238195

239196
/**
@@ -251,7 +208,7 @@ protected void ensureTaskCompletion(String id) throws Exception {
251208
throw exc;
252209
}
253210
}
254-
}, 30, TimeUnit.SECONDS);
211+
});
255212
}
256213

257214
/**

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import java.util.List;
4040
import java.util.Map;
4141
import java.util.concurrent.atomic.AtomicBoolean;
42-
import java.util.concurrent.atomic.AtomicLong;
4342
import java.util.concurrent.atomic.AtomicReference;
4443
import java.util.function.Consumer;
4544
import java.util.function.Supplier;
@@ -64,7 +63,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
6463
private final List<Runnable> initListeners = new ArrayList<>();
6564
private final Map<Long, Consumer<AsyncSearchResponse>> completionListeners = new HashMap<>();
6665

67-
private final AtomicLong expirationTimeMillis;
66+
private volatile long expirationTimeMillis;
6867
private final AtomicBoolean isCancelling = new AtomicBoolean(false);
6968

7069
private final AtomicReference<MutableSearchResponse> searchResponse = new AtomicReference<>();
@@ -95,7 +94,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
9594
ThreadPool threadPool,
9695
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier) {
9796
super(id, type, action, () -> "async_search{" + descriptionSupplier.get() + "}", parentTaskId, taskHeaders);
98-
this.expirationTimeMillis = new AtomicLong(getStartTime() + keepAlive.getMillis());
97+
this.expirationTimeMillis = getStartTime() + keepAlive.getMillis();
9998
this.originHeaders = originHeaders;
10099
this.searchId = searchId;
101100
this.client = client;
@@ -129,8 +128,8 @@ Listener getSearchProgressActionListener() {
129128
* Update the expiration time of the (partial) response.
130129
*/
131130
@Override
132-
public void extendExpirationTime(long newExpirationTimeMillis) {
133-
this.expirationTimeMillis.updateAndGet(curr -> Math.max(curr, newExpirationTimeMillis));
131+
public void setExpirationTime(long expirationTimeMillis) {
132+
this.expirationTimeMillis = expirationTimeMillis;
134133
}
135134

136135
@Override
@@ -332,19 +331,19 @@ private AsyncSearchResponse getResponse(boolean restoreResponseHeaders) {
332331
checkCancellation();
333332
AsyncSearchResponse asyncSearchResponse;
334333
try {
335-
asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis.get(), restoreResponseHeaders);
334+
asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis, restoreResponseHeaders);
336335
} catch(Exception e) {
337336
ElasticsearchException exception = new ElasticsearchStatusException("Async search: error while reducing partial results",
338337
ExceptionsHelper.status(e), e);
339-
asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis.get(), exception);
338+
asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis, exception);
340339
}
341340
return asyncSearchResponse;
342341
}
343342

344343
// checks if the search task should be cancelled
345344
private synchronized void checkCancellation() {
346345
long now = System.currentTimeMillis();
347-
if (hasCompleted == false && expirationTimeMillis.get() < now) {
346+
if (hasCompleted == false && expirationTimeMillis < now) {
348347
// we cancel expired search task even if they are still running
349348
cancelTask(() -> {}, "async search has expired");
350349
}
@@ -356,7 +355,7 @@ private synchronized void checkCancellation() {
356355
public AsyncStatusResponse getStatusResponse() {
357356
MutableSearchResponse mutableSearchResponse = searchResponse.get();
358357
assert mutableSearchResponse != null;
359-
return mutableSearchResponse.toStatusResponse(searchId.getEncoded(), getStartTime(), expirationTimeMillis.get());
358+
return mutableSearchResponse.toStatusResponse(searchId.getEncoded(), getStartTime(), expirationTimeMillis);
360359
}
361360

362361
class Listener extends SearchProgressActionListener {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void retrieveResult(GetAsyncResultRequest request, ActionListener<Respons
8282
// EQL doesn't store initial or intermediate results so we only need to update expiration time in store for only in case of
8383
// async search
8484
if (updateInitialResultsInStore & expirationTime > 0) {
85-
store.extendExpirationTime(searchId.getDocId(), expirationTime,
85+
store.updateExpirationTime(searchId.getDocId(), expirationTime,
8686
ActionListener.wrap(
8787
p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener),
8888
exc -> {
@@ -124,7 +124,7 @@ private void getSearchResponseFromTask(AsyncExecutionId searchId,
124124
}
125125

126126
if (expirationTimeMillis != -1) {
127-
task.extendExpirationTime(expirationTimeMillis);
127+
task.setExpirationTime(expirationTimeMillis);
128128
}
129129
addCompletionListener.apply(task, new ActionListener<>() {
130130
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ public interface AsyncTask {
3131
boolean isCancelled();
3232

3333
/**
34-
* Extends the expiration time of the (partial) response if needed
34+
* Update the expiration time of the (partial) response.
3535
*/
36-
void extendExpirationTime(long newExpirationTimeMillis);
36+
void setExpirationTime(long expirationTimeMillis);
3737

3838
/**
3939
* Performs necessary checks, cancels the task and calls the runnable upon completion

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333
import org.elasticsearch.common.xcontent.XContentBuilder;
3434
import org.elasticsearch.common.xcontent.XContentType;
3535
import org.elasticsearch.indices.SystemIndexDescriptor;
36-
import org.elasticsearch.script.Script;
37-
import org.elasticsearch.script.ScriptType;
3836
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
3937
import org.elasticsearch.tasks.Task;
4038
import org.elasticsearch.tasks.TaskManager;
@@ -49,6 +47,7 @@
4947
import java.io.UncheckedIOException;
5048
import java.nio.ByteBuffer;
5149
import java.util.Base64;
50+
import java.util.Collections;
5251
import java.util.HashMap;
5352
import java.util.List;
5453
import java.util.Map;
@@ -67,13 +66,6 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
6766
public static final String HEADERS_FIELD = "headers";
6867
public static final String RESPONSE_HEADERS_FIELD = "response_headers";
6968
public static final String EXPIRATION_TIME_FIELD = "expiration_time";
70-
public static final String EXPIRATION_TIME_SCRIPT =
71-
" if (ctx._source.expiration_time < params.expiration_time) { " +
72-
" ctx._source.expiration_time = params.expiration_time; " +
73-
" } else { " +
74-
" ctx.op = \"noop\"; " +
75-
" }";
76-
7769
public static final String RESULT_FIELD = "result";
7870

7971
// Usually the settings, mappings and system index descriptor below
@@ -222,15 +214,16 @@ public void updateResponse(String docId,
222214
}
223215

224216
/**
225-
* Extends the expiration time of the provided <code>docId</code> if the place-holder document is still present (update).
217+
* Updates the expiration time of the provided <code>docId</code> if the place-holder
218+
* document is still present (update).
226219
*/
227-
public void extendExpirationTime(String docId, long expirationTimeMillis, ActionListener<UpdateResponse> listener) {
228-
Script script = new Script(ScriptType.INLINE, "painless", EXPIRATION_TIME_SCRIPT,
229-
Map.of(EXPIRATION_TIME_FIELD, expirationTimeMillis));
230-
UpdateRequest request = new UpdateRequest()
231-
.index(index)
220+
public void updateExpirationTime(String docId,
221+
long expirationTimeMillis,
222+
ActionListener<UpdateResponse> listener) {
223+
Map<String, Object> source = Collections.singletonMap(EXPIRATION_TIME_FIELD, expirationTimeMillis);
224+
UpdateRequest request = new UpdateRequest().index(index)
232225
.id(docId)
233-
.script(script)
226+
.doc(source, XContentType.JSON)
234227
.retryOnConflict(5);
235228
clientWithOrigin.update(request, listener);
236229
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.action.update.UpdateResponse;
1515
import org.elasticsearch.cluster.service.ClusterService;
1616
import org.elasticsearch.common.unit.TimeValue;
17-
import org.elasticsearch.plugins.Plugin;
1817
import org.elasticsearch.tasks.CancellableTask;
1918
import org.elasticsearch.tasks.Task;
2019
import org.elasticsearch.tasks.TaskId;
@@ -25,7 +24,6 @@
2524
import org.elasticsearch.xpack.core.async.AsyncSearchIndexServiceTests.TestAsyncResponse;
2625
import org.junit.Before;
2726

28-
import java.util.Collection;
2927
import java.util.HashMap;
3028
import java.util.Map;
3129

@@ -71,8 +69,8 @@ public AsyncExecutionId getExecutionId() {
7169
}
7270

7371
@Override
74-
public void extendExpirationTime(long newExpirationTimeMillis) {
75-
this.expirationTimeMillis = newExpirationTimeMillis;
72+
public void setExpirationTime(long expirationTimeMillis) {
73+
this.expirationTimeMillis = expirationTimeMillis;
7674
}
7775

7876
@Override
@@ -158,7 +156,7 @@ public void testRetrieveFromMemoryWithExpiration() throws Exception {
158156
try {
159157
boolean shouldExpire = randomBoolean();
160158
long expirationTime = System.currentTimeMillis() + randomLongBetween(100000, 1000000) * (shouldExpire ? -1 : 1);
161-
task.extendExpirationTime(expirationTime);
159+
task.setExpirationTime(expirationTime);
162160

163161
if (updateInitialResultsInStore) {
164162
// we need to store initial result
@@ -200,7 +198,7 @@ public void testAssertExpirationPropagation() throws Exception {
200198
TestTask task = (TestTask) taskManager.register("test", "test", request);
201199
try {
202200
long startTime = System.currentTimeMillis();
203-
task.extendExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis());
201+
task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis());
204202

205203
if (updateInitialResultsInStore) {
206204
// we need to store initial result
@@ -238,7 +236,7 @@ public void testRetrieveFromDisk() throws Exception {
238236
TestTask task = (TestTask) taskManager.register("test", "test", request);
239237
try {
240238
long startTime = System.currentTimeMillis();
241-
task.extendExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis());
239+
task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis());
242240

243241
if (updateInitialResultsInStore) {
244242
// we need to store initial result
@@ -276,9 +274,4 @@ public void testRetrieveFromDisk() throws Exception {
276274
deleteService.deleteResponse(new DeleteAsyncResultRequest(task.getExecutionId().getEncoded()), deleteListener);
277275
assertFutureThrows(deleteListener, ResourceNotFoundException.class);
278276
}
279-
280-
@Override
281-
protected Collection<Class<? extends Plugin>> getPlugins() {
282-
return pluginList(ExpirationTimeScriptPlugin.class);
283-
}
284277
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ public void setup() {
5252
protected Collection<Class<? extends Plugin>> getPlugins() {
5353
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.getPlugins());
5454
plugins.add(TestPlugin.class);
55-
plugins.add(ExpirationTimeScriptPlugin.class);
5655
return plugins;
5756
}
5857

@@ -156,7 +155,7 @@ public void testAutoCreateIndex() throws Exception {
156155
// And so does updating the expiration time
157156
{
158157
PlainActionFuture<UpdateResponse> future = PlainActionFuture.newFuture();
159-
indexService.extendExpirationTime("0", 10L, future);
158+
indexService.updateExpirationTime("0", 10L, future);
160159
expectThrows(Exception.class, future::get);
161160
assertSettings();
162161
}
@@ -177,6 +176,4 @@ private void assertSettings() {
177176
Settings expected = AsyncTaskIndexService.settings();
178177
assertEquals(expected, settings.filter(expected::hasValue));
179178
}
180-
181-
182179
}

0 commit comments

Comments
 (0)