Skip to content

Commit a3d5d33

Browse files
authored
Operator/ingest (#89735)
Add support for /_ingest/pipeline for file based settings. Relates to #89183
1 parent 4d34667 commit a3d5d33

File tree

13 files changed

+1022
-57
lines changed

13 files changed

+1022
-57
lines changed

docs/changelog/89735.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 89735
2+
summary: Operator/ingest
3+
area: Infra/Core
4+
type: enhancement
5+
issues: []
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.ingest;
10+
11+
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
12+
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
13+
import org.elasticsearch.action.ingest.PutPipelineAction;
14+
import org.elasticsearch.action.ingest.PutPipelineRequest;
15+
import org.elasticsearch.action.ingest.ReservedPipelineAction;
16+
import org.elasticsearch.cluster.ClusterChangedEvent;
17+
import org.elasticsearch.cluster.ClusterStateListener;
18+
import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata;
19+
import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata;
20+
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
21+
import org.elasticsearch.cluster.service.ClusterService;
22+
import org.elasticsearch.common.bytes.BytesReference;
23+
import org.elasticsearch.core.Strings;
24+
import org.elasticsearch.core.Tuple;
25+
import org.elasticsearch.plugins.Plugin;
26+
import org.elasticsearch.reservedstate.service.FileSettingsService;
27+
import org.elasticsearch.test.ESIntegTestCase;
28+
import org.elasticsearch.xcontent.XContentFactory;
29+
import org.elasticsearch.xcontent.XContentParserConfiguration;
30+
31+
import java.io.ByteArrayInputStream;
32+
import java.nio.charset.StandardCharsets;
33+
import java.nio.file.Files;
34+
import java.nio.file.Path;
35+
import java.nio.file.StandardCopyOption;
36+
import java.util.Arrays;
37+
import java.util.Collection;
38+
import java.util.HashMap;
39+
import java.util.Map;
40+
import java.util.concurrent.CountDownLatch;
41+
import java.util.concurrent.TimeUnit;
42+
import java.util.concurrent.atomic.AtomicLong;
43+
44+
import static org.elasticsearch.xcontent.XContentType.JSON;
45+
import static org.hamcrest.Matchers.allOf;
46+
import static org.hamcrest.Matchers.containsInAnyOrder;
47+
import static org.hamcrest.Matchers.containsString;
48+
import static org.hamcrest.Matchers.hasSize;
49+
import static org.hamcrest.Matchers.notNullValue;
50+
51+
public class IngestFileSettingsIT extends ESIntegTestCase {
52+
53+
@Override
54+
protected Collection<Class<? extends Plugin>> nodePlugins() {
55+
return Arrays.asList(CustomIngestTestPlugin.class);
56+
}
57+
58+
private static AtomicLong versionCounter = new AtomicLong(1);
59+
60+
private static String testJSON = """
61+
{
62+
"metadata": {
63+
"version": "%s",
64+
"compatibility": "8.4.0"
65+
},
66+
"state": {
67+
"ingest_pipelines": {
68+
"my_ingest_pipeline": {
69+
"description": "_description",
70+
"processors": [
71+
{
72+
"test" : {
73+
"field": "pipeline",
74+
"value": "pipeline"
75+
}
76+
}
77+
]
78+
},
79+
"my_ingest_pipeline_1": {
80+
"description": "_description",
81+
"processors": [
82+
{
83+
"test" : {
84+
"field": "pipeline",
85+
"value": "pipeline"
86+
}
87+
}
88+
]
89+
}
90+
}
91+
}
92+
}""";
93+
94+
private static String testErrorJSON = """
95+
{
96+
"metadata": {
97+
"version": "%s",
98+
"compatibility": "8.4.0"
99+
},
100+
"state": {
101+
"ingest_pipelines": {
102+
"my_ingest_pipeline": {
103+
"description": "_description",
104+
"processors": [
105+
{
106+
"foo" : {
107+
"field": "pipeline",
108+
"value": "pipeline"
109+
}
110+
}
111+
]
112+
}
113+
}
114+
}
115+
}""";
116+
117+
private void writeJSONFile(String node, String json) throws Exception {
118+
long version = versionCounter.incrementAndGet();
119+
120+
FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node);
121+
assertTrue(fileSettingsService.watching());
122+
123+
Files.deleteIfExists(fileSettingsService.operatorSettingsFile());
124+
125+
Files.createDirectories(fileSettingsService.operatorSettingsDir());
126+
Path tempFilePath = createTempFile();
127+
128+
logger.info("--> writing JSON config to node {} with path {}", node, tempFilePath);
129+
logger.info(Strings.format(json, version));
130+
Files.write(tempFilePath, Strings.format(json, version).getBytes(StandardCharsets.UTF_8));
131+
Files.move(tempFilePath, fileSettingsService.operatorSettingsFile(), StandardCopyOption.ATOMIC_MOVE);
132+
}
133+
134+
private Tuple<CountDownLatch, AtomicLong> setupClusterStateListener(String node) {
135+
ClusterService clusterService = internalCluster().clusterService(node);
136+
CountDownLatch savedClusterState = new CountDownLatch(1);
137+
AtomicLong metadataVersion = new AtomicLong(-1);
138+
clusterService.addListener(new ClusterStateListener() {
139+
@Override
140+
public void clusterChanged(ClusterChangedEvent event) {
141+
ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE);
142+
if (reservedState != null) {
143+
ReservedStateHandlerMetadata handlerMetadata = reservedState.handlers().get(ReservedPipelineAction.NAME);
144+
if (handlerMetadata != null && handlerMetadata.keys().contains("my_ingest_pipeline")) {
145+
clusterService.removeListener(this);
146+
metadataVersion.set(event.state().metadata().version());
147+
savedClusterState.countDown();
148+
}
149+
}
150+
}
151+
});
152+
153+
return new Tuple<>(savedClusterState, metadataVersion);
154+
}
155+
156+
private void assertPipelinesSaveOK(CountDownLatch savedClusterState, AtomicLong metadataVersion) throws Exception {
157+
boolean awaitSuccessful = savedClusterState.await(20, TimeUnit.SECONDS);
158+
assertTrue(awaitSuccessful);
159+
160+
final ClusterStateResponse clusterStateResponse = client().admin()
161+
.cluster()
162+
.state(new ClusterStateRequest().waitForMetadataVersion(metadataVersion.get()))
163+
.get();
164+
165+
ReservedStateMetadata reservedState = clusterStateResponse.getState()
166+
.metadata()
167+
.reservedStateMetadata()
168+
.get(FileSettingsService.NAMESPACE);
169+
170+
ReservedStateHandlerMetadata handlerMetadata = reservedState.handlers().get(ReservedPipelineAction.NAME);
171+
172+
assertThat(handlerMetadata.keys(), allOf(notNullValue(), containsInAnyOrder("my_ingest_pipeline", "my_ingest_pipeline_1")));
173+
174+
// Try using the REST API to update the my_autoscaling_policy policy
175+
// This should fail, we have reserved certain autoscaling policies in operator mode
176+
assertEquals(
177+
"Failed to process request [org.elasticsearch.action.ingest.PutPipelineRequest/unset] with errors: "
178+
+ "[[my_ingest_pipeline] set as read-only by [file_settings]]",
179+
expectThrows(
180+
IllegalArgumentException.class,
181+
() -> client().execute(PutPipelineAction.INSTANCE, sampleRestRequest("my_ingest_pipeline")).actionGet()
182+
).getMessage()
183+
);
184+
}
185+
186+
public void testPoliciesApplied() throws Exception {
187+
ensureGreen();
188+
189+
var savedClusterState = setupClusterStateListener(internalCluster().getMasterName());
190+
writeJSONFile(internalCluster().getMasterName(), testJSON);
191+
192+
assertPipelinesSaveOK(savedClusterState.v1(), savedClusterState.v2());
193+
}
194+
195+
private Tuple<CountDownLatch, AtomicLong> setupClusterStateListenerForError(String node) {
196+
ClusterService clusterService = internalCluster().clusterService(node);
197+
CountDownLatch savedClusterState = new CountDownLatch(1);
198+
AtomicLong metadataVersion = new AtomicLong(-1);
199+
clusterService.addListener(new ClusterStateListener() {
200+
@Override
201+
public void clusterChanged(ClusterChangedEvent event) {
202+
ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE);
203+
if (reservedState != null && reservedState.errorMetadata() != null) {
204+
clusterService.removeListener(this);
205+
metadataVersion.set(event.state().metadata().version());
206+
savedClusterState.countDown();
207+
assertEquals(ReservedStateErrorMetadata.ErrorKind.VALIDATION, reservedState.errorMetadata().errorKind());
208+
assertThat(reservedState.errorMetadata().errors(), allOf(notNullValue(), hasSize(1)));
209+
assertThat(
210+
reservedState.errorMetadata().errors().get(0),
211+
containsString("org.elasticsearch.ElasticsearchParseException: No processor type exists with name [foo]")
212+
);
213+
}
214+
}
215+
});
216+
217+
return new Tuple<>(savedClusterState, metadataVersion);
218+
}
219+
220+
private void assertPipelinesNotSaved(CountDownLatch savedClusterState, AtomicLong metadataVersion) throws Exception {
221+
boolean awaitSuccessful = savedClusterState.await(20, TimeUnit.SECONDS);
222+
assertTrue(awaitSuccessful);
223+
224+
// This should succeed, nothing was reserved
225+
client().execute(PutPipelineAction.INSTANCE, sampleRestRequest("my_ingest_pipeline_bad")).get();
226+
}
227+
228+
public void testErrorSaved() throws Exception {
229+
ensureGreen();
230+
var savedClusterState = setupClusterStateListenerForError(internalCluster().getMasterName());
231+
232+
writeJSONFile(internalCluster().getMasterName(), testErrorJSON);
233+
assertPipelinesNotSaved(savedClusterState.v1(), savedClusterState.v2());
234+
}
235+
236+
private PutPipelineRequest sampleRestRequest(String id) throws Exception {
237+
var json = """
238+
{
239+
"description": "_description",
240+
"processors": [
241+
{
242+
"test" : {
243+
"field": "_foo",
244+
"value": "_bar"
245+
}
246+
}
247+
]
248+
}""";
249+
250+
try (
251+
var bis = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
252+
var parser = JSON.xContent().createParser(XContentParserConfiguration.EMPTY, bis);
253+
var builder = XContentFactory.contentBuilder(JSON)
254+
) {
255+
builder.map(parser.map());
256+
return new PutPipelineRequest(id, BytesReference.bytes(builder), JSON);
257+
}
258+
}
259+
260+
public static class CustomIngestTestPlugin extends IngestTestPlugin {
261+
@Override
262+
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
263+
Map<String, Processor.Factory> processors = new HashMap<>();
264+
processors.put("test", (factories, tag, description, config) -> {
265+
String field = (String) config.remove("field");
266+
String value = (String) config.remove("value");
267+
return new FakeProcessor("test", tag, description, (ingestDocument) -> ingestDocument.setFieldValue(field, value));
268+
});
269+
270+
return processors;
271+
}
272+
}
273+
}

server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,20 @@ public void writeTo(StreamOutput out) throws IOException {
125125
out.writeStringArray(requestedMetrics.toArray(String[]::new));
126126
}
127127

128+
/**
129+
* Helper method for creating NodesInfoRequests with desired metrics
130+
* @param metrics the metrics to include in the request
131+
* @return
132+
*/
133+
public static NodesInfoRequest requestWithMetrics(Metric... metrics) {
134+
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
135+
nodesInfoRequest.clear();
136+
for (var metric : metrics) {
137+
nodesInfoRequest.addMetric(metric.metricName());
138+
}
139+
return nodesInfoRequest;
140+
}
141+
128142
/**
129143
* An enumeration of the "core" sections of metrics that may be requested
130144
* from the nodes information endpoint. Eventually this list list will be

server/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import org.elasticsearch.threadpool.ThreadPool;
2323
import org.elasticsearch.transport.TransportService;
2424

25+
import java.util.Optional;
26+
import java.util.Set;
27+
2528
public class DeletePipelineTransportAction extends AcknowledgedTransportMasterNodeAction<DeletePipelineRequest> {
2629

2730
private final IngestService ingestService;
@@ -62,4 +65,13 @@ protected ClusterBlockException checkBlock(DeletePipelineRequest request, Cluste
6265
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
6366
}
6467

68+
@Override
69+
public Optional<String> reservedStateHandlerName() {
70+
return Optional.of(ReservedPipelineAction.NAME);
71+
}
72+
73+
@Override
74+
public Set<String> modifiedKeys(DeletePipelineRequest request) {
75+
return Set.of(request.getId());
76+
}
6577
}

server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@
2525
import org.elasticsearch.threadpool.ThreadPool;
2626
import org.elasticsearch.transport.TransportService;
2727

28+
import java.util.Optional;
29+
import java.util.Set;
30+
2831
import static org.elasticsearch.ingest.IngestService.INGEST_ORIGIN;
2932

3033
public class PutPipelineTransportAction extends AcknowledgedTransportMasterNodeAction<PutPipelineRequest> {
31-
3234
private final IngestService ingestService;
3335
private final OriginSettingClient client;
3436

@@ -73,4 +75,13 @@ protected ClusterBlockException checkBlock(PutPipelineRequest request, ClusterSt
7375
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
7476
}
7577

78+
@Override
79+
public Optional<String> reservedStateHandlerName() {
80+
return Optional.of(ReservedPipelineAction.NAME);
81+
}
82+
83+
@Override
84+
public Set<String> modifiedKeys(PutPipelineRequest request) {
85+
return Set.of(request.getId());
86+
}
7687
}

0 commit comments

Comments
 (0)