Skip to content

Integrate circuit breaker in AsyncTaskIndexService #73862

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
Expand Down Expand Up @@ -659,6 +660,14 @@ public UpdateRequest doc(Object... source) {
return this;
}

/**
* Sets the doc to use for updates when a script is not specified. The doc is provided in a bytes form.
*/
public UpdateRequest doc(BytesReference source, XContentType contentType) {
safeDoc().source(source, contentType);
return this;
}

/**
* Sets the doc to use for updates when a script is not specified, the doc provided
* is a field and value pairs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -37,19 +38,21 @@ public TransportGetAsyncSearchAction(TransportService transportService,
ClusterService clusterService,
NamedWriteableRegistry registry,
Client client,
ThreadPool threadPool) {
ThreadPool threadPool,
BigArrays bigArrays) {
super(GetAsyncSearchAction.NAME, transportService, actionFilters, GetAsyncResultRequest::new);
this.transportService = transportService;
this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool);
this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool, bigArrays);
}

static AsyncResultsService<AsyncSearchTask, AsyncSearchResponse> createResultsService(TransportService transportService,
ClusterService clusterService,
NamedWriteableRegistry registry,
Client client,
ThreadPool threadPool) {
ThreadPool threadPool,
BigArrays bigArrays) {
AsyncTaskIndexService<AsyncSearchResponse> store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService,
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry, bigArrays);
return new AsyncResultsService<>(store, true, AsyncSearchTask.class, AsyncSearchTask::addCompletionListener,
transportService.getTaskManager(), clusterService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -37,16 +38,17 @@ public class TransportGetAsyncStatusAction extends HandledTransportAction<GetAsy

@Inject
public TransportGetAsyncStatusAction(TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
NamedWriteableRegistry registry,
Client client,
ThreadPool threadPool) {
ActionFilters actionFilters,
ClusterService clusterService,
NamedWriteableRegistry registry,
Client client,
ThreadPool threadPool,
BigArrays bigArrays) {
super(GetAsyncStatusAction.NAME, transportService, actionFilters, GetAsyncStatusRequest::new);
this.transportService = transportService;
this.clusterService = clusterService;
this.store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService,
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry, bigArrays);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.engine.DocumentMissingException;
Expand Down Expand Up @@ -64,14 +65,15 @@ public TransportSubmitAsyncSearchAction(ClusterService clusterService,
Client client,
NodeClient nodeClient,
SearchService searchService,
TransportSearchAction searchAction) {
TransportSearchAction searchAction,
BigArrays bigArrays) {
super(SubmitAsyncSearchAction.NAME, transportService, actionFilters, SubmitAsyncSearchRequest::new);
this.nodeClient = nodeClient;
this.requestToAggReduceContextBuilder = request -> searchService.aggReduceContextBuilder(request).forFinalReduction();
this.searchAction = searchAction;
this.threadContext = transportService.getThreadPool().getThreadContext();
this.store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, threadContext, client,
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry, bigArrays);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.async;

import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -23,10 +24,8 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -75,21 +74,12 @@ public Collection<Object> createComponents(
List<Object> components = new ArrayList<>();
if (DiscoveryNode.canContainData(environment.settings())) {
// only data nodes should be eligible to run the maintenance service.
AsyncTaskIndexService<AsyncSearchResponse> indexService = new AsyncTaskIndexService<>(
XPackPlugin.ASYNC_RESULTS_INDEX,
clusterService,
threadPool.getThreadContext(),
client,
ASYNC_SEARCH_ORIGIN,
AsyncSearchResponse::new,
namedWriteableRegistry
);
AsyncTaskMaintenanceService maintenanceService = new AsyncTaskMaintenanceService(
clusterService,
nodeEnvironment.nodeId(),
settings,
threadPool,
indexService
new OriginSettingClient(client, ASYNC_SEARCH_ORIGIN)
);
components.add(maintenanceService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand Down Expand Up @@ -136,20 +138,23 @@ public static SystemIndexDescriptor getSystemIndexDescriptor() {
private final SecurityContext securityContext;
private final NamedWriteableRegistry registry;
private final Writeable.Reader<R> reader;
private final BigArrays bigArrays;

public AsyncTaskIndexService(String index,
ClusterService clusterService,
ThreadContext threadContext,
Client client,
String origin,
Writeable.Reader<R> reader,
NamedWriteableRegistry registry) {
NamedWriteableRegistry registry,
BigArrays bigArrays) {
this.index = index;
this.securityContext = new SecurityContext(clusterService.getSettings(), threadContext);
this.client = client;
this.clientWithOrigin = new OriginSettingClient(client, origin);
this.registry = registry;
this.reader = reader;
this.bigArrays = bigArrays;
}

/**
Expand Down Expand Up @@ -182,18 +187,22 @@ public void createResponse(String docId,
R response,
ActionListener<IndexResponse> listener) throws IOException {
try {
// TODO: Integrate with circuit breaker
final XContentBuilder source = XContentFactory.jsonBuilder()
final ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput(0, bigArrays.withCircuitBreaking());
final XContentBuilder source = XContentFactory.jsonBuilder(buffer);
listener = ActionListener.runBefore(listener, source::close);
source
.startObject()
.field(HEADERS_FIELD, headers)
.field(EXPIRATION_TIME_FIELD, response.getExpirationTime())
.directFieldAsBase64(RESULT_FIELD, os -> writeResponse(response, os))
.endObject();

// do not close the buffer or the XContentBuilder until the IndexRequest is completed (i.e., listener is notified);
// otherwise, we underestimate the memory usage in case the circuit breaker does not use the real memory usage.
source.flush();
final IndexRequest indexRequest = new IndexRequest(index)
.create(true)
.id(docId)
.source(source);
.source(buffer.bytes(), source.contentType());
clientWithOrigin.index(indexRequest, listener);
} catch (Exception e) {
listener.onFailure(e);
Expand All @@ -204,20 +213,25 @@ public void createResponse(String docId,
* Stores the final response if the place-holder document is still present (update).
*/
public void updateResponse(String docId,
Map<String, List<String>> responseHeaders,
R response,
ActionListener<UpdateResponse> listener) {
Map<String, List<String>> responseHeaders,
R response,
ActionListener<UpdateResponse> listener) {
try {
// TODO: Integrate with circuit breaker
final XContentBuilder source = XContentFactory.jsonBuilder()
final ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput(0, bigArrays.withCircuitBreaking());
final XContentBuilder source = XContentFactory.jsonBuilder(buffer);
listener = ActionListener.runBefore(listener, source::close);
source
.startObject()
.field(RESPONSE_HEADERS_FIELD, responseHeaders)
.directFieldAsBase64(RESULT_FIELD, os -> writeResponse(response, os))
.endObject();
UpdateRequest request = new UpdateRequest()
// do not close the buffer or the XContentBuilder until the UpdateRequest is completed (i.e., listener is notified);
// otherwise, we underestimate the memory usage in case the circuit breaker does not use the real memory usage.
source.flush();
final UpdateRequest request = new UpdateRequest()
.index(index)
.id(docId)
.doc(source)
.doc(buffer.bytes(), source.contentType())
.retryOnConflict(5);
clientWithOrigin.update(request, listener);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class AsyncTaskMaintenanceService extends AbstractLifecycleComponent impl
private final String index;
private final String localNodeId;
private final ThreadPool threadPool;
private final AsyncTaskIndexService<?> indexService;
private final Client clientWithOrigin;
private final TimeValue delay;

private boolean isCleanupRunning;
Expand All @@ -65,12 +66,12 @@ public AsyncTaskMaintenanceService(ClusterService clusterService,
String localNodeId,
Settings nodeSettings,
ThreadPool threadPool,
AsyncTaskIndexService<?> indexService) {
Client clientWithOrigin) {
this.clusterService = clusterService;
this.index = XPackPlugin.ASYNC_RESULTS_INDEX;
this.localNodeId = localNodeId;
this.threadPool = threadPool;
this.indexService = indexService;
this.clientWithOrigin = clientWithOrigin;
this.delay = ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings);
}

Expand Down Expand Up @@ -125,8 +126,7 @@ synchronized void executeNextCleanup() {
long nowInMillis = System.currentTimeMillis();
DeleteByQueryRequest toDelete = new DeleteByQueryRequest(index)
.setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis));
indexService.getClientWithOrigin()
.execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(this::scheduleNextCleanup));
clientWithOrigin.execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(this::scheduleNextCleanup));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -34,13 +35,14 @@ public TransportDeleteAsyncResultAction(TransportService transportService,
ClusterService clusterService,
NamedWriteableRegistry registry,
Client client,
ThreadPool threadPool) {
ThreadPool threadPool,
BigArrays bigArrays) {
super(DeleteAsyncResultAction.NAME, transportService, actionFilters, DeleteAsyncResultRequest::new);
this.transportService = transportService;
this.clusterService = clusterService;
AsyncTaskIndexService<?> store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService,
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN,
(in) -> {throw new UnsupportedOperationException("Reading is not supported during deletion");}, registry);
(in) -> {throw new UnsupportedOperationException("Reading is not supported during deletion");}, registry, bigArrays);
this.deleteResultsService = new DeleteAsyncResultsService(store, transportService.getTaskManager());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -123,9 +124,10 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
public void setup() {
clusterService = getInstanceFromNode(ClusterService.class);
TransportService transportService = getInstanceFromNode(TransportService.class);
BigArrays bigArrays = getInstanceFromNode(BigArrays.class);
taskManager = transportService.getTaskManager();
indexService = new AsyncTaskIndexService<>("test", clusterService, transportService.getThreadPool().getThreadContext(),
client(), ASYNC_SEARCH_ORIGIN, TestAsyncResponse::new, writableRegistry());
client(), ASYNC_SEARCH_ORIGIN, TestAsyncResponse::new, writableRegistry(), bigArrays);

}

Expand Down
Loading