Skip to content

Commit c999c09

Browse files
authored
Add enrich policy list API (#41553)
This commit wires up the Rest calls and Transport calls for listing all enrich policies, as well as tests and rest spec additions.
1 parent 3c7f463 commit c999c09

File tree

12 files changed

+413
-10
lines changed

12 files changed

+413
-10
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java

Lines changed: 91 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.common.io.stream.StreamOutput;
1313
import org.elasticsearch.common.io.stream.Writeable;
1414
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
15+
import org.elasticsearch.common.xcontent.ToXContent;
1516
import org.elasticsearch.common.xcontent.ToXContentFragment;
1617
import org.elasticsearch.common.xcontent.XContentBuilder;
1718
import org.elasticsearch.common.xcontent.XContentHelper;
@@ -53,16 +54,20 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
5354
);
5455

5556
static {
56-
PARSER.declareString(ConstructingObjectParser.constructorArg(), TYPE);
57-
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> {
57+
declareParserOptions(PARSER);
58+
}
59+
60+
private static void declareParserOptions(ConstructingObjectParser parser) {
61+
parser.declareString(ConstructingObjectParser.constructorArg(), TYPE);
62+
parser.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> {
5863
XContentBuilder contentBuilder = XContentBuilder.builder(p.contentType().xContent());
5964
contentBuilder.generator().copyCurrentStructure(p);
6065
return new QuerySource(BytesReference.bytes(contentBuilder), contentBuilder.contentType());
6166
}, QUERY);
62-
PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_PATTERN);
63-
PARSER.declareString(ConstructingObjectParser.constructorArg(), ENRICH_KEY);
64-
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), ENRICH_VALUES);
65-
PARSER.declareString(ConstructingObjectParser.constructorArg(), SCHEDULE);
67+
parser.declareString(ConstructingObjectParser.constructorArg(), INDEX_PATTERN);
68+
parser.declareString(ConstructingObjectParser.constructorArg(), ENRICH_KEY);
69+
parser.declareStringArray(ConstructingObjectParser.constructorArg(), ENRICH_VALUES);
70+
parser.declareString(ConstructingObjectParser.constructorArg(), SCHEDULE);
6671
}
6772

6873
public static EnrichPolicy fromXContent(XContentParser parser) throws IOException {
@@ -228,4 +233,84 @@ public int hashCode() {
228233
return Objects.hash(query, contentType);
229234
}
230235
}
236+
237+
public static class NamedPolicy implements Writeable, ToXContent {
238+
239+
static final ParseField NAME = new ParseField("name");
240+
@SuppressWarnings("unchecked")
241+
static final ConstructingObjectParser<NamedPolicy, Void> PARSER = new ConstructingObjectParser<>("named_policy",
242+
args -> {
243+
return new NamedPolicy(
244+
(String) args[0],
245+
new EnrichPolicy((String) args[1],
246+
(QuerySource) args[2],
247+
(String) args[3],
248+
(String) args[4],
249+
(List<String>) args[5],
250+
(String) args[6])
251+
);
252+
}
253+
);
254+
255+
static {
256+
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME);
257+
declareParserOptions(PARSER);
258+
}
259+
260+
private final String name;
261+
private final EnrichPolicy policy;
262+
263+
public NamedPolicy(String name, EnrichPolicy policy) {
264+
this.name = name;
265+
this.policy = policy;
266+
}
267+
268+
public NamedPolicy(StreamInput in) throws IOException {
269+
name = in.readString();
270+
policy = new EnrichPolicy(in);
271+
}
272+
273+
public String getName() {
274+
return name;
275+
}
276+
277+
public EnrichPolicy getPolicy() {
278+
return policy;
279+
}
280+
281+
@Override
282+
public void writeTo(StreamOutput out) throws IOException {
283+
out.writeString(name);
284+
policy.writeTo(out);
285+
}
286+
287+
@Override
288+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
289+
builder.startObject();
290+
{
291+
builder.field(NAME.getPreferredName(), name);
292+
policy.toXContent(builder, params);
293+
}
294+
builder.endObject();
295+
return builder;
296+
}
297+
298+
public static NamedPolicy fromXContent(XContentParser parser) throws IOException {
299+
return PARSER.parse(parser, null);
300+
}
301+
302+
@Override
303+
public boolean equals(Object o) {
304+
if (this == o) return true;
305+
if (o == null || getClass() != o.getClass()) return false;
306+
NamedPolicy that = (NamedPolicy) o;
307+
return name.equals(that.name) &&
308+
policy.equals(that.policy);
309+
}
310+
311+
@Override
312+
public int hashCode() {
313+
return Objects.hash(name, policy);
314+
}
315+
}
231316
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.enrich.action;
7+
8+
import org.elasticsearch.action.Action;
9+
import org.elasticsearch.action.ActionRequestValidationException;
10+
import org.elasticsearch.action.ActionResponse;
11+
import org.elasticsearch.action.support.master.MasterNodeRequest;
12+
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.common.xcontent.ToXContentObject;
15+
import org.elasticsearch.common.xcontent.XContentBuilder;
16+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
17+
18+
import java.io.IOException;
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.Objects;
22+
import java.util.TreeMap;
23+
import java.util.stream.Collectors;
24+
25+
public class ListEnrichPolicyAction extends Action<ListEnrichPolicyAction.Response> {
26+
27+
public static final ListEnrichPolicyAction INSTANCE = new ListEnrichPolicyAction();
28+
public static final String NAME = "cluster:admin/xpack/enrich/list";
29+
30+
private ListEnrichPolicyAction() {
31+
super(NAME);
32+
}
33+
34+
@Override
35+
public Response newResponse() {
36+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
37+
}
38+
public static class Request extends MasterNodeRequest<ListEnrichPolicyAction.Request> {
39+
40+
public Request() {}
41+
42+
public Request(StreamInput in) throws IOException {
43+
super(in);
44+
}
45+
46+
@Override
47+
public ActionRequestValidationException validate() {
48+
return null;
49+
}
50+
}
51+
52+
public static class Response extends ActionResponse implements ToXContentObject {
53+
54+
private final List<EnrichPolicy.NamedPolicy> policies;
55+
56+
public Response(Map<String, EnrichPolicy> policies) {
57+
Objects.requireNonNull(policies, "policies cannot be null");
58+
// use a treemap to guarantee ordering in the set, then transform it to the list of named policies
59+
this.policies = new TreeMap<>(policies).entrySet().stream()
60+
.map(entry -> new EnrichPolicy.NamedPolicy(entry.getKey(), entry.getValue())).collect(Collectors.toList());
61+
}
62+
63+
public Response(StreamInput in) throws IOException {
64+
policies = in.readList(EnrichPolicy.NamedPolicy::new);
65+
}
66+
67+
@Override
68+
public void writeTo(StreamOutput out) throws IOException {
69+
out.writeList(policies);
70+
}
71+
72+
@Override
73+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
74+
builder.startObject();
75+
{
76+
builder.startArray("policies");
77+
{
78+
for (EnrichPolicy.NamedPolicy policy: policies) {
79+
policy.toXContent(builder, params);
80+
}
81+
}
82+
builder.endArray();
83+
}
84+
builder.endObject();
85+
86+
return builder;
87+
}
88+
89+
public List<EnrichPolicy.NamedPolicy> getPolicies() {
90+
return policies;
91+
}
92+
93+
@Override
94+
public boolean equals(Object o) {
95+
if (this == o) return true;
96+
if (o == null || getClass() != o.getClass()) return false;
97+
Response response = (Response) o;
98+
return policies.equals(response.policies);
99+
}
100+
101+
@Override
102+
public int hashCode() {
103+
return Objects.hash(policies);
104+
}
105+
}
106+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/PutEnrichPolicyAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class PutEnrichPolicyAction extends Action<AcknowledgedResponse> {
2222
public static final PutEnrichPolicyAction INSTANCE = new PutEnrichPolicyAction();
2323
public static final String NAME = "cluster:admin/xpack/enrich/put";
2424

25-
protected PutEnrichPolicyAction() {
25+
private PutEnrichPolicyAction() {
2626
super(NAME);
2727
}
2828

x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,14 @@
1010
enrich_key: baz
1111
enrich_values: ["a", "b"]
1212
schedule: "*/120"
13-
1413
- is_true: acknowledged
14+
15+
- do:
16+
enrich.list_policy: {}
17+
- length: { policies: 1 }
18+
- match: { policies.0.name: policy-crud }
19+
- match: { policies.0.type: exact_match }
20+
- match: { policies.0.index_pattern: "bar*" }
21+
- match: { policies.0.enrich_key: baz }
22+
- match: { policies.0.enrich_values: ["a", "b"] }
23+
- match: { policies.0.schedule: "*/120" }

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@
2525
import org.elasticsearch.plugins.Plugin;
2626
import org.elasticsearch.rest.RestController;
2727
import org.elasticsearch.rest.RestHandler;
28+
import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction;
2829
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
30+
import org.elasticsearch.xpack.enrich.action.TransportListEnrichPolicyAction;
2931
import org.elasticsearch.xpack.enrich.action.TransportPutEnrichPolicyAction;
32+
import org.elasticsearch.xpack.enrich.rest.RestListEnrichPolicyAction;
3033
import org.elasticsearch.xpack.enrich.rest.RestPutEnrichPolicyAction;
3134

3235
import java.util.Arrays;
@@ -67,6 +70,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
6770
}
6871

6972
return Arrays.asList(
73+
new ActionHandler<>(ListEnrichPolicyAction.INSTANCE, TransportListEnrichPolicyAction.class),
7074
new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class)
7175
);
7276
}
@@ -80,6 +84,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
8084
}
8185

8286
return Arrays.asList(
87+
new RestListEnrichPolicyAction(settings, restController),
8388
new RestPutEnrichPolicyAction(settings, restController)
8489
);
8590
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichStore.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,13 @@ public static EnrichPolicy getPolicy(String name, ClusterState state) {
8585
return getPolicies(state).get(name);
8686
}
8787

88-
private static Map<String, EnrichPolicy> getPolicies(ClusterState state) {
88+
/**
89+
* Gets all policies in the cluster.
90+
*
91+
* @param state the cluster state
92+
* @return a Map of <code>policyName, EnrichPolicy</code> of the policies
93+
*/
94+
public static Map<String, EnrichPolicy> getPolicies(ClusterState state) {
8995
final Map<String, EnrichPolicy> policies;
9096
final EnrichMetadata enrichMetadata = state.metaData().custom(EnrichMetadata.TYPE);
9197
if (enrichMetadata != null) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.enrich.action;
7+
8+
import org.elasticsearch.action.ActionListener;
9+
import org.elasticsearch.action.support.ActionFilters;
10+
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
11+
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.block.ClusterBlockException;
13+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
14+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
15+
import org.elasticsearch.cluster.service.ClusterService;
16+
import org.elasticsearch.common.inject.Inject;
17+
import org.elasticsearch.threadpool.ThreadPool;
18+
import org.elasticsearch.transport.TransportService;
19+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
20+
import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction;
21+
import org.elasticsearch.xpack.enrich.EnrichStore;
22+
23+
import java.util.Map;
24+
25+
public class TransportListEnrichPolicyAction
26+
extends TransportMasterNodeAction<ListEnrichPolicyAction.Request, ListEnrichPolicyAction.Response> {
27+
28+
@Inject
29+
public TransportListEnrichPolicyAction(TransportService transportService,
30+
ClusterService clusterService,
31+
ThreadPool threadPool,
32+
ActionFilters actionFilters,
33+
IndexNameExpressionResolver indexNameExpressionResolver) {
34+
super(ListEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
35+
ListEnrichPolicyAction.Request::new, indexNameExpressionResolver);
36+
}
37+
38+
@Override
39+
protected String executor() {
40+
return ThreadPool.Names.SAME;
41+
}
42+
43+
@Override
44+
protected ListEnrichPolicyAction.Response newResponse() {
45+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
46+
}
47+
48+
@Override
49+
protected void masterOperation(ListEnrichPolicyAction.Request request, ClusterState state,
50+
ActionListener<ListEnrichPolicyAction.Response> listener) throws Exception {
51+
Map<String, EnrichPolicy> policies = EnrichStore.getPolicies(clusterService.state());
52+
listener.onResponse(new ListEnrichPolicyAction.Response(policies));
53+
}
54+
55+
@Override
56+
protected ClusterBlockException checkBlock(ListEnrichPolicyAction.Request request, ClusterState state) {
57+
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
58+
}
59+
60+
61+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.enrich.rest;
7+
8+
import org.elasticsearch.client.node.NodeClient;
9+
import org.elasticsearch.common.settings.Settings;
10+
import org.elasticsearch.rest.BaseRestHandler;
11+
import org.elasticsearch.rest.RestController;
12+
import org.elasticsearch.rest.RestRequest;
13+
import org.elasticsearch.rest.action.RestToXContentListener;
14+
import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction;
15+
16+
import java.io.IOException;
17+
18+
public class RestListEnrichPolicyAction extends BaseRestHandler {
19+
20+
public RestListEnrichPolicyAction(final Settings settings, final RestController controller) {
21+
super(settings);
22+
controller.registerHandler(RestRequest.Method.GET, "/_enrich/policy", this);
23+
}
24+
25+
@Override
26+
public String getName() {
27+
return "list_enrich_policy";
28+
}
29+
30+
@Override
31+
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
32+
final ListEnrichPolicyAction.Request request = new ListEnrichPolicyAction.Request();
33+
return channel -> client.execute(ListEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel));
34+
}
35+
}

0 commit comments

Comments
 (0)