Skip to content

Commit f87a5db

Browse files
ingest: Add ignore_missing property to foreach filter (#22147)
1 parent 823a9d3 commit f87a5db

File tree

3 files changed

+76
-20
lines changed

3 files changed

+76
-20
lines changed

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Set;
3131

3232
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
33+
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
3334
import static org.elasticsearch.ingest.ConfigurationUtils.readMap;
3435
import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;
3536

@@ -47,16 +48,28 @@ public final class ForEachProcessor extends AbstractProcessor {
4748

4849
private final String field;
4950
private final Processor processor;
51+
private final boolean ignoreMissing;
5052

51-
ForEachProcessor(String tag, String field, Processor processor) {
53+
ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) {
5254
super(tag);
5355
this.field = field;
5456
this.processor = processor;
57+
this.ignoreMissing = ignoreMissing;
58+
}
59+
60+
boolean isIgnoreMissing() {
61+
return ignoreMissing;
5562
}
5663

5764
@Override
5865
public void execute(IngestDocument ingestDocument) throws Exception {
59-
List values = ingestDocument.getFieldValue(field, List.class);
66+
List values = ingestDocument.getFieldValue(field, List.class, ignoreMissing);
67+
if (values == null) {
68+
if (ignoreMissing) {
69+
return;
70+
}
71+
throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements.");
72+
}
6073
List<Object> newValues = new ArrayList<>(values.size());
6174
for (Object value : values) {
6275
Object previousValue = ingestDocument.getIngestMetadata().put("_value", value);
@@ -87,14 +100,15 @@ public static final class Factory implements Processor.Factory {
87100
public ForEachProcessor create(Map<String, Processor.Factory> factories, String tag,
88101
Map<String, Object> config) throws Exception {
89102
String field = readStringProperty(TYPE, tag, config, "field");
103+
boolean ignoreMissing = readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
90104
Map<String, Map<String, Object>> processorConfig = readMap(TYPE, tag, config, "processor");
91105
Set<Map.Entry<String, Map<String, Object>>> entries = processorConfig.entrySet();
92106
if (entries.size() != 1) {
93107
throw newConfigurationException(TYPE, tag, "processor", "Must specify exactly one processor type");
94108
}
95109
Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
96110
Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue());
97-
return new ForEachProcessor(tag, field, processor);
111+
return new ForEachProcessor(tag, field, processor, ignoreMissing);
98112
}
99113
}
100114
}

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,23 @@ public void testCreate() throws Exception {
4848
assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
4949
}
5050

51+
public void testSetIgnoreMissing() throws Exception {
52+
Processor processor = new TestProcessor(ingestDocument -> { });
53+
Map<String, Processor.Factory> registry = new HashMap<>();
54+
registry.put("_name", (r, t, c) -> processor);
55+
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
56+
57+
Map<String, Object> config = new HashMap<>();
58+
config.put("field", "_field");
59+
config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
60+
config.put("ignore_missing", true);
61+
ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config);
62+
assertThat(forEachProcessor, Matchers.notNullValue());
63+
assertThat(forEachProcessor.getField(), equalTo("_field"));
64+
assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
65+
assertTrue(forEachProcessor.isIgnoreMissing());
66+
}
67+
5168
public void testCreateWithTooManyProcessorTypes() throws Exception {
5269
Processor processor = new TestProcessor(ingestDocument -> { });
5370
Map<String, Processor.Factory> registry = new HashMap<>();

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,23 @@
1919

2020
package org.elasticsearch.ingest.common;
2121

22-
import org.elasticsearch.ingest.CompoundProcessor;
23-
import org.elasticsearch.ingest.IngestDocument;
24-
import org.elasticsearch.ingest.Processor;
25-
import org.elasticsearch.ingest.TestProcessor;
26-
import org.elasticsearch.ingest.TestTemplateService;
27-
import org.elasticsearch.script.TemplateScript;
28-
import org.elasticsearch.test.ESTestCase;
29-
3022
import java.util.ArrayList;
3123
import java.util.Arrays;
3224
import java.util.Collections;
3325
import java.util.HashMap;
3426
import java.util.List;
3527
import java.util.Locale;
3628
import java.util.Map;
29+
import org.elasticsearch.ingest.AbstractProcessor;
30+
import org.elasticsearch.ingest.CompoundProcessor;
31+
import org.elasticsearch.ingest.IngestDocument;
32+
import org.elasticsearch.ingest.Processor;
33+
import org.elasticsearch.ingest.TestProcessor;
34+
import org.elasticsearch.ingest.TestTemplateService;
35+
import org.elasticsearch.script.TemplateScript;
36+
import org.elasticsearch.test.ESTestCase;
3737

38+
import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
3839
import static org.hamcrest.Matchers.equalTo;
3940

4041
public class ForEachProcessorTests extends ESTestCase {
@@ -49,7 +50,8 @@ public void testExecute() throws Exception {
4950
);
5051

5152
ForEachProcessor processor = new ForEachProcessor(
52-
"_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value")
53+
"_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"),
54+
false
5355
);
5456
processor.execute(ingestDocument);
5557

@@ -69,7 +71,7 @@ public void testExecuteWithFailure() throws Exception {
6971
throw new RuntimeException("failure");
7072
}
7173
});
72-
ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor);
74+
ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false);
7375
try {
7476
processor.execute(ingestDocument);
7577
fail("exception expected");
@@ -89,7 +91,8 @@ public void testExecuteWithFailure() throws Exception {
8991
});
9092
Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {});
9193
processor = new ForEachProcessor(
92-
"_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor))
94+
"_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)),
95+
false
9396
);
9497
processor.execute(ingestDocument);
9598
assertThat(testProcessor.getInvokedCounter(), equalTo(3));
@@ -109,7 +112,7 @@ public void testMetaDataAvailable() throws Exception {
109112
id.setFieldValue("_ingest._value.type", id.getSourceAndMetadata().get("_type"));
110113
id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id"));
111114
});
112-
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor);
115+
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
113116
processor.execute(ingestDocument);
114117

115118
assertThat(innerProcessor.getInvokedCounter(), equalTo(2));
@@ -137,7 +140,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception {
137140
ForEachProcessor processor = new ForEachProcessor(
138141
"_tag", "values", new SetProcessor("_tag",
139142
new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"),
140-
(model) -> model.get("other")));
143+
(model) -> model.get("other")), false);
141144
processor.execute(ingestDocument);
142145

143146
assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value"));
@@ -174,7 +177,7 @@ public String getTag() {
174177
"_index", "_type", "_id", null, null, null, Collections.singletonMap("values", values)
175178
);
176179

177-
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor);
180+
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
178181
processor.execute(ingestDocument);
179182
@SuppressWarnings("unchecked")
180183
List<String> result = ingestDocument.getFieldValue("values", List.class);
@@ -199,7 +202,7 @@ public void testModifyFieldsOutsideArray() throws Exception {
199202
"_tag", "values", new CompoundProcessor(false,
200203
Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")),
201204
Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added"))))
202-
));
205+
), false);
203206
processor.execute(ingestDocument);
204207

205208
List result = ingestDocument.getFieldValue("values", List.class);
@@ -225,7 +228,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws
225228

226229
TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value",
227230
doc.getFieldValue("_source._value", String.class)));
228-
ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor);
231+
ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false);
229232
forEachProcessor.execute(ingestDocument);
230233

231234
List result = ingestDocument.getFieldValue("values", List.class);
@@ -258,7 +261,7 @@ public void testNestedForEach() throws Exception {
258261
doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_ingest._value", String.class).toUpperCase(Locale.ENGLISH))
259262
);
260263
ForEachProcessor processor = new ForEachProcessor(
261-
"_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor));
264+
"_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false);
262265
processor.execute(ingestDocument);
263266

264267
List result = ingestDocument.getFieldValue("values1.0.values2", List.class);
@@ -270,4 +273,26 @@ public void testNestedForEach() throws Exception {
270273
assertThat(result.get(1), equalTo("JKL"));
271274
}
272275

276+
public void testIgnoreMissing() throws Exception {
277+
IngestDocument originalIngestDocument = new IngestDocument(
278+
"_index", "_type", "_id", null, null, null, Collections.emptyMap()
279+
);
280+
IngestDocument ingestDocument = new IngestDocument(
281+
"_index", "_type", "_id", null, null, null, Collections.emptyMap()
282+
);
283+
ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", new AbstractProcessor("noop") {
284+
@Override
285+
public void execute(final IngestDocument ingestDocument) {
286+
throw new IllegalStateException("Should not run");
287+
}
288+
289+
@Override
290+
public String getType() {
291+
return null;
292+
}
293+
}, true);
294+
processor.execute(ingestDocument);
295+
assertIngestDocument(originalIngestDocument, ingestDocument);
296+
}
297+
273298
}

0 commit comments

Comments
 (0)