41
41
import java .net .InetAddress ;
42
42
import java .security .AccessController ;
43
43
import java .security .PrivilegedAction ;
44
+ import java .util .ArrayList ;
44
45
import java .util .Arrays ;
45
46
import java .util .Collections ;
46
47
import java .util .EnumSet ;
@@ -68,6 +69,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
68
69
private final Set <Property > properties ;
69
70
private final boolean ignoreMissing ;
70
71
private final GeoIpCache cache ;
72
+ private final boolean firstOnly ;
71
73
72
74
/**
73
75
* Construct a geo-IP processor.
@@ -79,22 +81,25 @@ public final class GeoIpProcessor extends AbstractProcessor {
79
81
* @param properties the properties; ideally this is lazily-loaded once on first use
80
82
* @param ignoreMissing true if documents with a missing value for the field should be ignored
81
83
* @param cache a geo-IP cache
84
+ * @param firstOnly true if only first result should be returned in case of array
82
85
*/
83
86
GeoIpProcessor (
84
- final String tag ,
85
- final String field ,
86
- final DatabaseReaderLazyLoader lazyLoader ,
87
- final String targetField ,
88
- final Set <Property > properties ,
89
- final boolean ignoreMissing ,
90
- final GeoIpCache cache ) {
87
+ final String tag ,
88
+ final String field ,
89
+ final DatabaseReaderLazyLoader lazyLoader ,
90
+ final String targetField ,
91
+ final Set <Property > properties ,
92
+ final boolean ignoreMissing ,
93
+ final GeoIpCache cache ,
94
+ boolean firstOnly ) {
91
95
super (tag );
92
96
this .field = field ;
93
97
this .targetField = targetField ;
94
98
this .lazyLoader = lazyLoader ;
95
99
this .properties = properties ;
96
100
this .ignoreMissing = ignoreMissing ;
97
101
this .cache = cache ;
102
+ this .firstOnly = firstOnly ;
98
103
}
99
104
100
105
boolean isIgnoreMissing () {
@@ -103,19 +108,51 @@ boolean isIgnoreMissing() {
103
108
104
109
@ Override
105
110
public IngestDocument execute (IngestDocument ingestDocument ) throws IOException {
106
- String ip = ingestDocument .getFieldValue (field , String .class , ignoreMissing );
111
+ Object ip = ingestDocument .getFieldValue (field , Object .class , ignoreMissing );
107
112
108
113
if (ip == null && ignoreMissing ) {
109
114
return ingestDocument ;
110
115
} else if (ip == null ) {
111
116
throw new IllegalArgumentException ("field [" + field + "] is null, cannot extract geoip information." );
112
117
}
113
118
114
- final InetAddress ipAddress = InetAddresses .forString (ip );
119
+ if (ip instanceof String ) {
120
+ Map <String , Object > geoData = getGeoData ((String ) ip );
121
+ if (geoData .isEmpty () == false ) {
122
+ ingestDocument .setFieldValue (targetField , geoData );
123
+ }
124
+ } else if (ip instanceof List ) {
125
+ boolean match = false ;
126
+ List <Map <String , Object >> geoDataList = new ArrayList <>(((List ) ip ).size ());
127
+ for (Object ipAddr : (List ) ip ) {
128
+ if (ipAddr instanceof String == false ) {
129
+ throw new IllegalArgumentException ("array in field [" + field + "] should only contain strings" );
130
+ }
131
+ Map <String , Object > geoData = getGeoData ((String ) ipAddr );
132
+ if (geoData .isEmpty ()) {
133
+ geoDataList .add (null );
134
+ continue ;
135
+ }
136
+ if (firstOnly ) {
137
+ ingestDocument .setFieldValue (targetField , geoData );
138
+ return ingestDocument ;
139
+ }
140
+ match = true ;
141
+ geoDataList .add (geoData );
142
+ }
143
+ if (match ) {
144
+ ingestDocument .setFieldValue (targetField , geoDataList );
145
+ }
146
+ } else {
147
+ throw new IllegalArgumentException ("field [" + field + "] should contain only string or array of strings" );
148
+ }
149
+ return ingestDocument ;
150
+ }
115
151
116
- Map <String , Object > geoData ;
152
+ private Map <String , Object > getGeoData ( String ip ) throws IOException {
117
153
String databaseType = lazyLoader .getDatabaseType ();
118
-
154
+ final InetAddress ipAddress = InetAddresses .forString (ip );
155
+ Map <String , Object > geoData ;
119
156
if (databaseType .endsWith (CITY_DB_SUFFIX )) {
120
157
try {
121
158
geoData = retrieveCityGeoData (ipAddress );
@@ -136,12 +173,9 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException
136
173
}
137
174
} else {
138
175
throw new ElasticsearchParseException ("Unsupported database type [" + lazyLoader .getDatabaseType ()
139
- + "]" , new IllegalStateException ());
140
- }
141
- if (geoData .isEmpty () == false ) {
142
- ingestDocument .setFieldValue (targetField , geoData );
176
+ + "]" , new IllegalStateException ());
143
177
}
144
- return ingestDocument ;
178
+ return geoData ;
145
179
}
146
180
147
181
@ Override
@@ -360,14 +394,15 @@ public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders, GeoIpCache
360
394
361
395
@ Override
362
396
public GeoIpProcessor create (
363
- final Map <String , Processor .Factory > registry ,
364
- final String processorTag ,
365
- final Map <String , Object > config ) throws IOException {
397
+ final Map <String , Processor .Factory > registry ,
398
+ final String processorTag ,
399
+ final Map <String , Object > config ) throws IOException {
366
400
String ipField = readStringProperty (TYPE , processorTag , config , "field" );
367
401
String targetField = readStringProperty (TYPE , processorTag , config , "target_field" , "geoip" );
368
402
String databaseFile = readStringProperty (TYPE , processorTag , config , "database_file" , "GeoLite2-City.mmdb" );
369
403
List <String > propertyNames = readOptionalList (TYPE , processorTag , config , "properties" );
370
404
boolean ignoreMissing = readBooleanProperty (TYPE , processorTag , config , "ignore_missing" , false );
405
+ boolean firstOnly = readBooleanProperty (TYPE , processorTag , config , "first_only" , true );
371
406
372
407
DatabaseReaderLazyLoader lazyLoader = databaseReaders .get (databaseFile );
373
408
if (lazyLoader == null ) {
@@ -397,11 +432,11 @@ public GeoIpProcessor create(
397
432
properties = DEFAULT_ASN_PROPERTIES ;
398
433
} else {
399
434
throw newConfigurationException (TYPE , processorTag , "database_file" , "Unsupported database type ["
400
- + databaseType + "]" );
435
+ + databaseType + "]" );
401
436
}
402
437
}
403
438
404
- return new GeoIpProcessor (processorTag , ipField , lazyLoader , targetField , properties , ignoreMissing , cache );
439
+ return new GeoIpProcessor (processorTag , ipField , lazyLoader , targetField , properties , ignoreMissing , cache , firstOnly );
405
440
}
406
441
}
407
442
@@ -460,7 +495,7 @@ public static Property parseProperty(String databaseType, String value) {
460
495
return property ;
461
496
} catch (IllegalArgumentException e ) {
462
497
throw new IllegalArgumentException ("illegal property value [" + value + "]. valid values are " +
463
- Arrays .toString (validProperties .toArray ()));
498
+ Arrays .toString (validProperties .toArray ()));
464
499
}
465
500
}
466
501
}
0 commit comments