Skip to content

Fix expiration time in async search response #55435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public class GetAsyncSearchRequest implements Validatable {
private TimeValue waitForCompletion;
private TimeValue keepAlive;

public static final long MIN_KEEPALIVE = TimeValue.timeValueMinutes(1).millis();

private final String id;

public GetAsyncSearchRequest(String id) {
Expand Down Expand Up @@ -62,14 +60,7 @@ public void setKeepAlive(TimeValue keepAlive) {

@Override
public Optional<ValidationException> validate() {
final ValidationException validationException = new ValidationException();
if (keepAlive != null && keepAlive.getMillis() < MIN_KEEPALIVE) {
validationException.addValidationError("keep_alive must be greater than 1 minute, got: " + keepAlive.toString());
}
if (validationException.validationErrors().isEmpty()) {
return Optional.empty();
}
return Optional.of(validationException);
return Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
*/
public class SubmitAsyncSearchRequest implements Validatable {

public static final int DEFAULT_BATCHED_REDUCE_SIZE = 5;

public static long MIN_KEEP_ALIVE = TimeValue.timeValueMinutes(1).millis();

private TimeValue waitForCompletionTimeout;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
Expand All @@ -39,6 +39,8 @@
import java.util.List;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;

public final class AsyncSearch extends Plugin implements ActionPlugin {
private final Settings settings;

Expand Down Expand Up @@ -84,11 +86,16 @@ public Collection<Object> createComponents(Client client,
AsyncSearchIndexService indexService =
new AsyncSearchIndexService(clusterService, threadPool.getThreadContext(), client, namedWriteableRegistry);
AsyncSearchMaintenanceService maintenanceService =
new AsyncSearchMaintenanceService(nodeEnvironment.nodeId(), threadPool, indexService, TimeValue.timeValueHours(1));
new AsyncSearchMaintenanceService(nodeEnvironment.nodeId(), settings, threadPool, indexService);
clusterService.addListener(maintenanceService);
return Collections.singletonList(maintenanceService);
} else {
return Collections.emptyList();
}
}

@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,17 @@ void getResponse(AsyncSearchId searchId,
return;
}

if (restoreResponseHeaders) {
if (restoreResponseHeaders && get.getSource().containsKey(RESPONSE_HEADERS_FIELD)) {
@SuppressWarnings("unchecked")
Map<String, List<String>> responseHeaders = (Map<String, List<String>>) get.getSource().get(RESPONSE_HEADERS_FIELD);
restoreResponseHeadersContext(securityContext.getThreadContext(), responseHeaders);
}

long expirationTime = (long) get.getSource().get(EXPIRATION_TIME_FIELD);
String encoded = (String) get.getSource().get(RESULT_FIELD);
listener.onResponse(encoded != null ? decodeResponse(encoded) : null);
AsyncSearchResponse response = decodeResponse(encoded);
response.setExpirationTime(expirationTime);
listener.onResponse(encoded != null ? response : null);
},
listener::onFailure
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.gateway.GatewayService;
Expand All @@ -26,13 +28,17 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.xpack.search.AsyncSearchIndexService.EXPIRATION_TIME_FIELD;
import static org.elasticsearch.xpack.search.AsyncSearchIndexService.INDEX;

/**
* A service that runs a periodic cleanup over the async-search index.
*/
class AsyncSearchMaintenanceService implements Releasable, ClusterStateListener {
private static final Logger logger = LogManager.getLogger(AsyncSearchMaintenanceService.class);

public static final Setting<TimeValue> ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING =
Setting.timeSetting("async_search.index_cleanup_interval", TimeValue.timeValueHours(1), Setting.Property.NodeScope);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we plan on using this new setting only in our tests? Or can it be useful for users too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe for debug but I don't think it's useful for users, only for tests at the moment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no way to somehow make this "private" or for use only in our tests? I worry mostly about naming, and the fact that users may end up using it although we would not want them to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value is quite large (1h) so maybe there's some value to change it. I am not sure, I can make it private but that would work only in fantasy integration tests (single vm) so this will come back. I was expecting that the lack of documentation would make this setting an expert thing that users would never heard from ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I see, can you add a comment that this is intentionally undocumented?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 09afaca


private final String localNodeId;
private final ThreadPool threadPool;
private final AsyncSearchIndexService indexService;
Expand All @@ -43,13 +49,13 @@ class AsyncSearchMaintenanceService implements Releasable, ClusterStateListener
private volatile Scheduler.Cancellable cancellable;

AsyncSearchMaintenanceService(String localNodeId,
Settings nodeSettings,
ThreadPool threadPool,
AsyncSearchIndexService indexService,
TimeValue delay) {
AsyncSearchIndexService indexService) {
this.localNodeId = localNodeId;
this.threadPool = threadPool;
this.indexService = indexService;
this.delay = delay;
this.delay = ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings);
}

@Override
Expand All @@ -62,31 +68,29 @@ public void clusterChanged(ClusterChangedEvent event) {
tryStartCleanup(state);
}

void tryStartCleanup(ClusterState state) {
synchronized void tryStartCleanup(ClusterState state) {
if (isClosed.get()) {
return;
}
IndexRoutingTable indexRouting = state.routingTable().index(AsyncSearchIndexService.INDEX);
if (indexRouting == null) {
if (isCleanupRunning.compareAndSet(true, false)) {
close();
}
stop();
return;
}
String primaryNodeId = indexRouting.shard(0).primaryShard().currentNodeId();
if (localNodeId.equals(primaryNodeId)) {
if (isCleanupRunning.compareAndSet(false, true)) {
executeNextCleanup();
}
} else if (isCleanupRunning.compareAndSet(true, false)) {
close();
} else {
stop();
}
}

synchronized void executeNextCleanup() {
if (isClosed.get() == false && isCleanupRunning.get()) {
long nowInMillis = System.currentTimeMillis();
DeleteByQueryRequest toDelete = new DeleteByQueryRequest()
DeleteByQueryRequest toDelete = new DeleteByQueryRequest(INDEX)
.setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis));
indexService.getClient()
.execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(() -> scheduleNextCleanup()));
Expand All @@ -107,11 +111,17 @@ synchronized void scheduleNextCleanup() {
}
}

synchronized void stop() {
if (isCleanupRunning.compareAndSet(true, false)) {
if (cancellable != null && cancellable.isCancelled() == false) {
cancellable.cancel();
}
}
}

@Override
public void close() {
if (cancellable != null && cancellable.isCancelled() == false) {
cancellable.cancel();
}
stop();
isClosed.compareAndSet(false, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ private void executeCompletionListeners() {
*/
private AsyncSearchResponse getResponse() {
assert searchResponse.get() != null;
checkCancellation();
return searchResponse.get().toAsyncSearchResponse(this, expirationTimeMillis);
}

Expand All @@ -306,15 +307,17 @@ private AsyncSearchResponse getResponse() {
*/
private AsyncSearchResponse getResponseWithHeaders() {
assert searchResponse.get() != null;
checkCancellation();
return searchResponse.get().toAsyncSearchResponseWithHeaders(this, expirationTimeMillis);
}



// checks if the search task should be cancelled
private void checkCancellation() {
private synchronized void checkCancellation() {
long now = System.currentTimeMillis();
if (expirationTimeMillis < now || checkSubmitCancellation.getAsBoolean()) {
if (hasCompleted == false &&
expirationTimeMillis < now || checkSubmitCancellation.getAsBoolean()) {
// we cancel the search task if the initial submit task was cancelled,
// this is needed because the task cancellation mechanism doesn't
// handle the cancellation of grand-children.
Expand All @@ -338,7 +341,7 @@ protected void onQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exc
// best effort to cancel expired tasks
checkCancellation();
searchResponse.get().addShardFailure(shardIndex,
new ShardSearchFailure(exc, shardTarget.getNodeId() != null ? shardTarget : null));
new ShardSearchFailure(exc, shardTarget.getNodeIdText() != null ? shardTarget : null));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.tasks.CancellableTask;
Expand Down Expand Up @@ -187,7 +188,8 @@ private void onFinalResponse(CancellableTask submitTask,
store.storeFinalResponse(searchTask.getSearchId().getDocId(), threadContext.getResponseHeaders(),response,
ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
if (exc.getCause() instanceof DocumentMissingException == false) {
if (exc.getCause() instanceof DocumentMissingException == false &&
exc.getCause() instanceof VersionConflictEngineException == false) {
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
searchTask.getSearchId().getEncoded()), exc);
}
Expand Down
Loading