25
25
import org .elasticsearch .ingest .IngestDocument ;
26
26
import org .elasticsearch .ingest .Processor ;
27
27
28
- import java .util .Arrays ;
29
28
import java .util .Collections ;
30
29
import java .util .List ;
31
30
import java .util .Map ;
32
31
import java .util .Set ;
32
+ import java .util .function .Consumer ;
33
+ import java .util .function .Function ;
34
+ import java .util .function .Predicate ;
35
+ import java .util .regex .Pattern ;
33
36
34
37
/**
35
38
* The KeyValueProcessor parses and extracts messages of the `key=value` variety into fields with values of the keys.
@@ -38,16 +41,20 @@ public final class KeyValueProcessor extends AbstractProcessor {
38
41
39
42
public static final String TYPE = "kv" ;
40
43
44
+ private static final Pattern STRIP_BRACKETS = Pattern .compile ("(^[\\ (\\ [<\" '])|([\\ ]\\ )>\" ']$)" );
45
+
41
46
private final String field ;
42
47
private final String fieldSplit ;
43
48
private final String valueSplit ;
44
49
private final Set <String > includeKeys ;
45
50
private final Set <String > excludeKeys ;
46
51
private final String targetField ;
47
52
private final boolean ignoreMissing ;
53
+ private final Consumer <IngestDocument > execution ;
48
54
49
55
KeyValueProcessor (String tag , String field , String fieldSplit , String valueSplit , Set <String > includeKeys ,
50
- Set <String > excludeKeys , String targetField , boolean ignoreMissing ) {
56
+ Set <String > excludeKeys , String targetField , boolean ignoreMissing ,
57
+ String trimKey , String trimValue , boolean stripBrackets , String prefix ) {
51
58
super (tag );
52
59
this .field = field ;
53
60
this .targetField = targetField ;
@@ -56,6 +63,92 @@ public final class KeyValueProcessor extends AbstractProcessor {
56
63
this .includeKeys = includeKeys ;
57
64
this .excludeKeys = excludeKeys ;
58
65
this .ignoreMissing = ignoreMissing ;
66
+ this .execution = buildExecution (
67
+ fieldSplit , valueSplit , field , includeKeys , excludeKeys , targetField , ignoreMissing , trimKey , trimValue ,
68
+ stripBrackets , prefix
69
+ );
70
+ }
71
+
72
+ private static Consumer <IngestDocument > buildExecution (String fieldSplit , String valueSplit , String field ,
73
+ Set <String > includeKeys , Set <String > excludeKeys ,
74
+ String targetField , boolean ignoreMissing ,
75
+ String trimKey , String trimValue , boolean stripBrackets ,
76
+ String prefix ) {
77
+ final Predicate <String > keyFilter ;
78
+ if (includeKeys == null ) {
79
+ if (excludeKeys == null ) {
80
+ keyFilter = key -> true ;
81
+ } else {
82
+ keyFilter = key -> excludeKeys .contains (key ) == false ;
83
+ }
84
+ } else {
85
+ if (excludeKeys == null ) {
86
+ keyFilter = includeKeys ::contains ;
87
+ } else {
88
+ keyFilter = key -> includeKeys .contains (key ) && excludeKeys .contains (key ) == false ;
89
+ }
90
+ }
91
+ final String fieldPathPrefix ;
92
+ String keyPrefix = prefix == null ? "" : prefix ;
93
+ if (targetField == null ) {
94
+ fieldPathPrefix = keyPrefix ;
95
+ } else {
96
+ fieldPathPrefix = targetField + "." + keyPrefix ;
97
+ }
98
+ final Function <String , String > keyPrefixer ;
99
+ if (fieldPathPrefix .isEmpty ()) {
100
+ keyPrefixer = val -> val ;
101
+ } else {
102
+ keyPrefixer = val -> fieldPathPrefix + val ;
103
+ }
104
+ final Function <String , String []> fieldSplitter = buildSplitter (fieldSplit , true );
105
+ Function <String , String []> valueSplitter = buildSplitter (valueSplit , false );
106
+ final Function <String , String > keyTrimmer = buildTrimmer (trimKey );
107
+ final Function <String , String > bracketStrip ;
108
+ if (stripBrackets ) {
109
+ bracketStrip = val -> STRIP_BRACKETS .matcher (val ).replaceAll ("" );
110
+ } else {
111
+ bracketStrip = val -> val ;
112
+ }
113
+ final Function <String , String > valueTrimmer = buildTrimmer (trimValue );
114
+ return document -> {
115
+ String value = document .getFieldValue (field , String .class , ignoreMissing );
116
+ if (value == null ) {
117
+ if (ignoreMissing ) {
118
+ return ;
119
+ }
120
+ throw new IllegalArgumentException ("field [" + field + "] is null, cannot extract key-value pairs." );
121
+ }
122
+ for (String part : fieldSplitter .apply (value )) {
123
+ String [] kv = valueSplitter .apply (part );
124
+ if (kv .length != 2 ) {
125
+ throw new IllegalArgumentException ("field [" + field + "] does not contain value_split [" + valueSplit + "]" );
126
+ }
127
+ String key = keyTrimmer .apply (kv [0 ]);
128
+ if (keyFilter .test (key )) {
129
+ append (document , keyPrefixer .apply (key ), valueTrimmer .apply (bracketStrip .apply (kv [1 ])));
130
+ }
131
+ }
132
+ };
133
+ }
134
+
135
+ private static Function <String , String > buildTrimmer (String trim ) {
136
+ if (trim == null ) {
137
+ return val -> val ;
138
+ } else {
139
+ Pattern pattern = Pattern .compile ("(^([" + trim + "]+))|([" + trim + "]+$)" );
140
+ return val -> pattern .matcher (val ).replaceAll ("" );
141
+ }
142
+ }
143
+
144
+ private static Function <String , String []> buildSplitter (String split , boolean fields ) {
145
+ int limit = fields ? 0 : 2 ;
146
+ if (split .length () > 2 || split .length () == 2 && split .charAt (0 ) != '\\' ) {
147
+ Pattern splitPattern = Pattern .compile (split );
148
+ return val -> splitPattern .split (val , limit );
149
+ } else {
150
+ return val -> val .split (split , limit );
151
+ }
59
152
}
60
153
61
154
String getField () {
@@ -86,7 +179,7 @@ boolean isIgnoreMissing() {
86
179
return ignoreMissing ;
87
180
}
88
181
89
- public void append (IngestDocument document , String targetField , String value ) {
182
+ private static void append (IngestDocument document , String targetField , String value ) {
90
183
if (document .hasField (targetField )) {
91
184
document .appendFieldValue (targetField , value );
92
185
} else {
@@ -96,27 +189,7 @@ public void append(IngestDocument document, String targetField, String value) {
96
189
97
190
@ Override
98
191
public void execute (IngestDocument document ) {
99
- String oldVal = document .getFieldValue (field , String .class , ignoreMissing );
100
-
101
- if (oldVal == null && ignoreMissing ) {
102
- return ;
103
- } else if (oldVal == null ) {
104
- throw new IllegalArgumentException ("field [" + field + "] is null, cannot extract key-value pairs." );
105
- }
106
-
107
- String fieldPathPrefix = (targetField == null ) ? "" : targetField + "." ;
108
- Arrays .stream (oldVal .split (fieldSplit ))
109
- .map ((f ) -> {
110
- String [] kv = f .split (valueSplit , 2 );
111
- if (kv .length != 2 ) {
112
- throw new IllegalArgumentException ("field [" + field + "] does not contain value_split [" + valueSplit + "]" );
113
- }
114
- return kv ;
115
- })
116
- .filter ((p ) ->
117
- (includeKeys == null || includeKeys .contains (p [0 ])) &&
118
- (excludeKeys == null || excludeKeys .contains (p [0 ]) == false ))
119
- .forEach ((p ) -> append (document , fieldPathPrefix + p [0 ], p [1 ]));
192
+ execution .accept (document );
120
193
}
121
194
122
195
@ Override
@@ -132,6 +205,11 @@ public KeyValueProcessor create(Map<String, Processor.Factory> registry, String
132
205
String targetField = ConfigurationUtils .readOptionalStringProperty (TYPE , processorTag , config , "target_field" );
133
206
String fieldSplit = ConfigurationUtils .readStringProperty (TYPE , processorTag , config , "field_split" );
134
207
String valueSplit = ConfigurationUtils .readStringProperty (TYPE , processorTag , config , "value_split" );
208
+ String trimKey = ConfigurationUtils .readOptionalStringProperty (TYPE , processorTag , config , "trim_key" );
209
+ String trimValue = ConfigurationUtils .readOptionalStringProperty (TYPE , processorTag , config , "trim_value" );
210
+ String prefix = ConfigurationUtils .readOptionalStringProperty (TYPE , processorTag , config , "prefix" );
211
+ boolean stripBrackets =
212
+ ConfigurationUtils .readBooleanProperty (TYPE , processorTag , config , "strip_brackets" , false );
135
213
Set <String > includeKeys = null ;
136
214
Set <String > excludeKeys = null ;
137
215
List <String > includeKeysList = ConfigurationUtils .readOptionalList (TYPE , processorTag , config , "include_keys" );
@@ -143,7 +221,10 @@ public KeyValueProcessor create(Map<String, Processor.Factory> registry, String
143
221
excludeKeys = Collections .unmodifiableSet (Sets .newHashSet (excludeKeysList ));
144
222
}
145
223
boolean ignoreMissing = ConfigurationUtils .readBooleanProperty (TYPE , processorTag , config , "ignore_missing" , false );
146
- return new KeyValueProcessor (processorTag , field , fieldSplit , valueSplit , includeKeys , excludeKeys , targetField , ignoreMissing );
224
+ return new KeyValueProcessor (
225
+ processorTag , field , fieldSplit , valueSplit , includeKeys , excludeKeys , targetField , ignoreMissing ,
226
+ trimKey , trimValue , stripBrackets , prefix
227
+ );
147
228
}
148
229
}
149
230
}
0 commit comments