Skip to content

Commit b4195d3

Browse files
sohaibiftikharnik9000
authored andcommitted
REST high-level client: add put ingest pipeline API (#30793)
REST high-level client: add put ingest pipeline API Adds the put ingest pipeline API to the high level rest client.
1 parent 3aa29f5 commit b4195d3

File tree

12 files changed

+435
-14
lines changed

12 files changed

+435
-14
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
2626
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
2727
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
28+
import org.elasticsearch.action.ingest.PutPipelineRequest;
29+
import org.elasticsearch.action.ingest.PutPipelineResponse;
2830

2931
import java.io.IOException;
3032

@@ -87,4 +89,26 @@ public void listTasksAsync(ListTasksRequest request, ActionListener<ListTasksRes
8789
restHighLevelClient.performRequestAsyncAndParseEntity(request, RequestConverters::listTasks, ListTasksResponse::fromXContent,
8890
listener, emptySet(), headers);
8991
}
92+
93+
/**
94+
* Add a pipeline or update an existing pipeline in the cluster
95+
* <p>
96+
* See
97+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html"> Put Pipeline API on elastic.co</a>
98+
*/
99+
public PutPipelineResponse putPipeline(PutPipelineRequest request, Header... headers) throws IOException {
100+
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::putPipeline,
101+
PutPipelineResponse::fromXContent, emptySet(), headers);
102+
}
103+
104+
/**
105+
* Asynchronously add a pipeline or update an existing pipeline in the cluster
106+
* <p>
107+
* See
108+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html"> Put Pipeline API on elastic.co</a>
109+
*/
110+
public void putPipelineAsync(PutPipelineRequest request, ActionListener<PutPipelineResponse> listener, Header... headers) {
111+
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::putPipeline,
112+
PutPipelineResponse::fromXContent, listener, emptySet(), headers);
113+
}
90114
}

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.elasticsearch.action.get.GetRequest;
5959
import org.elasticsearch.action.get.MultiGetRequest;
6060
import org.elasticsearch.action.index.IndexRequest;
61+
import org.elasticsearch.action.ingest.PutPipelineRequest;
6162
import org.elasticsearch.action.search.ClearScrollRequest;
6263
import org.elasticsearch.action.search.MultiSearchRequest;
6364
import org.elasticsearch.action.search.SearchRequest;
@@ -620,6 +621,21 @@ static Request clusterPutSettings(ClusterUpdateSettingsRequest clusterUpdateSett
620621
return request;
621622
}
622623

624+
static Request putPipeline(PutPipelineRequest putPipelineRequest) throws IOException {
625+
String endpoint = new EndpointBuilder()
626+
.addPathPartAsIs("_ingest/pipeline")
627+
.addPathPart(putPipelineRequest.getId())
628+
.build();
629+
Request request = new Request(HttpPut.METHOD_NAME, endpoint);
630+
631+
Params parameters = new Params(request);
632+
parameters.withTimeout(putPipelineRequest.timeout());
633+
parameters.withMasterTimeout(putPipelineRequest.masterNodeTimeout());
634+
635+
request.setEntity(createEntity(putPipelineRequest, REQUEST_BODY_CONTENT_TYPE));
636+
return request;
637+
}
638+
623639
static Request listTasks(ListTasksRequest listTaskRequest) {
624640
if (listTaskRequest.getTaskId() != null && listTaskRequest.getTaskId().isSet()) {
625641
throw new IllegalArgumentException("TaskId cannot be used for list tasks request");

client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,17 @@
2525
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
2626
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
2727
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
28+
import org.elasticsearch.action.ingest.PutPipelineRequest;
29+
import org.elasticsearch.action.ingest.PutPipelineResponse;
2830
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
31+
import org.elasticsearch.common.bytes.BytesReference;
2932
import org.elasticsearch.common.settings.Settings;
3033
import org.elasticsearch.common.unit.ByteSizeUnit;
34+
import org.elasticsearch.common.xcontent.XContentBuilder;
3135
import org.elasticsearch.common.xcontent.XContentType;
3236
import org.elasticsearch.common.xcontent.support.XContentMapValues;
3337
import org.elasticsearch.indices.recovery.RecoverySettings;
38+
import org.elasticsearch.ingest.Pipeline;
3439
import org.elasticsearch.rest.RestStatus;
3540
import org.elasticsearch.tasks.TaskInfo;
3641

@@ -136,4 +141,41 @@ public void testListTasks() throws IOException {
136141
}
137142
assertTrue("List tasks were not found", listTasksFound);
138143
}
144+
145+
public void testPutPipeline() throws IOException {
146+
String id = "some_pipeline_id";
147+
XContentType xContentType = randomFrom(XContentType.values());
148+
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
149+
pipelineBuilder.startObject();
150+
{
151+
pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
152+
pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY);
153+
{
154+
pipelineBuilder.startObject().startObject("set");
155+
{
156+
pipelineBuilder
157+
.field("field", "foo")
158+
.field("value", "bar");
159+
}
160+
pipelineBuilder.endObject().endObject();
161+
pipelineBuilder.startObject().startObject("convert");
162+
{
163+
pipelineBuilder
164+
.field("field", "rank")
165+
.field("type", "integer");
166+
}
167+
pipelineBuilder.endObject().endObject();
168+
}
169+
pipelineBuilder.endArray();
170+
}
171+
pipelineBuilder.endObject();
172+
PutPipelineRequest request = new PutPipelineRequest(
173+
id,
174+
BytesReference.bytes(pipelineBuilder),
175+
pipelineBuilder.contentType());
176+
177+
PutPipelineResponse putPipelineResponse =
178+
execute(request, highLevelClient().cluster()::putPipeline, highLevelClient().cluster()::putPipelineAsync);
179+
assertTrue(putPipelineResponse.isAcknowledged());
180+
}
139181
}

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.elasticsearch.action.get.GetRequest;
6262
import org.elasticsearch.action.get.MultiGetRequest;
6363
import org.elasticsearch.action.index.IndexRequest;
64+
import org.elasticsearch.action.ingest.PutPipelineRequest;
6465
import org.elasticsearch.action.search.ClearScrollRequest;
6566
import org.elasticsearch.action.search.MultiSearchRequest;
6667
import org.elasticsearch.action.search.SearchRequest;
@@ -91,6 +92,7 @@
9192
import org.elasticsearch.common.xcontent.XContentHelper;
9293
import org.elasticsearch.common.xcontent.XContentParser;
9394
import org.elasticsearch.common.xcontent.XContentType;
95+
import org.elasticsearch.common.xcontent.json.JsonXContent;
9496
import org.elasticsearch.index.RandomCreateIndexGenerator;
9597
import org.elasticsearch.index.VersionType;
9698
import org.elasticsearch.index.query.TermQueryBuilder;
@@ -119,6 +121,7 @@
119121

120122
import java.io.IOException;
121123
import java.io.InputStream;
124+
import java.nio.charset.StandardCharsets;
122125
import java.nio.file.Path;
123126
import java.util.ArrayList;
124127
import java.util.Arrays;
@@ -1434,6 +1437,26 @@ public void testClusterPutSettings() throws IOException {
14341437
assertEquals(expectedParams, expectedRequest.getParameters());
14351438
}
14361439

1440+
public void testPutPipeline() throws IOException {
1441+
String pipelineId = "some_pipeline_id";
1442+
PutPipelineRequest request = new PutPipelineRequest(
1443+
"some_pipeline_id",
1444+
new BytesArray("{}".getBytes(StandardCharsets.UTF_8)),
1445+
XContentType.JSON
1446+
);
1447+
Map<String, String> expectedParams = new HashMap<>();
1448+
setRandomMasterTimeout(request, expectedParams);
1449+
setRandomTimeout(request::timeout, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, expectedParams);
1450+
1451+
Request expectedRequest = RequestConverters.putPipeline(request);
1452+
StringJoiner endpoint = new StringJoiner("/", "/", "");
1453+
endpoint.add("_ingest/pipeline");
1454+
endpoint.add(pipelineId);
1455+
assertEquals(endpoint.toString(), expectedRequest.getEndpoint());
1456+
assertEquals(HttpPut.METHOD_NAME, expectedRequest.getMethod());
1457+
assertEquals(expectedParams, expectedRequest.getParameters());
1458+
}
1459+
14371460
public void testRollover() throws IOException {
14381461
RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10),
14391462
randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10));

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,19 @@
2121

2222
import org.elasticsearch.ElasticsearchException;
2323
import org.elasticsearch.action.ActionListener;
24-
import org.elasticsearch.action.FailedNodeException;
2524
import org.elasticsearch.action.LatchedActionListener;
2625
import org.elasticsearch.action.TaskOperationFailure;
2726
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
2827
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
2928
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
3029
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
3130
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
31+
import org.elasticsearch.action.ingest.PutPipelineRequest;
32+
import org.elasticsearch.action.ingest.PutPipelineResponse;
3233
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
3334
import org.elasticsearch.client.RestHighLevelClient;
3435
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
36+
import org.elasticsearch.common.bytes.BytesArray;
3537
import org.elasticsearch.common.settings.Settings;
3638
import org.elasticsearch.common.unit.ByteSizeUnit;
3739
import org.elasticsearch.common.unit.TimeValue;
@@ -41,6 +43,7 @@
4143
import org.elasticsearch.tasks.TaskInfo;
4244

4345
import java.io.IOException;
46+
import java.nio.charset.StandardCharsets;
4447
import java.util.HashMap;
4548
import java.util.List;
4649
import java.util.Map;
@@ -80,19 +83,19 @@ public void testClusterPutSettings() throws IOException {
8083
// end::put-settings-request
8184

8285
// tag::put-settings-create-settings
83-
String transientSettingKey =
86+
String transientSettingKey =
8487
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey();
8588
int transientSettingValue = 10;
86-
Settings transientSettings =
89+
Settings transientSettings =
8790
Settings.builder()
8891
.put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES)
8992
.build(); // <1>
9093

91-
String persistentSettingKey =
94+
String persistentSettingKey =
9295
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey();
93-
String persistentSettingValue =
96+
String persistentSettingValue =
9497
EnableAllocationDecider.Allocation.NONE.name();
95-
Settings persistentSettings =
98+
Settings persistentSettings =
9699
Settings.builder()
97100
.put(persistentSettingKey, persistentSettingValue)
98101
.build(); // <2>
@@ -105,9 +108,9 @@ public void testClusterPutSettings() throws IOException {
105108

106109
{
107110
// tag::put-settings-settings-builder
108-
Settings.Builder transientSettingsBuilder =
111+
Settings.Builder transientSettingsBuilder =
109112
Settings.builder()
110-
.put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES);
113+
.put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES);
111114
request.transientSettings(transientSettingsBuilder); // <1>
112115
// end::put-settings-settings-builder
113116
}
@@ -164,7 +167,7 @@ public void testClusterUpdateSettingsAsync() throws Exception {
164167
ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();
165168

166169
// tag::put-settings-execute-listener
167-
ActionListener<ClusterUpdateSettingsResponse> listener =
170+
ActionListener<ClusterUpdateSettingsResponse> listener =
168171
new ActionListener<ClusterUpdateSettingsResponse>() {
169172
@Override
170173
public void onResponse(ClusterUpdateSettingsResponse response) {
@@ -272,4 +275,80 @@ public void onFailure(Exception e) {
272275
assertTrue(latch.await(30L, TimeUnit.SECONDS));
273276
}
274277
}
278+
279+
public void testPutPipeline() throws IOException {
280+
RestHighLevelClient client = highLevelClient();
281+
282+
{
283+
// tag::put-pipeline-request
284+
String source =
285+
"{\"description\":\"my set of processors\"," +
286+
"\"processors\":[{\"set\":{\"field\":\"foo\",\"value\":\"bar\"}}]}";
287+
PutPipelineRequest request = new PutPipelineRequest(
288+
"my-pipeline-id", // <1>
289+
new BytesArray(source.getBytes(StandardCharsets.UTF_8)), // <2>
290+
XContentType.JSON // <3>
291+
);
292+
// end::put-pipeline-request
293+
294+
// tag::put-pipeline-request-timeout
295+
request.timeout(TimeValue.timeValueMinutes(2)); // <1>
296+
request.timeout("2m"); // <2>
297+
// end::put-pipeline-request-timeout
298+
299+
// tag::put-pipeline-request-masterTimeout
300+
request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); // <1>
301+
request.masterNodeTimeout("1m"); // <2>
302+
// end::put-pipeline-request-masterTimeout
303+
304+
// tag::put-pipeline-execute
305+
PutPipelineResponse response = client.cluster().putPipeline(request); // <1>
306+
// end::put-pipeline-execute
307+
308+
// tag::put-pipeline-response
309+
boolean acknowledged = response.isAcknowledged(); // <1>
310+
// end::put-pipeline-response
311+
assertTrue(acknowledged);
312+
}
313+
}
314+
315+
public void testPutPipelineAsync() throws Exception {
316+
RestHighLevelClient client = highLevelClient();
317+
318+
{
319+
String source =
320+
"{\"description\":\"my set of processors\"," +
321+
"\"processors\":[{\"set\":{\"field\":\"foo\",\"value\":\"bar\"}}]}";
322+
PutPipelineRequest request = new PutPipelineRequest(
323+
"my-pipeline-id",
324+
new BytesArray(source.getBytes(StandardCharsets.UTF_8)),
325+
XContentType.JSON
326+
);
327+
328+
// tag::put-pipeline-execute-listener
329+
ActionListener<PutPipelineResponse> listener =
330+
new ActionListener<PutPipelineResponse>() {
331+
@Override
332+
public void onResponse(PutPipelineResponse response) {
333+
// <1>
334+
}
335+
336+
@Override
337+
public void onFailure(Exception e) {
338+
// <2>
339+
}
340+
};
341+
// end::put-pipeline-execute-listener
342+
343+
// Replace the empty listener by a blocking listener in test
344+
final CountDownLatch latch = new CountDownLatch(1);
345+
listener = new LatchedActionListener<>(listener, latch);
346+
347+
// tag::put-pipeline-execute-async
348+
client.cluster().putPipelineAsync(request, listener); // <1>
349+
// end::put-pipeline-execute-async
350+
351+
assertTrue(latch.await(30L, TimeUnit.SECONDS));
352+
}
353+
}
275354
}

0 commit comments

Comments
 (0)