Skip to content

Commit 11d4a65

Browse files
committed
This commits merges (#48039) the enrich feature branch,
which adds a new ingest processor, named enrich processor, that allows document being ingested to be enriched with data from other indices. Besides a new enrich processor, this PR adds several APIs to manage an enrich policy. An enrich policy is in charge of making the data from other indices available to the enrich processor in an efficient manner. Closes #32789
2 parents f183ac1 + d941e1b commit 11d4a65

File tree

134 files changed

+14092
-10
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

134 files changed

+14092
-10
lines changed
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client;
20+
21+
import org.elasticsearch.action.ActionListener;
22+
import org.elasticsearch.client.core.AcknowledgedResponse;
23+
import org.elasticsearch.client.enrich.DeletePolicyRequest;
24+
import org.elasticsearch.client.enrich.ExecutePolicyRequest;
25+
import org.elasticsearch.client.enrich.ExecutePolicyResponse;
26+
import org.elasticsearch.client.enrich.GetPolicyRequest;
27+
import org.elasticsearch.client.enrich.GetPolicyResponse;
28+
import org.elasticsearch.client.enrich.PutPolicyRequest;
29+
import org.elasticsearch.client.enrich.StatsRequest;
30+
import org.elasticsearch.client.enrich.StatsResponse;
31+
32+
import java.io.IOException;
33+
import java.util.Collections;
34+
35+
/**
36+
* A wrapper for the {@link RestHighLevelClient} that provides methods for
37+
* accessing the Elastic enrich related methods
38+
* <p>
39+
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-processor.html#enrich-policy-apis">
40+
* X-Pack Enrich Policy APIs on elastic.co</a> for more information.
41+
*/
42+
public final class EnrichClient {
43+
44+
private final RestHighLevelClient restHighLevelClient;
45+
46+
EnrichClient(RestHighLevelClient restHighLevelClient) {
47+
this.restHighLevelClient = restHighLevelClient;
48+
}
49+
50+
/**
51+
* Executes the put policy api, which stores an enrich policy.
52+
*
53+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-policy-apis.html#put-policy-api">
54+
* the docs</a> for more.
55+
*
56+
* @param request the {@link PutPolicyRequest}
57+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
58+
* @return the response
59+
* @throws IOException in case there is a problem sending the request or parsing back the response
60+
*/
61+
public AcknowledgedResponse putPolicy(PutPolicyRequest request, RequestOptions options) throws IOException {
62+
return restHighLevelClient.performRequestAndParseEntity(
63+
request,
64+
EnrichRequestConverters::putPolicy,
65+
options,
66+
AcknowledgedResponse::fromXContent,
67+
Collections.emptySet()
68+
);
69+
}
70+
71+
/**
72+
* Asynchronously executes the put policy api, which stores an enrich policy.
73+
*
74+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/enrich-policy-apis.html#put-policy-api">
75+
* the docs</a> for more.
76+
*
77+
* @param request the {@link PutPolicyRequest}
78+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
79+
* @param listener the listener to be notified upon request completion
80+
* @return cancellable that may be used to cancel the request
81+
*/
82+
public Cancellable putPolicyAsync(PutPolicyRequest request,
83+
RequestOptions options,
84+
ActionListener<AcknowledgedResponse> listener) {
85+
return restHighLevelClient.performRequestAsyncAndParseEntity(
86+
request,
87+
EnrichRequestConverters::putPolicy,
88+
options,
89+
AcknowledgedResponse::fromXContent,
90+
listener,
91+
Collections.emptySet()
92+
);
93+
}
94+
95+
/**
96+
* Executes the delete policy api, which deletes an enrich policy.
97+
*
98+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-policy-apis.html#delete-policy-api">
99+
* the docs</a> for more.
100+
*
101+
* @param request the {@link DeletePolicyRequest}
102+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
103+
* @return the response
104+
* @throws IOException in case there is a problem sending the request or parsing back the response
105+
*/
106+
public AcknowledgedResponse deletePolicy(DeletePolicyRequest request, RequestOptions options) throws IOException {
107+
return restHighLevelClient.performRequestAndParseEntity(
108+
request,
109+
EnrichRequestConverters::deletePolicy,
110+
options,
111+
AcknowledgedResponse::fromXContent,
112+
Collections.emptySet()
113+
);
114+
}
115+
116+
/**
117+
* Asynchronously executes the delete policy api, which deletes an enrich policy.
118+
*
119+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-policy-apis.html#delete-policy-api">
120+
* the docs</a> for more.
121+
*
122+
* @param request the {@link DeletePolicyRequest}
123+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
124+
* @param listener the listener to be notified upon request completion
125+
* @return cancellable that may be used to cancel the request
126+
*/
127+
public Cancellable deletePolicyAsync(DeletePolicyRequest request,
128+
RequestOptions options,
129+
ActionListener<AcknowledgedResponse> listener) {
130+
return restHighLevelClient.performRequestAsyncAndParseEntity(
131+
request,
132+
EnrichRequestConverters::deletePolicy,
133+
options,
134+
AcknowledgedResponse::fromXContent,
135+
listener,
136+
Collections.emptySet()
137+
);
138+
}
139+
140+
/**
141+
* Executes the get policy api, which retrieves an enrich policy.
142+
*
143+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-policy-apis.html#get-policy-api">
144+
* the docs</a> for more.
145+
*
146+
* @param request the {@link PutPolicyRequest}
147+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
148+
* @return the response
149+
* @throws IOException in case there is a problem sending the request or parsing back the response
150+
*/
151+
public GetPolicyResponse getPolicy(GetPolicyRequest request, RequestOptions options) throws IOException {
152+
return restHighLevelClient.performRequestAndParseEntity(
153+
request,
154+
EnrichRequestConverters::getPolicy,
155+
options,
156+
GetPolicyResponse::fromXContent,
157+
Collections.emptySet()
158+
);
159+
}
160+
161+
/**
162+
* Asynchronously executes the get policy api, which retrieves an enrich policy.
163+
*
164+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-policy-apis.html#get-policy-api">
165+
* the docs</a> for more.
166+
*
167+
* @param request the {@link PutPolicyRequest}
168+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
169+
* @param listener the listener to be notified upon request completion
170+
* @return cancellable that may be used to cancel the request
171+
*/
172+
public Cancellable getPolicyAsync(GetPolicyRequest request,
173+
RequestOptions options,
174+
ActionListener<GetPolicyResponse> listener) {
175+
return restHighLevelClient.performRequestAsyncAndParseEntity(
176+
request,
177+
EnrichRequestConverters::getPolicy,
178+
options,
179+
GetPolicyResponse::fromXContent,
180+
listener,
181+
Collections.emptySet()
182+
);
183+
}
184+
185+
/**
186+
* Executes the enrich stats api, which retrieves enrich related stats.
187+
*
188+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-policy-apis.html#stats-api">
189+
* the docs</a> for more.
190+
*
191+
* @param request the {@link StatsRequest}
192+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
193+
* @return the response
194+
* @throws IOException in case there is a problem sending the request or parsing back the response
195+
*/
196+
public StatsResponse stats(StatsRequest request, RequestOptions options) throws IOException {
197+
return restHighLevelClient.performRequestAndParseEntity(
198+
request,
199+
EnrichRequestConverters::stats,
200+
options,
201+
StatsResponse::fromXContent,
202+
Collections.emptySet()
203+
);
204+
}
205+
206+
/**
207+
* Asynchronously executes the enrich stats api, which retrieves enrich related stats.
208+
*
209+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-policy-apis.html#stats-api">
210+
* the docs</a> for more.
211+
*
212+
* @param request the {@link StatsRequest}
213+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
214+
* @param listener the listener to be notified upon request completion
215+
* @return cancellable that may be used to cancel the request
216+
*/
217+
public Cancellable statsAsync(StatsRequest request,
218+
RequestOptions options,
219+
ActionListener<StatsResponse> listener) {
220+
return restHighLevelClient.performRequestAsyncAndParseEntity(
221+
request,
222+
EnrichRequestConverters::stats,
223+
options,
224+
StatsResponse::fromXContent,
225+
listener,
226+
Collections.emptySet()
227+
);
228+
}
229+
230+
/**
231+
* Executes the execute policy api, which executes an enrich policy.
232+
*
233+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-policy-apis.html#execute-policy">
234+
* the docs</a> for more.
235+
*
236+
* @param request the {@link ExecutePolicyRequest}
237+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
238+
* @return the response
239+
* @throws IOException in case there is a problem sending the request or parsing back the response
240+
*/
241+
public ExecutePolicyResponse executePolicy(ExecutePolicyRequest request, RequestOptions options) throws IOException {
242+
return restHighLevelClient.performRequestAndParseEntity(
243+
request,
244+
EnrichRequestConverters::executePolicy,
245+
options,
246+
ExecutePolicyResponse::fromXContent,
247+
Collections.emptySet()
248+
);
249+
}
250+
251+
/**
252+
* Asynchronously executes the execute policy api, which executes an enrich policy.
253+
*
254+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-policy-apis.html#execute-policy">
255+
* the docs</a> for more.
256+
*
257+
* @param request the {@link ExecutePolicyRequest}
258+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
259+
* @param listener the listener to be notified upon request completion
260+
* @return cancellable that may be used to cancel the request
261+
*/
262+
public Cancellable executePolicyAsync(ExecutePolicyRequest request,
263+
RequestOptions options,
264+
ActionListener<ExecutePolicyResponse> listener) {
265+
return restHighLevelClient.performRequestAsyncAndParseEntity(
266+
request,
267+
EnrichRequestConverters::executePolicy,
268+
options,
269+
ExecutePolicyResponse::fromXContent,
270+
listener,
271+
Collections.emptySet()
272+
);
273+
}
274+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client;
20+
21+
import org.apache.http.client.methods.HttpDelete;
22+
import org.apache.http.client.methods.HttpGet;
23+
import org.apache.http.client.methods.HttpPost;
24+
import org.apache.http.client.methods.HttpPut;
25+
import org.elasticsearch.client.enrich.DeletePolicyRequest;
26+
import org.elasticsearch.client.enrich.ExecutePolicyRequest;
27+
import org.elasticsearch.client.enrich.GetPolicyRequest;
28+
import org.elasticsearch.client.enrich.PutPolicyRequest;
29+
import org.elasticsearch.client.enrich.StatsRequest;
30+
31+
import java.io.IOException;
32+
33+
import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE;
34+
import static org.elasticsearch.client.RequestConverters.createEntity;
35+
36+
final class EnrichRequestConverters {
37+
38+
static Request putPolicy(PutPolicyRequest putPolicyRequest) throws IOException {
39+
String endpoint = new RequestConverters.EndpointBuilder()
40+
.addPathPartAsIs("_enrich", "policy")
41+
.addPathPart(putPolicyRequest.getName())
42+
.build();
43+
Request request = new Request(HttpPut.METHOD_NAME, endpoint);
44+
request.setEntity(createEntity(putPolicyRequest, REQUEST_BODY_CONTENT_TYPE));
45+
return request;
46+
}
47+
48+
static Request deletePolicy(DeletePolicyRequest deletePolicyRequest) {
49+
String endpoint = new RequestConverters.EndpointBuilder()
50+
.addPathPartAsIs("_enrich", "policy")
51+
.addPathPart(deletePolicyRequest.getName())
52+
.build();
53+
return new Request(HttpDelete.METHOD_NAME, endpoint);
54+
}
55+
56+
static Request getPolicy(GetPolicyRequest getPolicyRequest) {
57+
String endpoint = new RequestConverters.EndpointBuilder()
58+
.addPathPartAsIs("_enrich", "policy")
59+
.addCommaSeparatedPathParts(getPolicyRequest.getNames())
60+
.build();
61+
return new Request(HttpGet.METHOD_NAME, endpoint);
62+
}
63+
64+
static Request stats(StatsRequest statsRequest) {
65+
String endpoint = new RequestConverters.EndpointBuilder()
66+
.addPathPartAsIs("_enrich", "_stats")
67+
.build();
68+
return new Request(HttpGet.METHOD_NAME, endpoint);
69+
}
70+
71+
static Request executePolicy(ExecutePolicyRequest executePolicyRequest) {
72+
String endpoint = new RequestConverters.EndpointBuilder()
73+
.addPathPartAsIs("_enrich", "policy")
74+
.addPathPart(executePolicyRequest.getName())
75+
.addPathPartAsIs("_execute")
76+
.build();
77+
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
78+
if (executePolicyRequest.getWaitForCompletion() != null) {
79+
request.addParameter("wait_for_completion", executePolicyRequest.getWaitForCompletion().toString());
80+
}
81+
return request;
82+
}
83+
84+
}

client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ public class RestHighLevelClient implements Closeable {
257257
private final RollupClient rollupClient = new RollupClient(this);
258258
private final CcrClient ccrClient = new CcrClient(this);
259259
private final TransformClient transformClient = new TransformClient(this);
260+
private final EnrichClient enrichClient = new EnrichClient(this);
260261

261262
/**
262263
* Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
@@ -481,6 +482,10 @@ public TransformClient transform() {
481482
return transformClient;
482483
}
483484

485+
public EnrichClient enrich() {
486+
return enrichClient;
487+
}
488+
484489
/**
485490
* Executes a bulk request using the Bulk API.
486491
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk API on elastic.co</a>

0 commit comments

Comments
 (0)