Skip to content

Commit d48b102

Browse files
INGEST: Extend KV Processor (#31789) (#32232) (#32262)
* INGEST: Extend KV Processor (#31789) Added more capabilities supported by LS to the KV processor: * Stripping of brackets and quotes from values (`include_brackets` in corresponding LS filter) * Adding key prefixes * Trimming specified chars from keys and values Refactored the way the filter is configured to avoid conditionals during execution. Refactored Tests a little to not have to add more redundant getters for new parameters. Relates #31786 * Add documentation
1 parent f98ebb2 commit d48b102

File tree

3 files changed

+214
-37
lines changed

3 files changed

+214
-37
lines changed

docs/reference/ingest/ingest-node.asciidoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1732,6 +1732,10 @@ For example, if you have a log message which contains `ip=1.2.3.4 error=REFUSED`
17321732
| `include_keys` | no | `null` | List of keys to filter and insert into document. Defaults to including all keys
17331733
| `exclude_keys` | no | `null` | List of keys to exclude from document
17341734
| `ignore_missing` | no | `false` | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document
1735+
| `prefix` | no | `null` | Prefix to be added to extracted keys
1736+
| `trim_key` | no | `null` | String of characters to trim from extracted keys
1737+
| `trim_value` | no | `null` | String of characters to trim from extracted values
1738+
| `strip_brackets` | no | `false` | If `true` strip brackets `()`, `<>`, `[]` as well as quotes `'` and `"` from extracted values
17351739
|======
17361740

17371741

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java

Lines changed: 106 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@
2525
import org.elasticsearch.ingest.IngestDocument;
2626
import org.elasticsearch.ingest.Processor;
2727

28-
import java.util.Arrays;
2928
import java.util.Collections;
3029
import java.util.List;
3130
import java.util.Map;
3231
import java.util.Set;
32+
import java.util.function.Consumer;
33+
import java.util.function.Function;
34+
import java.util.function.Predicate;
35+
import java.util.regex.Pattern;
3336

3437
/**
3538
* The KeyValueProcessor parses and extracts messages of the `key=value` variety into fields with values of the keys.
@@ -38,16 +41,20 @@ public final class KeyValueProcessor extends AbstractProcessor {
3841

3942
public static final String TYPE = "kv";
4043

44+
private static final Pattern STRIP_BRACKETS = Pattern.compile("(^[\\(\\[<\"'])|([\\]\\)>\"']$)");
45+
4146
private final String field;
4247
private final String fieldSplit;
4348
private final String valueSplit;
4449
private final Set<String> includeKeys;
4550
private final Set<String> excludeKeys;
4651
private final String targetField;
4752
private final boolean ignoreMissing;
53+
private final Consumer<IngestDocument> execution;
4854

4955
KeyValueProcessor(String tag, String field, String fieldSplit, String valueSplit, Set<String> includeKeys,
50-
Set<String> excludeKeys, String targetField, boolean ignoreMissing) {
56+
Set<String> excludeKeys, String targetField, boolean ignoreMissing,
57+
String trimKey, String trimValue, boolean stripBrackets, String prefix) {
5158
super(tag);
5259
this.field = field;
5360
this.targetField = targetField;
@@ -56,6 +63,92 @@ public final class KeyValueProcessor extends AbstractProcessor {
5663
this.includeKeys = includeKeys;
5764
this.excludeKeys = excludeKeys;
5865
this.ignoreMissing = ignoreMissing;
66+
this.execution = buildExecution(
67+
fieldSplit, valueSplit, field, includeKeys, excludeKeys, targetField, ignoreMissing, trimKey, trimValue,
68+
stripBrackets, prefix
69+
);
70+
}
71+
72+
private static Consumer<IngestDocument> buildExecution(String fieldSplit, String valueSplit, String field,
73+
Set<String> includeKeys, Set<String> excludeKeys,
74+
String targetField, boolean ignoreMissing,
75+
String trimKey, String trimValue, boolean stripBrackets,
76+
String prefix) {
77+
final Predicate<String> keyFilter;
78+
if (includeKeys == null) {
79+
if (excludeKeys == null) {
80+
keyFilter = key -> true;
81+
} else {
82+
keyFilter = key -> excludeKeys.contains(key) == false;
83+
}
84+
} else {
85+
if (excludeKeys == null) {
86+
keyFilter = includeKeys::contains;
87+
} else {
88+
keyFilter = key -> includeKeys.contains(key) && excludeKeys.contains(key) == false;
89+
}
90+
}
91+
final String fieldPathPrefix;
92+
String keyPrefix = prefix == null ? "" : prefix;
93+
if (targetField == null) {
94+
fieldPathPrefix = keyPrefix;
95+
} else {
96+
fieldPathPrefix = targetField + "." + keyPrefix;
97+
}
98+
final Function<String, String> keyPrefixer;
99+
if (fieldPathPrefix.isEmpty()) {
100+
keyPrefixer = val -> val;
101+
} else {
102+
keyPrefixer = val -> fieldPathPrefix + val;
103+
}
104+
final Function<String, String[]> fieldSplitter = buildSplitter(fieldSplit, true);
105+
Function<String, String[]> valueSplitter = buildSplitter(valueSplit, false);
106+
final Function<String, String> keyTrimmer = buildTrimmer(trimKey);
107+
final Function<String, String> bracketStrip;
108+
if (stripBrackets) {
109+
bracketStrip = val -> STRIP_BRACKETS.matcher(val).replaceAll("");
110+
} else {
111+
bracketStrip = val -> val;
112+
}
113+
final Function<String, String> valueTrimmer = buildTrimmer(trimValue);
114+
return document -> {
115+
String value = document.getFieldValue(field, String.class, ignoreMissing);
116+
if (value == null) {
117+
if (ignoreMissing) {
118+
return;
119+
}
120+
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs.");
121+
}
122+
for (String part : fieldSplitter.apply(value)) {
123+
String[] kv = valueSplitter.apply(part);
124+
if (kv.length != 2) {
125+
throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]");
126+
}
127+
String key = keyTrimmer.apply(kv[0]);
128+
if (keyFilter.test(key)) {
129+
append(document, keyPrefixer.apply(key), valueTrimmer.apply(bracketStrip.apply(kv[1])));
130+
}
131+
}
132+
};
133+
}
134+
135+
private static Function<String, String> buildTrimmer(String trim) {
136+
if (trim == null) {
137+
return val -> val;
138+
} else {
139+
Pattern pattern = Pattern.compile("(^([" + trim + "]+))|([" + trim + "]+$)");
140+
return val -> pattern.matcher(val).replaceAll("");
141+
}
142+
}
143+
144+
private static Function<String, String[]> buildSplitter(String split, boolean fields) {
145+
int limit = fields ? 0 : 2;
146+
if (split.length() > 2 || split.length() == 2 && split.charAt(0) != '\\') {
147+
Pattern splitPattern = Pattern.compile(split);
148+
return val -> splitPattern.split(val, limit);
149+
} else {
150+
return val -> val.split(split, limit);
151+
}
59152
}
60153

61154
String getField() {
@@ -86,7 +179,7 @@ boolean isIgnoreMissing() {
86179
return ignoreMissing;
87180
}
88181

89-
public void append(IngestDocument document, String targetField, String value) {
182+
private static void append(IngestDocument document, String targetField, String value) {
90183
if (document.hasField(targetField)) {
91184
document.appendFieldValue(targetField, value);
92185
} else {
@@ -96,27 +189,7 @@ public void append(IngestDocument document, String targetField, String value) {
96189

97190
@Override
98191
public void execute(IngestDocument document) {
99-
String oldVal = document.getFieldValue(field, String.class, ignoreMissing);
100-
101-
if (oldVal == null && ignoreMissing) {
102-
return;
103-
} else if (oldVal == null) {
104-
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs.");
105-
}
106-
107-
String fieldPathPrefix = (targetField == null) ? "" : targetField + ".";
108-
Arrays.stream(oldVal.split(fieldSplit))
109-
.map((f) -> {
110-
String[] kv = f.split(valueSplit, 2);
111-
if (kv.length != 2) {
112-
throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]");
113-
}
114-
return kv;
115-
})
116-
.filter((p) ->
117-
(includeKeys == null || includeKeys.contains(p[0])) &&
118-
(excludeKeys == null || excludeKeys.contains(p[0]) == false))
119-
.forEach((p) -> append(document, fieldPathPrefix + p[0], p[1]));
192+
execution.accept(document);
120193
}
121194

122195
@Override
@@ -132,6 +205,11 @@ public KeyValueProcessor create(Map<String, Processor.Factory> registry, String
132205
String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
133206
String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split");
134207
String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split");
208+
String trimKey = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_key");
209+
String trimValue = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_value");
210+
String prefix = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "prefix");
211+
boolean stripBrackets =
212+
ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "strip_brackets", false);
135213
Set<String> includeKeys = null;
136214
Set<String> excludeKeys = null;
137215
List<String> includeKeysList = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "include_keys");
@@ -143,7 +221,10 @@ public KeyValueProcessor create(Map<String, Processor.Factory> registry, String
143221
excludeKeys = Collections.unmodifiableSet(Sets.newHashSet(excludeKeysList));
144222
}
145223
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
146-
return new KeyValueProcessor(processorTag, field, fieldSplit, valueSplit, includeKeys, excludeKeys, targetField, ignoreMissing);
224+
return new KeyValueProcessor(
225+
processorTag, field, fieldSplit, valueSplit, includeKeys, excludeKeys, targetField, ignoreMissing,
226+
trimKey, trimValue, stripBrackets, prefix
227+
);
147228
}
148229
}
149230
}

0 commit comments

Comments
 (0)