From f87a5dbd048e3ef1396964e280d72ce677841932 Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 26 Jun 2018 08:20:54 +0200 Subject: [PATCH 1/2] ingest: Add ignore_missing property to foreach filter (#22147) --- .../ingest/common/ForEachProcessor.java | 20 ++++++- .../common/ForEachProcessorFactoryTests.java | 17 ++++++ .../ingest/common/ForEachProcessorTests.java | 59 +++++++++++++------ 3 files changed, 76 insertions(+), 20 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 2a1046acb9cdb..1c64fdb7408ef 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -30,6 +30,7 @@ import java.util.Set; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; +import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; import static org.elasticsearch.ingest.ConfigurationUtils.readMap; import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty; @@ -47,16 +48,28 @@ public final class ForEachProcessor extends AbstractProcessor { private final String field; private final Processor processor; + private final boolean ignoreMissing; - ForEachProcessor(String tag, String field, Processor processor) { + ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) { super(tag); this.field = field; this.processor = processor; + this.ignoreMissing = ignoreMissing; + } + + boolean isIgnoreMissing() { + return ignoreMissing; } @Override public void execute(IngestDocument ingestDocument) throws Exception { - List values = ingestDocument.getFieldValue(field, List.class); + List values = ingestDocument.getFieldValue(field, List.class, ignoreMissing); + if (values == null) { + if (ignoreMissing) { + return; + } + throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."); + } List newValues = new ArrayList<>(values.size()); for (Object value : values) { Object previousValue = ingestDocument.getIngestMetadata().put("_value", value); @@ -87,6 +100,7 @@ public static final class Factory implements Processor.Factory { public ForEachProcessor create(Map factories, String tag, Map config) throws Exception { String field = readStringProperty(TYPE, tag, config, "field"); + boolean ignoreMissing = readBooleanProperty(TYPE, tag, config, "ignore_missing", false); Map> processorConfig = readMap(TYPE, tag, config, "processor"); Set>> entries = processorConfig.entrySet(); if (entries.size() != 1) { @@ -94,7 +108,7 @@ public ForEachProcessor create(Map factories, String } Map.Entry> entry = entries.iterator().next(); Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue()); - return new ForEachProcessor(tag, field, processor); + return new ForEachProcessor(tag, field, processor, ignoreMissing); } } } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index 49611d76f4081..f64459af78566 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -48,6 +48,23 @@ public void testCreate() throws Exception { assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor)); } + public void testSetIgnoreMissing() throws Exception { + Processor processor = new TestProcessor(ingestDocument -> { }); + Map registry = new HashMap<>(); + registry.put("_name", (r, t, c) -> processor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + + Map config = new HashMap<>(); + config.put("field", "_field"); + config.put("processor", Collections.singletonMap("_name", Collections.emptyMap())); + config.put("ignore_missing", true); + ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config); + assertThat(forEachProcessor, Matchers.notNullValue()); + assertThat(forEachProcessor.getField(), equalTo("_field")); + assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor)); + assertTrue(forEachProcessor.isIgnoreMissing()); + } + public void testCreateWithTooManyProcessorTypes() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index 07573a780a17a..508ddc7a19d9e 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -19,14 +19,6 @@ package org.elasticsearch.ingest.common; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.ingest.TestProcessor; -import org.elasticsearch.ingest.TestTemplateService; -import org.elasticsearch.script.TemplateScript; -import org.elasticsearch.test.ESTestCase; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -34,7 +26,16 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.CompoundProcessor; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ingest.TestProcessor; +import org.elasticsearch.ingest.TestTemplateService; +import org.elasticsearch.script.TemplateScript; +import org.elasticsearch.test.ESTestCase; +import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; public class ForEachProcessorTests extends ESTestCase { @@ -49,7 +50,8 @@ public void testExecute() throws Exception { ); ForEachProcessor processor = new ForEachProcessor( - "_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value") + "_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"), + false ); processor.execute(ingestDocument); @@ -69,7 +71,7 @@ public void testExecuteWithFailure() throws Exception { throw new RuntimeException("failure"); } }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false); try { processor.execute(ingestDocument); fail("exception expected"); @@ -89,7 +91,8 @@ public void testExecuteWithFailure() throws Exception { }); Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {}); processor = new ForEachProcessor( - "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)) + "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)), + false ); processor.execute(ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(3)); @@ -109,7 +112,7 @@ public void testMetaDataAvailable() throws Exception { id.setFieldValue("_ingest._value.type", id.getSourceAndMetadata().get("_type")); id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id")); }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); processor.execute(ingestDocument); assertThat(innerProcessor.getInvokedCounter(), equalTo(2)); @@ -137,7 +140,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { ForEachProcessor processor = new ForEachProcessor( "_tag", "values", new SetProcessor("_tag", new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"), - (model) -> model.get("other"))); + (model) -> model.get("other")), false); processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value")); @@ -174,7 +177,7 @@ public String getTag() { "_index", "_type", "_id", null, null, null, Collections.singletonMap("values", values) ); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); processor.execute(ingestDocument); @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); @@ -199,7 +202,7 @@ public void testModifyFieldsOutsideArray() throws Exception { "_tag", "values", new CompoundProcessor(false, Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")), Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added")))) - )); + ), false); processor.execute(ingestDocument); List result = ingestDocument.getFieldValue("values", List.class); @@ -225,7 +228,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_source._value", String.class))); - ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor); + ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false); forEachProcessor.execute(ingestDocument); List result = ingestDocument.getFieldValue("values", List.class); @@ -258,7 +261,7 @@ public void testNestedForEach() throws Exception { doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_ingest._value", String.class).toUpperCase(Locale.ENGLISH)) ); ForEachProcessor processor = new ForEachProcessor( - "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor)); + "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false); processor.execute(ingestDocument); List result = ingestDocument.getFieldValue("values1.0.values2", List.class); @@ -270,4 +273,26 @@ public void testNestedForEach() throws Exception { assertThat(result.get(1), equalTo("JKL")); } + public void testIgnoreMissing() throws Exception { + IngestDocument originalIngestDocument = new IngestDocument( + "_index", "_type", "_id", null, null, null, Collections.emptyMap() + ); + IngestDocument ingestDocument = new IngestDocument( + "_index", "_type", "_id", null, null, null, Collections.emptyMap() + ); + ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", new AbstractProcessor("noop") { + @Override + public void execute(final IngestDocument ingestDocument) { + throw new IllegalStateException("Should not run"); + } + + @Override + public String getType() { + return null; + } + }, true); + processor.execute(ingestDocument); + assertIngestDocument(originalIngestDocument, ingestDocument); + } + } From 93d6923198c796880baecf10798976c66eff8e2b Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 26 Jun 2018 17:24:43 +0200 Subject: [PATCH 2/2] CR Comments --- docs/reference/ingest/ingest-node.asciidoc | 7 ++++--- .../common/ForEachProcessorFactoryTests.java | 1 + .../ingest/common/ForEachProcessorTests.java | 19 ++++--------------- 3 files changed, 9 insertions(+), 18 deletions(-) diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index 009a67699e344..29ff039950925 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -1075,9 +1075,10 @@ then it aborts the execution and leaves the array unmodified. .Foreach Options [options="header"] |====== -| Name | Required | Default | Description -| `field` | yes | - | The array field -| `processor` | yes | - | The processor to execute against each field +| Name | Required | Default | Description +| `field` | yes | - | The array field +| `processor` | yes | - | The processor to execute against each field +| `ignore_missing` | no | false | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document |====== Assume the following document: diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index f64459af78566..f382ad8dcfb6a 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -46,6 +46,7 @@ public void testCreate() throws Exception { assertThat(forEachProcessor, Matchers.notNullValue()); assertThat(forEachProcessor.getField(), equalTo("_field")); assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor)); + assertFalse(forEachProcessor.isIgnoreMissing()); } public void testSetIgnoreMissing() throws Exception { diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index 508ddc7a19d9e..1491bd481bd07 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; @@ -277,22 +276,12 @@ public void testIgnoreMissing() throws Exception { IngestDocument originalIngestDocument = new IngestDocument( "_index", "_type", "_id", null, null, null, Collections.emptyMap() ); - IngestDocument ingestDocument = new IngestDocument( - "_index", "_type", "_id", null, null, null, Collections.emptyMap() - ); - ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", new AbstractProcessor("noop") { - @Override - public void execute(final IngestDocument ingestDocument) { - throw new IllegalStateException("Should not run"); - } - - @Override - public String getType() { - return null; - } - }, true); + IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); + TestProcessor testProcessor = new TestProcessor(doc -> {}); + ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true); processor.execute(ingestDocument); assertIngestDocument(originalIngestDocument, ingestDocument); + assertThat(testProcessor.getInvokedCounter(), equalTo(0)); } }