diff --git a/docs/changelog/96243.yaml b/docs/changelog/96243.yaml new file mode 100644 index 0000000000000..fc89aa67ce1c8 --- /dev/null +++ b/docs/changelog/96243.yaml @@ -0,0 +1,5 @@ +pr: 96243 +summary: Support dotted field notations in the reroute processor +area: Ingest Node +type: bug +issues: [] diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java index 6c2b321112821..d1641abf83a80 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java @@ -91,17 +91,27 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { String namespace = determineDataStreamField(ingestDocument, this.namespace, currentNamespace); String newTarget = type + "-" + dataset + "-" + namespace; ingestDocument.reroute(newTarget); - ingestDocument.setFieldValue(DATA_STREAM_TYPE, type); - ingestDocument.setFieldValue(DATA_STREAM_DATASET, dataset); - ingestDocument.setFieldValue(DATA_STREAM_NAMESPACE, namespace); - if (ingestDocument.hasField(EVENT_DATASET)) { + setFieldValue(ingestDocument, DATA_STREAM_TYPE, type); + setFieldValue(ingestDocument, DATA_STREAM_DATASET, dataset); + setFieldValue(ingestDocument, DATA_STREAM_NAMESPACE, namespace); + if (ingestDocument.getCtxMap().containsKey(EVENT_DATASET) || ingestDocument.hasField(EVENT_DATASET)) { // ECS specifies that "event.dataset should have the same value as data_stream.dataset" // not eagerly set event.dataset but only if the doc contains it already to ensure it's consistent with data_stream.dataset - ingestDocument.setFieldValue(EVENT_DATASET, dataset); + setFieldValue(ingestDocument, EVENT_DATASET, dataset); } return ingestDocument; } + /* sets a field value in either dotted or nested notation, preserving the notation used in the document */ + private static void setFieldValue(IngestDocument doc, String path, String value) { + Map source = doc.getSourceAndMetadata(); + if (source.containsKey(path)) { + source.put(path, value); + } else { + doc.setFieldValue(path, value); + } + } + private static String parseDataStreamType(String dataStreamName, int indexOfFirstDash) { return dataStreamName.substring(0, indexOfFirstDash); } @@ -253,10 +263,33 @@ private DataStreamValueSource(String value, Function sanitizer) @Nullable public String resolve(IngestDocument ingestDocument) { if (fieldReference != null) { - return sanitizer.apply(ingestDocument.getFieldValue(fieldReference, String.class, true)); + String value = ingestDocument.getFieldValue(fieldReference, String.class, true); + if (value == null) { + value = getStringFieldValueInDottedNotation(ingestDocument); + } + return sanitizer.apply(value); } else { return value; } } + + private String getStringFieldValueInDottedNotation(IngestDocument ingestDocument) { + String value = null; + Object valueObject = ingestDocument.getCtxMap().get(fieldReference); + if (valueObject instanceof String) { + value = (String) valueObject; + } else if (valueObject != null) { + throw new IllegalArgumentException( + "field [" + + fieldReference + + "] of type [" + + valueObject.getClass().getName() + + "] cannot be cast to [" + + String.class.getName() + + "]" + ); + } + return value; + } } } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java index 3da394575d625..d95f3eefcc87d 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java @@ -41,6 +41,17 @@ public void testEventDataset() throws Exception { assertThat(ingestDocument.getFieldValue("event.dataset", String.class), equalTo("foo")); } + public void testEventDatasetDottedFieldName() throws Exception { + IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); + ingestDocument.getCtxMap().put("event.dataset", "foo"); + + RerouteProcessor processor = createRerouteProcessor(List.of("{{event.dataset}}"), List.of()); + processor.execute(ingestDocument); + assertDataSetFields(ingestDocument, "logs", "foo", "default"); + assertThat(ingestDocument.getCtxMap().get("event.dataset"), equalTo("foo")); + assertFalse(ingestDocument.getCtxMap().containsKey("event")); + } + public void testNoDataset() throws Exception { IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); ingestDocument.setFieldValue("ds", "foo"); @@ -81,6 +92,17 @@ public void testDataStreamFieldsFromDocument() throws Exception { assertDataSetFields(ingestDocument, "logs", "foo", "bar"); } + public void testDataStreamFieldsFromDocumentDottedNotation() throws Exception { + IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); + ingestDocument.getCtxMap().put("data_stream.type", "logs"); + ingestDocument.getCtxMap().put("data_stream.dataset", "foo"); + ingestDocument.getCtxMap().put("data_stream.namespace", "bar"); + + RerouteProcessor processor = createRerouteProcessor(List.of(), List.of()); + processor.execute(ingestDocument); + assertDataSetFields(ingestDocument, "logs", "foo", "bar"); + } + public void testInvalidDataStreamFieldsFromDocument() throws Exception { IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); ingestDocument.setFieldValue("data_stream.dataset", "foo-bar"); @@ -250,9 +272,15 @@ private RerouteProcessor createRerouteProcessor(String destination) { } private void assertDataSetFields(IngestDocument ingestDocument, String type, String dataset, String namespace) { - assertThat(ingestDocument.getFieldValue("data_stream.type", String.class), equalTo(type)); - assertThat(ingestDocument.getFieldValue("data_stream.dataset", String.class), equalTo(dataset)); - assertThat(ingestDocument.getFieldValue("data_stream.namespace", String.class), equalTo(namespace)); + if (ingestDocument.hasField("data_stream")) { + assertThat(ingestDocument.getFieldValue("data_stream.type", String.class), equalTo(type)); + assertThat(ingestDocument.getFieldValue("data_stream.dataset", String.class), equalTo(dataset)); + assertThat(ingestDocument.getFieldValue("data_stream.namespace", String.class), equalTo(namespace)); + } else { + assertThat(ingestDocument.getCtxMap().get("data_stream.type"), equalTo(type)); + assertThat(ingestDocument.getCtxMap().get("data_stream.dataset"), equalTo(dataset)); + assertThat(ingestDocument.getCtxMap().get("data_stream.namespace"), equalTo(namespace)); + } assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo(type + "-" + dataset + "-" + namespace)); if (ingestDocument.hasField("event.dataset")) { assertThat(