Skip to content

Commit 21c759a

Browse files
authored
[ML] Add an ingest pipeline definition to structure finder (#34350)
The ingest pipeline that is produced is very simple. It contains a grok processor if the format is semi-structured text, a date processor if the format contains a timestamp, and a remove processor if required to remove the interim timestamp field parsed out of semi-structured text. Eventually the UI should offer the option to customize the pipeline with additional processors to perform other data preparation steps before ingesting data to an index.
1 parent 7352f0d commit 21c759a

File tree

10 files changed

+254
-7
lines changed

10 files changed

+254
-7
lines changed

docs/reference/ml/apis/find-file-structure.asciidoc

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,20 @@ If the request does not encounter errors, you receive the following result:
613613
"type" : "double"
614614
}
615615
},
616+
"ingest_pipeline" : {
617+
"description" : "Ingest pipeline created by file structure finder",
618+
"processors" : [
619+
{
620+
"date" : {
621+
"field" : "tpep_pickup_datetime",
622+
"timezone" : "{{ beat.timezone }}",
623+
"formats" : [
624+
"YYYY-MM-dd HH:mm:ss"
625+
]
626+
}
627+
}
628+
]
629+
},
616630
"field_stats" : {
617631
"DOLocationID" : {
618632
"count" : 19998,
@@ -1366,6 +1380,33 @@ this:
13661380
"type" : "text"
13671381
}
13681382
},
1383+
"ingest_pipeline" : {
1384+
"description" : "Ingest pipeline created by file structure finder",
1385+
"processors" : [
1386+
{
1387+
"grok" : {
1388+
"field" : "message",
1389+
"patterns" : [
1390+
"\\[%{TIMESTAMP_ISO8601:timestamp}\\]\\[%{LOGLEVEL:loglevel}.*"
1391+
]
1392+
}
1393+
},
1394+
{
1395+
"date" : {
1396+
"field" : "timestamp",
1397+
"timezone" : "{{ beat.timezone }}",
1398+
"formats" : [
1399+
"ISO8601"
1400+
]
1401+
}
1402+
},
1403+
{
1404+
"remove" : {
1405+
"field" : "timestamp"
1406+
}
1407+
}
1408+
]
1409+
},
13691410
"field_stats" : {
13701411
"loglevel" : {
13711412
"count" : 53,
@@ -1499,6 +1540,33 @@ this:
14991540
"type" : "keyword"
15001541
}
15011542
},
1543+
"ingest_pipeline" : {
1544+
"description" : "Ingest pipeline created by file structure finder",
1545+
"processors" : [
1546+
{
1547+
"grok" : {
1548+
"field" : "message",
1549+
"patterns" : [
1550+
"\\[%{TIMESTAMP_ISO8601:timestamp}\\]\\[%{LOGLEVEL:loglevel} *\\]\\[%{JAVACLASS:class} *\\] \\[%{HOSTNAME:node}\\] %{JAVALOGMESSAGE:message}"
1551+
]
1552+
}
1553+
},
1554+
{
1555+
"date" : {
1556+
"field" : "timestamp",
1557+
"timezone" : "{{ beat.timezone }}",
1558+
"formats" : [
1559+
"ISO8601"
1560+
]
1561+
}
1562+
},
1563+
{
1564+
"remove" : {
1565+
"field" : "timestamp"
1566+
}
1567+
}
1568+
]
1569+
},
15021570
"field_stats" : { <2>
15031571
"class" : {
15041572
"count" : 53,

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/filestructurefinder/FileStructure.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.io.IOException;
1818
import java.util.ArrayList;
1919
import java.util.Collections;
20+
import java.util.LinkedHashMap;
2021
import java.util.List;
2122
import java.util.Locale;
2223
import java.util.Map;
@@ -103,6 +104,7 @@ public String toString() {
103104
public static final ParseField JAVA_TIMESTAMP_FORMATS = new ParseField("java_timestamp_formats");
104105
public static final ParseField NEED_CLIENT_TIMEZONE = new ParseField("need_client_timezone");
105106
public static final ParseField MAPPINGS = new ParseField("mappings");
107+
public static final ParseField INGEST_PIPELINE = new ParseField("ingest_pipeline");
106108
public static final ParseField FIELD_STATS = new ParseField("field_stats");
107109
public static final ParseField EXPLANATION = new ParseField("explanation");
108110

@@ -128,6 +130,7 @@ public String toString() {
128130
PARSER.declareStringArray(Builder::setJavaTimestampFormats, JAVA_TIMESTAMP_FORMATS);
129131
PARSER.declareBoolean(Builder::setNeedClientTimezone, NEED_CLIENT_TIMEZONE);
130132
PARSER.declareObject(Builder::setMappings, (p, c) -> new TreeMap<>(p.map()), MAPPINGS);
133+
PARSER.declareObject(Builder::setIngestPipeline, (p, c) -> p.mapOrdered(), INGEST_PIPELINE);
131134
PARSER.declareObject(Builder::setFieldStats, (p, c) -> {
132135
Map<String, FieldStats> fieldStats = new TreeMap<>();
133136
while (p.nextToken() == XContentParser.Token.FIELD_NAME) {
@@ -157,15 +160,16 @@ public String toString() {
157160
private final String timestampField;
158161
private final boolean needClientTimezone;
159162
private final SortedMap<String, Object> mappings;
163+
private final Map<String, Object> ingestPipeline;
160164
private final SortedMap<String, FieldStats> fieldStats;
161165
private final List<String> explanation;
162166

163167
public FileStructure(int numLinesAnalyzed, int numMessagesAnalyzed, String sampleStart, String charset, Boolean hasByteOrderMarker,
164168
Format format, String multilineStartPattern, String excludeLinesPattern, List<String> columnNames,
165169
Boolean hasHeaderRow, Character delimiter, Character quote, Boolean shouldTrimFields, String grokPattern,
166170
String timestampField, List<String> jodaTimestampFormats, List<String> javaTimestampFormats,
167-
boolean needClientTimezone, Map<String, Object> mappings, Map<String, FieldStats> fieldStats,
168-
List<String> explanation) {
171+
boolean needClientTimezone, Map<String, Object> mappings, Map<String, Object> ingestPipeline,
172+
Map<String, FieldStats> fieldStats, List<String> explanation) {
169173

170174
this.numLinesAnalyzed = numLinesAnalyzed;
171175
this.numMessagesAnalyzed = numMessagesAnalyzed;
@@ -188,6 +192,7 @@ public FileStructure(int numLinesAnalyzed, int numMessagesAnalyzed, String sampl
188192
(javaTimestampFormats == null) ? null : Collections.unmodifiableList(new ArrayList<>(javaTimestampFormats));
189193
this.needClientTimezone = needClientTimezone;
190194
this.mappings = Collections.unmodifiableSortedMap(new TreeMap<>(mappings));
195+
this.ingestPipeline = (ingestPipeline == null) ? null : Collections.unmodifiableMap(new LinkedHashMap<>(ingestPipeline));
191196
this.fieldStats = Collections.unmodifiableSortedMap(new TreeMap<>(fieldStats));
192197
this.explanation = Collections.unmodifiableList(new ArrayList<>(explanation));
193198
}
@@ -212,6 +217,7 @@ public FileStructure(StreamInput in) throws IOException {
212217
timestampField = in.readOptionalString();
213218
needClientTimezone = in.readBoolean();
214219
mappings = Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap()));
220+
ingestPipeline = in.readBoolean() ? Collections.unmodifiableMap(in.readMap()) : null;
215221
fieldStats = Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap(StreamInput::readString, FieldStats::new)));
216222
explanation = Collections.unmodifiableList(in.readList(StreamInput::readString));
217223
}
@@ -262,6 +268,12 @@ public void writeTo(StreamOutput out) throws IOException {
262268
out.writeOptionalString(timestampField);
263269
out.writeBoolean(needClientTimezone);
264270
out.writeMap(mappings);
271+
if (ingestPipeline == null) {
272+
out.writeBoolean(false);
273+
} else {
274+
out.writeBoolean(true);
275+
out.writeMap(ingestPipeline);
276+
}
265277
out.writeMap(fieldStats, StreamOutput::writeString, (out1, value) -> value.writeTo(out1));
266278
out.writeCollection(explanation, StreamOutput::writeString);
267279
}
@@ -342,6 +354,10 @@ public SortedMap<String, Object> getMappings() {
342354
return mappings;
343355
}
344356

357+
public Map<String, Object> getIngestPipeline() {
358+
return ingestPipeline;
359+
}
360+
345361
public SortedMap<String, FieldStats> getFieldStats() {
346362
return fieldStats;
347363
}
@@ -397,6 +413,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
397413
}
398414
builder.field(NEED_CLIENT_TIMEZONE.getPreferredName(), needClientTimezone);
399415
builder.field(MAPPINGS.getPreferredName(), mappings);
416+
if (ingestPipeline != null) {
417+
builder.field(INGEST_PIPELINE.getPreferredName(), ingestPipeline);
418+
}
400419
if (fieldStats.isEmpty() == false) {
401420
builder.startObject(FIELD_STATS.getPreferredName());
402421
for (Map.Entry<String, FieldStats> entry : fieldStats.entrySet()) {
@@ -476,6 +495,7 @@ public static class Builder {
476495
private List<String> javaTimestampFormats;
477496
private boolean needClientTimezone;
478497
private Map<String, Object> mappings;
498+
private Map<String, Object> ingestPipeline;
479499
private Map<String, FieldStats> fieldStats = Collections.emptyMap();
480500
private List<String> explanation;
481501

@@ -582,6 +602,11 @@ public Builder setMappings(Map<String, Object> mappings) {
582602
return this;
583603
}
584604

605+
public Builder setIngestPipeline(Map<String, Object> ingestPipeline) {
606+
this.ingestPipeline = ingestPipeline;
607+
return this;
608+
}
609+
585610
public Builder setFieldStats(Map<String, FieldStats> fieldStats) {
586611
this.fieldStats = Objects.requireNonNull(fieldStats);
587612
return this;
@@ -708,7 +733,8 @@ public FileStructure build() {
708733

709734
return new FileStructure(numLinesAnalyzed, numMessagesAnalyzed, sampleStart, charset, hasByteOrderMarker, format,
710735
multilineStartPattern, excludeLinesPattern, columnNames, hasHeaderRow, delimiter, quote, shouldTrimFields, grokPattern,
711-
timestampField, jodaTimestampFormats, javaTimestampFormats, needClientTimezone, mappings, fieldStats, explanation);
736+
timestampField, jodaTimestampFormats, javaTimestampFormats, needClientTimezone, mappings, ingestPipeline, fieldStats,
737+
explanation);
712738
}
713739
}
714740
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/filestructurefinder/FileStructureTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.Arrays;
1515
import java.util.Collections;
1616
import java.util.EnumSet;
17+
import java.util.LinkedHashMap;
1718
import java.util.Locale;
1819
import java.util.Map;
1920
import java.util.TreeMap;
@@ -74,6 +75,14 @@ public static FileStructure createTestFileStructure() {
7475
}
7576
builder.setMappings(mappings);
7677

78+
if (randomBoolean()) {
79+
Map<String, Object> ingestPipeline = new LinkedHashMap<>();
80+
for (String field : generateRandomStringArray(5, 20, false, false)) {
81+
ingestPipeline.put(field, Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(10)));
82+
}
83+
builder.setMappings(ingestPipeline);
84+
}
85+
7786
if (randomBoolean()) {
7887
Map<String, FieldStats> fieldStats = new TreeMap<>();
7988
for (String field : generateRandomStringArray(5, 20, false, false)) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,14 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
142142
.collect(Collectors.joining(",")));
143143
}
144144

145+
boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();
146+
145147
structureBuilder.setTimestampField(timeField.v1())
146148
.setJodaTimestampFormats(timeField.v2().jodaTimestampFormats)
147149
.setJavaTimestampFormats(timeField.v2().javaTimestampFormats)
148-
.setNeedClientTimezone(timeField.v2().hasTimezoneDependentParsing())
150+
.setNeedClientTimezone(needClientTimeZone)
151+
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, timeField.v1(),
152+
timeField.v2().jodaTimestampFormats, needClientTimeZone))
149153
.setMultilineStartPattern(timeLineRegex);
150154
}
151155

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtils.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.elasticsearch.common.collect.Tuple;
99
import org.elasticsearch.grok.Grok;
10+
import org.elasticsearch.ingest.Pipeline;
1011
import org.elasticsearch.xpack.core.ml.filestructurefinder.FieldStats;
1112
import org.elasticsearch.xpack.ml.filestructurefinder.TimestampFormatFinder.TimestampMatch;
1213

@@ -15,6 +16,7 @@
1516
import java.util.Collection;
1617
import java.util.Collections;
1718
import java.util.Iterator;
19+
import java.util.LinkedHashMap;
1820
import java.util.List;
1921
import java.util.Map;
2022
import java.util.Set;
@@ -37,6 +39,8 @@ public final class FileStructureUtils {
3739
private static final int KEYWORD_MAX_LEN = 256;
3840
private static final int KEYWORD_MAX_SPACES = 5;
3941

42+
private static final String BEAT_TIMEZONE_FIELD = "beat.timezone";
43+
4044
private FileStructureUtils() {
4145
}
4246

@@ -306,4 +310,53 @@ static boolean isMoreLikelyTextThanKeyword(String str) {
306310
int length = str.length();
307311
return length > KEYWORD_MAX_LEN || length - str.replaceAll("\\s", "").length() > KEYWORD_MAX_SPACES;
308312
}
313+
314+
/**
315+
* Create an ingest pipeline definition appropriate for the file structure.
316+
* @param grokPattern The Grok pattern used for parsing semi-structured text formats. <code>null</code> for
317+
* fully structured formats.
318+
* @param timestampField The input field containing the timestamp to be parsed into <code>@timestamp</code>.
319+
* <code>null</code> if there is no timestamp.
320+
* @param timestampFormats Timestamp formats to be used for parsing {@code timestampField}.
321+
* May be <code>null</code> if {@code timestampField} is also <code>null</code>.
322+
* @param needClientTimezone Is the timezone of the client supplying data to ingest required to uniquely parse the timestamp?
323+
* @return The ingest pipeline definition, or <code>null</code> if none is required.
324+
*/
325+
public static Map<String, Object> makeIngestPipelineDefinition(String grokPattern, String timestampField, List<String> timestampFormats,
326+
boolean needClientTimezone) {
327+
328+
if (grokPattern == null && timestampField == null) {
329+
return null;
330+
}
331+
332+
Map<String, Object> pipeline = new LinkedHashMap<>();
333+
pipeline.put(Pipeline.DESCRIPTION_KEY, "Ingest pipeline created by file structure finder");
334+
335+
List<Map<String, Object>> processors = new ArrayList<>();
336+
337+
if (grokPattern != null) {
338+
Map<String, Object> grokProcessorSettings = new LinkedHashMap<>();
339+
grokProcessorSettings.put("field", "message");
340+
grokProcessorSettings.put("patterns", Collections.singletonList(grokPattern));
341+
processors.add(Collections.singletonMap("grok", grokProcessorSettings));
342+
}
343+
344+
if (timestampField != null) {
345+
Map<String, Object> dateProcessorSettings = new LinkedHashMap<>();
346+
dateProcessorSettings.put("field", timestampField);
347+
if (needClientTimezone) {
348+
dateProcessorSettings.put("timezone", "{{ " + BEAT_TIMEZONE_FIELD + " }}");
349+
}
350+
dateProcessorSettings.put("formats", timestampFormats);
351+
processors.add(Collections.singletonMap("date", dateProcessorSettings));
352+
}
353+
354+
// This removes the interim timestamp field used for semi-structured text formats
355+
if (grokPattern != null && timestampField != null) {
356+
processors.add(Collections.singletonMap("remove", Collections.singletonMap("field", timestampField)));
357+
}
358+
359+
pipeline.put(Pipeline.PROCESSORS_KEY, processors);
360+
return pipeline;
361+
}
309362
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/JsonFileStructureFinder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,14 @@ static JsonFileStructureFinder makeJsonFileStructureFinder(List<String> explanat
5656
Tuple<String, TimestampMatch> timeField =
5757
FileStructureUtils.guessTimestampField(explanation, sampleRecords, overrides, timeoutChecker);
5858
if (timeField != null) {
59+
boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();
60+
5961
structureBuilder.setTimestampField(timeField.v1())
6062
.setJodaTimestampFormats(timeField.v2().jodaTimestampFormats)
6163
.setJavaTimestampFormats(timeField.v2().javaTimestampFormats)
62-
.setNeedClientTimezone(timeField.v2().hasTimezoneDependentParsing());
64+
.setNeedClientTimezone(needClientTimeZone)
65+
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, timeField.v1(),
66+
timeField.v2().jodaTimestampFormats, needClientTimeZone));
6367
}
6468

6569
Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TextLogFileStructureFinder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,16 @@ static TextLogFileStructureFinder makeTextLogFileStructureFinder(List<String> ex
113113
}
114114
}
115115

116+
boolean needClientTimeZone = bestTimestamp.v1().hasTimezoneDependentParsing();
117+
116118
FileStructure structure = structureBuilder
117119
.setTimestampField(interimTimestampField)
118120
.setJodaTimestampFormats(bestTimestamp.v1().jodaTimestampFormats)
119121
.setJavaTimestampFormats(bestTimestamp.v1().javaTimestampFormats)
120-
.setNeedClientTimezone(bestTimestamp.v1().hasTimezoneDependentParsing())
122+
.setNeedClientTimezone(needClientTimeZone)
121123
.setGrokPattern(grokPattern)
124+
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(grokPattern, interimTimestampField,
125+
bestTimestamp.v1().jodaTimestampFormats, needClientTimeZone))
122126
.setMappings(mappings)
123127
.setFieldStats(fieldStats)
124128
.setExplanation(explanation)

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/XmlFileStructureFinder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,14 @@ static XmlFileStructureFinder makeXmlFileStructureFinder(List<String> explanatio
9595
Tuple<String, TimestampMatch> timeField =
9696
FileStructureUtils.guessTimestampField(explanation, sampleRecords, overrides, timeoutChecker);
9797
if (timeField != null) {
98+
boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();
99+
98100
structureBuilder.setTimestampField(timeField.v1())
99101
.setJodaTimestampFormats(timeField.v2().jodaTimestampFormats)
100102
.setJavaTimestampFormats(timeField.v2().javaTimestampFormats)
101-
.setNeedClientTimezone(timeField.v2().hasTimezoneDependentParsing());
103+
.setNeedClientTimezone(needClientTimeZone)
104+
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, topLevelTag + "." + timeField.v1(),
105+
timeField.v2().jodaTimestampFormats, needClientTimeZone));
102106
}
103107

104108
Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =

0 commit comments

Comments
 (0)