Skip to content

Commit 2e81bcc

Browse files
authored
Support dotted field notations in the reroute processor (#96243)
1 parent d3a7163 commit 2e81bcc

File tree

3 files changed

+75
-9
lines changed

3 files changed

+75
-9
lines changed

docs/changelog/96243.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 96243
2+
summary: Support dotted field notations in the reroute processor
3+
area: Ingest Node
4+
type: bug
5+
issues: []

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,17 +91,27 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
9191
String namespace = determineDataStreamField(ingestDocument, this.namespace, currentNamespace);
9292
String newTarget = type + "-" + dataset + "-" + namespace;
9393
ingestDocument.reroute(newTarget);
94-
ingestDocument.setFieldValue(DATA_STREAM_TYPE, type);
95-
ingestDocument.setFieldValue(DATA_STREAM_DATASET, dataset);
96-
ingestDocument.setFieldValue(DATA_STREAM_NAMESPACE, namespace);
97-
if (ingestDocument.hasField(EVENT_DATASET)) {
94+
setFieldValue(ingestDocument, DATA_STREAM_TYPE, type);
95+
setFieldValue(ingestDocument, DATA_STREAM_DATASET, dataset);
96+
setFieldValue(ingestDocument, DATA_STREAM_NAMESPACE, namespace);
97+
if (ingestDocument.getCtxMap().containsKey(EVENT_DATASET) || ingestDocument.hasField(EVENT_DATASET)) {
9898
// ECS specifies that "event.dataset should have the same value as data_stream.dataset"
9999
// not eagerly set event.dataset but only if the doc contains it already to ensure it's consistent with data_stream.dataset
100-
ingestDocument.setFieldValue(EVENT_DATASET, dataset);
100+
setFieldValue(ingestDocument, EVENT_DATASET, dataset);
101101
}
102102
return ingestDocument;
103103
}
104104

105+
/* sets a field value in either dotted or nested notation, preserving the notation used in the document */
106+
private static void setFieldValue(IngestDocument doc, String path, String value) {
107+
Map<String, Object> source = doc.getSourceAndMetadata();
108+
if (source.containsKey(path)) {
109+
source.put(path, value);
110+
} else {
111+
doc.setFieldValue(path, value);
112+
}
113+
}
114+
105115
private static String parseDataStreamType(String dataStreamName, int indexOfFirstDash) {
106116
return dataStreamName.substring(0, indexOfFirstDash);
107117
}
@@ -253,10 +263,33 @@ private DataStreamValueSource(String value, Function<String, String> sanitizer)
253263
@Nullable
254264
public String resolve(IngestDocument ingestDocument) {
255265
if (fieldReference != null) {
256-
return sanitizer.apply(ingestDocument.getFieldValue(fieldReference, String.class, true));
266+
String value = ingestDocument.getFieldValue(fieldReference, String.class, true);
267+
if (value == null) {
268+
value = getStringFieldValueInDottedNotation(ingestDocument);
269+
}
270+
return sanitizer.apply(value);
257271
} else {
258272
return value;
259273
}
260274
}
275+
276+
private String getStringFieldValueInDottedNotation(IngestDocument ingestDocument) {
277+
String value = null;
278+
Object valueObject = ingestDocument.getCtxMap().get(fieldReference);
279+
if (valueObject instanceof String) {
280+
value = (String) valueObject;
281+
} else if (valueObject != null) {
282+
throw new IllegalArgumentException(
283+
"field ["
284+
+ fieldReference
285+
+ "] of type ["
286+
+ valueObject.getClass().getName()
287+
+ "] cannot be cast to ["
288+
+ String.class.getName()
289+
+ "]"
290+
);
291+
}
292+
return value;
293+
}
261294
}
262295
}

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,17 @@ public void testEventDataset() throws Exception {
4141
assertThat(ingestDocument.getFieldValue("event.dataset", String.class), equalTo("foo"));
4242
}
4343

44+
public void testEventDatasetDottedFieldName() throws Exception {
45+
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
46+
ingestDocument.getCtxMap().put("event.dataset", "foo");
47+
48+
RerouteProcessor processor = createRerouteProcessor(List.of("{{event.dataset}}"), List.of());
49+
processor.execute(ingestDocument);
50+
assertDataSetFields(ingestDocument, "logs", "foo", "default");
51+
assertThat(ingestDocument.getCtxMap().get("event.dataset"), equalTo("foo"));
52+
assertFalse(ingestDocument.getCtxMap().containsKey("event"));
53+
}
54+
4455
public void testNoDataset() throws Exception {
4556
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
4657
ingestDocument.setFieldValue("ds", "foo");
@@ -81,6 +92,17 @@ public void testDataStreamFieldsFromDocument() throws Exception {
8192
assertDataSetFields(ingestDocument, "logs", "foo", "bar");
8293
}
8394

95+
public void testDataStreamFieldsFromDocumentDottedNotation() throws Exception {
96+
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
97+
ingestDocument.getCtxMap().put("data_stream.type", "logs");
98+
ingestDocument.getCtxMap().put("data_stream.dataset", "foo");
99+
ingestDocument.getCtxMap().put("data_stream.namespace", "bar");
100+
101+
RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
102+
processor.execute(ingestDocument);
103+
assertDataSetFields(ingestDocument, "logs", "foo", "bar");
104+
}
105+
84106
public void testInvalidDataStreamFieldsFromDocument() throws Exception {
85107
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
86108
ingestDocument.setFieldValue("data_stream.dataset", "foo-bar");
@@ -250,9 +272,15 @@ private RerouteProcessor createRerouteProcessor(String destination) {
250272
}
251273

252274
private void assertDataSetFields(IngestDocument ingestDocument, String type, String dataset, String namespace) {
253-
assertThat(ingestDocument.getFieldValue("data_stream.type", String.class), equalTo(type));
254-
assertThat(ingestDocument.getFieldValue("data_stream.dataset", String.class), equalTo(dataset));
255-
assertThat(ingestDocument.getFieldValue("data_stream.namespace", String.class), equalTo(namespace));
275+
if (ingestDocument.hasField("data_stream")) {
276+
assertThat(ingestDocument.getFieldValue("data_stream.type", String.class), equalTo(type));
277+
assertThat(ingestDocument.getFieldValue("data_stream.dataset", String.class), equalTo(dataset));
278+
assertThat(ingestDocument.getFieldValue("data_stream.namespace", String.class), equalTo(namespace));
279+
} else {
280+
assertThat(ingestDocument.getCtxMap().get("data_stream.type"), equalTo(type));
281+
assertThat(ingestDocument.getCtxMap().get("data_stream.dataset"), equalTo(dataset));
282+
assertThat(ingestDocument.getCtxMap().get("data_stream.namespace"), equalTo(namespace));
283+
}
256284
assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo(type + "-" + dataset + "-" + namespace));
257285
if (ingestDocument.hasField("event.dataset")) {
258286
assertThat(

0 commit comments

Comments
 (0)