Skip to content

Fix expiration time in async search response #55435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public class GetAsyncSearchRequest implements Validatable {
private TimeValue waitForCompletion;
private TimeValue keepAlive;

public static final long MIN_KEEPALIVE = TimeValue.timeValueMinutes(1).millis();

private final String id;

public GetAsyncSearchRequest(String id) {
Expand Down Expand Up @@ -62,14 +60,7 @@ public void setKeepAlive(TimeValue keepAlive) {

@Override
public Optional<ValidationException> validate() {
final ValidationException validationException = new ValidationException();
if (keepAlive != null && keepAlive.getMillis() < MIN_KEEPALIVE) {
validationException.addValidationError("keep_alive must be greater than 1 minute, got: " + keepAlive.toString());
}
if (validationException.validationErrors().isEmpty()) {
return Optional.empty();
}
return Optional.of(validationException);
return Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
*/
public class SubmitAsyncSearchRequest implements Validatable {

public static final int DEFAULT_BATCHED_REDUCE_SIZE = 5;

public static long MIN_KEEP_ALIVE = TimeValue.timeValueMinutes(1).millis();

private TimeValue waitForCompletionTimeout;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public SearchShardTarget(String nodeId, ShardId shardId, @Nullable String cluste

@Nullable
public String getNodeId() {
return nodeId.string();
return nodeId != null ? nodeId.string() : null;
}

public Text getNodeIdText() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
Expand All @@ -39,6 +39,8 @@
import java.util.List;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;

public final class AsyncSearch extends Plugin implements ActionPlugin {
private final Settings settings;

Expand Down Expand Up @@ -84,11 +86,16 @@ public Collection<Object> createComponents(Client client,
AsyncSearchIndexService indexService =
new AsyncSearchIndexService(clusterService, threadPool.getThreadContext(), client, namedWriteableRegistry);
AsyncSearchMaintenanceService maintenanceService =
new AsyncSearchMaintenanceService(nodeEnvironment.nodeId(), threadPool, indexService, TimeValue.timeValueHours(1));
new AsyncSearchMaintenanceService(nodeEnvironment.nodeId(), settings, threadPool, indexService);
clusterService.addListener(maintenanceService);
return Collections.singletonList(maintenanceService);
} else {
return Collections.emptyList();
}
}

@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,16 @@ void getResponse(AsyncSearchId searchId,
return;
}

if (restoreResponseHeaders) {
if (restoreResponseHeaders && get.getSource().containsKey(RESPONSE_HEADERS_FIELD)) {
@SuppressWarnings("unchecked")
Map<String, List<String>> responseHeaders = (Map<String, List<String>>) get.getSource().get(RESPONSE_HEADERS_FIELD);
restoreResponseHeadersContext(securityContext.getThreadContext(), responseHeaders);
}

long expirationTime = (long) get.getSource().get(EXPIRATION_TIME_FIELD);
String encoded = (String) get.getSource().get(RESULT_FIELD);
listener.onResponse(encoded != null ? decodeResponse(encoded) : null);
AsyncSearchResponse response = decodeResponse(encoded, expirationTime);
listener.onResponse(encoded != null ? response : null);
},
listener::onFailure
));
Expand Down Expand Up @@ -331,11 +333,11 @@ String encodeResponse(AsyncSearchResponse response) throws IOException {
/**
* Decode the provided base-64 bytes into a {@link AsyncSearchResponse}.
*/
AsyncSearchResponse decodeResponse(String value) throws IOException {
AsyncSearchResponse decodeResponse(String value, long expirationTime) throws IOException {
try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) {
try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) {
in.setVersion(Version.readVersion(in));
return new AsyncSearchResponse(in);
return new AsyncSearchResponse(in, expirationTime);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.gateway.GatewayService;
Expand All @@ -26,30 +28,40 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.xpack.search.AsyncSearchIndexService.EXPIRATION_TIME_FIELD;
import static org.elasticsearch.xpack.search.AsyncSearchIndexService.INDEX;

/**
* A service that runs a periodic cleanup over the async-search index.
*/
class AsyncSearchMaintenanceService implements Releasable, ClusterStateListener {
private static final Logger logger = LogManager.getLogger(AsyncSearchMaintenanceService.class);

/**
* Controls the interval at which the cleanup is scheduled.
* Defaults to 1h. It is an undocumented/expert setting that
* is mainly used by integration tests to make the garbage
* collection of search responses more reactive.
*/
public static final Setting<TimeValue> ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING =
Setting.timeSetting("async_search.index_cleanup_interval", TimeValue.timeValueHours(1), Setting.Property.NodeScope);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we plan on using this new setting only in our tests? Or can it be useful for users too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe for debug but I don't think it's useful for users, only for tests at the moment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no way to somehow make this "private" or for use only in our tests? I worry mostly about naming, and the fact that users may end up using it although we would not want them to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value is quite large (1h) so maybe there's some value to change it. I am not sure, I can make it private but that would work only in fantasy integration tests (single vm) so this will come back. I was expecting that the lack of documentation would make this setting an expert thing that users would never heard from ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I see, can you add a comment that this is intentionally undocumented?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 09afaca


private final String localNodeId;
private final ThreadPool threadPool;
private final AsyncSearchIndexService indexService;
private final TimeValue delay;

private final AtomicBoolean isCleanupRunning = new AtomicBoolean(false);
private boolean isCleanupRunning;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private volatile Scheduler.Cancellable cancellable;

AsyncSearchMaintenanceService(String localNodeId,
Settings nodeSettings,
ThreadPool threadPool,
AsyncSearchIndexService indexService,
TimeValue delay) {
AsyncSearchIndexService indexService) {
this.localNodeId = localNodeId;
this.threadPool = threadPool;
this.indexService = indexService;
this.delay = delay;
this.delay = ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings);
}

@Override
Expand All @@ -62,39 +74,38 @@ public void clusterChanged(ClusterChangedEvent event) {
tryStartCleanup(state);
}

void tryStartCleanup(ClusterState state) {
synchronized void tryStartCleanup(ClusterState state) {
if (isClosed.get()) {
return;
}
IndexRoutingTable indexRouting = state.routingTable().index(AsyncSearchIndexService.INDEX);
if (indexRouting == null) {
if (isCleanupRunning.compareAndSet(true, false)) {
close();
}
stop();
return;
}
String primaryNodeId = indexRouting.shard(0).primaryShard().currentNodeId();
if (localNodeId.equals(primaryNodeId)) {
if (isCleanupRunning.compareAndSet(false, true)) {
if (isCleanupRunning == false) {
isCleanupRunning = true;
executeNextCleanup();
}
} else if (isCleanupRunning.compareAndSet(true, false)) {
close();
} else {
stop();
}
}

synchronized void executeNextCleanup() {
if (isClosed.get() == false && isCleanupRunning.get()) {
if (isClosed.get() == false && isCleanupRunning) {
long nowInMillis = System.currentTimeMillis();
DeleteByQueryRequest toDelete = new DeleteByQueryRequest()
DeleteByQueryRequest toDelete = new DeleteByQueryRequest(INDEX)
.setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis));
indexService.getClient()
.execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(() -> scheduleNextCleanup()));
}
}

synchronized void scheduleNextCleanup() {
if (isClosed.get() == false && isCleanupRunning.get()) {
if (isClosed.get() == false && isCleanupRunning) {
try {
cancellable = threadPool.schedule(this::executeNextCleanup, delay, ThreadPool.Names.GENERIC);
} catch (EsRejectedExecutionException e) {
Expand All @@ -107,11 +118,18 @@ synchronized void scheduleNextCleanup() {
}
}

synchronized void stop() {
if (isCleanupRunning) {
if (cancellable != null && cancellable.isCancelled() == false) {
cancellable.cancel();
}
isCleanupRunning = false;
}
}

@Override
public void close() {
if (cancellable != null && cancellable.isCancelled() == false) {
cancellable.cancel();
}
stop();
isClosed.compareAndSet(false, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ private void executeCompletionListeners() {
*/
private AsyncSearchResponse getResponse() {
assert searchResponse.get() != null;
checkCancellation();
return searchResponse.get().toAsyncSearchResponse(this, expirationTimeMillis);
}

Expand All @@ -306,15 +307,17 @@ private AsyncSearchResponse getResponse() {
*/
private AsyncSearchResponse getResponseWithHeaders() {
assert searchResponse.get() != null;
checkCancellation();
return searchResponse.get().toAsyncSearchResponseWithHeaders(this, expirationTimeMillis);
}



// checks if the search task should be cancelled
private void checkCancellation() {
private synchronized void checkCancellation() {
long now = System.currentTimeMillis();
if (expirationTimeMillis < now || checkSubmitCancellation.getAsBoolean()) {
if (hasCompleted == false &&
expirationTimeMillis < now || checkSubmitCancellation.getAsBoolean()) {
// we cancel the search task if the initial submit task was cancelled,
// this is needed because the task cancellation mechanism doesn't
// handle the cancellation of grand-children.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchAction;
Expand All @@ -25,6 +26,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.tasks.CancellableTask;
Expand Down Expand Up @@ -187,7 +189,9 @@ private void onFinalResponse(CancellableTask submitTask,
store.storeFinalResponse(searchTask.getSearchId().getDocId(), threadContext.getResponseHeaders(),response,
ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
if (exc.getCause() instanceof DocumentMissingException == false) {
Throwable cause = ExceptionsHelper.unwrapCause(exc);
if (cause instanceof DocumentMissingException == false &&
cause instanceof VersionConflictEngineException == false) {
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
searchTask.getSearchId().getEncoded()), exc);
}
Expand Down
Loading