Skip to content

Commit 7a59ae8

Browse files
authored
[7.x] Allow_duplicates option for append processor (#61916) (#63257)
1 parent a3252af commit 7a59ae8

File tree

5 files changed

+196
-32
lines changed

5 files changed

+196
-32
lines changed

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,13 @@ public final class AppendProcessor extends AbstractProcessor {
4040

4141
private final TemplateScript.Factory field;
4242
private final ValueSource value;
43+
private final boolean allowDuplicates;
4344

44-
AppendProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value) {
45+
AppendProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, boolean allowDuplicates) {
4546
super(tag, description);
4647
this.field = field;
4748
this.value = value;
49+
this.allowDuplicates = allowDuplicates;
4850
}
4951

5052
public TemplateScript.Factory getField() {
@@ -57,7 +59,7 @@ public ValueSource getValue() {
5759

5860
@Override
5961
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
60-
ingestDocument.appendFieldValue(field, value);
62+
ingestDocument.appendFieldValue(field, value, allowDuplicates);
6163
return ingestDocument;
6264
}
6365

@@ -79,9 +81,11 @@ public AppendProcessor create(Map<String, Processor.Factory> registry, String pr
7981
String description, Map<String, Object> config) throws Exception {
8082
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
8183
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
84+
boolean allowDuplicates = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "allow_duplicates", true);
8285
TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag,
8386
"field", field, scriptService);
84-
return new AppendProcessor(processorTag, description, compiledTemplate, ValueSource.wrap(value, scriptService));
87+
return new AppendProcessor(processorTag, description, compiledTemplate, ValueSource.wrap(value, scriptService),
88+
allowDuplicates);
8589
}
8690
}
8791
}

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java

+68-10
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,16 @@
2828
import org.elasticsearch.test.ESTestCase;
2929

3030
import java.util.ArrayList;
31+
import java.util.Collections;
3132
import java.util.HashMap;
3233
import java.util.List;
3334
import java.util.Map;
3435

3536
import static org.hamcrest.CoreMatchers.equalTo;
37+
import static org.hamcrest.CoreMatchers.instanceOf;
3638
import static org.hamcrest.CoreMatchers.not;
3739
import static org.hamcrest.CoreMatchers.sameInstance;
40+
import static org.hamcrest.Matchers.containsInAnyOrder;
3841

3942
public class AppendProcessorTests extends ESTestCase {
4043

@@ -53,13 +56,13 @@ public void testAppendValuesToExistingList() throws Exception {
5356
if (randomBoolean()) {
5457
Object value = scalar.randomValue();
5558
values.add(value);
56-
appendProcessor = createAppendProcessor(field, value);
59+
appendProcessor = createAppendProcessor(field, value, true);
5760
} else {
5861
int valuesSize = randomIntBetween(0, 10);
5962
for (int i = 0; i < valuesSize; i++) {
6063
values.add(scalar.randomValue());
6164
}
62-
appendProcessor = createAppendProcessor(field, values);
65+
appendProcessor = createAppendProcessor(field, values, true);
6366
}
6467
appendProcessor.execute(ingestDocument);
6568
Object fieldValue = ingestDocument.getFieldValue(field, Object.class);
@@ -82,13 +85,13 @@ public void testAppendValuesToNonExistingList() throws Exception {
8285
if (randomBoolean()) {
8386
Object value = scalar.randomValue();
8487
values.add(value);
85-
appendProcessor = createAppendProcessor(field, value);
88+
appendProcessor = createAppendProcessor(field, value, true);
8689
} else {
8790
int valuesSize = randomIntBetween(0, 10);
8891
for (int i = 0; i < valuesSize; i++) {
8992
values.add(scalar.randomValue());
9093
}
91-
appendProcessor = createAppendProcessor(field, values);
94+
appendProcessor = createAppendProcessor(field, values, true);
9295
}
9396
appendProcessor.execute(ingestDocument);
9497
List<?> list = ingestDocument.getFieldValue(field, List.class);
@@ -106,13 +109,13 @@ public void testConvertScalarToList() throws Exception {
106109
if (randomBoolean()) {
107110
Object value = scalar.randomValue();
108111
values.add(value);
109-
appendProcessor = createAppendProcessor(field, value);
112+
appendProcessor = createAppendProcessor(field, value, true);
110113
} else {
111114
int valuesSize = randomIntBetween(0, 10);
112115
for (int i = 0; i < valuesSize; i++) {
113116
values.add(scalar.randomValue());
114117
}
115-
appendProcessor = createAppendProcessor(field, values);
118+
appendProcessor = createAppendProcessor(field, values, true);
116119
}
117120
appendProcessor.execute(ingestDocument);
118121
List<?> fieldValue = ingestDocument.getFieldValue(field, List.class);
@@ -132,13 +135,13 @@ public void testAppendMetadataExceptVersion() throws Exception {
132135
if (randomBoolean()) {
133136
String value = randomAlphaOfLengthBetween(1, 10);
134137
values.add(value);
135-
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), value);
138+
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), value, true);
136139
} else {
137140
int valuesSize = randomIntBetween(0, 10);
138141
for (int i = 0; i < valuesSize; i++) {
139142
values.add(randomAlphaOfLengthBetween(1, 10));
140143
}
141-
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), values);
144+
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), values, true);
142145
}
143146

144147
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
@@ -156,10 +159,65 @@ public void testAppendMetadataExceptVersion() throws Exception {
156159
}
157160
}
158161

159-
private static Processor createAppendProcessor(String fieldName, Object fieldValue) {
162+
public void testAppendingDuplicateValueToScalarDoesNotModifyDocument() throws Exception {
163+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
164+
String originalValue = randomAlphaOfLengthBetween(1, 10);
165+
String field = RandomDocumentPicks.addRandomField(random(), ingestDocument, originalValue);
166+
167+
List<Object> valuesToAppend = new ArrayList<>();
168+
valuesToAppend.add(originalValue);
169+
Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false);
170+
appendProcessor.execute(ingestDocument);
171+
Object fieldValue = ingestDocument.getFieldValue(field, Object.class);
172+
assertThat(fieldValue, not(instanceOf(List.class)));
173+
assertThat(fieldValue, equalTo(originalValue));
174+
}
175+
176+
public void testAppendingUniqueValueToScalar() throws Exception {
177+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
178+
String originalValue = randomAlphaOfLengthBetween(1, 10);
179+
String field = RandomDocumentPicks.addRandomField(random(), ingestDocument, originalValue);
180+
181+
List<Object> valuesToAppend = new ArrayList<>();
182+
String newValue = randomAlphaOfLengthBetween(1, 10);
183+
valuesToAppend.add(newValue);
184+
Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false);
185+
appendProcessor.execute(ingestDocument);
186+
List<?> list = ingestDocument.getFieldValue(field, List.class);
187+
assertThat(list.size(), equalTo(2));
188+
assertThat(list, equalTo(org.elasticsearch.common.collect.List.of(originalValue, newValue)));
189+
}
190+
191+
public void testAppendingToListWithDuplicatesDisallowed() throws Exception {
192+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
193+
List<String> list = new ArrayList<>();
194+
int size = randomIntBetween(0, 10);
195+
for (int i = 0; i < size; i++) {
196+
list.add(randomAlphaOfLengthBetween(1, 10));
197+
}
198+
String originalField = RandomDocumentPicks.addRandomField(random(), ingestDocument, list);
199+
List<String> expectedValues = new ArrayList<>(list);
200+
List<String> existingValues = randomSubsetOf(list);
201+
int uniqueValuesSize = randomIntBetween(0, 10);
202+
List<String> uniqueValues = new ArrayList<>();
203+
for (int i = 0; i < uniqueValuesSize; i++) {
204+
uniqueValues.add(randomAlphaOfLengthBetween(1, 10));
205+
}
206+
List<String> valuesToAppend = new ArrayList<>(existingValues);
207+
valuesToAppend.addAll(uniqueValues);
208+
expectedValues.addAll(uniqueValues);
209+
Collections.sort(valuesToAppend);
210+
Processor appendProcessor = createAppendProcessor(originalField, valuesToAppend, false);
211+
appendProcessor.execute(ingestDocument);
212+
List<?> fieldValue = ingestDocument.getFieldValue(originalField, List.class);
213+
assertThat(fieldValue, sameInstance(list));
214+
assertThat(fieldValue, containsInAnyOrder(expectedValues.toArray()));
215+
}
216+
217+
private static Processor createAppendProcessor(String fieldName, Object fieldValue, boolean allowDuplicates) {
160218
return new AppendProcessor(randomAlphaOfLength(10),
161219
null, new TestTemplateService.MockTemplateScript.Factory(fieldName),
162-
ValueSource.wrap(fieldValue, TestTemplateService.instance()));
220+
ValueSource.wrap(fieldValue, TestTemplateService.instance()), allowDuplicates);
163221
}
164222

165223
private enum Scalar {

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,10 @@ public void testModifyFieldsOutsideArray() throws Exception {
206206

207207
ForEachProcessor processor = new ForEachProcessor(
208208
"_tag", null, "values", new CompoundProcessor(false,
209-
Collections.singletonList(new UppercaseProcessor("_tag_upper", null, "_ingest._value", false, "_ingest._value")),
210-
Collections.singletonList(new AppendProcessor("_tag", null, template, (model) -> (Collections.singletonList("added"))))
209+
org.elasticsearch.common.collect.List.of(
210+
new UppercaseProcessor("_tag_upper", null, "_ingest._value", false, "_ingest._value")),
211+
org.elasticsearch.common.collect.List.of(
212+
new AppendProcessor("_tag", null, template, (model) -> (Collections.singletonList("added")), true))
211213
), false);
212214
processor.execute(ingestDocument, (result, e) -> {});
213215

server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

+69-6
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,24 @@ private static Object resolve(String pathElement, String fullPath, Object contex
379379
* @throws IllegalArgumentException if the path is null, empty or invalid.
380380
*/
381381
public void appendFieldValue(String path, Object value) {
382-
setFieldValue(path, value, true);
382+
appendFieldValue(path, value, true);
383+
}
384+
385+
/**
386+
* Appends the provided value to the provided path in the document.
387+
* Any non existing path element will be created.
388+
* If the path identifies a list, the value will be appended to the existing list.
389+
* If the path identifies a scalar, the scalar will be converted to a list and
390+
* the provided value will be added to the newly created list.
391+
* Supports multiple values too provided in forms of list, in that case all the values will be appended to the
392+
* existing (or newly created) list.
393+
* @param path The path within the document in dot-notation
394+
* @param value The value or values to append to the existing ones
395+
* @param allowDuplicates When false, any values that already exist in the field will not be added
396+
* @throws IllegalArgumentException if the path is null, empty or invalid.
397+
*/
398+
public void appendFieldValue(String path, Object value, boolean allowDuplicates) {
399+
setFieldValue(path, value, true, allowDuplicates);
383400
}
384401

385402
/**
@@ -399,6 +416,24 @@ public void appendFieldValue(TemplateScript.Factory fieldPathTemplate, ValueSour
399416
appendFieldValue(fieldPathTemplate.newInstance(model).execute(), valueSource.copyAndResolve(model));
400417
}
401418

419+
/**
420+
* Appends the provided value to the provided path in the document.
421+
* Any non existing path element will be created.
422+
* If the path identifies a list, the value will be appended to the existing list.
423+
* If the path identifies a scalar, the scalar will be converted to a list and
424+
* the provided value will be added to the newly created list.
425+
* Supports multiple values too provided in forms of list, in that case all the values will be appended to the
426+
* existing (or newly created) list.
427+
* @param fieldPathTemplate Resolves to the path with dot-notation within the document
428+
* @param valueSource The value source that will produce the value or values to append to the existing ones
429+
* @param allowDuplicates When false, any values that already exist in the field will not be added
430+
* @throws IllegalArgumentException if the path is null, empty or invalid.
431+
*/
432+
public void appendFieldValue(TemplateScript.Factory fieldPathTemplate, ValueSource valueSource, boolean allowDuplicates) {
433+
Map<String, Object> model = createTemplateModel();
434+
appendFieldValue(fieldPathTemplate.newInstance(model).execute(), valueSource.copyAndResolve(model), allowDuplicates);
435+
}
436+
402437
/**
403438
* Sets the provided value to the provided path in the document.
404439
* Any non existing path element will be created.
@@ -454,6 +489,10 @@ public void setFieldValue(TemplateScript.Factory fieldPathTemplate, ValueSource
454489
}
455490

456491
private void setFieldValue(String path, Object value, boolean append) {
492+
setFieldValue(path, value, append, true);
493+
}
494+
495+
private void setFieldValue(String path, Object value, boolean append, boolean allowDuplicates) {
457496
FieldPath fieldPath = new FieldPath(path);
458497
Object context = fieldPath.initialContext;
459498
for (int i = 0; i < fieldPath.pathElements.length - 1; i++) {
@@ -502,7 +541,7 @@ private void setFieldValue(String path, Object value, boolean append) {
502541
if (append) {
503542
if (map.containsKey(leafKey)) {
504543
Object object = map.get(leafKey);
505-
List<Object> list = appendValues(object, value);
544+
Object list = appendValues(object, value, allowDuplicates);
506545
if (list != object) {
507546
map.put(leafKey, list);
508547
}
@@ -530,7 +569,7 @@ private void setFieldValue(String path, Object value, boolean append) {
530569
}
531570
if (append) {
532571
Object object = list.get(index);
533-
List<Object> newList = appendValues(object, value);
572+
Object newList = appendValues(object, value, allowDuplicates);
534573
if (newList != object) {
535574
list.set(index, newList);
536575
}
@@ -544,7 +583,7 @@ private void setFieldValue(String path, Object value, boolean append) {
544583
}
545584

546585
@SuppressWarnings("unchecked")
547-
private static List<Object> appendValues(Object maybeList, Object value) {
586+
private static Object appendValues(Object maybeList, Object value, boolean allowDuplicates) {
548587
List<Object> list;
549588
if (maybeList instanceof List) {
550589
//maybeList is already a list, we append the provided values to it
@@ -554,8 +593,13 @@ private static List<Object> appendValues(Object maybeList, Object value) {
554593
list = new ArrayList<>();
555594
list.add(maybeList);
556595
}
557-
appendValues(list, value);
558-
return list;
596+
if (allowDuplicates) {
597+
appendValues(list, value);
598+
return list;
599+
} else {
600+
// if no values were appended due to duplication, return the original object so the ingest document remains unmodified
601+
return appendValuesWithoutDuplicates(list, value) ? list : maybeList;
602+
}
559603
}
560604

561605
private static void appendValues(List<Object> list, Object value) {
@@ -566,6 +610,25 @@ private static void appendValues(List<Object> list, Object value) {
566610
}
567611
}
568612

613+
private static boolean appendValuesWithoutDuplicates(List<Object> list, Object value) {
614+
boolean valuesWereAppended = false;
615+
if (value instanceof List) {
616+
List<?> valueList = (List<?>) value;
617+
for (Object val : valueList) {
618+
if (list.contains(val) == false) {
619+
list.add(val);
620+
valuesWereAppended = true;
621+
}
622+
}
623+
} else {
624+
if (list.contains(value) == false) {
625+
list.add(value);
626+
valuesWereAppended = true;
627+
}
628+
}
629+
return valuesWereAppended;
630+
}
631+
569632
private static <T> T cast(String path, Object object, Class<T> clazz) {
570633
if (object == null) {
571634
return null;

0 commit comments

Comments
 (0)