Skip to content

Commit b0ccce2

Browse files
authored
Add wait for completion for Enrich policy execution (#47886)
This PR adds the ability to run the enrich policy execution task in the background, returning a task id instead of waiting for the completed operation.
1 parent ddf3bc2 commit b0ccce2

File tree

6 files changed

+106
-16
lines changed

6 files changed

+106
-16
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,39 @@ private ExecuteEnrichPolicyAction() {
3030
public static class Request extends MasterNodeRequest<Request> {
3131

3232
private final String name;
33+
private boolean waitForCompletion;
3334

3435
public Request(String name) {
3536
this.name = Objects.requireNonNull(name, "name cannot be null");
37+
this.waitForCompletion = true;
3638
}
3739

3840
public Request(StreamInput in) throws IOException {
3941
super(in);
4042
name = in.readString();
43+
waitForCompletion = in.readBoolean();
4144
}
4245

4346
@Override
4447
public void writeTo(StreamOutput out) throws IOException {
4548
super.writeTo(out);
4649
out.writeString(name);
50+
out.writeBoolean(waitForCompletion);
4751
}
4852

4953
public String getName() {
5054
return name;
5155
}
5256

57+
public boolean isWaitForCompletion() {
58+
return waitForCompletion;
59+
}
60+
61+
public Request setWaitForCompletion(boolean waitForCompletion) {
62+
this.waitForCompletion = waitForCompletion;
63+
return this;
64+
}
65+
5366
@Override
5467
public ActionRequestValidationException validate() {
5568
return null;
@@ -66,12 +79,13 @@ public boolean equals(Object o) {
6679
if (this == o) return true;
6780
if (o == null || getClass() != o.getClass()) return false;
6881
Request request = (Request) o;
69-
return name.equals(request.name);
82+
return waitForCompletion == request.waitForCompletion &&
83+
Objects.equals(name, request.name);
7084
}
7185

7286
@Override
7387
public int hashCode() {
74-
return Objects.hash(name);
88+
return Objects.hash(name, waitForCompletion);
7589
}
7690
}
7791

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,12 @@ private EnrichPolicy getPolicy(ExecuteEnrichPolicyAction.Request request) {
133133
return policy;
134134
}
135135

136-
public void runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener<ExecuteEnrichPolicyStatus> listener) {
137-
runPolicy(request, getPolicy(request), listener);
136+
public Task runPolicy(ExecuteEnrichPolicyAction.Request request, ActionListener<ExecuteEnrichPolicyStatus> listener) {
137+
return runPolicy(request, getPolicy(request), listener);
138138
}
139139

140-
public void runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener<ExecuteEnrichPolicyStatus> listener) {
141-
runPolicy(request, getPolicy(request), listener);
140+
public Task runPolicy(ExecuteEnrichPolicyAction.Request request, TaskListener<ExecuteEnrichPolicyStatus> listener) {
141+
return runPolicy(request, getPolicy(request), listener);
142142
}
143143

144144
public Task runPolicy(ExecuteEnrichPolicyAction.Request request, EnrichPolicy policy,

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
import org.elasticsearch.common.inject.Inject;
1818
import org.elasticsearch.common.io.stream.StreamInput;
1919
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.tasks.LoggingTaskListener;
2021
import org.elasticsearch.tasks.Task;
22+
import org.elasticsearch.tasks.TaskId;
2123
import org.elasticsearch.threadpool.ThreadPool;
2224
import org.elasticsearch.transport.TransportService;
2325
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
@@ -60,17 +62,23 @@ protected ExecuteEnrichPolicyAction.Response read(StreamInput in) throws IOExcep
6062
@Override
6163
protected void masterOperation(Task task, ExecuteEnrichPolicyAction.Request request, ClusterState state,
6264
ActionListener<ExecuteEnrichPolicyAction.Response> listener) {
63-
executor.runPolicy(request, new ActionListener<>() {
64-
@Override
65-
public void onResponse(ExecuteEnrichPolicyStatus executionStatus) {
66-
listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus));
67-
}
65+
if (request.isWaitForCompletion()) {
66+
executor.runPolicy(request, new ActionListener<>() {
67+
@Override
68+
public void onResponse(ExecuteEnrichPolicyStatus executionStatus) {
69+
listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus));
70+
}
6871

69-
@Override
70-
public void onFailure(Exception e) {
71-
listener.onFailure(e);
72-
}
73-
});
72+
@Override
73+
public void onFailure(Exception e) {
74+
listener.onFailure(e);
75+
}
76+
});
77+
} else {
78+
Task executeTask = executor.runPolicy(request, LoggingTaskListener.instance());
79+
TaskId taskId = new TaskId(clusterService.localNode().getId(), executeTask.getId());
80+
listener.onResponse(new ExecuteEnrichPolicyAction.Response(taskId));
81+
}
7482
}
7583

7684
@Override

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public String getName() {
2929
@Override
3030
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
3131
final ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(restRequest.param("name"));
32+
request.setWaitForCompletion(restRequest.paramAsBoolean("wait_for_completion", true));
3233
return channel -> client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel));
3334
}
3435
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
*/
66
package org.elasticsearch.xpack.enrich;
77

8+
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction;
9+
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
10+
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
811
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
912
import org.elasticsearch.action.bulk.BulkItemResponse;
1013
import org.elasticsearch.action.bulk.BulkRequest;
@@ -23,6 +26,7 @@
2326
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
2427
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
2528
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
29+
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
2630
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
2731

2832
import java.util.Collection;
@@ -37,7 +41,9 @@
3741
import static org.hamcrest.Matchers.equalTo;
3842
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3943
import static org.hamcrest.Matchers.is;
44+
import static org.hamcrest.Matchers.not;
4045
import static org.hamcrest.Matchers.notNullValue;
46+
import static org.hamcrest.Matchers.nullValue;
4147

4248
public class BasicEnrichTests extends ESSingleNodeTestCase {
4349

@@ -210,6 +216,60 @@ public void testMultiplePolicies() {
210216
}
211217
}
212218

219+
public void testAsyncTaskExecute() throws Exception {
220+
String policyName = "async-policy";
221+
String sourceIndexName = "async-policy-source";
222+
223+
{
224+
IndexRequest indexRequest = new IndexRequest(sourceIndexName);
225+
indexRequest.source("key", "key", "value", "val1");
226+
client().index(indexRequest).actionGet();
227+
client().admin().indices().refresh(new RefreshRequest(sourceIndexName)).actionGet();
228+
}
229+
230+
EnrichPolicy enrichPolicy =
231+
new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexName), "key", List.of("value"));
232+
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
233+
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
234+
ExecuteEnrichPolicyAction.Response executeResponse = client()
235+
.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName).setWaitForCompletion(false))
236+
.actionGet();
237+
238+
assertThat(executeResponse.getStatus(), is(nullValue()));
239+
assertThat(executeResponse.getTaskId(), is(not(nullValue())));
240+
GetTaskRequest getPolicyTaskRequest = new GetTaskRequest().setTaskId(executeResponse.getTaskId()).setWaitForCompletion(true);
241+
assertBusy(() -> {
242+
GetTaskResponse taskResponse = client().execute(GetTaskAction.INSTANCE, getPolicyTaskRequest).actionGet();
243+
assertThat(((ExecuteEnrichPolicyStatus) taskResponse.getTask().getTask().getStatus()).getPhase(),
244+
is(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE));
245+
});
246+
247+
String pipelineName = "test-pipeline";
248+
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
249+
"\", \"field\": \"key\", \"target_field\": \"target\"}}]}";
250+
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
251+
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();
252+
253+
BulkRequest bulkRequest = new BulkRequest("my-index");
254+
int numTestDocs = randomIntBetween(3, 10);
255+
for (int i = 0; i < numTestDocs; i++) {
256+
IndexRequest indexRequest = new IndexRequest("my-index");
257+
indexRequest.id(Integer.toString(i));
258+
indexRequest.setPipeline(pipelineName);
259+
indexRequest.source(Map.of("key", "key"));
260+
bulkRequest.add(indexRequest);
261+
}
262+
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
263+
assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false));
264+
265+
for (int i = 0; i < numTestDocs; i++) {
266+
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
267+
Map<String, Object> source = getResponse.getSourceAsMap();
268+
assertThat(source.size(), equalTo(2));
269+
assertThat(source.get("target"), equalTo(List.of(Map.of("key", "key", "value", "val1"))));
270+
}
271+
}
272+
213273
private List<String> createSourceMatchIndex(int numKeys, int numDocsPerKey) {
214274
Set<String> keys = new HashSet<>();
215275
for (int id = 0; id < numKeys; id++) {

x-pack/plugin/src/test/resources/rest-api-spec/api/enrich.execute_policy.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@
1515
}
1616
}
1717
]
18+
},
19+
"params":{
20+
"wait_for_completion":{
21+
"type":"boolean",
22+
"default":true,
23+
"description":"Should the request should block until the execution is complete."
24+
}
1825
}
1926
}
2027
}

0 commit comments

Comments
 (0)