Skip to content

Add wait for completion for Enrich policy execution #47886

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 14, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,39 @@ private ExecuteEnrichPolicyAction() {
public static class Request extends MasterNodeRequest<Request> {

private final String name;
private boolean waitForCompletion;

public Request(String name) {
this.name = Objects.requireNonNull(name, "name cannot be null");
this.waitForCompletion = true;
}

public Request(StreamInput in) throws IOException {
super(in);
name = in.readString();
waitForCompletion = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
out.writeBoolean(waitForCompletion);
}

public String getName() {
return name;
}

public boolean isWaitForCompletion() {
return waitForCompletion;
}

public Request setWaitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
return this;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand All @@ -66,12 +79,13 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return name.equals(request.name);
return waitForCompletion == request.waitForCompletion &&
Objects.equals(name, request.name);
}

@Override
public int hashCode() {
return Objects.hash(name);
return Objects.hash(name, waitForCompletion);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@ private EnrichPolicy getPolicy(ExecuteEnrichPolicyAction.Request request) {
return policy;
}

public void runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener<ExecuteEnrichPolicyStatus> listener) {
runPolicy(request, getPolicy(request), listener);
public Task runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener<ExecuteEnrichPolicyStatus> listener) {
return runPolicy(request, getPolicy(request), listener);
}

public void runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener<ExecuteEnrichPolicyStatus> listener) {
runPolicy(request, getPolicy(request), listener);
public Task runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener<ExecuteEnrichPolicyStatus> listener) {
return runPolicy(request, getPolicy(request), listener);
}

public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.LoggingTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
Expand Down Expand Up @@ -60,17 +62,23 @@ protected ExecuteEnrichPolicyAction.Response read(StreamInput in) throws IOExcep
@Override
protected void masterOperation(Task task, ExecuteEnrichPolicyAction.Request request, ClusterState state,
ActionListener<ExecuteEnrichPolicyAction.Response> listener) {
executor.runPolicy(request, new ActionListener<>() {
@Override
public void onResponse(ExecuteEnrichPolicyStatus executionStatus) {
listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus));
}
if (request.isWaitForCompletion()) {
executor.runPolicy(request, new ActionListener<>() {
@Override
public void onResponse(ExecuteEnrichPolicyStatus executionStatus) {
listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} else {
Task executeTask = executor.runPolicy(request, LoggingTaskListener.instance());
TaskId taskId = new TaskId(clusterService.localNode().getId(), executeTask.getId());
listener.onResponse(new ExecuteEnrichPolicyAction.Response(taskId));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public String getName() {
@Override
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
final ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(restRequest.param("name"));
request.setWaitForCompletion(restRequest.paramAsBoolean("wait_for_completion", true));
return channel -> client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
*/
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
Expand All @@ -23,6 +26,7 @@
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;

import java.util.Collection;
Expand All @@ -37,7 +41,9 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class BasicEnrichTests extends ESSingleNodeTestCase {

Expand Down Expand Up @@ -208,6 +214,58 @@ public void testMultiplePolicies() {
}
}

public void testAsyncTaskExecute() throws Exception {
String policyName = "async-policy";
String sourceIndexName = "async-policy-source";

{
IndexRequest indexRequest = new IndexRequest(sourceIndexName);
indexRequest.source("key", "key", "value", "val1");
client().index(indexRequest).actionGet();
client().admin().indices().refresh(new RefreshRequest(sourceIndexName)).actionGet();
}

EnrichPolicy enrichPolicy =
new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexName), "key", List.of("value"));
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
ExecuteEnrichPolicyAction.Response executeResponse = client()
.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName).setWaitForCompletion(false))
.actionGet();

assertThat(executeResponse.getStatus(), is(nullValue()));
assertThat(executeResponse.getTaskId(), is(not(nullValue())));
GetTaskRequest getPolicyTaskRequest = new GetTaskRequest().setTaskId(executeResponse.getTaskId()).setWaitForCompletion(true);
GetTaskResponse taskResponse = client().execute(GetTaskAction.INSTANCE, getPolicyTaskRequest).actionGet();
assertThat(((ExecuteEnrichPolicyStatus) taskResponse.getTask().getTask().getStatus()).getPhase(),
is(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would place this inside an assertBusy(...), because there is no guarantee that the task is completed (especially on slow build machines).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the assertBusy line. In the code the task request has wait_for_completion set to true, is it possible that this call can come back before the task is completed even on slow hardware? I suppose at the very least assertBusy insulates the test against timeout failures

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in this test wait_for_completion is set to false, right? So it is likely that it returns and then the task isn't completed?

If wait_for_completion is true (which is the default) then the api doesn't return until the force merge has completed and therefor the associated task has been completed. So in this case it shouldn't be possible for the task not being in a completed state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah nevermind my comment... I missed that wait_for_completion is also set on GetTaskRequest. Then the task api will not return until the task in question has been completed. I didn't know this.

So the assertBusy(...) doesn't make sense and can be removed.


String pipelineName = "test-pipeline";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
"\", \"field\": \"key\", \"target_field\": \"target\"}}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();

BulkRequest bulkRequest = new BulkRequest("my-index");
int numTestDocs = randomIntBetween(3, 10);
for (int i = 0; i < numTestDocs; i++) {
IndexRequest indexRequest = new IndexRequest("my-index");
indexRequest.id(Integer.toString(i));
indexRequest.setPipeline(pipelineName);
indexRequest.source(Map.of("key", "key"));
bulkRequest.add(indexRequest);
}
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false));

for (int i = 0; i < numTestDocs; i++) {
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
Map<String, Object> source = getResponse.getSourceAsMap();
assertThat(source.size(), equalTo(2));
assertThat(source.get("target"), equalTo(List.of(Map.of("key", "key", "value", "val1"))));
}
}

private List<String> createSourceMatchIndex(int numKeys, int numDocsPerKey) {
Set<String> keys = new HashSet<>();
for (int id = 0; id < numKeys; id++) {
Expand Down