Skip to content

Commit cbfa396

Browse files
INGEST: Implement Drop Processor (#32278) (#34596)
* INGEST: Implement Drop Processor (#32278) * Adjust Processor API * Closes #23726
1 parent b2a7e57 commit cbfa396

File tree

44 files changed

+255
-134
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+255
-134
lines changed

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,17 @@ String getTargetField() {
5757
}
5858

5959
@Override
60-
public final void execute(IngestDocument document) {
60+
public final IngestDocument execute(IngestDocument document) {
6161
String val = document.getFieldValue(field, String.class, ignoreMissing);
6262

6363
if (val == null && ignoreMissing) {
64-
return;
64+
return document;
6565
} else if (val == null) {
6666
throw new IllegalArgumentException("field [" + field + "] is null, cannot process it.");
6767
}
6868

6969
document.setFieldValue(targetField, process(val));
70+
return document;
7071
}
7172

7273
protected abstract T process(String value);

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,9 @@ public ValueSource getValue() {
5656
}
5757

5858
@Override
59-
public void execute(IngestDocument ingestDocument) throws Exception {
59+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
6060
ingestDocument.appendFieldValue(field, value);
61+
return ingestDocument;
6162
}
6263

6364
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,12 @@ boolean isIgnoreMissing() {
173173
}
174174

175175
@Override
176-
public void execute(IngestDocument document) {
176+
public IngestDocument execute(IngestDocument document) {
177177
Object oldValue = document.getFieldValue(field, Object.class, ignoreMissing);
178178
Object newValue;
179179

180180
if (oldValue == null && ignoreMissing) {
181-
return;
181+
return document;
182182
} else if (oldValue == null) {
183183
throw new IllegalArgumentException("Field [" + field + "] is null, cannot be converted to type [" + convertType + "]");
184184
}
@@ -194,6 +194,7 @@ public void execute(IngestDocument document) {
194194
newValue = convertType.convert(oldValue);
195195
}
196196
document.setFieldValue(targetField, newValue);
197+
return document;
197198
}
198199

199200
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public final class DateIndexNameProcessor extends AbstractProcessor {
6363
}
6464

6565
@Override
66-
public void execute(IngestDocument ingestDocument) throws Exception {
66+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
6767
// Date can be specified as a string or long:
6868
Object obj = ingestDocument.getFieldValue(field, Object.class);
6969
String date = null;
@@ -101,6 +101,7 @@ public void execute(IngestDocument ingestDocument) throws Exception {
101101
.append('>');
102102
String dynamicIndexName = builder.toString();
103103
ingestDocument.setFieldValue(IngestDocument.MetaData.INDEX.getFieldName(), dynamicIndexName);
104+
return ingestDocument;
104105
}
105106

106107
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ private Locale newLocale(Map<String, Object> params) {
7474
}
7575

7676
@Override
77-
public void execute(IngestDocument ingestDocument) {
77+
public IngestDocument execute(IngestDocument ingestDocument) {
7878
Object obj = ingestDocument.getFieldValue(field, Object.class);
7979
String value = null;
8080
if (obj != null) {
@@ -98,6 +98,7 @@ public void execute(IngestDocument ingestDocument) {
9898
}
9999

100100
ingestDocument.setFieldValue(targetField, ISODateTimeFormat.dateTime().print(dateTime));
101+
return ingestDocument;
101102
}
102103

103104
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DissectProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,15 @@ public final class DissectProcessor extends AbstractProcessor {
4747
}
4848

4949
@Override
50-
public void execute(IngestDocument ingestDocument) {
50+
public IngestDocument execute(IngestDocument ingestDocument) {
5151
String input = ingestDocument.getFieldValue(field, String.class, ignoreMissing);
5252
if (input == null && ignoreMissing) {
53-
return;
53+
return ingestDocument;
5454
} else if (input == null) {
5555
throw new IllegalArgumentException("field [" + field + "] is null, cannot process it.");
5656
}
5757
dissectParser.parse(input).forEach(ingestDocument::setFieldValue);
58+
return ingestDocument;
5859
}
5960

6061
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DotExpanderProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public final class DotExpanderProcessor extends AbstractProcessor {
4141

4242
@Override
4343
@SuppressWarnings("unchecked")
44-
public void execute(IngestDocument ingestDocument) throws Exception {
44+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
4545
String path;
4646
Map<String, Object> map;
4747
if (this.path != null) {
@@ -75,6 +75,7 @@ public void execute(IngestDocument ingestDocument) throws Exception {
7575
Object value = map.remove(field);
7676
ingestDocument.setFieldValue(path, value);
7777
}
78+
return ingestDocument;
7879
}
7980

8081
@Override
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.ingest.common;
21+
22+
import java.util.Map;
23+
import org.elasticsearch.ingest.AbstractProcessor;
24+
import org.elasticsearch.ingest.IngestDocument;
25+
import org.elasticsearch.ingest.Processor;
26+
27+
/**
28+
* Drop processor only returns {@code null} for the execution result to indicate that any document
29+
* executed by it should not be indexed.
30+
*/
31+
public final class DropProcessor extends AbstractProcessor {
32+
33+
public static final String TYPE = "drop";
34+
35+
private DropProcessor(final String tag) {
36+
super(tag);
37+
}
38+
39+
@Override
40+
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
41+
return null;
42+
}
43+
44+
@Override
45+
public String getType() {
46+
return TYPE;
47+
}
48+
49+
public static final class Factory implements Processor.Factory {
50+
51+
@Override
52+
public Processor create(final Map<String, Processor.Factory> processorFactories, final String tag,
53+
final Map<String, Object> config) {
54+
return new DropProcessor(tag);
55+
}
56+
}
57+
}

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public TemplateScript.Factory getMessage() {
4848
}
4949

5050
@Override
51-
public void execute(IngestDocument document) {
51+
public IngestDocument execute(IngestDocument document) {
5252
throw new FailProcessorException(document.renderTemplate(message));
5353
}
5454

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,24 +63,29 @@ boolean isIgnoreMissing() {
6363
}
6464

6565
@Override
66-
public void execute(IngestDocument ingestDocument) throws Exception {
67-
List values = ingestDocument.getFieldValue(field, List.class, ignoreMissing);
66+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
67+
List<?> values = ingestDocument.getFieldValue(field, List.class, ignoreMissing);
6868
if (values == null) {
6969
if (ignoreMissing) {
70-
return;
70+
return ingestDocument;
7171
}
7272
throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements.");
7373
}
7474
List<Object> newValues = new ArrayList<>(values.size());
75+
IngestDocument document = ingestDocument;
7576
for (Object value : values) {
7677
Object previousValue = ingestDocument.getIngestMetadata().put("_value", value);
7778
try {
78-
processor.execute(ingestDocument);
79+
document = processor.execute(document);
80+
if (document == null) {
81+
return null;
82+
}
7983
} finally {
8084
newValues.add(ingestDocument.getIngestMetadata().put("_value", previousValue));
8185
}
8286
}
83-
ingestDocument.setFieldValue(field, newValues);
87+
document.setFieldValue(field, newValues);
88+
return document;
8489
}
8590

8691
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,11 @@ public final class GrokProcessor extends AbstractProcessor {
5454
}
5555

5656
@Override
57-
public void execute(IngestDocument ingestDocument) throws Exception {
57+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
5858
String fieldValue = ingestDocument.getFieldValue(matchField, String.class, ignoreMissing);
5959

6060
if (fieldValue == null && ignoreMissing) {
61-
return;
61+
return ingestDocument;
6262
} else if (fieldValue == null) {
6363
throw new IllegalArgumentException("field [" + matchField + "] is null, cannot process it.");
6464
}
@@ -81,6 +81,7 @@ public void execute(IngestDocument ingestDocument) throws Exception {
8181
ingestDocument.setFieldValue(PATTERN_MATCH_KEY, "0");
8282
}
8383
}
84+
return ingestDocument;
8485
}
8586

8687
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
8484
processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory());
8585
processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService));
8686
processors.put(DissectProcessor.TYPE, new DissectProcessor.Factory());
87+
processors.put(DropProcessor.TYPE, new DropProcessor.Factory());
8788
return Collections.unmodifiableMap(processors);
8889
}
8990

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JoinProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ String getTargetField() {
6060
}
6161

6262
@Override
63-
public void execute(IngestDocument document) {
63+
public IngestDocument execute(IngestDocument document) {
6464
List<?> list = document.getFieldValue(field, List.class);
6565
if (list == null) {
6666
throw new IllegalArgumentException("field [" + field + "] is null, cannot join.");
@@ -69,6 +69,7 @@ public void execute(IngestDocument document) {
6969
.map(Object::toString)
7070
.collect(Collectors.joining(separator));
7171
document.setFieldValue(targetField, joined);
72+
return document;
7273
}
7374

7475
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JsonProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,13 @@ public static void apply(Map<String, Object> ctx, String fieldName) {
107107
}
108108

109109
@Override
110-
public void execute(IngestDocument document) throws Exception {
110+
public IngestDocument execute(IngestDocument document) throws Exception {
111111
if (addToRoot) {
112112
apply(document.getSourceAndMetadata(), field);
113113
} else {
114114
document.setFieldValue(targetField, apply(document.getFieldValue(field, Object.class)));
115115
}
116+
return document;
116117
}
117118

118119
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,9 @@ private static void append(IngestDocument document, String targetField, String v
188188
}
189189

190190
@Override
191-
public void execute(IngestDocument document) {
191+
public IngestDocument execute(IngestDocument document) {
192192
execution.accept(document);
193+
return document;
193194
}
194195

195196
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ private PipelineProcessor(String tag, String pipelineName, IngestService ingestS
4242
}
4343

4444
@Override
45-
public void execute(IngestDocument ingestDocument) throws Exception {
45+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
4646
Pipeline pipeline = ingestService.getPipeline(pipelineName);
4747
if (pipeline == null) {
4848
throw new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']');
4949
}
50-
ingestDocument.executePipeline(pipeline);
50+
return ingestDocument.executePipeline(pipeline);
5151
}
5252

5353
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,5 @@ public static void json(Map<String, Object> ctx, String field) {
4646
public static String urlDecode(String value) {
4747
return URLDecodeProcessor.apply(value);
4848
}
49+
4950
}

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RemoveProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public List<TemplateScript.Factory> getFields() {
5252
}
5353

5454
@Override
55-
public void execute(IngestDocument document) {
55+
public IngestDocument execute(IngestDocument document) {
5656
if (ignoreMissing) {
5757
fields.forEach(field -> {
5858
String path = document.renderTemplate(field);
@@ -63,6 +63,7 @@ public void execute(IngestDocument document) {
6363
} else {
6464
fields.forEach(document::removeField);
6565
}
66+
return document;
6667
}
6768

6869
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RenameProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ boolean isIgnoreMissing() {
5959
}
6060

6161
@Override
62-
public void execute(IngestDocument document) {
62+
public IngestDocument execute(IngestDocument document) {
6363
String path = document.renderTemplate(field);
6464
if (document.hasField(path, true) == false) {
6565
if (ignoreMissing) {
66-
return;
66+
return document;
6767
} else {
6868
throw new IllegalArgumentException("field [" + path + "] doesn't exist");
6969
}
@@ -86,6 +86,7 @@ public void execute(IngestDocument document) {
8686
document.setFieldValue(path, value);
8787
throw e;
8888
}
89+
return document;
8990
}
9091

9192
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ScriptProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,11 @@ public final class ScriptProcessor extends AbstractProcessor {
7171
* @param document The Ingest document passed into the script context under the "ctx" object.
7272
*/
7373
@Override
74-
public void execute(IngestDocument document) {
74+
public IngestDocument execute(IngestDocument document) {
7575
IngestScript.Factory factory = scriptService.compile(script, IngestScript.CONTEXT);
7676
factory.newInstance(script.getParams()).execute(document.getSourceAndMetadata());
7777
CollectionUtils.ensureNoSelfReferences(document.getSourceAndMetadata());
78+
return document;
7879
}
7980

8081
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SetProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,11 @@ public ValueSource getValue() {
6565
}
6666

6767
@Override
68-
public void execute(IngestDocument document) {
68+
public IngestDocument execute(IngestDocument document) {
6969
if (overrideEnabled || document.hasField(field) == false || document.getFieldValue(field, Object.class) == null) {
7070
document.setFieldValue(field, value);
7171
}
72+
return document;
7273
}
7374

7475
@Override

0 commit comments

Comments
 (0)