Skip to content

Add wait for completion for Enrich policy execution #47886

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,39 @@ private ExecuteEnrichPolicyAction() {
public static class Request extends MasterNodeRequest<Request> {

private final String name;
private boolean waitForCompletion;

public Request(String name) {
this.name = Objects.requireNonNull(name, "name cannot be null");
this.waitForCompletion = true;
}

public Request(StreamInput in) throws IOException {
super(in);
name = in.readString();
waitForCompletion = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
out.writeBoolean(waitForCompletion);
}

public String getName() {
return name;
}

public boolean isWaitForCompletion() {
return waitForCompletion;
}

public Request setWaitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
return this;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand All @@ -66,12 +79,13 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return name.equals(request.name);
return waitForCompletion == request.waitForCompletion &&
Objects.equals(name, request.name);
}

@Override
public int hashCode() {
return Objects.hash(name);
return Objects.hash(name, waitForCompletion);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@ private EnrichPolicy getPolicy(ExecuteEnrichPolicyAction.Request request) {
return policy;
}

public void runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener<ExecuteEnrichPolicyStatus> listener) {
runPolicy(request, getPolicy(request), listener);
public Task runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener<ExecuteEnrichPolicyStatus> listener) {
return runPolicy(request, getPolicy(request), listener);
}

public void runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener<ExecuteEnrichPolicyStatus> listener) {
runPolicy(request, getPolicy(request), listener);
public Task runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener<ExecuteEnrichPolicyStatus> listener) {
return runPolicy(request, getPolicy(request), listener);
}

public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.LoggingTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
Expand Down Expand Up @@ -60,17 +62,23 @@ protected ExecuteEnrichPolicyAction.Response read(StreamInput in) throws IOExcep
@Override
protected void masterOperation(Task task, ExecuteEnrichPolicyAction.Request request, ClusterState state,
ActionListener<ExecuteEnrichPolicyAction.Response> listener) {
executor.runPolicy(request, new ActionListener<>() {
@Override
public void onResponse(ExecuteEnrichPolicyStatus executionStatus) {
listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus));
}
if (request.isWaitForCompletion()) {
executor.runPolicy(request, new ActionListener<>() {
@Override
public void onResponse(ExecuteEnrichPolicyStatus executionStatus) {
listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} else {
Task executeTask = executor.runPolicy(request, LoggingTaskListener.instance());
TaskId taskId = new TaskId(clusterService.localNode().getId(), executeTask.getId());
listener.onResponse(new ExecuteEnrichPolicyAction.Response(taskId));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public String getName() {
@Override
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
final ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(restRequest.param("name"));
request.setWaitForCompletion(restRequest.paramAsBoolean("wait_for_completion", true));
return channel -> client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
*/
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
Expand All @@ -23,6 +26,7 @@
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;

import java.util.Collection;
Expand All @@ -37,7 +41,9 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class BasicEnrichTests extends ESSingleNodeTestCase {

Expand Down Expand Up @@ -208,6 +214,60 @@ public void testMultiplePolicies() {
}
}

public void testAsyncTaskExecute() throws Exception {
String policyName = "async-policy";
String sourceIndexName = "async-policy-source";

{
IndexRequest indexRequest = new IndexRequest(sourceIndexName);
indexRequest.source("key", "key", "value", "val1");
client().index(indexRequest).actionGet();
client().admin().indices().refresh(new RefreshRequest(sourceIndexName)).actionGet();
}

EnrichPolicy enrichPolicy =
new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexName), "key", List.of("value"));
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
ExecuteEnrichPolicyAction.Response executeResponse = client()
.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName).setWaitForCompletion(false))
.actionGet();

assertThat(executeResponse.getStatus(), is(nullValue()));
assertThat(executeResponse.getTaskId(), is(not(nullValue())));
GetTaskRequest getPolicyTaskRequest = new GetTaskRequest().setTaskId(executeResponse.getTaskId()).setWaitForCompletion(true);
assertBusy(() -> {
GetTaskResponse taskResponse = client().execute(GetTaskAction.INSTANCE, getPolicyTaskRequest).actionGet();
assertThat(((ExecuteEnrichPolicyStatus) taskResponse.getTask().getTask().getStatus()).getPhase(),
is(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE));
});

String pipelineName = "test-pipeline";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
"\", \"field\": \"key\", \"target_field\": \"target\"}}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();

BulkRequest bulkRequest = new BulkRequest("my-index");
int numTestDocs = randomIntBetween(3, 10);
for (int i = 0; i < numTestDocs; i++) {
IndexRequest indexRequest = new IndexRequest("my-index");
indexRequest.id(Integer.toString(i));
indexRequest.setPipeline(pipelineName);
indexRequest.source(Map.of("key", "key"));
bulkRequest.add(indexRequest);
}
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false));

for (int i = 0; i < numTestDocs; i++) {
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
Map<String, Object> source = getResponse.getSourceAsMap();
assertThat(source.size(), equalTo(2));
assertThat(source.get("target"), equalTo(List.of(Map.of("key", "key", "value", "val1"))));
}
}

private List<String> createSourceMatchIndex(int numKeys, int numDocsPerKey) {
Set<String> keys = new HashSet<>();
for (int id = 0; id < numKeys; id++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
}
}
]
},
"params":{
"wait_for_completion":{
"type":"boolean",
"default":true,
"description":"Should the request should block until the execution is complete."
}
}
}
}