Skip to content

Commit 7cc73f6

Browse files
committed
Add HLRC support for enrich execute policy API (#47991)
This PR also includes HLRC docs for the enrich stats api. Relates to #32789
1 parent d4901a7 commit 7cc73f6

File tree

10 files changed

+379
-1
lines changed

10 files changed

+379
-1
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/EnrichClient.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.elasticsearch.action.ActionListener;
2222
import org.elasticsearch.client.core.AcknowledgedResponse;
2323
import org.elasticsearch.client.enrich.DeletePolicyRequest;
24+
import org.elasticsearch.client.enrich.ExecutePolicyRequest;
25+
import org.elasticsearch.client.enrich.ExecutePolicyResponse;
2426
import org.elasticsearch.client.enrich.GetPolicyRequest;
2527
import org.elasticsearch.client.enrich.GetPolicyResponse;
2628
import org.elasticsearch.client.enrich.PutPolicyRequest;
@@ -224,4 +226,49 @@ public Cancellable statsAsync(StatsRequest request,
224226
Collections.emptySet()
225227
);
226228
}
229+
230+
/**
231+
* Executes the execute policy api, which executes an enrich policy.
232+
*
233+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-policy-apis.html#execute-policy">
234+
* the docs</a> for more.
235+
*
236+
* @param request the {@link ExecutePolicyRequest}
237+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
238+
* @return the response
239+
* @throws IOException in case there is a problem sending the request or parsing back the response
240+
*/
241+
public ExecutePolicyResponse executePolicy(ExecutePolicyRequest request, RequestOptions options) throws IOException {
242+
return restHighLevelClient.performRequestAndParseEntity(
243+
request,
244+
EnrichRequestConverters::executePolicy,
245+
options,
246+
ExecutePolicyResponse::fromXContent,
247+
Collections.emptySet()
248+
);
249+
}
250+
251+
/**
252+
* Asynchronously executes the execute policy api, which executes an enrich policy.
253+
*
254+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-policy-apis.html#execute-policy">
255+
* the docs</a> for more.
256+
*
257+
* @param request the {@link ExecutePolicyRequest}
258+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
259+
* @param listener the listener to be notified upon request completion
260+
* @return cancellable that may be used to cancel the request
261+
*/
262+
public Cancellable executePolicyAsync(ExecutePolicyRequest request,
263+
RequestOptions options,
264+
ActionListener<ExecutePolicyResponse> listener) {
265+
return restHighLevelClient.performRequestAsyncAndParseEntity(
266+
request,
267+
EnrichRequestConverters::executePolicy,
268+
options,
269+
ExecutePolicyResponse::fromXContent,
270+
listener,
271+
Collections.emptySet()
272+
);
273+
}
227274
}

client/rest-high-level/src/main/java/org/elasticsearch/client/EnrichRequestConverters.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020

2121
import org.apache.http.client.methods.HttpDelete;
2222
import org.apache.http.client.methods.HttpGet;
23+
import org.apache.http.client.methods.HttpPost;
2324
import org.apache.http.client.methods.HttpPut;
2425
import org.elasticsearch.client.enrich.DeletePolicyRequest;
26+
import org.elasticsearch.client.enrich.ExecutePolicyRequest;
2527
import org.elasticsearch.client.enrich.GetPolicyRequest;
2628
import org.elasticsearch.client.enrich.PutPolicyRequest;
2729
import org.elasticsearch.client.enrich.StatsRequest;
@@ -66,4 +68,17 @@ static Request stats(StatsRequest statsRequest) {
6668
return new Request(HttpGet.METHOD_NAME, endpoint);
6769
}
6870

71+
static Request executePolicy(ExecutePolicyRequest executePolicyRequest) {
72+
String endpoint = new RequestConverters.EndpointBuilder()
73+
.addPathPartAsIs("_enrich", "policy")
74+
.addPathPart(executePolicyRequest.getName())
75+
.addPathPartAsIs("_execute")
76+
.build();
77+
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
78+
if (executePolicyRequest.getWaitForCompletion() != null) {
79+
request.addParameter("wait_for_completion", executePolicyRequest.getWaitForCompletion().toString());
80+
}
81+
return request;
82+
}
83+
6984
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.enrich;
20+
21+
import org.elasticsearch.client.Validatable;
22+
23+
public final class ExecutePolicyRequest implements Validatable {
24+
25+
private final String name;
26+
private Boolean waitForCompletion;
27+
28+
public ExecutePolicyRequest(String name) {
29+
this.name = name;
30+
}
31+
32+
public String getName() {
33+
return name;
34+
}
35+
36+
public Boolean getWaitForCompletion() {
37+
return waitForCompletion;
38+
}
39+
40+
public void setWaitForCompletion(boolean waitForCompletion) {
41+
this.waitForCompletion = waitForCompletion;
42+
}
43+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.enrich;
20+
21+
import org.elasticsearch.common.ParseField;
22+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
23+
import org.elasticsearch.common.xcontent.XContentParser;
24+
25+
public final class ExecutePolicyResponse {
26+
27+
private static final ParseField TASK_FIELD = new ParseField("task");
28+
private static final ParseField STATUS_FIELD = new ParseField("status");
29+
30+
private static final ConstructingObjectParser<ExecutePolicyResponse, Void> PARSER = new ConstructingObjectParser<>(
31+
"execute_policy_response",
32+
true,
33+
args -> new ExecutePolicyResponse((String) args[0], (ExecutionStatus) args[1])
34+
);
35+
36+
static {
37+
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), TASK_FIELD);
38+
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ExecutionStatus.PARSER, STATUS_FIELD);
39+
}
40+
41+
public static ExecutePolicyResponse fromXContent(XContentParser parser) {
42+
return PARSER.apply(parser, null);
43+
}
44+
45+
private final String taskId;
46+
private final ExecutionStatus executionStatus;
47+
48+
ExecutePolicyResponse(String taskId, ExecutionStatus executionStatus) {
49+
this.taskId = taskId;
50+
this.executionStatus = executionStatus;
51+
}
52+
53+
public String getTaskId() {
54+
return taskId;
55+
}
56+
57+
public ExecutionStatus getExecutionStatus() {
58+
return executionStatus;
59+
}
60+
61+
public static final class ExecutionStatus {
62+
63+
private static final ParseField PHASE_FIELD = new ParseField("phase");
64+
65+
private static final ConstructingObjectParser<ExecutionStatus, Void> PARSER = new ConstructingObjectParser<>(
66+
"execution_status",
67+
true,
68+
args -> new ExecutionStatus((String) args[0])
69+
);
70+
71+
static {
72+
PARSER.declareString(ConstructingObjectParser.constructorArg(), PHASE_FIELD);
73+
}
74+
75+
private final String phase;
76+
77+
ExecutionStatus(String phase) {
78+
this.phase = phase;
79+
}
80+
81+
public String getPhase() {
82+
return phase;
83+
}
84+
}
85+
}

client/rest-high-level/src/test/java/org/elasticsearch/client/EnrichIT.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020

2121
import org.elasticsearch.client.core.AcknowledgedResponse;
2222
import org.elasticsearch.client.enrich.DeletePolicyRequest;
23+
import org.elasticsearch.client.enrich.ExecutePolicyRequest;
24+
import org.elasticsearch.client.enrich.ExecutePolicyResponse;
2325
import org.elasticsearch.client.enrich.GetPolicyRequest;
2426
import org.elasticsearch.client.enrich.GetPolicyResponse;
2527
import org.elasticsearch.client.enrich.PutPolicyRequest;
2628
import org.elasticsearch.client.enrich.StatsRequest;
2729
import org.elasticsearch.client.enrich.StatsResponse;
30+
import org.elasticsearch.client.indices.CreateIndexRequest;
2831

2932
import java.util.Collections;
3033

@@ -36,6 +39,11 @@
3639
public class EnrichIT extends ESRestHighLevelClientTestCase {
3740

3841
public void testCRUD() throws Exception {
42+
CreateIndexRequest createIndexRequest = new CreateIndexRequest("my-index")
43+
.mapping(Collections.singletonMap("properties", Collections.singletonMap("enrich_key",
44+
Collections.singletonMap("type", "keyword"))));
45+
highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);
46+
3947
final EnrichClient enrichClient = highLevelClient().enrich();
4048
PutPolicyRequest putPolicyRequest = new PutPolicyRequest("my-policy", "match",
4149
Collections.singletonList("my-index"), "enrich_key", Collections.singletonList("enrich_value"));
@@ -60,6 +68,11 @@ public void testCRUD() throws Exception {
6068
assertThat(statsResponse.getCoordinatorStats().get(0).getRemoteRequestsTotal(), greaterThanOrEqualTo(0L));
6169
assertThat(statsResponse.getCoordinatorStats().get(0).getExecutedSearchesTotal(), greaterThanOrEqualTo(0L));
6270

71+
ExecutePolicyRequest executePolicyRequest = new ExecutePolicyRequest("my-policy");
72+
ExecutePolicyResponse executePolicyResponse =
73+
execute(executePolicyRequest, enrichClient::executePolicy, enrichClient::executePolicyAsync);
74+
assertThat(executePolicyResponse.getExecutionStatus().getPhase(), equalTo("COMPLETE"));
75+
6376
DeletePolicyRequest deletePolicyRequest = new DeletePolicyRequest("my-policy");
6477
AcknowledgedResponse deletePolicyResponse =
6578
execute(deletePolicyRequest, enrichClient::deletePolicy, enrichClient::deletePolicyAsync);

client/rest-high-level/src/test/java/org/elasticsearch/client/EnrichRequestConvertersTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020

2121
import org.apache.http.client.methods.HttpDelete;
2222
import org.apache.http.client.methods.HttpGet;
23+
import org.apache.http.client.methods.HttpPost;
2324
import org.apache.http.client.methods.HttpPut;
2425
import org.elasticsearch.client.enrich.DeletePolicyRequest;
26+
import org.elasticsearch.client.enrich.ExecutePolicyRequest;
2527
import org.elasticsearch.client.enrich.GetPolicyRequest;
2628
import org.elasticsearch.client.enrich.PutPolicyRequest;
2729
import org.elasticsearch.client.enrich.PutPolicyRequestTests;
@@ -89,4 +91,24 @@ public void testStats() {
8991
assertThat(result.getEntity(), nullValue());
9092
}
9193

94+
public void testExecutePolicy() {
95+
ExecutePolicyRequest request = new ExecutePolicyRequest(randomAlphaOfLength(4));
96+
Request result = EnrichRequestConverters.executePolicy(request);
97+
98+
assertThat(result.getMethod(), equalTo(HttpPost.METHOD_NAME));
99+
assertThat(result.getEndpoint(), equalTo("/_enrich/policy/" + request.getName() + "/_execute"));
100+
assertThat(result.getParameters().size(), equalTo(0));
101+
assertThat(result.getEntity(), nullValue());
102+
103+
request = new ExecutePolicyRequest(randomAlphaOfLength(4));
104+
request.setWaitForCompletion(randomBoolean());
105+
result = EnrichRequestConverters.executePolicy(request);
106+
107+
assertThat(result.getMethod(), equalTo(HttpPost.METHOD_NAME));
108+
assertThat(result.getEndpoint(), equalTo("/_enrich/policy/" + request.getName() + "/_execute"));
109+
assertThat(result.getParameters().size(), equalTo(1));
110+
assertThat(result.getParameters().get("wait_for_completion"), equalTo(request.getWaitForCompletion().toString()));
111+
assertThat(result.getEntity(), nullValue());
112+
}
113+
92114
}

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/EnrichDocumentationIT.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.elasticsearch.client.RestHighLevelClient;
2626
import org.elasticsearch.client.core.AcknowledgedResponse;
2727
import org.elasticsearch.client.enrich.DeletePolicyRequest;
28+
import org.elasticsearch.client.enrich.ExecutePolicyRequest;
29+
import org.elasticsearch.client.enrich.ExecutePolicyResponse;
2830
import org.elasticsearch.client.enrich.NamedPolicy;
2931
import org.elasticsearch.client.enrich.GetPolicyRequest;
3032
import org.elasticsearch.client.enrich.GetPolicyResponse;
@@ -33,6 +35,7 @@
3335
import org.elasticsearch.client.enrich.StatsResponse;
3436
import org.elasticsearch.client.enrich.StatsResponse.CoordinatorStats;
3537
import org.elasticsearch.client.enrich.StatsResponse.ExecutingPolicy;
38+
import org.elasticsearch.client.indices.CreateIndexRequest;
3639
import org.junit.After;
3740

3841
import java.util.Arrays;
@@ -156,7 +159,7 @@ public void testGetPolicy() throws Exception {
156159
RestHighLevelClient client = highLevelClient();
157160

158161
PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
159-
"users-policy", "exact_match", Collections.singletonList("users"),
162+
"users-policy", "match", Collections.singletonList("users"),
160163
"email", Arrays.asList("address", "zip", "city", "state"));
161164
client.enrich().putPolicy(putPolicyRequest, RequestOptions.DEFAULT);
162165

@@ -251,4 +254,61 @@ public void onFailure(Exception e) {
251254
assertTrue(latch.await(30L, TimeUnit.SECONDS));
252255
}
253256

257+
public void testExecutePolicy() throws Exception {
258+
RestHighLevelClient client = highLevelClient();
259+
260+
{
261+
CreateIndexRequest createIndexRequest = new CreateIndexRequest("users")
262+
.mapping(Collections.singletonMap("properties", Collections.singletonMap("email",
263+
Collections.singletonMap("type", "keyword"))));
264+
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
265+
PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
266+
"users-policy", "match", Collections.singletonList("users"),
267+
"email", Arrays.asList("address", "zip", "city", "state"));
268+
client.enrich().putPolicy(putPolicyRequest, RequestOptions.DEFAULT);
269+
}
270+
271+
// tag::enrich-execute-policy-request
272+
ExecutePolicyRequest request =
273+
new ExecutePolicyRequest("users-policy");
274+
// end::enrich-execute-policy-request
275+
276+
// tag::enrich-execute-policy-execute
277+
ExecutePolicyResponse response =
278+
client.enrich().executePolicy(request, RequestOptions.DEFAULT);
279+
// end::enrich-execute-policy-execute
280+
281+
// tag::enrich-execute-policy-response
282+
ExecutePolicyResponse.ExecutionStatus status =
283+
response.getExecutionStatus();
284+
// end::enrich-execute-policy-response
285+
286+
// tag::enrich-execute-policy-execute-listener
287+
ActionListener<ExecutePolicyResponse> listener =
288+
new ActionListener<ExecutePolicyResponse>() {
289+
@Override
290+
public void onResponse(ExecutePolicyResponse response) { // <1>
291+
ExecutePolicyResponse.ExecutionStatus status =
292+
response.getExecutionStatus();
293+
}
294+
295+
@Override
296+
public void onFailure(Exception e) {
297+
// <2>
298+
}
299+
};
300+
// end::enrich-execute-policy-execute-listener
301+
302+
// Replace the empty listener by a blocking listener in test
303+
final CountDownLatch latch = new CountDownLatch(1);
304+
listener = new LatchedActionListener<>(listener, latch);
305+
306+
// tag::enrich-execute-policy-execute-async
307+
client.enrich().executePolicyAsync(request, RequestOptions.DEFAULT,
308+
listener); // <1>
309+
// end::enrich-execute-policy-execute-async
310+
311+
assertTrue(latch.await(30L, TimeUnit.SECONDS));
312+
}
313+
254314
}

0 commit comments

Comments
 (0)