Skip to content

Commit 2ba3e92

Browse files
authored
GeoIP database downloader (#68424)
This change adds component that will download new GeoIP databases from infra service New databases are downloaded in chunks and stored in .geoip_databases index Downloads are verified against MD5 checksum provided by the server Current state of all stored databases is stored in cluster state in persistent task state Relates to #68920
1 parent 2f570c4 commit 2ba3e92

File tree

20 files changed

+1651
-63
lines changed

20 files changed

+1651
-63
lines changed

modules/ingest-geoip/build.gradle

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import org.apache.tools.ant.taskdefs.condition.Os
1111
apply plugin: 'elasticsearch.yaml-rest-test'
1212
apply plugin: 'elasticsearch.internal-cluster-test'
1313

14+
final Project fixture = project(':test:fixtures:geoip-fixture')
15+
1416
esplugin {
1517
description 'Ingest processor that uses lookup geo data based on IP adresses using the MaxMind geo database'
1618
classname 'org.elasticsearch.ingest.geoip.IngestGeoIpPlugin'
@@ -24,6 +26,7 @@ dependencies {
2426
api('com.maxmind.db:maxmind-db:1.3.1')
2527

2628
testImplementation 'org.elasticsearch:geolite2-databases:20191119'
29+
internalClusterTestImplementation project(path: ":modules:reindex")
2730
}
2831

2932
restResources {
@@ -32,6 +35,31 @@ restResources {
3235
}
3336
}
3437

38+
def useFixture = System.getenv("geoip_use_service") != "true"
39+
40+
if (useFixture) {
41+
apply plugin: 'elasticsearch.test.fixtures'
42+
testFixtures.useFixture(fixture.path, 'geoip-fixture')
43+
44+
task "beforeInternalClusterTest" {
45+
dependsOn ':test:fixtures:geoip-fixture:postProcessFixture'
46+
doLast {
47+
int ephemeralPort = fixture.postProcessFixture.ext."test.fixtures.geoip-fixture.tcp.80"
48+
assert ephemeralPort > 0
49+
internalClusterTest {
50+
nonInputProperties.systemProperty "geoip_endpoint", "http://127.0.0.1:" + ephemeralPort
51+
}
52+
}
53+
}
54+
}
55+
56+
internalClusterTest {
57+
systemProperty "es.geoip_v2_feature_flag_enabled", "true"
58+
if (useFixture) {
59+
dependsOn "beforeInternalClusterTest"
60+
}
61+
}
62+
3563
tasks.register("copyDefaultGeoIp2DatabaseFiles", Copy) {
3664
from { zipTree(configurations.testCompileClasspath.files.find { it.name.contains('geolite2-databases') }) }
3765
into "${project.buildDir}/ingest-geoip"
@@ -47,21 +75,21 @@ tasks.named("bundlePlugin").configure {
4775

4876
tasks.named("thirdPartyAudit").configure {
4977
ignoreMissingClasses(
50-
// geoip WebServiceClient needs apache http client, but we're not using WebServiceClient:
51-
'org.apache.http.HttpEntity',
52-
'org.apache.http.HttpHost',
53-
'org.apache.http.HttpResponse',
54-
'org.apache.http.StatusLine',
55-
'org.apache.http.auth.UsernamePasswordCredentials',
56-
'org.apache.http.client.config.RequestConfig$Builder',
57-
'org.apache.http.client.config.RequestConfig',
58-
'org.apache.http.client.methods.CloseableHttpResponse',
59-
'org.apache.http.client.methods.HttpGet',
60-
'org.apache.http.client.utils.URIBuilder',
61-
'org.apache.http.impl.auth.BasicScheme',
62-
'org.apache.http.impl.client.CloseableHttpClient',
63-
'org.apache.http.impl.client.HttpClientBuilder',
64-
'org.apache.http.util.EntityUtils'
78+
// geoip WebServiceClient needs apache http client, but we're not using WebServiceClient:
79+
'org.apache.http.HttpEntity',
80+
'org.apache.http.HttpHost',
81+
'org.apache.http.HttpResponse',
82+
'org.apache.http.StatusLine',
83+
'org.apache.http.auth.UsernamePasswordCredentials',
84+
'org.apache.http.client.config.RequestConfig$Builder',
85+
'org.apache.http.client.config.RequestConfig',
86+
'org.apache.http.client.methods.CloseableHttpResponse',
87+
'org.apache.http.client.methods.HttpGet',
88+
'org.apache.http.client.utils.URIBuilder',
89+
'org.apache.http.impl.auth.BasicScheme',
90+
'org.apache.http.impl.client.CloseableHttpClient',
91+
'org.apache.http.impl.client.HttpClientBuilder',
92+
'org.apache.http.util.EntityUtils'
6593
)
6694
}
6795

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.ingest.geoip;
10+
11+
import org.elasticsearch.common.settings.Setting;
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.plugins.Plugin;
14+
import org.elasticsearch.test.ESIntegTestCase;
15+
import org.elasticsearch.test.StreamsUtils;
16+
17+
import java.io.ByteArrayInputStream;
18+
import java.io.IOException;
19+
import java.io.UncheckedIOException;
20+
import java.nio.file.Files;
21+
import java.nio.file.Path;
22+
import java.util.Arrays;
23+
import java.util.Collection;
24+
import java.util.Collections;
25+
import java.util.List;
26+
27+
public abstract class AbstractGeoIpIT extends ESIntegTestCase {
28+
@Override
29+
protected Collection<Class<? extends Plugin>> nodePlugins() {
30+
return Arrays.asList(IngestGeoIpPlugin.class, IngestGeoIpSettingsPlugin.class);
31+
}
32+
33+
@Override
34+
protected Settings nodeSettings(final int nodeOrdinal) {
35+
final Path databasePath = createTempDir();
36+
try {
37+
Files.createDirectories(databasePath);
38+
Files.copy(
39+
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")),
40+
databasePath.resolve("GeoLite2-City.mmdb"));
41+
Files.copy(
42+
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")),
43+
databasePath.resolve("GeoLite2-Country.mmdb"));
44+
Files.copy(
45+
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")),
46+
databasePath.resolve("GeoLite2-ASN.mmdb"));
47+
} catch (final IOException e) {
48+
throw new UncheckedIOException(e);
49+
}
50+
return Settings.builder()
51+
.put("ingest.geoip.database_path", databasePath)
52+
.put(super.nodeSettings(nodeOrdinal))
53+
.build();
54+
}
55+
56+
public static class IngestGeoIpSettingsPlugin extends Plugin {
57+
58+
@Override
59+
public List<Setting<?>> getSettings() {
60+
return Collections.singletonList(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope));
61+
}
62+
}
63+
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.ingest.geoip;
10+
11+
import com.maxmind.geoip2.DatabaseReader;
12+
import org.apache.lucene.search.TotalHits;
13+
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
14+
import org.elasticsearch.action.search.SearchResponse;
15+
import org.elasticsearch.common.SuppressForbidden;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.index.query.BoolQueryBuilder;
18+
import org.elasticsearch.index.query.MatchQueryBuilder;
19+
import org.elasticsearch.index.query.RangeQueryBuilder;
20+
import org.elasticsearch.index.reindex.ReindexPlugin;
21+
import org.elasticsearch.persistent.PersistentTaskParams;
22+
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
23+
import org.elasticsearch.plugins.Plugin;
24+
import org.elasticsearch.search.SearchHit;
25+
import org.elasticsearch.search.sort.SortOrder;
26+
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
27+
import org.elasticsearch.test.ESIntegTestCase.Scope;
28+
29+
import java.io.BufferedOutputStream;
30+
import java.io.ByteArrayInputStream;
31+
import java.io.IOException;
32+
import java.io.InputStream;
33+
import java.io.OutputStream;
34+
import java.nio.file.Files;
35+
import java.nio.file.Path;
36+
import java.util.ArrayList;
37+
import java.util.Arrays;
38+
import java.util.Collection;
39+
import java.util.Iterator;
40+
import java.util.List;
41+
import java.util.Set;
42+
import java.util.concurrent.TimeUnit;
43+
import java.util.zip.GZIPInputStream;
44+
45+
import static java.nio.file.StandardOpenOption.CREATE;
46+
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
47+
import static java.nio.file.StandardOpenOption.WRITE;
48+
49+
@ClusterScope(scope = Scope.TEST, maxNumDataNodes = 1)
50+
public class GeoIpDownloaderIT extends AbstractGeoIpIT {
51+
52+
@Override
53+
protected Collection<Class<? extends Plugin>> nodePlugins() {
54+
return Arrays.asList(ReindexPlugin.class, IngestGeoIpPlugin.class, GeoIpProcessorNonIngestNodeIT.IngestGeoIpSettingsPlugin.class);
55+
}
56+
57+
@Override
58+
protected Settings nodeSettings(int nodeOrdinal) {
59+
Settings.Builder settings = Settings.builder()
60+
.put(super.nodeSettings(nodeOrdinal))
61+
.put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), false);
62+
String endpoint = System.getProperty("geoip_endpoint");
63+
if (endpoint != null) {
64+
settings.put(GeoIpDownloader.ENDPOINT_SETTING.getKey(), endpoint);
65+
}
66+
return settings.build();
67+
}
68+
69+
public void testGeoIpDatabasesDownload() throws Exception {
70+
ClusterUpdateSettingsResponse settingsResponse = client().admin().cluster()
71+
.prepareUpdateSettings()
72+
.setPersistentSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true))
73+
.get();
74+
assertTrue(settingsResponse.isAcknowledged());
75+
assertBusy(() -> {
76+
PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> task = getTask();
77+
assertNotNull(task);
78+
GeoIpTaskState state = (GeoIpTaskState) task.getState();
79+
assertNotNull(state);
80+
assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet());
81+
}, 2, TimeUnit.MINUTES);
82+
83+
GeoIpTaskState state = (GeoIpTaskState) getTask().getState();
84+
for (String id : List.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb")) {
85+
assertBusy(() -> {
86+
GeoIpTaskState.Metadata metadata = state.get(id);
87+
BoolQueryBuilder queryBuilder = new BoolQueryBuilder()
88+
.filter(new MatchQueryBuilder("name", id))
89+
.filter(new RangeQueryBuilder("chunk")
90+
.from(metadata.getFirstChunk())
91+
.to(metadata.getLastChunk(), true));
92+
int size = metadata.getLastChunk() - metadata.getFirstChunk() + 1;
93+
SearchResponse res = client().prepareSearch(GeoIpDownloader.DATABASES_INDEX)
94+
.setSize(size)
95+
.setQuery(queryBuilder)
96+
.addSort("chunk", SortOrder.ASC)
97+
.get();
98+
TotalHits totalHits = res.getHits().getTotalHits();
99+
assertEquals(TotalHits.Relation.EQUAL_TO, totalHits.relation);
100+
assertEquals(size, totalHits.value);
101+
assertEquals(size, res.getHits().getHits().length);
102+
103+
List<byte[]> data = new ArrayList<>();
104+
105+
for (SearchHit hit : res.getHits().getHits()) {
106+
data.add((byte[]) hit.getSourceAsMap().get("data"));
107+
}
108+
109+
GZIPInputStream stream = new GZIPInputStream(new MultiByteArrayInputStream(data));
110+
Path tempFile = createTempFile();
111+
try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tempFile, TRUNCATE_EXISTING, WRITE, CREATE))) {
112+
stream.transferTo(os);
113+
}
114+
115+
parseDatabase(tempFile);
116+
});
117+
}
118+
}
119+
120+
@SuppressForbidden(reason = "Maxmind API requires java.io.File")
121+
private void parseDatabase(Path tempFile) throws IOException {
122+
try (DatabaseReader databaseReader = new DatabaseReader.Builder(tempFile.toFile()).build()) {
123+
assertNotNull(databaseReader.getMetadata());
124+
}
125+
}
126+
127+
private PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> getTask() {
128+
return PersistentTasksCustomMetadata.getTaskWithId(clusterService().state(), GeoIpDownloader.GEOIP_DOWNLOADER);
129+
}
130+
131+
private static class MultiByteArrayInputStream extends InputStream {
132+
133+
private final Iterator<byte[]> data;
134+
private ByteArrayInputStream current;
135+
136+
private MultiByteArrayInputStream(List<byte[]> data) {
137+
this.data = data.iterator();
138+
}
139+
140+
@Override
141+
public int read() {
142+
if (current == null) {
143+
if (data.hasNext() == false) {
144+
return -1;
145+
}
146+
147+
current = new ByteArrayInputStream(data.next());
148+
}
149+
int read = current.read();
150+
if (read == -1) {
151+
current = null;
152+
return read();
153+
}
154+
return read;
155+
}
156+
157+
@Override
158+
public int read(byte[] b, int off, int len) throws IOException {
159+
if (current == null) {
160+
if (data.hasNext() == false) {
161+
return -1;
162+
}
163+
164+
current = new ByteArrayInputStream(data.next());
165+
}
166+
int read = current.read(b, off, len);
167+
if (read == -1) {
168+
current = null;
169+
return read(b, off, len);
170+
}
171+
return read;
172+
}
173+
}
174+
}

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java

Lines changed: 3 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -13,69 +13,27 @@
1313
import org.elasticsearch.action.index.IndexResponse;
1414
import org.elasticsearch.action.ingest.PutPipelineRequest;
1515
import org.elasticsearch.common.bytes.BytesReference;
16-
import org.elasticsearch.common.settings.Setting;
1716
import org.elasticsearch.common.settings.Settings;
1817
import org.elasticsearch.common.xcontent.XContentBuilder;
1918
import org.elasticsearch.common.xcontent.XContentType;
2019
import org.elasticsearch.common.xcontent.json.JsonXContent;
2120
import org.elasticsearch.ingest.IngestService;
22-
import org.elasticsearch.plugins.Plugin;
2321
import org.elasticsearch.rest.RestStatus;
24-
import org.elasticsearch.test.ESIntegTestCase;
2522
import org.elasticsearch.test.NodeRoles;
26-
import org.elasticsearch.test.StreamsUtils;
2723

28-
import java.io.ByteArrayInputStream;
2924
import java.io.IOException;
30-
import java.io.UncheckedIOException;
31-
import java.nio.file.Files;
32-
import java.nio.file.Path;
3325
import java.util.Arrays;
34-
import java.util.Collection;
3526
import java.util.Collections;
36-
import java.util.List;
3727

3828
import static org.elasticsearch.test.NodeRoles.nonIngestNode;
3929
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
4030
import static org.hamcrest.Matchers.equalTo;
4131

42-
public class GeoIpProcessorNonIngestNodeIT extends ESIntegTestCase {
43-
44-
public static class IngestGeoIpSettingsPlugin extends Plugin {
45-
46-
@Override
47-
public List<Setting<?>> getSettings() {
48-
return Collections.singletonList(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope));
49-
}
50-
}
32+
public class GeoIpProcessorNonIngestNodeIT extends AbstractGeoIpIT {
5133

5234
@Override
53-
protected Collection<Class<? extends Plugin>> nodePlugins() {
54-
return Arrays.asList(IngestGeoIpPlugin.class, IngestGeoIpSettingsPlugin.class);
55-
}
56-
57-
@Override
58-
protected Settings nodeSettings(final int nodeOrdinal) {
59-
final Path databasePath = createTempDir();
60-
try {
61-
Files.createDirectories(databasePath);
62-
Files.copy(
63-
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")),
64-
databasePath.resolve("GeoLite2-City.mmdb"));
65-
Files.copy(
66-
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")),
67-
databasePath.resolve("GeoLite2-Country.mmdb"));
68-
Files.copy(
69-
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")),
70-
databasePath.resolve("GeoLite2-ASN.mmdb"));
71-
} catch (final IOException e) {
72-
throw new UncheckedIOException(e);
73-
}
74-
return Settings.builder()
75-
.put("ingest.geoip.database_path", databasePath)
76-
.put(nonIngestNode())
77-
.put(super.nodeSettings(nodeOrdinal))
78-
.build();
35+
protected Settings nodeSettings(int nodeOrdinal) {
36+
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(nonIngestNode()).build();
7937
}
8038

8139
/**

0 commit comments

Comments
 (0)