Skip to content

Commit eade161

Browse files
sohaibiftikharjavanna
authored andcommitted
REST high-level client: add simulate pipeline API (#31158)
relates to #27205
1 parent 0352d88 commit eade161

21 files changed

+1182
-129
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/IngestClient.java

+35
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.elasticsearch.action.ingest.GetPipelineRequest;
2525
import org.elasticsearch.action.ingest.GetPipelineResponse;
2626
import org.elasticsearch.action.ingest.PutPipelineRequest;
27+
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
28+
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
2729
import org.elasticsearch.action.ingest.WritePipelineResponse;
2830

2931
import java.io.IOException;
@@ -125,4 +127,37 @@ public void deletePipelineAsync(DeletePipelineRequest request, RequestOptions op
125127
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::deletePipeline, options,
126128
WritePipelineResponse::fromXContent, listener, emptySet());
127129
}
130+
131+
/**
132+
* Simulate a pipeline on a set of documents provided in the request
133+
* <p>
134+
* See
135+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/simulate-pipeline-api.html">
136+
* Simulate Pipeline API on elastic.co</a>
137+
* @param request the request
138+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
139+
* @return the response
140+
* @throws IOException in case there is a problem sending the request or parsing back the response
141+
*/
142+
public SimulatePipelineResponse simulatePipeline(SimulatePipelineRequest request, RequestOptions options) throws IOException {
143+
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::simulatePipeline, options,
144+
SimulatePipelineResponse::fromXContent, emptySet());
145+
}
146+
147+
/**
148+
* Asynchronously simulate a pipeline on a set of documents provided in the request
149+
* <p>
150+
* See
151+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/simulate-pipeline-api.html">
152+
* Simulate Pipeline API on elastic.co</a>
153+
* @param request the request
154+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
155+
* @param listener the listener to be notified upon request completion
156+
*/
157+
public void simulatePipelineAsync(SimulatePipelineRequest request,
158+
RequestOptions options,
159+
ActionListener<SimulatePipelineResponse> listener) {
160+
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::simulatePipeline, options,
161+
SimulatePipelineResponse::fromXContent, listener, emptySet());
162+
}
128163
}

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

+15
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.elasticsearch.action.ingest.DeletePipelineRequest;
7272
import org.elasticsearch.action.ingest.PutPipelineRequest;
7373
import org.elasticsearch.action.ingest.GetPipelineRequest;
74+
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
7475
import org.elasticsearch.action.search.ClearScrollRequest;
7576
import org.elasticsearch.action.search.MultiSearchRequest;
7677
import org.elasticsearch.action.search.SearchRequest;
@@ -886,6 +887,20 @@ static Request validateQuery(ValidateQueryRequest validateQueryRequest) throws I
886887
return request;
887888
}
888889

890+
static Request simulatePipeline(SimulatePipelineRequest simulatePipelineRequest) throws IOException {
891+
EndpointBuilder builder = new EndpointBuilder().addPathPartAsIs("_ingest/pipeline");
892+
if (simulatePipelineRequest.getId() != null && !simulatePipelineRequest.getId().isEmpty()) {
893+
builder.addPathPart(simulatePipelineRequest.getId());
894+
}
895+
builder.addPathPartAsIs("_simulate");
896+
String endpoint = builder.build();
897+
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
898+
Params params = new Params(request);
899+
params.putParam("verbose", Boolean.toString(simulatePipelineRequest.isVerbose()));
900+
request.setEntity(createEntity(simulatePipelineRequest, REQUEST_BODY_CONTENT_TYPE));
901+
return request;
902+
}
903+
889904
static Request getAlias(GetAliasesRequest getAliasesRequest) {
890905
String[] indices = getAliasesRequest.indices() == null ? Strings.EMPTY_ARRAY : getAliasesRequest.indices();
891906
String[] aliases = getAliasesRequest.aliases() == null ? Strings.EMPTY_ARRAY : getAliasesRequest.aliases();

client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,7 @@ private HighLevelClient(RestClient restClient) {
8585
}
8686
}
8787

88-
protected static XContentBuilder buildRandomXContentPipeline() throws IOException {
89-
XContentType xContentType = randomFrom(XContentType.values());
90-
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
88+
protected static XContentBuilder buildRandomXContentPipeline(XContentBuilder pipelineBuilder) throws IOException {
9189
pipelineBuilder.startObject();
9290
{
9391
pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
@@ -114,6 +112,12 @@ protected static XContentBuilder buildRandomXContentPipeline() throws IOExceptio
114112
return pipelineBuilder;
115113
}
116114

115+
protected static XContentBuilder buildRandomXContentPipeline() throws IOException {
116+
XContentType xContentType = randomFrom(XContentType.values());
117+
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
118+
return buildRandomXContentPipeline(pipelineBuilder);
119+
}
120+
117121
protected static void createPipeline(String pipelineId) throws IOException {
118122
XContentBuilder builder = buildRandomXContentPipeline();
119123
createPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(builder), builder.contentType()));

client/rest-high-level/src/test/java/org/elasticsearch/client/IngestClientIT.java

+99
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,22 @@
2323
import org.elasticsearch.action.ingest.GetPipelineRequest;
2424
import org.elasticsearch.action.ingest.GetPipelineResponse;
2525
import org.elasticsearch.action.ingest.PutPipelineRequest;
26+
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
27+
import org.elasticsearch.action.ingest.SimulateDocumentResult;
28+
import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult;
29+
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
30+
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
2631
import org.elasticsearch.action.ingest.WritePipelineResponse;
2732
import org.elasticsearch.common.bytes.BytesReference;
2833
import org.elasticsearch.common.xcontent.XContentBuilder;
34+
import org.elasticsearch.common.xcontent.XContentType;
2935
import org.elasticsearch.ingest.PipelineConfiguration;
3036

3137
import java.io.IOException;
38+
import java.util.List;
39+
40+
import static org.hamcrest.Matchers.containsString;
41+
import static org.hamcrest.core.IsInstanceOf.instanceOf;
3242

3343
public class IngestClientIT extends ESRestHighLevelClientTestCase {
3444

@@ -80,4 +90,93 @@ public void testDeletePipeline() throws IOException {
8090
execute(request, highLevelClient().ingest()::deletePipeline, highLevelClient().ingest()::deletePipelineAsync);
8191
assertTrue(response.isAcknowledged());
8292
}
93+
94+
public void testSimulatePipeline() throws IOException {
95+
testSimulatePipeline(false, false);
96+
}
97+
98+
public void testSimulatePipelineWithFailure() throws IOException {
99+
testSimulatePipeline(false, true);
100+
}
101+
102+
public void testSimulatePipelineVerbose() throws IOException {
103+
testSimulatePipeline(true, false);
104+
}
105+
106+
public void testSimulatePipelineVerboseWithFailure() throws IOException {
107+
testSimulatePipeline(true, true);
108+
}
109+
110+
private void testSimulatePipeline(boolean isVerbose,
111+
boolean isFailure) throws IOException {
112+
XContentType xContentType = randomFrom(XContentType.values());
113+
XContentBuilder builder = XContentBuilder.builder(xContentType.xContent());
114+
String rankValue = isFailure ? "non-int" : Integer.toString(1234);
115+
builder.startObject();
116+
{
117+
builder.field("pipeline");
118+
buildRandomXContentPipeline(builder);
119+
builder.startArray("docs");
120+
{
121+
builder.startObject()
122+
.field("_index", "index")
123+
.field("_type", "doc")
124+
.field("_id", "doc_" + 1)
125+
.startObject("_source").field("foo", "rab_" + 1).field("rank", rankValue).endObject()
126+
.endObject();
127+
}
128+
builder.endArray();
129+
}
130+
builder.endObject();
131+
132+
SimulatePipelineRequest request = new SimulatePipelineRequest(
133+
BytesReference.bytes(builder),
134+
builder.contentType()
135+
);
136+
request.setVerbose(isVerbose);
137+
SimulatePipelineResponse response =
138+
execute(request, highLevelClient().ingest()::simulatePipeline, highLevelClient().ingest()::simulatePipelineAsync);
139+
List<SimulateDocumentResult> results = response.getResults();
140+
assertEquals(1, results.size());
141+
if (isVerbose) {
142+
assertThat(results.get(0), instanceOf(SimulateDocumentVerboseResult.class));
143+
SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult) results.get(0);
144+
assertEquals(2, verboseResult.getProcessorResults().size());
145+
if (isFailure) {
146+
assertNotNull(verboseResult.getProcessorResults().get(1).getFailure());
147+
assertThat(verboseResult.getProcessorResults().get(1).getFailure().getMessage(),
148+
containsString("unable to convert [non-int] to integer"));
149+
} else {
150+
assertEquals(
151+
verboseResult.getProcessorResults().get(0).getIngestDocument()
152+
.getFieldValue("foo", String.class),
153+
"bar"
154+
);
155+
assertEquals(
156+
Integer.valueOf(1234),
157+
verboseResult.getProcessorResults().get(1).getIngestDocument()
158+
.getFieldValue("rank", Integer.class)
159+
);
160+
}
161+
} else {
162+
assertThat(results.get(0), instanceOf(SimulateDocumentBaseResult.class));
163+
SimulateDocumentBaseResult baseResult = (SimulateDocumentBaseResult)results.get(0);
164+
if (isFailure) {
165+
assertNotNull(baseResult.getFailure());
166+
assertThat(baseResult.getFailure().getMessage(),
167+
containsString("unable to convert [non-int] to integer"));
168+
} else {
169+
assertNotNull(baseResult.getIngestDocument());
170+
assertEquals(
171+
baseResult.getIngestDocument().getFieldValue("foo", String.class),
172+
"bar"
173+
);
174+
assertEquals(
175+
Integer.valueOf(1234),
176+
baseResult.getIngestDocument()
177+
.getFieldValue("rank", Integer.class)
178+
);
179+
}
180+
}
181+
}
83182
}

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

+29
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.elasticsearch.action.ingest.DeletePipelineRequest;
7575
import org.elasticsearch.action.ingest.GetPipelineRequest;
7676
import org.elasticsearch.action.ingest.PutPipelineRequest;
77+
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
7778
import org.elasticsearch.action.search.ClearScrollRequest;
7879
import org.elasticsearch.action.search.MultiSearchRequest;
7980
import org.elasticsearch.action.search.SearchRequest;
@@ -1534,6 +1535,34 @@ public void testDeletePipeline() {
15341535
assertEquals(expectedParams, expectedRequest.getParameters());
15351536
}
15361537

1538+
public void testSimulatePipeline() throws IOException {
1539+
String pipelineId = randomBoolean() ? "some_pipeline_id" : null;
1540+
boolean verbose = randomBoolean();
1541+
String json = "{\"pipeline\":{" +
1542+
"\"description\":\"_description\"," +
1543+
"\"processors\":[{\"set\":{\"field\":\"field2\",\"value\":\"_value\"}}]}," +
1544+
"\"docs\":[{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"rab\"}}]}";
1545+
SimulatePipelineRequest request = new SimulatePipelineRequest(
1546+
new BytesArray(json.getBytes(StandardCharsets.UTF_8)),
1547+
XContentType.JSON
1548+
);
1549+
request.setId(pipelineId);
1550+
request.setVerbose(verbose);
1551+
Map<String, String> expectedParams = new HashMap<>();
1552+
expectedParams.put("verbose", Boolean.toString(verbose));
1553+
1554+
Request expectedRequest = RequestConverters.simulatePipeline(request);
1555+
StringJoiner endpoint = new StringJoiner("/", "/", "");
1556+
endpoint.add("_ingest/pipeline");
1557+
if (pipelineId != null && !pipelineId.isEmpty())
1558+
endpoint.add(pipelineId);
1559+
endpoint.add("_simulate");
1560+
assertEquals(endpoint.toString(), expectedRequest.getEndpoint());
1561+
assertEquals(HttpPost.METHOD_NAME, expectedRequest.getMethod());
1562+
assertEquals(expectedParams, expectedRequest.getParameters());
1563+
assertToXContentBody(request, expectedRequest.getEntity());
1564+
}
1565+
15371566
public void testClusterHealth() {
15381567
ClusterHealthRequest healthRequest = new ClusterHealthRequest();
15391568
Map<String, String> expectedParams = new HashMap<>();

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IngestClientDocumentationIT.java

+111
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@
2525
import org.elasticsearch.action.ingest.GetPipelineRequest;
2626
import org.elasticsearch.action.ingest.GetPipelineResponse;
2727
import org.elasticsearch.action.ingest.PutPipelineRequest;
28+
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
29+
import org.elasticsearch.action.ingest.SimulateDocumentResult;
30+
import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult;
31+
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
32+
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
33+
import org.elasticsearch.action.ingest.SimulateProcessorResult;
2834
import org.elasticsearch.action.ingest.WritePipelineResponse;
2935
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
3036
import org.elasticsearch.client.RequestOptions;
@@ -277,4 +283,109 @@ public void onFailure(Exception e) {
277283
}
278284
}
279285

286+
public void testSimulatePipeline() throws IOException {
287+
RestHighLevelClient client = highLevelClient();
288+
289+
{
290+
// tag::simulate-pipeline-request
291+
String source =
292+
"{\"" +
293+
"pipeline\":{" +
294+
"\"description\":\"_description\"," +
295+
"\"processors\":[{\"set\":{\"field\":\"field2\",\"value\":\"_value\"}}]" +
296+
"}," +
297+
"\"docs\":[" +
298+
"{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"bar\"}}," +
299+
"{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"rab\"}}" +
300+
"]" +
301+
"}";
302+
SimulatePipelineRequest request = new SimulatePipelineRequest(
303+
new BytesArray(source.getBytes(StandardCharsets.UTF_8)), // <1>
304+
XContentType.JSON // <2>
305+
);
306+
// end::simulate-pipeline-request
307+
308+
// tag::simulate-pipeline-request-pipeline-id
309+
request.setId("my-pipeline-id"); // <1>
310+
// end::simulate-pipeline-request-pipeline-id
311+
312+
// For testing we set this back to null
313+
request.setId(null);
314+
315+
// tag::simulate-pipeline-request-verbose
316+
request.setVerbose(true); // <1>
317+
// end::simulate-pipeline-request-verbose
318+
319+
// tag::simulate-pipeline-execute
320+
SimulatePipelineResponse response = client.ingest().simulatePipeline(request, RequestOptions.DEFAULT); // <1>
321+
// end::simulate-pipeline-execute
322+
323+
// tag::simulate-pipeline-response
324+
for (SimulateDocumentResult result: response.getResults()) { // <1>
325+
if (request.isVerbose()) {
326+
assert result instanceof SimulateDocumentVerboseResult;
327+
SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult)result; // <2>
328+
for (SimulateProcessorResult processorResult: verboseResult.getProcessorResults()) { // <3>
329+
processorResult.getIngestDocument(); // <4>
330+
processorResult.getFailure(); // <5>
331+
}
332+
} else {
333+
assert result instanceof SimulateDocumentBaseResult;
334+
SimulateDocumentBaseResult baseResult = (SimulateDocumentBaseResult)result; // <6>
335+
baseResult.getIngestDocument(); // <7>
336+
baseResult.getFailure(); // <8>
337+
}
338+
}
339+
// end::simulate-pipeline-response
340+
assert(response.getResults().size() > 0);
341+
}
342+
}
343+
344+
public void testSimulatePipelineAsync() throws Exception {
345+
RestHighLevelClient client = highLevelClient();
346+
347+
{
348+
String source =
349+
"{\"" +
350+
"pipeline\":{" +
351+
"\"description\":\"_description\"," +
352+
"\"processors\":[{\"set\":{\"field\":\"field2\",\"value\":\"_value\"}}]" +
353+
"}," +
354+
"\"docs\":[" +
355+
"{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"bar\"}}," +
356+
"{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"rab\"}}" +
357+
"]" +
358+
"}";
359+
SimulatePipelineRequest request = new SimulatePipelineRequest(
360+
new BytesArray(source.getBytes(StandardCharsets.UTF_8)),
361+
XContentType.JSON
362+
);
363+
364+
// tag::simulate-pipeline-execute-listener
365+
ActionListener<SimulatePipelineResponse> listener =
366+
new ActionListener<SimulatePipelineResponse>() {
367+
@Override
368+
public void onResponse(SimulatePipelineResponse response) {
369+
// <1>
370+
}
371+
372+
@Override
373+
public void onFailure(Exception e) {
374+
// <2>
375+
}
376+
};
377+
// end::simulate-pipeline-execute-listener
378+
379+
// Replace the empty listener by a blocking listener in test
380+
final CountDownLatch latch = new CountDownLatch(1);
381+
listener = new LatchedActionListener<>(listener, latch);
382+
383+
// tag::simulate-pipeline-execute-async
384+
client.ingest().simulatePipelineAsync(request, RequestOptions.DEFAULT, listener); // <1>
385+
// end::simulate-pipeline-execute-async
386+
387+
assertTrue(latch.await(30L, TimeUnit.SECONDS));
388+
}
389+
}
390+
280391
}

0 commit comments

Comments
 (0)