Skip to content

Commit adbfd1d

Browse files
sohaibiftikharjavanna
authored andcommitted
REST high-level client: add simulate pipeline API (#31158)
relates to #27205
1 parent 626ff21 commit adbfd1d

21 files changed

+1186
-129
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/IngestClient.java

+35
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.elasticsearch.action.ingest.GetPipelineRequest;
2525
import org.elasticsearch.action.ingest.GetPipelineResponse;
2626
import org.elasticsearch.action.ingest.PutPipelineRequest;
27+
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
28+
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
2729
import org.elasticsearch.action.ingest.WritePipelineResponse;
2830

2931
import java.io.IOException;
@@ -125,4 +127,37 @@ public void deletePipelineAsync(DeletePipelineRequest request, RequestOptions op
125127
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::deletePipeline, options,
126128
WritePipelineResponse::fromXContent, listener, emptySet());
127129
}
130+
131+
/**
132+
* Simulate a pipeline on a set of documents provided in the request
133+
* <p>
134+
* See
135+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/simulate-pipeline-api.html">
136+
* Simulate Pipeline API on elastic.co</a>
137+
* @param request the request
138+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
139+
* @return the response
140+
* @throws IOException in case there is a problem sending the request or parsing back the response
141+
*/
142+
public SimulatePipelineResponse simulatePipeline(SimulatePipelineRequest request, RequestOptions options) throws IOException {
143+
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::simulatePipeline, options,
144+
SimulatePipelineResponse::fromXContent, emptySet());
145+
}
146+
147+
/**
148+
* Asynchronously simulate a pipeline on a set of documents provided in the request
149+
* <p>
150+
* See
151+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/simulate-pipeline-api.html">
152+
* Simulate Pipeline API on elastic.co</a>
153+
* @param request the request
154+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
155+
* @param listener the listener to be notified upon request completion
156+
*/
157+
public void simulatePipelineAsync(SimulatePipelineRequest request,
158+
RequestOptions options,
159+
ActionListener<SimulatePipelineResponse> listener) {
160+
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::simulatePipeline, options,
161+
SimulatePipelineResponse::fromXContent, listener, emptySet());
162+
}
128163
}

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

+15
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.elasticsearch.action.ingest.DeletePipelineRequest;
7373
import org.elasticsearch.action.ingest.PutPipelineRequest;
7474
import org.elasticsearch.action.ingest.GetPipelineRequest;
75+
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
7576
import org.elasticsearch.action.search.ClearScrollRequest;
7677
import org.elasticsearch.action.search.MultiSearchRequest;
7778
import org.elasticsearch.action.search.SearchRequest;
@@ -927,6 +928,20 @@ static Request validateQuery(ValidateQueryRequest validateQueryRequest) throws I
927928
return request;
928929
}
929930

931+
static Request simulatePipeline(SimulatePipelineRequest simulatePipelineRequest) throws IOException {
932+
EndpointBuilder builder = new EndpointBuilder().addPathPartAsIs("_ingest/pipeline");
933+
if (simulatePipelineRequest.getId() != null && !simulatePipelineRequest.getId().isEmpty()) {
934+
builder.addPathPart(simulatePipelineRequest.getId());
935+
}
936+
builder.addPathPartAsIs("_simulate");
937+
String endpoint = builder.build();
938+
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
939+
Params params = new Params(request);
940+
params.putParam("verbose", Boolean.toString(simulatePipelineRequest.isVerbose()));
941+
request.setEntity(createEntity(simulatePipelineRequest, REQUEST_BODY_CONTENT_TYPE));
942+
return request;
943+
}
944+
930945
static Request getAlias(GetAliasesRequest getAliasesRequest) {
931946
String[] indices = getAliasesRequest.indices() == null ? Strings.EMPTY_ARRAY : getAliasesRequest.indices();
932947
String[] aliases = getAliasesRequest.aliases() == null ? Strings.EMPTY_ARRAY : getAliasesRequest.aliases();

client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,7 @@ private HighLevelClient(RestClient restClient) {
123123
}
124124
}
125125

126-
protected static XContentBuilder buildRandomXContentPipeline() throws IOException {
127-
XContentType xContentType = randomFrom(XContentType.values());
128-
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
126+
protected static XContentBuilder buildRandomXContentPipeline(XContentBuilder pipelineBuilder) throws IOException {
129127
pipelineBuilder.startObject();
130128
{
131129
pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
@@ -152,6 +150,12 @@ protected static XContentBuilder buildRandomXContentPipeline() throws IOExceptio
152150
return pipelineBuilder;
153151
}
154152

153+
protected static XContentBuilder buildRandomXContentPipeline() throws IOException {
154+
XContentType xContentType = randomFrom(XContentType.values());
155+
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
156+
return buildRandomXContentPipeline(pipelineBuilder);
157+
}
158+
155159
protected static void createPipeline(String pipelineId) throws IOException {
156160
XContentBuilder builder = buildRandomXContentPipeline();
157161
createPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(builder), builder.contentType()));

client/rest-high-level/src/test/java/org/elasticsearch/client/IngestClientIT.java

+99
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,22 @@
2323
import org.elasticsearch.action.ingest.GetPipelineRequest;
2424
import org.elasticsearch.action.ingest.GetPipelineResponse;
2525
import org.elasticsearch.action.ingest.PutPipelineRequest;
26+
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
27+
import org.elasticsearch.action.ingest.SimulateDocumentResult;
28+
import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult;
29+
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
30+
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
2631
import org.elasticsearch.action.ingest.WritePipelineResponse;
2732
import org.elasticsearch.common.bytes.BytesReference;
2833
import org.elasticsearch.common.xcontent.XContentBuilder;
34+
import org.elasticsearch.common.xcontent.XContentType;
2935
import org.elasticsearch.ingest.PipelineConfiguration;
3036

3137
import java.io.IOException;
38+
import java.util.List;
39+
40+
import static org.hamcrest.Matchers.containsString;
41+
import static org.hamcrest.core.IsInstanceOf.instanceOf;
3242

3343
public class IngestClientIT extends ESRestHighLevelClientTestCase {
3444

@@ -80,4 +90,93 @@ public void testDeletePipeline() throws IOException {
8090
execute(request, highLevelClient().ingest()::deletePipeline, highLevelClient().ingest()::deletePipelineAsync);
8191
assertTrue(response.isAcknowledged());
8292
}
93+
94+
public void testSimulatePipeline() throws IOException {
95+
testSimulatePipeline(false, false);
96+
}
97+
98+
public void testSimulatePipelineWithFailure() throws IOException {
99+
testSimulatePipeline(false, true);
100+
}
101+
102+
public void testSimulatePipelineVerbose() throws IOException {
103+
testSimulatePipeline(true, false);
104+
}
105+
106+
public void testSimulatePipelineVerboseWithFailure() throws IOException {
107+
testSimulatePipeline(true, true);
108+
}
109+
110+
private void testSimulatePipeline(boolean isVerbose,
111+
boolean isFailure) throws IOException {
112+
XContentType xContentType = randomFrom(XContentType.values());
113+
XContentBuilder builder = XContentBuilder.builder(xContentType.xContent());
114+
String rankValue = isFailure ? "non-int" : Integer.toString(1234);
115+
builder.startObject();
116+
{
117+
builder.field("pipeline");
118+
buildRandomXContentPipeline(builder);
119+
builder.startArray("docs");
120+
{
121+
builder.startObject()
122+
.field("_index", "index")
123+
.field("_type", "doc")
124+
.field("_id", "doc_" + 1)
125+
.startObject("_source").field("foo", "rab_" + 1).field("rank", rankValue).endObject()
126+
.endObject();
127+
}
128+
builder.endArray();
129+
}
130+
builder.endObject();
131+
132+
SimulatePipelineRequest request = new SimulatePipelineRequest(
133+
BytesReference.bytes(builder),
134+
builder.contentType()
135+
);
136+
request.setVerbose(isVerbose);
137+
SimulatePipelineResponse response =
138+
execute(request, highLevelClient().ingest()::simulatePipeline, highLevelClient().ingest()::simulatePipelineAsync);
139+
List<SimulateDocumentResult> results = response.getResults();
140+
assertEquals(1, results.size());
141+
if (isVerbose) {
142+
assertThat(results.get(0), instanceOf(SimulateDocumentVerboseResult.class));
143+
SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult) results.get(0);
144+
assertEquals(2, verboseResult.getProcessorResults().size());
145+
if (isFailure) {
146+
assertNotNull(verboseResult.getProcessorResults().get(1).getFailure());
147+
assertThat(verboseResult.getProcessorResults().get(1).getFailure().getMessage(),
148+
containsString("unable to convert [non-int] to integer"));
149+
} else {
150+
assertEquals(
151+
verboseResult.getProcessorResults().get(0).getIngestDocument()
152+
.getFieldValue("foo", String.class),
153+
"bar"
154+
);
155+
assertEquals(
156+
Integer.valueOf(1234),
157+
verboseResult.getProcessorResults().get(1).getIngestDocument()
158+
.getFieldValue("rank", Integer.class)
159+
);
160+
}
161+
} else {
162+
assertThat(results.get(0), instanceOf(SimulateDocumentBaseResult.class));
163+
SimulateDocumentBaseResult baseResult = (SimulateDocumentBaseResult)results.get(0);
164+
if (isFailure) {
165+
assertNotNull(baseResult.getFailure());
166+
assertThat(baseResult.getFailure().getMessage(),
167+
containsString("unable to convert [non-int] to integer"));
168+
} else {
169+
assertNotNull(baseResult.getIngestDocument());
170+
assertEquals(
171+
baseResult.getIngestDocument().getFieldValue("foo", String.class),
172+
"bar"
173+
);
174+
assertEquals(
175+
Integer.valueOf(1234),
176+
baseResult.getIngestDocument()
177+
.getFieldValue("rank", Integer.class)
178+
);
179+
}
180+
}
181+
}
83182
}

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

+29
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.elasticsearch.action.ingest.DeletePipelineRequest;
7676
import org.elasticsearch.action.ingest.GetPipelineRequest;
7777
import org.elasticsearch.action.ingest.PutPipelineRequest;
78+
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
7879
import org.elasticsearch.action.search.ClearScrollRequest;
7980
import org.elasticsearch.action.search.MultiSearchRequest;
8081
import org.elasticsearch.action.search.SearchRequest;
@@ -1622,6 +1623,34 @@ public void testDeletePipeline() {
16221623
assertEquals(expectedParams, expectedRequest.getParameters());
16231624
}
16241625

1626+
public void testSimulatePipeline() throws IOException {
1627+
String pipelineId = randomBoolean() ? "some_pipeline_id" : null;
1628+
boolean verbose = randomBoolean();
1629+
String json = "{\"pipeline\":{" +
1630+
"\"description\":\"_description\"," +
1631+
"\"processors\":[{\"set\":{\"field\":\"field2\",\"value\":\"_value\"}}]}," +
1632+
"\"docs\":[{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"rab\"}}]}";
1633+
SimulatePipelineRequest request = new SimulatePipelineRequest(
1634+
new BytesArray(json.getBytes(StandardCharsets.UTF_8)),
1635+
XContentType.JSON
1636+
);
1637+
request.setId(pipelineId);
1638+
request.setVerbose(verbose);
1639+
Map<String, String> expectedParams = new HashMap<>();
1640+
expectedParams.put("verbose", Boolean.toString(verbose));
1641+
1642+
Request expectedRequest = RequestConverters.simulatePipeline(request);
1643+
StringJoiner endpoint = new StringJoiner("/", "/", "");
1644+
endpoint.add("_ingest/pipeline");
1645+
if (pipelineId != null && !pipelineId.isEmpty())
1646+
endpoint.add(pipelineId);
1647+
endpoint.add("_simulate");
1648+
assertEquals(endpoint.toString(), expectedRequest.getEndpoint());
1649+
assertEquals(HttpPost.METHOD_NAME, expectedRequest.getMethod());
1650+
assertEquals(expectedParams, expectedRequest.getParameters());
1651+
assertToXContentBody(request, expectedRequest.getEntity());
1652+
}
1653+
16251654
public void testClusterHealth() {
16261655
ClusterHealthRequest healthRequest = new ClusterHealthRequest();
16271656
Map<String, String> expectedParams = new HashMap<>();

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IngestClientDocumentationIT.java

+111
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@
2525
import org.elasticsearch.action.ingest.GetPipelineRequest;
2626
import org.elasticsearch.action.ingest.GetPipelineResponse;
2727
import org.elasticsearch.action.ingest.PutPipelineRequest;
28+
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
29+
import org.elasticsearch.action.ingest.SimulateDocumentResult;
30+
import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult;
31+
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
32+
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
33+
import org.elasticsearch.action.ingest.SimulateProcessorResult;
2834
import org.elasticsearch.action.ingest.WritePipelineResponse;
2935
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
3036
import org.elasticsearch.client.RequestOptions;
@@ -277,4 +283,109 @@ public void onFailure(Exception e) {
277283
}
278284
}
279285

286+
public void testSimulatePipeline() throws IOException {
287+
RestHighLevelClient client = highLevelClient();
288+
289+
{
290+
// tag::simulate-pipeline-request
291+
String source =
292+
"{\"" +
293+
"pipeline\":{" +
294+
"\"description\":\"_description\"," +
295+
"\"processors\":[{\"set\":{\"field\":\"field2\",\"value\":\"_value\"}}]" +
296+
"}," +
297+
"\"docs\":[" +
298+
"{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"bar\"}}," +
299+
"{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"rab\"}}" +
300+
"]" +
301+
"}";
302+
SimulatePipelineRequest request = new SimulatePipelineRequest(
303+
new BytesArray(source.getBytes(StandardCharsets.UTF_8)), // <1>
304+
XContentType.JSON // <2>
305+
);
306+
// end::simulate-pipeline-request
307+
308+
// tag::simulate-pipeline-request-pipeline-id
309+
request.setId("my-pipeline-id"); // <1>
310+
// end::simulate-pipeline-request-pipeline-id
311+
312+
// For testing we set this back to null
313+
request.setId(null);
314+
315+
// tag::simulate-pipeline-request-verbose
316+
request.setVerbose(true); // <1>
317+
// end::simulate-pipeline-request-verbose
318+
319+
// tag::simulate-pipeline-execute
320+
SimulatePipelineResponse response = client.ingest().simulatePipeline(request, RequestOptions.DEFAULT); // <1>
321+
// end::simulate-pipeline-execute
322+
323+
// tag::simulate-pipeline-response
324+
for (SimulateDocumentResult result: response.getResults()) { // <1>
325+
if (request.isVerbose()) {
326+
assert result instanceof SimulateDocumentVerboseResult;
327+
SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult)result; // <2>
328+
for (SimulateProcessorResult processorResult: verboseResult.getProcessorResults()) { // <3>
329+
processorResult.getIngestDocument(); // <4>
330+
processorResult.getFailure(); // <5>
331+
}
332+
} else {
333+
assert result instanceof SimulateDocumentBaseResult;
334+
SimulateDocumentBaseResult baseResult = (SimulateDocumentBaseResult)result; // <6>
335+
baseResult.getIngestDocument(); // <7>
336+
baseResult.getFailure(); // <8>
337+
}
338+
}
339+
// end::simulate-pipeline-response
340+
assert(response.getResults().size() > 0);
341+
}
342+
}
343+
344+
public void testSimulatePipelineAsync() throws Exception {
345+
RestHighLevelClient client = highLevelClient();
346+
347+
{
348+
String source =
349+
"{\"" +
350+
"pipeline\":{" +
351+
"\"description\":\"_description\"," +
352+
"\"processors\":[{\"set\":{\"field\":\"field2\",\"value\":\"_value\"}}]" +
353+
"}," +
354+
"\"docs\":[" +
355+
"{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"bar\"}}," +
356+
"{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"rab\"}}" +
357+
"]" +
358+
"}";
359+
SimulatePipelineRequest request = new SimulatePipelineRequest(
360+
new BytesArray(source.getBytes(StandardCharsets.UTF_8)),
361+
XContentType.JSON
362+
);
363+
364+
// tag::simulate-pipeline-execute-listener
365+
ActionListener<SimulatePipelineResponse> listener =
366+
new ActionListener<SimulatePipelineResponse>() {
367+
@Override
368+
public void onResponse(SimulatePipelineResponse response) {
369+
// <1>
370+
}
371+
372+
@Override
373+
public void onFailure(Exception e) {
374+
// <2>
375+
}
376+
};
377+
// end::simulate-pipeline-execute-listener
378+
379+
// Replace the empty listener by a blocking listener in test
380+
final CountDownLatch latch = new CountDownLatch(1);
381+
listener = new LatchedActionListener<>(listener, latch);
382+
383+
// tag::simulate-pipeline-execute-async
384+
client.ingest().simulatePipelineAsync(request, RequestOptions.DEFAULT, listener); // <1>
385+
// end::simulate-pipeline-execute-async
386+
387+
assertTrue(latch.await(30L, TimeUnit.SECONDS));
388+
}
389+
}
390+
280391
}

0 commit comments

Comments
 (0)