From 7ac5aa9057bfaca62b6696c656d85494611ef9e8 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Fri, 4 Oct 2019 14:45:39 -0400 Subject: [PATCH 1/6] Add wait for completion option for task --- .../action/ExecuteEnrichPolicyAction.java | 17 +++++++++-- .../xpack/enrich/EnrichPolicyExecutor.java | 8 +++--- .../TransportExecuteEnrichPolicyAction.java | 28 ++++++++++++------- 3 files changed, 37 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java index 0d7fa55ebb6ce..b31c7751dfa39 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java @@ -30,26 +30,38 @@ private ExecuteEnrichPolicyAction() { public static class Request extends MasterNodeRequest { private final String name; + private boolean waitForCompletion; public Request(String name) { this.name = Objects.requireNonNull(name, "name cannot be null"); + this.waitForCompletion = false; } public Request(StreamInput in) throws IOException { super(in); name = in.readString(); + waitForCompletion = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(name); + out.writeBoolean(waitForCompletion); } public String getName() { return name; } + public boolean isWaitForCompletion() { + return waitForCompletion; + } + + public void setWaitForCompletion(boolean waitForCompletion) { + this.waitForCompletion = waitForCompletion; + } + @Override public ActionRequestValidationException validate() { return null; @@ -66,12 +78,13 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Request request = (Request) o; - return name.equals(request.name); + return waitForCompletion == request.waitForCompletion && + Objects.equals(name, request.name); } @Override public int hashCode() { - return Objects.hash(name); + return Objects.hash(name, waitForCompletion); } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java index 238d8e237582c..361f4f6b285cb 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java @@ -133,12 +133,12 @@ private EnrichPolicy getPolicy(ExecuteEnrichPolicyAction.Request request) { return policy; } - public void runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener listener) { - runPolicy(request, getPolicy(request), listener); + public Task runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener listener) { + return runPolicy(request, getPolicy(request), listener); } - public void runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener listener) { - runPolicy(request, getPolicy(request), listener); + public Task runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener listener) { + return runPolicy(request, getPolicy(request), listener); } public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy, diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java index 9ac4b08e6574a..aa55b11ee9403 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java @@ -17,7 +17,9 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.LoggingTaskListener; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; @@ -60,17 +62,23 @@ protected ExecuteEnrichPolicyAction.Response read(StreamInput in) throws IOExcep @Override protected void masterOperation(Task task, ExecuteEnrichPolicyAction.Request request, ClusterState state, ActionListener listener) { - executor.runPolicy(request, new ActionListener<>() { - @Override - public void onResponse(ExecuteEnrichPolicyStatus executionStatus) { - listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus)); - } + if (request.isWaitForCompletion()) { + executor.runPolicy(request, new ActionListener<>() { + @Override + public void onResponse(ExecuteEnrichPolicyStatus executionStatus) { + listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus)); + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } else { + Task executeTask = executor.runPolicy(request, new LoggingTaskListener<>()); + TaskId taskId = new TaskId(clusterService.localNode().getId(), executeTask.getId()); + listener.onResponse(new ExecuteEnrichPolicyAction.Response(taskId)); + } } @Override From 1934ef66ac51749f7840326aca153030322363ce Mon Sep 17 00:00:00 2001 From: James Baiera Date: Wed, 9 Oct 2019 15:34:16 -0400 Subject: [PATCH 2/6] Set wait for complete to true by default --- .../xpack/core/enrich/action/ExecuteEnrichPolicyAction.java | 5 +++-- .../enrich/action/TransportExecuteEnrichPolicyAction.java | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java index b31c7751dfa39..e5c6ab5eb67ee 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java @@ -34,7 +34,7 @@ public static class Request extends MasterNodeRequest { public Request(String name) { this.name = Objects.requireNonNull(name, "name cannot be null"); - this.waitForCompletion = false; + this.waitForCompletion = true; } public Request(StreamInput in) throws IOException { @@ -58,8 +58,9 @@ public boolean isWaitForCompletion() { return waitForCompletion; } - public void setWaitForCompletion(boolean waitForCompletion) { + public Request setWaitForCompletion(boolean waitForCompletion) { this.waitForCompletion = waitForCompletion; + return this; } @Override diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java index aa55b11ee9403..7402c2daef6e7 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java @@ -75,7 +75,7 @@ public void onFailure(Exception e) { } }); } else { - Task executeTask = executor.runPolicy(request, new LoggingTaskListener<>()); + Task executeTask = executor.runPolicy(request, LoggingTaskListener.instance()); TaskId taskId = new TaskId(clusterService.localNode().getId(), executeTask.getId()); listener.onResponse(new ExecuteEnrichPolicyAction.Response(taskId)); } From 2f2e2a5baaddd1d29476e2103652c99f22b75617 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Thu, 10 Oct 2019 17:41:56 -0400 Subject: [PATCH 3/6] Add test for async task running --- .../xpack/enrich/BasicEnrichTests.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java index cba5bceb23e8b..fcb5a8def49ea 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java @@ -5,6 +5,9 @@ */ package org.elasticsearch.xpack.enrich; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; @@ -23,6 +26,7 @@ import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; import java.util.Collection; @@ -37,7 +41,9 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; public class BasicEnrichTests extends ESSingleNodeTestCase { @@ -208,6 +214,58 @@ public void testMultiplePolicies() { } } + public void testAsyncTaskExecute() throws Exception { + String policyName = "async-policy"; + String sourceIndexName = "async-policy-source"; + + { + IndexRequest indexRequest = new IndexRequest(sourceIndexName); + indexRequest.source("key", "key", "value", "val1"); + client().index(indexRequest).actionGet(); + client().admin().indices().refresh(new RefreshRequest(sourceIndexName)).actionGet(); + } + + EnrichPolicy enrichPolicy = + new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexName), "key", List.of("value")); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); + ExecuteEnrichPolicyAction.Response executeResponse = client() + .execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName).setWaitForCompletion(false)) + .actionGet(); + + assertThat(executeResponse.getStatus(), is(nullValue())); + assertThat(executeResponse.getTaskId(), is(not(nullValue()))); + GetTaskRequest getPolicyTaskRequest = new GetTaskRequest().setTaskId(executeResponse.getTaskId()).setWaitForCompletion(true); + GetTaskResponse taskResponse = client().execute(GetTaskAction.INSTANCE, getPolicyTaskRequest).actionGet(); + assertThat(((ExecuteEnrichPolicyStatus) taskResponse.getTask().getTask().getStatus()).getPhase(), + is(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE)); + + String pipelineName = "test-pipeline"; + String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName + + "\", \"field\": \"key\", \"target_field\": \"target\"}}]}"; + PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).actionGet(); + + BulkRequest bulkRequest = new BulkRequest("my-index"); + int numTestDocs = randomIntBetween(3, 10); + for (int i = 0; i < numTestDocs; i++) { + IndexRequest indexRequest = new IndexRequest("my-index"); + indexRequest.id(Integer.toString(i)); + indexRequest.setPipeline(pipelineName); + indexRequest.source(Map.of("key", "key")); + bulkRequest.add(indexRequest); + } + BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); + assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false)); + + for (int i = 0; i < numTestDocs; i++) { + GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet(); + Map source = getResponse.getSourceAsMap(); + assertThat(source.size(), equalTo(2)); + assertThat(source.get("target"), equalTo(List.of(Map.of("key", "key", "value", "val1")))); + } + } + private List createSourceMatchIndex(int numKeys, int numDocsPerKey) { Set keys = new HashSet<>(); for (int id = 0; id < numKeys; id++) { From b8e6de805ab863210079f8879388b81bf838d2b9 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Thu, 10 Oct 2019 17:42:24 -0400 Subject: [PATCH 4/6] Wire up wait_for_completion param via rest --- .../xpack/enrich/rest/RestExecuteEnrichPolicyAction.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java index 05955efdc8b3d..087117a6c1a45 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java @@ -29,6 +29,7 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException { final ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(restRequest.param("name")); + request.setWaitForCompletion(restRequest.paramAsBoolean("wait_for_completion", true)); return channel -> client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel)); } } From 90e6fdb45ce50b6d0e5ee983fcbfe84d2ff621fc Mon Sep 17 00:00:00 2001 From: James Baiera Date: Fri, 11 Oct 2019 10:44:20 -0400 Subject: [PATCH 5/6] Use assertBusy when waiting for task completion. --- .../org/elasticsearch/xpack/enrich/BasicEnrichTests.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java index fcb5a8def49ea..64354f8e414bb 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java @@ -236,9 +236,11 @@ public void testAsyncTaskExecute() throws Exception { assertThat(executeResponse.getStatus(), is(nullValue())); assertThat(executeResponse.getTaskId(), is(not(nullValue()))); GetTaskRequest getPolicyTaskRequest = new GetTaskRequest().setTaskId(executeResponse.getTaskId()).setWaitForCompletion(true); - GetTaskResponse taskResponse = client().execute(GetTaskAction.INSTANCE, getPolicyTaskRequest).actionGet(); - assertThat(((ExecuteEnrichPolicyStatus) taskResponse.getTask().getTask().getStatus()).getPhase(), - is(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE)); + assertBusy(() -> { + GetTaskResponse taskResponse = client().execute(GetTaskAction.INSTANCE, getPolicyTaskRequest).actionGet(); + assertThat(((ExecuteEnrichPolicyStatus) taskResponse.getTask().getTask().getStatus()).getPhase(), + is(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE)); + }); String pipelineName = "test-pipeline"; String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName + From 66457ccb9c33d0c7c73af9b4f24c9fc54f9fe835 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Fri, 11 Oct 2019 10:44:45 -0400 Subject: [PATCH 6/6] Add wait_for_completion to rest spec --- .../resources/rest-api-spec/api/enrich.execute_policy.json | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json index b6115e40ec6f6..b49486a062036 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json @@ -15,6 +15,13 @@ } } ] + }, + "params":{ + "wait_for_completion":{ + "type":"boolean", + "default":true, + "description":"Should the request should block until the execution is complete." + } } } }