Skip to content

Commit 24d7fa6

Browse files
committed
ingest: Change the foreach processor to use the _ingest._value ingest metadata attribute to store the current array element being processed.
Closes #19592
1 parent 21ff90f commit 24d7fa6

File tree

14 files changed

+143
-54
lines changed

14 files changed

+143
-54
lines changed

core/src/main/java/org/elasticsearch/action/ingest/WriteableIngestDocument.java

+3-8
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,14 @@ final class WriteableIngestDocument implements Writeable, ToXContent {
4141

4242
WriteableIngestDocument(StreamInput in) throws IOException {
4343
Map<String, Object> sourceAndMetadata = in.readMap();
44-
@SuppressWarnings("unchecked")
45-
Map<String, String> ingestMetadata = (Map<String, String>) in.readGenericValue();
44+
Map<String, Object> ingestMetadata = in.readMap();
4645
this.ingestDocument = new IngestDocument(sourceAndMetadata, ingestMetadata);
4746
}
4847

4948
@Override
5049
public void writeTo(StreamOutput out) throws IOException {
5150
out.writeMap(ingestDocument.getSourceAndMetadata());
52-
out.writeGenericValue(ingestDocument.getIngestMetadata());
51+
out.writeMap(ingestDocument.getIngestMetadata());
5352
}
5453

5554
IngestDocument getIngestDocument() {
@@ -66,11 +65,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
6665
}
6766
}
6867
builder.field("_source", ingestDocument.getSourceAndMetadata());
69-
builder.startObject("_ingest");
70-
for (Map.Entry<String, String> ingestMetadata : ingestDocument.getIngestMetadata().entrySet()) {
71-
builder.field(ingestMetadata.getKey(), ingestMetadata.getValue());
72-
}
73-
builder.endObject();
68+
builder.field("_ingest", ingestDocument.getIngestMetadata());
7469
builder.endObject();
7570
return builder;
7671
}

core/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,14 @@ private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchExce
135135
List<String> processorTagHeader = cause.getHeader("processor_tag");
136136
String failedProcessorType = (processorTypeHeader != null) ? processorTypeHeader.get(0) : null;
137137
String failedProcessorTag = (processorTagHeader != null) ? processorTagHeader.get(0) : null;
138-
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
138+
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
139139
ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getRootCause().getMessage());
140140
ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
141141
ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
142142
}
143143

144144
private void removeFailureMetadata(IngestDocument ingestDocument) {
145-
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
145+
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
146146
ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
147147
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
148148
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);

core/src/main/java/org/elasticsearch/ingest/IngestDocument.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public final class IngestDocument {
5454
static final String TIMESTAMP = "timestamp";
5555

5656
private final Map<String, Object> sourceAndMetadata;
57-
private final Map<String, String> ingestMetadata;
57+
private final Map<String, Object> ingestMetadata;
5858

5959
public IngestDocument(String index, String type, String id, String routing, String parent, String timestamp,
6060
String ttl, Map<String, Object> source) {
@@ -94,7 +94,7 @@ public IngestDocument(IngestDocument other) {
9494
* source and ingest metadata. This is needed because the ingest metadata will be initialized with the current timestamp at
9595
* init time, which makes equality comparisons impossible in tests.
9696
*/
97-
public IngestDocument(Map<String, Object> sourceAndMetadata, Map<String, String> ingestMetadata) {
97+
public IngestDocument(Map<String, Object> sourceAndMetadata, Map<String, Object> ingestMetadata) {
9898
this.sourceAndMetadata = sourceAndMetadata;
9999
this.ingestMetadata = ingestMetadata;
100100
}
@@ -517,7 +517,7 @@ public Map<MetaData, String> extractMetadata() {
517517
* Returns the available ingest metadata fields, by default only timestamp, but it is possible to set additional ones.
518518
* Use only for reading values, modify them instead using {@link #setFieldValue(String, Object)} and {@link #removeField(String)}
519519
*/
520-
public Map<String, String> getIngestMetadata() {
520+
public Map<String, Object> getIngestMetadata() {
521521
return this.ingestMetadata;
522522
}
523523

core/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public void testExecuteVerboseItemWithOnFailure() throws Exception {
145145
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), not(sameInstance(ingestDocument)));
146146

147147
IngestDocument ingestDocumentWithOnFailureMetadata = new IngestDocument(ingestDocument);
148-
Map<String, String> metadata = ingestDocumentWithOnFailureMetadata.getIngestMetadata();
148+
Map<String, Object> metadata = ingestDocumentWithOnFailureMetadata.getIngestMetadata();
149149
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD, "mock");
150150
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD, "processor_0");
151151
metadata.put(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD, "processor failed");

core/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public void testActualCompoundProcessorWithOnFailure() throws Exception {
111111
assertThat(resultList.get(0).getFailure(), equalTo(exception));
112112
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag()));
113113

114-
Map<String, String> metadata = resultList.get(1).getIngestDocument().getIngestMetadata();
114+
Map<String, Object> metadata = resultList.get(1).getIngestDocument().getIngestMetadata();
115115
assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
116116
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test"));
117117
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail"));

core/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public void testEqualsAndHashcode() throws Exception {
4747
for (int i = 0; i < numFields; i++) {
4848
sourceAndMetadata.put(randomFrom(IngestDocument.MetaData.values()).getFieldName(), randomAsciiOfLengthBetween(5, 10));
4949
}
50-
Map<String, String> ingestMetadata = new HashMap<>();
50+
Map<String, Object> ingestMetadata = new HashMap<>();
5151
numFields = randomIntBetween(1, 5);
5252
for (int i = 0; i < numFields; i++) {
5353
ingestMetadata.put(randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(5, 10));
@@ -70,7 +70,7 @@ public void testEqualsAndHashcode() throws Exception {
7070
changed = true;
7171
}
7272

73-
Map<String, String> otherIngestMetadata;
73+
Map<String, Object> otherIngestMetadata;
7474
if (randomBoolean()) {
7575
otherIngestMetadata = new HashMap<>();
7676
numFields = randomIntBetween(1, 5);
@@ -103,7 +103,7 @@ public void testSerialization() throws IOException {
103103
for (int i = 0; i < numFields; i++) {
104104
sourceAndMetadata.put(randomFrom(IngestDocument.MetaData.values()).getFieldName(), randomAsciiOfLengthBetween(5, 10));
105105
}
106-
Map<String, String> ingestMetadata = new HashMap<>();
106+
Map<String, Object> ingestMetadata = new HashMap<>();
107107
numFields = randomIntBetween(1, 5);
108108
for (int i = 0; i < numFields; i++) {
109109
ingestMetadata.put(randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(5, 10));
@@ -131,7 +131,7 @@ public void testToXContent() throws IOException {
131131

132132
Map<String, Object> toXContentDoc = (Map<String, Object>) toXContentMap.get("doc");
133133
Map<String, Object> toXContentSource = (Map<String, Object>) toXContentDoc.get("_source");
134-
Map<String, String> toXContentIngestMetadata = (Map<String, String>) toXContentDoc.get("_ingest");
134+
Map<String, Object> toXContentIngestMetadata = (Map<String, Object>) toXContentDoc.get("_ingest");
135135

136136
Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
137137
for (Map.Entry<IngestDocument.MetaData, String> metadata : metadataMap.entrySet()) {

core/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void testIgnoreFailure() throws Exception {
8686
public void testSingleProcessorWithOnFailureProcessor() throws Exception {
8787
TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
8888
TestProcessor processor2 = new TestProcessor(ingestDocument -> {
89-
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
89+
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
9090
assertThat(ingestMetadata.size(), equalTo(3));
9191
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
9292
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first"));
@@ -104,15 +104,15 @@ public void testSingleProcessorWithOnFailureProcessor() throws Exception {
104104
public void testSingleProcessorWithNestedFailures() throws Exception {
105105
TestProcessor processor = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
106106
TestProcessor processorToFail = new TestProcessor("id2", "second", ingestDocument -> {
107-
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
107+
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
108108
assertThat(ingestMetadata.size(), equalTo(3));
109109
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
110110
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first"));
111111
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id"));
112112
throw new RuntimeException("error");
113113
});
114114
TestProcessor lastProcessor = new TestProcessor(ingestDocument -> {
115-
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
115+
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
116116
assertThat(ingestMetadata.size(), equalTo(3));
117117
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
118118
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second"));
@@ -131,7 +131,7 @@ public void testSingleProcessorWithNestedFailures() throws Exception {
131131
public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception {
132132
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
133133
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
134-
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
134+
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
135135
assertThat(ingestMetadata.entrySet(), hasSize(3));
136136
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
137137
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first"));
@@ -153,7 +153,7 @@ public void testCompoundProcessorExceptionFail() throws Exception {
153153
TestProcessor failProcessor =
154154
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
155155
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
156-
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
156+
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
157157
assertThat(ingestMetadata.entrySet(), hasSize(3));
158158
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("custom error message"));
159159
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("fail"));
@@ -176,7 +176,7 @@ public void testCompoundProcessorExceptionFailInOnFailure() throws Exception {
176176
TestProcessor failProcessor =
177177
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
178178
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
179-
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
179+
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
180180
assertThat(ingestMetadata.entrySet(), hasSize(3));
181181
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("custom error message"));
182182
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("fail"));

core/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -907,7 +907,7 @@ public void testEqualsAndHashcode() throws Exception {
907907
for (int i = 0; i < numFields; i++) {
908908
sourceAndMetadata.put(randomFrom(IngestDocument.MetaData.values()).getFieldName(), randomAsciiOfLengthBetween(5, 10));
909909
}
910-
Map<String, String> ingestMetadata = new HashMap<>();
910+
Map<String, Object> ingestMetadata = new HashMap<>();
911911
numFields = randomIntBetween(1, 5);
912912
for (int i = 0; i < numFields; i++) {
913913
ingestMetadata.put(randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(5, 10));
@@ -930,7 +930,7 @@ public void testEqualsAndHashcode() throws Exception {
930930
changed = true;
931931
}
932932

933-
Map<String, String> otherIngestMetadata;
933+
Map<String, Object> otherIngestMetadata;
934934
if (randomBoolean()) {
935935
otherIngestMetadata = new HashMap<>();
936936
numFields = randomIntBetween(1, 5);
@@ -962,7 +962,7 @@ public void testIngestMetadataTimestamp() throws Exception {
962962
long before = System.currentTimeMillis();
963963
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
964964
long after = System.currentTimeMillis();
965-
String timestampString = ingestDocument.getIngestMetadata().get("timestamp");
965+
String timestampString = (String) ingestDocument.getIngestMetadata().get("timestamp");
966966
assertThat(timestampString, notNullValue());
967967
assertThat(timestampString, endsWith("+0000"));
968968
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ", Locale.ROOT);

docs/reference/ingest/ingest-node.asciidoc

+13-7
Original file line numberDiff line numberDiff line change
@@ -859,8 +859,16 @@ because it is likely that the number of elements in an array is unknown. For thi
859859
processor exists. By specifying the field holding array elements and a processor that
860860
defines what should happen to each element, array fields can easily be preprocessed.
861861

862-
A processor inside the foreach processor works in a different context, and the only valid top-level
863-
field is `_value`, which holds the array element value. Under this field other fields may exist.
862+
A processor inside the foreach processor works in the array element context and puts that in the ingest metadata
863+
under the `_ingest._value` key. If the array element is a json object it holds all immediate fields of that json object.
864+
and if the nested object is a value is `_ingest._value` just holds that value. Note that if a processor prior to the
865+
`foreach` processor used `_ingest._value` key then the specified value will not be available to the processor inside
866+
the `foreach` processor. The `foreach` processor does restore the original value, so that value is available to processors
867+
after the `foreach` processor.
868+
869+
Note that any other field from the document are accessible and modifiable like with all other processors. This processor
870+
just puts the current array element being read into `_ingest._value` ingest metadata attribute, so that it may be
871+
pre-processed.
864872

865873
If the `foreach` processor fails to process an element inside the array, and no `on_failure` processor has been specified,
866874
then it aborts the execution and leaves the array unmodified.
@@ -892,7 +900,7 @@ When this `foreach` processor operates on this sample document:
892900
"field" : "values",
893901
"processor" : {
894902
"uppercase" : {
895-
"field" : "_value"
903+
"field" : "_ingest._value"
896904
}
897905
}
898906
}
@@ -936,7 +944,7 @@ so the following `foreach` processor is used:
936944
"field" : "persons",
937945
"processor" : {
938946
"remove" : {
939-
"field" : "_value.id"
947+
"field" : "_ingest._value.id"
940948
}
941949
}
942950
}
@@ -959,9 +967,7 @@ After preprocessing the result is:
959967
}
960968
--------------------------------------------------
961969

962-
As for any processor, you can define `on_failure` processors
963-
in processors that are wrapped inside the `foreach` processor.
964-
970+
The wrapped processor can have a `on_failure` definition.
965971
For example, the `id` field may not exist on all person objects.
966972
Instead of failing the index request, you can use an `on_failure`
967973
block to send the document to the 'failure_index' index for later inspection:

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,12 @@ public void execute(IngestDocument ingestDocument) throws Exception {
6262
List<Object> values = ingestDocument.getFieldValue(field, List.class);
6363
List<Object> newValues = new ArrayList<>(values.size());
6464
for (Object value : values) {
65-
Map<String, Object> innerSource = new HashMap<>(ingestDocument.getSourceAndMetadata());
66-
innerSource.put("_value", value); // scalar value to access the list item being evaluated
67-
IngestDocument innerIngestDocument = new IngestDocument(innerSource, ingestDocument.getIngestMetadata());
68-
processor.execute(innerIngestDocument);
69-
newValues.add(innerSource.get("_value"));
65+
Object previousValue = ingestDocument.getIngestMetadata().put("_value", value);
66+
try {
67+
processor.execute(ingestDocument);
68+
} finally {
69+
newValues.add(ingestDocument.getIngestMetadata().put("_value", previousValue));
70+
}
7071
}
7172
ingestDocument.setFieldValue(field, newValues);
7273
}

0 commit comments

Comments
 (0)