Skip to content

Commit 18d7e32

Browse files
committed
Add wait for completion for Enrich policy execution (#47886)
This PR adds the ability to run the enrich policy execution task in the background, returning a task id instead of waiting for the completed operation.
1 parent 7fc9198 commit 18d7e32

File tree

6 files changed

+107
-16
lines changed

6 files changed

+107
-16
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,39 @@ private ExecuteEnrichPolicyAction() {
3030
public static class Request extends MasterNodeRequest<Request> {
3131

3232
private final String name;
33+
private boolean waitForCompletion;
3334

3435
public Request(String name) {
3536
this.name = Objects.requireNonNull(name, "name cannot be null");
37+
this.waitForCompletion = true;
3638
}
3739

3840
public Request(StreamInput in) throws IOException {
3941
super(in);
4042
name = in.readString();
43+
waitForCompletion = in.readBoolean();
4144
}
4245

4346
@Override
4447
public void writeTo(StreamOutput out) throws IOException {
4548
super.writeTo(out);
4649
out.writeString(name);
50+
out.writeBoolean(waitForCompletion);
4751
}
4852

4953
public String getName() {
5054
return name;
5155
}
5256

57+
public boolean isWaitForCompletion() {
58+
return waitForCompletion;
59+
}
60+
61+
public Request setWaitForCompletion(boolean waitForCompletion) {
62+
this.waitForCompletion = waitForCompletion;
63+
return this;
64+
}
65+
5366
@Override
5467
public ActionRequestValidationException validate() {
5568
return null;
@@ -66,12 +79,13 @@ public boolean equals(Object o) {
6679
if (this == o) return true;
6780
if (o == null || getClass() != o.getClass()) return false;
6881
Request request = (Request) o;
69-
return name.equals(request.name);
82+
return waitForCompletion == request.waitForCompletion &&
83+
Objects.equals(name, request.name);
7084
}
7185

7286
@Override
7387
public int hashCode() {
74-
return Objects.hash(name);
88+
return Objects.hash(name, waitForCompletion);
7589
}
7690
}
7791

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,12 @@ private EnrichPolicy getPolicy(ExecuteEnrichPolicyAction.Request request) {
133133
return policy;
134134
}
135135

136-
public void runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener<ExecuteEnrichPolicyStatus> listener) {
137-
runPolicy(request, getPolicy(request), listener);
136+
public Task runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener<ExecuteEnrichPolicyStatus> listener) {
137+
return runPolicy(request, getPolicy(request), listener);
138138
}
139139

140-
public void runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener<ExecuteEnrichPolicyStatus> listener) {
141-
runPolicy(request, getPolicy(request), listener);
140+
public Task runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener<ExecuteEnrichPolicyStatus> listener) {
141+
return runPolicy(request, getPolicy(request), listener);
142142
}
143143

144144
public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
import org.elasticsearch.common.inject.Inject;
1818
import org.elasticsearch.common.io.stream.StreamInput;
1919
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.tasks.LoggingTaskListener;
21+
import org.elasticsearch.tasks.Task;
22+
import org.elasticsearch.tasks.TaskId;
2023
import org.elasticsearch.threadpool.ThreadPool;
2124
import org.elasticsearch.transport.TransportService;
2225
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
@@ -59,17 +62,23 @@ protected ExecuteEnrichPolicyAction.Response read(StreamInput in) throws IOExcep
5962
@Override
6063
protected void masterOperation(ExecuteEnrichPolicyAction.Request request, ClusterState state,
6164
ActionListener<ExecuteEnrichPolicyAction.Response> listener) {
62-
executor.runPolicy(request, new ActionListener<ExecuteEnrichPolicyStatus>() {
63-
@Override
64-
public void onResponse(ExecuteEnrichPolicyStatus executionStatus) {
65-
listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus));
66-
}
65+
if (request.isWaitForCompletion()) {
66+
executor.runPolicy(request, new ActionListener<ExecuteEnrichPolicyStatus>() {
67+
@Override
68+
public void onResponse(ExecuteEnrichPolicyStatus executionStatus) {
69+
listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus));
70+
}
6771

68-
@Override
69-
public void onFailure(Exception e) {
70-
listener.onFailure(e);
71-
}
72-
});
72+
@Override
73+
public void onFailure(Exception e) {
74+
listener.onFailure(e);
75+
}
76+
});
77+
} else {
78+
Task executeTask = executor.runPolicy(request, LoggingTaskListener.instance());
79+
TaskId taskId = new TaskId(clusterService.localNode().getId(), executeTask.getId());
80+
listener.onResponse(new ExecuteEnrichPolicyAction.Response(taskId));
81+
}
7382
}
7483

7584
@Override

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public String getName() {
2929
@Override
3030
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
3131
final ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(restRequest.param("name"));
32+
request.setWaitForCompletion(restRequest.paramAsBoolean("wait_for_completion", true));
3233
return channel -> client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel));
3334
}
3435
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
*/
66
package org.elasticsearch.xpack.enrich;
77

8+
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction;
9+
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
10+
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
811
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
912
import org.elasticsearch.action.bulk.BulkItemResponse;
1013
import org.elasticsearch.action.bulk.BulkRequest;
@@ -23,6 +26,7 @@
2326
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
2427
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
2528
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
29+
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
2630
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
2731

2832
import java.util.ArrayList;
@@ -41,7 +45,9 @@
4145
import static org.hamcrest.Matchers.equalTo;
4246
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4347
import static org.hamcrest.Matchers.is;
48+
import static org.hamcrest.Matchers.not;
4449
import static org.hamcrest.Matchers.notNullValue;
50+
import static org.hamcrest.Matchers.nullValue;
4551

4652
public class BasicEnrichTests extends ESSingleNodeTestCase {
4753

@@ -214,6 +220,60 @@ public void testMultiplePolicies() {
214220
}
215221
}
216222

223+
public void testAsyncTaskExecute() throws Exception {
224+
String policyName = "async-policy";
225+
String sourceIndexName = "async-policy-source";
226+
227+
{
228+
IndexRequest indexRequest = new IndexRequest(sourceIndexName);
229+
indexRequest.source("key", "key", "value", "val1");
230+
client().index(indexRequest).actionGet();
231+
client().admin().indices().refresh(new RefreshRequest(sourceIndexName)).actionGet();
232+
}
233+
234+
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndexName), "key",
235+
Collections.singletonList("value"));
236+
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
237+
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
238+
ExecuteEnrichPolicyAction.Response executeResponse = client()
239+
.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName).setWaitForCompletion(false))
240+
.actionGet();
241+
242+
assertThat(executeResponse.getStatus(), is(nullValue()));
243+
assertThat(executeResponse.getTaskId(), is(not(nullValue())));
244+
GetTaskRequest getPolicyTaskRequest = new GetTaskRequest().setTaskId(executeResponse.getTaskId()).setWaitForCompletion(true);
245+
assertBusy(() -> {
246+
GetTaskResponse taskResponse = client().execute(GetTaskAction.INSTANCE, getPolicyTaskRequest).actionGet();
247+
assertThat(((ExecuteEnrichPolicyStatus) taskResponse.getTask().getTask().getStatus()).getPhase(),
248+
is(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE));
249+
});
250+
251+
String pipelineName = "test-pipeline";
252+
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
253+
"\", \"field\": \"key\", \"target_field\": \"target\"}}]}";
254+
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
255+
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();
256+
257+
BulkRequest bulkRequest = new BulkRequest("my-index");
258+
int numTestDocs = randomIntBetween(3, 10);
259+
for (int i = 0; i < numTestDocs; i++) {
260+
IndexRequest indexRequest = new IndexRequest("my-index");
261+
indexRequest.id(Integer.toString(i));
262+
indexRequest.setPipeline(pipelineName);
263+
indexRequest.source(Collections.singletonMap("key", "key"));
264+
bulkRequest.add(indexRequest);
265+
}
266+
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
267+
assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false));
268+
269+
for (int i = 0; i < numTestDocs; i++) {
270+
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
271+
Map<String, Object> source = getResponse.getSourceAsMap();
272+
assertThat(source.size(), equalTo(2));
273+
assertThat(source.get("target"), equalTo(mapOf("key", "key", "value", "val1")));
274+
}
275+
}
276+
217277
private List<String> createSourceMatchIndex(int numKeys, int numDocsPerKey) {
218278
Set<String> keys = new HashSet<>();
219279
for (int id = 0; id < numKeys; id++) {

x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@
1515
}
1616
}
1717
]
18+
},
19+
"params":{
20+
"wait_for_completion":{
21+
"type":"boolean",
22+
"default":true,
23+
"description":"Should the request should block until the execution is complete."
24+
}
1825
}
1926
}
2027
}

0 commit comments

Comments
 (0)