diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/AbstractGeoIpIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/AbstractGeoIpIT.java index 72dac4ee2b299..04e87b5031acf 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/AbstractGeoIpIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/AbstractGeoIpIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.StreamsUtils; @@ -21,7 +22,6 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; public abstract class AbstractGeoIpIT extends ESIntegTestCase { @@ -58,7 +58,9 @@ public static class IngestGeoIpSettingsPlugin extends Plugin { @Override public List> getSettings() { - return Collections.singletonList(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope)); + return List.of(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope), + Setting.timeSetting("ingest.geoip.database_validity", TimeValue.timeValueDays(3), Setting.Property.NodeScope, + Setting.Property.Dynamic)); } } } diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java index bb5fde301137c..456e69a90e340 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java @@ -9,18 +9,20 @@ package org.elasticsearch.ingest.geoip; import com.maxmind.geoip2.DatabaseReader; + import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.ingest.SimulateDocumentBaseResult; import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.ingest.SimulatePipelineResponse; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.MatchQueryBuilder; @@ -53,10 +55,13 @@ import java.util.zip.GZIPInputStream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; public class GeoIpDownloaderIT extends AbstractGeoIpIT { @@ -78,14 +83,85 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { } @After - public void disableDownloader() { + public void cleanUp() { ClusterUpdateSettingsResponse settingsResponse = client().admin().cluster() .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), (String) null)) + .setPersistentSettings(Settings.builder() + .put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), (String) null) + .put(GeoIpDownloader.POLL_INTERVAL_SETTING.getKey(), (String) null) + .put("ingest.geoip.database_validity", (String) null)) .get(); assertTrue(settingsResponse.isAcknowledged()); } + public void testInvalidTimestamp() throws Exception { + assumeTrue("only test with fixture to have stable results", ENDPOINT != null); + ClusterUpdateSettingsResponse settingsResponse = + client().admin().cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder() + .put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true)) + .get(); + assertTrue(settingsResponse.isAcknowledged()); + assertBusy(() -> { + GeoIpTaskState state = getGeoIpTaskState(); + assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet()); + }, 2, TimeUnit.MINUTES); + + putPipeline(); + verifyUpdatedDatabase(); + + settingsResponse = + client().admin().cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder() + .put("ingest.geoip.database_validity", TimeValue.timeValueMillis(1))) + .get(); + assertTrue(settingsResponse.isAcknowledged()); + Thread.sleep(10); + settingsResponse = client().admin().cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(GeoIpDownloader.POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueDays(2))) + .get(); + assertTrue(settingsResponse.isAcknowledged()); + List geoIpTmpDirs = getGeoIpTmpDirs(); + assertBusy(() -> { + for (Path geoIpTmpDir : geoIpTmpDirs) { + try (Stream files = Files.list(geoIpTmpDir)) { + Set names = files.map(f -> f.getFileName().toString()).collect(Collectors.toSet()); + assertThat(names, not(hasItem("GeoLite2-ASN.mmdb"))); + assertThat(names, not(hasItem("GeoLite2-City.mmdb"))); + assertThat(names, not(hasItem("GeoLite2-Country.mmdb"))); + } + } + }); + putPipeline(); + assertBusy(() -> { + SimulateDocumentBaseResult result = simulatePipeline(); + assertThat(result.getFailure(), nullValue()); + assertTrue(result.getIngestDocument().hasField("tags")); + @SuppressWarnings("unchecked") + List tags = result.getIngestDocument().getFieldValue("tags", List.class); + assertThat(tags, contains("_geoip_expired_database")); + assertFalse(result.getIngestDocument().hasField("ip-city")); + assertFalse(result.getIngestDocument().hasField("ip-asn")); + assertFalse(result.getIngestDocument().hasField("ip-country")); + }); + } + + public void testUpdatedTimestamp() throws Exception { + assumeTrue("only test with fixture to have stable results", ENDPOINT != null); + testGeoIpDatabasesDownload(); + long lastCheck = getGeoIpTaskState().getDatabases().get("GeoLite2-ASN.mmdb").getLastCheck(); + ClusterUpdateSettingsResponse settingsResponse = client().admin().cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(GeoIpDownloader.POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueDays(2))) + .get(); + assertTrue(settingsResponse.isAcknowledged()); + assertBusy(() -> assertNotEquals(lastCheck, getGeoIpTaskState().getDatabases().get("GeoLite2-ASN.mmdb").getLastCheck())); + testGeoIpDatabasesDownload(); + } + public void testGeoIpDatabasesDownload() throws Exception { ClusterUpdateSettingsResponse settingsResponse = client().admin().cluster() .prepareUpdateSettings() @@ -93,10 +169,7 @@ public void testGeoIpDatabasesDownload() throws Exception { .get(); assertTrue(settingsResponse.isAcknowledged()); assertBusy(() -> { - PersistentTasksCustomMetadata.PersistentTask task = getTask(); - assertNotNull(task); - GeoIpTaskState state = (GeoIpTaskState) task.getState(); - assertNotNull(state); + GeoIpTaskState state = getGeoIpTaskState(); assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet()); }, 2, TimeUnit.MINUTES); @@ -150,6 +223,95 @@ public void testGeoIpDatabasesDownload() throws Exception { public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception { assumeTrue("only test with fixture to have stable results", ENDPOINT != null); // setup: + putPipeline(); + + // verify before updating dbs + { + SimulateDocumentBaseResult result = simulatePipeline(); + assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Tumba")); + assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB")); + assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden")); + } + + // Enable downloader: + Settings.Builder settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings)); + + final List geoipTmpDirs = getGeoIpTmpDirs(); + assertBusy(() -> { + for (Path geoipTmpDir : geoipTmpDirs) { + try (Stream list = Files.list(geoipTmpDir)) { + List files = list.map(Path::getFileName).map(Path::toString).collect(Collectors.toList()); + assertThat(files, containsInAnyOrder("GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "GeoLite2-ASN.mmdb", + "GeoLite2-City.mmdb_COPYRIGHT.txt", "GeoLite2-Country.mmdb_COPYRIGHT.txt", "GeoLite2-ASN.mmdb_COPYRIGHT.txt", + "GeoLite2-City.mmdb_LICENSE.txt", "GeoLite2-Country.mmdb_LICENSE.txt", "GeoLite2-ASN.mmdb_LICENSE.txt", + "GeoLite2-ASN.mmdb_README.txt")); + } + } + }); + + verifyUpdatedDatabase(); + + // Disable downloader: + settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), false); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings)); + + assertBusy(() -> { + for (Path geoipTmpDir : geoipTmpDirs) { + try (Stream list = Files.list(geoipTmpDir)) { + List files = list.map(Path::toString).filter(p -> p.endsWith(".mmdb")).collect(Collectors.toList()); + assertThat(files, empty()); + } + } + }); + } + + private void verifyUpdatedDatabase() throws Exception { + assertBusy(() -> { + SimulateDocumentBaseResult result = simulatePipeline(); + assertThat(result.getFailure(), nullValue()); + assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Linköping")); + assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB")); + assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden")); + }); + } + + private GeoIpTaskState getGeoIpTaskState() { + PersistentTasksCustomMetadata.PersistentTask task = getTask(); + assertNotNull(task); + GeoIpTaskState state = (GeoIpTaskState) task.getState(); + assertNotNull(state); + return state; + } + + private SimulateDocumentBaseResult simulatePipeline() throws IOException { + BytesReference bytes; + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + builder.startArray("docs"); + { + builder.startObject(); + builder.field("_index", "my-index"); + { + builder.startObject("_source"); + builder.field("ip", "89.160.20.128"); + builder.endObject(); + } + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + bytes = BytesReference.bytes(builder); + } + SimulatePipelineRequest simulateRequest = new SimulatePipelineRequest(bytes, XContentType.JSON); + simulateRequest.setId("_id"); + SimulatePipelineResponse simulateResponse = client().admin().cluster().simulatePipeline(simulateRequest).actionGet(); + assertThat(simulateResponse.getPipelineId(), equalTo("_id")); + assertThat(simulateResponse.getResults().size(), equalTo(1)); + return (SimulateDocumentBaseResult) simulateResponse.getResults().get(0); + } + + private void putPipeline() throws IOException { BytesReference bytes; try (XContentBuilder builder = JsonXContent.contentBuilder()) { builder.startObject(); @@ -196,41 +358,9 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception { bytes = BytesReference.bytes(builder); } assertAcked(client().admin().cluster().preparePutPipeline("_id", bytes, XContentType.JSON).get()); + } - // verify before updating dbs - try (XContentBuilder builder = JsonXContent.contentBuilder()) { - builder.startObject(); - builder.startArray("docs"); - { - builder.startObject(); - builder.field("_index", "my-index"); - { - builder.startObject("_source"); - builder.field("ip", "89.160.20.128"); - builder.endObject(); - } - builder.endObject(); - } - builder.endArray(); - builder.endObject(); - bytes = BytesReference.bytes(builder); - } - SimulatePipelineRequest simulateRequest = new SimulatePipelineRequest(bytes, XContentType.JSON); - simulateRequest.setId("_id"); - { - SimulatePipelineResponse simulateResponse = client().admin().cluster().simulatePipeline(simulateRequest).actionGet(); - assertThat(simulateResponse.getPipelineId(), equalTo("_id")); - assertThat(simulateResponse.getResults().size(), equalTo(1)); - SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) simulateResponse.getResults().get(0); - assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Tumba")); - assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB")); - assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden")); - } - - // Enable downloader: - Settings.Builder settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true); - assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings)); - + private List getGeoIpTmpDirs() throws IOException { final Set ids = StreamSupport.stream(clusterService().state().nodes().getDataNodes().values().spliterator(), false) .map(c -> c.value.getId()) .collect(Collectors.toSet()); @@ -242,42 +372,7 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception { geoipTmpDirs = files.filter(path -> ids.contains(path.getFileName().toString())).collect(Collectors.toList()); } assertThat(geoipTmpDirs.size(), equalTo(internalCluster().numDataNodes())); - assertBusy(() -> { - for (Path geoipTmpDir : geoipTmpDirs) { - try (Stream list = Files.list(geoipTmpDir)) { - List files = list.map(Path::getFileName).map(Path::toString).collect(Collectors.toList()); - assertThat(files, containsInAnyOrder("GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "GeoLite2-ASN.mmdb", - "GeoLite2-City.mmdb_COPYRIGHT.txt", "GeoLite2-Country.mmdb_COPYRIGHT.txt", "GeoLite2-ASN.mmdb_COPYRIGHT.txt", - "GeoLite2-City.mmdb_LICENSE.txt", "GeoLite2-Country.mmdb_LICENSE.txt", "GeoLite2-ASN.mmdb_LICENSE.txt", - "GeoLite2-ASN.mmdb_README.txt")); - } - } - }); - - // Verify after updating dbs: - assertBusy(() -> { - SimulatePipelineResponse simulateResponse = client().admin().cluster().simulatePipeline(simulateRequest).actionGet(); - assertThat(simulateResponse.getPipelineId(), equalTo("_id")); - assertThat(simulateResponse.getResults().size(), equalTo(1)); - SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) simulateResponse.getResults().get(0); - assertThat(result.getFailure(), nullValue()); - assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Linköping")); - assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB")); - assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden")); - }); - - // Disable downloader: - settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), false); - assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings)); - - assertBusy(() -> { - for (Path geoipTmpDir : geoipTmpDirs) { - try (Stream list = Files.list(geoipTmpDir)) { - List files = list.map(Path::toString).filter(p -> p.endsWith(".mmdb")).collect(Collectors.toList()); - assertThat(files, empty()); - } - } - }); + return geoipTmpDirs; } @SuppressForbidden(reason = "Maxmind API requires java.io.File") diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java index 6ac46fea36d15..afa40c241fd64 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java @@ -10,6 +10,8 @@ import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.core.internal.io.IOUtils; @@ -41,6 +43,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") // Don't randomly add 'extra' files to directory. public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase { @@ -57,13 +60,15 @@ public void test() throws Exception { Path geoIpConfigDir = createTempDir(); Path geoIpTmpDir = createTempDir(); DatabaseRegistry databaseRegistry = createRegistry(geoIpModulesDir, geoIpConfigDir, geoIpTmpDir); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), geoIpTmpDir.resolve("GeoLite2-City.mmdb")); Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")); - databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"), 0); - databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0); + databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb")); + databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")); lazyLoadReaders(databaseRegistry); final GeoIpProcessor processor1 = factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field"))); @@ -116,13 +121,13 @@ public void test() throws Exception { } else { Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), geoIpTmpDir.resolve("GeoLite2-City.mmdb"), StandardCopyOption.REPLACE_EXISTING); - databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"), 0); + databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb")); } DatabaseReaderLazyLoader previous2 = databaseRegistry.get("GeoLite2-City-Test.mmdb"); InputStream source = LocalDatabases.class.getResourceAsStream(i % 2 == 0 ? "/GeoIP2-City-Test.mmdb" : "/GeoLite2-City-Test.mmdb"); Files.copy(source, geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), StandardCopyOption.REPLACE_EXISTING); - databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0); + databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")); DatabaseReaderLazyLoader current1 = databaseRegistry.get("GeoLite2-City.mmdb"); DatabaseReaderLazyLoader current2 = databaseRegistry.get("GeoLite2-City-Test.mmdb"); diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java index 31b9560676cc2..891437ebedae4 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.core.SuppressForbidden; -import org.elasticsearch.common.logging.HeaderWarning; import org.elasticsearch.core.internal.io.IOUtils; import java.io.Closeable; @@ -36,7 +35,6 @@ import java.nio.file.Path; import java.security.AccessController; import java.security.PrivilegedAction; -import java.time.Duration; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; @@ -55,7 +53,6 @@ class DatabaseReaderLazyLoader implements Closeable { private final GeoIpCache cache; private final Path databasePath; private final CheckedSupplier loader; - private volatile long lastUpdate; final SetOnce databaseReader; // cache the database type so that we do not re-read it on every pipeline execution @@ -197,16 +194,6 @@ private T getResponse(InetAddress ipAddress, } DatabaseReader get() throws IOException { - //only downloaded databases will have lastUpdate != 0, we never update it for default databases or databases from config dir - if (lastUpdate != 0) { - Path fileName = databasePath.getFileName(); - if (System.currentTimeMillis() - lastUpdate > Duration.ofDays(30).toMillis()) { - throw new IllegalStateException("database [" + fileName + "] was not updated for 30 days and is disabled"); - } else if (System.currentTimeMillis() - lastUpdate > Duration.ofDays(25).toMillis()) { - HeaderWarning.addWarning( - "database [{}] was not updated for over 25 days, ingestion will fail if there is no update for 30 days", fileName); - } - } if (databaseReader.get() == null) { synchronized (databaseReader) { if (databaseReader.get() == null) { @@ -261,7 +248,4 @@ private static DatabaseReader.Builder createDatabaseBuilder(Path databasePath) { return new DatabaseReader.Builder(databasePath.toFile()); } - void setLastUpdate(long lastUpdate) { - this.lastUpdate = lastUpdate; - } } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java index 820c99da92be1..caf8436dd9ef8 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java @@ -48,6 +48,7 @@ import java.util.Collection; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -202,25 +203,31 @@ void checkDatabases(ClusterState state) { // Empty state will purge stale entries in databases map. GeoIpTaskState taskState = task == null || task.getState() == null ? GeoIpTaskState.EMPTY : (GeoIpTaskState) task.getState(); - taskState.getDatabases().forEach((name, metadata) -> { - DatabaseReaderLazyLoader reference = databases.get(name); - String remoteMd5 = metadata.getMd5(); - String localMd5 = reference != null ? reference.getMd5() : null; - if (Objects.equals(localMd5, remoteMd5)) { - reference.setLastUpdate(metadata.getLastUpdate()); - LOGGER.debug("Current reference of [{}] is up to date [{}] with was recorded in CS [{}]", name, localMd5, remoteMd5); - return; - } + taskState.getDatabases().entrySet().stream() + .filter(e -> e.getValue().isValid(state.getMetadata().settings())) + .forEach(e -> { + String name = e.getKey(); + GeoIpTaskState.Metadata metadata = e.getValue(); + DatabaseReaderLazyLoader reference = databases.get(name); + String remoteMd5 = metadata.getMd5(); + String localMd5 = reference != null ? reference.getMd5() : null; + if (Objects.equals(localMd5, remoteMd5)) { + LOGGER.debug("Current reference of [{}] is up to date [{}] with was recorded in CS [{}]", name, localMd5, remoteMd5); + return; + } - try { - retrieveAndUpdateDatabase(name, metadata); - } catch (Exception e) { - LOGGER.error((Supplier) () -> new ParameterizedMessage("attempt to download database [{}] failed", name), e); - } - }); + try { + retrieveAndUpdateDatabase(name, metadata); + } catch (Exception ex) { + LOGGER.error((Supplier) () -> new ParameterizedMessage("attempt to download database [{}] failed", name), ex); + } + }); List staleEntries = new ArrayList<>(databases.keySet()); - staleEntries.removeAll(taskState.getDatabases().keySet()); + staleEntries.removeAll(taskState.getDatabases().entrySet().stream() + .filter(e->e.getValue().isValid(state.getMetadata().settings())) + .map(Map.Entry::getKey) + .collect(Collectors.toSet())); removeStaleEntries(staleEntries); } @@ -284,7 +291,7 @@ void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata meta LOGGER.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile); Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); - updateDatabase(databaseName, recordedMd5, databaseFile, metadata.getLastUpdate()); + updateDatabase(databaseName, recordedMd5, databaseFile); Files.delete(databaseTmpGzFile); }, failure -> { @@ -299,11 +306,10 @@ void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata meta }); } - void updateDatabase(String databaseFileName, String recordedMd5, Path file, long lastUpdate) { + void updateDatabase(String databaseFileName, String recordedMd5, Path file) { try { LOGGER.info("database file changed [{}], reload database...", file); DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, recordedMd5); - loader.setLastUpdate(lastUpdate); DatabaseReaderLazyLoader existing = databases.put(databaseFileName, loader); if (existing != null) { existing.close(); diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java index b5eb472c30970..b8ab92c3c65ee 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java @@ -22,11 +22,11 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; @@ -70,6 +70,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask { private final Client client; private final HttpClient httpClient; + private final ClusterService clusterService; private final ThreadPool threadPool; private final String endpoint; @@ -84,6 +85,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask { super(id, type, action, description, parentTask, headers); this.httpClient = httpClient; this.client = new OriginSettingClient(client, IngestService.INGEST_ORIGIN); + this.clusterService = clusterService; this.threadPool = threadPool; endpoint = ENDPOINT_SETTING.get(settings); pollInterval = POLL_INTERVAL_SETTING.get(settings); @@ -139,7 +141,7 @@ void processDatabase(Map databaseInfo) { int firstChunk = state.contains(name) ? state.get(name).getLastChunk() + 1 : 0; int lastChunk = indexChunks(name, is, firstChunk, md5, start); if (lastChunk > firstChunk) { - state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5)); + state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5, start)); updateTaskState(); stats = stats.successfulDownload(System.currentTimeMillis() - start).count(state.getDatabases().size()); logger.info("updated geoip database [" + name + "]"); @@ -166,7 +168,8 @@ void deleteOldChunks(String name, int firstChunk) { //visible for testing protected void updateTimestamp(String name, Metadata old) { logger.info("geoip database [" + name + "] is up to date, updated timestamp"); - state = state.put(name, new Metadata(System.currentTimeMillis(), old.getFirstChunk(), old.getLastChunk(), old.getMd5())); + state = state.put(name, new Metadata(old.getLastUpdate(), old.getFirstChunk(), old.getLastChunk(), old.getMd5(), + System.currentTimeMillis())); stats = stats.skippedDownload(); updateTaskState(); } @@ -235,9 +238,28 @@ void runDownloader() { } catch (Exception e) { logger.error("exception during geoip databases update", e); } + try { + cleanDatabases(); + } catch (Exception e) { + logger.error("exception during geoip databases cleanup", e); + } scheduleNextRun(pollInterval); } + private void cleanDatabases() { + long expiredDatabases = state.getDatabases().entrySet().stream() + .filter(e -> e.getValue().isValid(clusterService.state().metadata().settings()) == false) + .peek(e -> { + String name = e.getKey(); + Metadata meta = e.getValue(); + deleteOldChunks(name, meta.getLastChunk() + 1); + state = state.put(name, new Metadata(meta.getLastUpdate(), meta.getFirstChunk(), meta.getLastChunk(), meta.getMd5(), + meta.getLastCheck() - 1)); + updateTaskState(); + }).count(); + stats = stats.expiredDatabases((int) expiredDatabases); + } + @Override protected void onCancelled() { if (scheduled != null) { @@ -251,6 +273,8 @@ public GeoIpDownloaderStats getStatus() { } private void scheduleNextRun(TimeValue time) { - scheduled = threadPool.schedule(this::runDownloader, time, ThreadPool.Names.GENERIC); + if (threadPool.scheduler().isShutdown() == false) { + scheduled = threadPool.schedule(this::runDownloader, time, ThreadPool.Names.GENERIC); + } } } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index d842e3490a15f..61b4ad367c146 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -19,12 +19,15 @@ import com.maxmind.geoip2.record.Subdivision; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; import java.io.IOException; import java.net.InetAddress; @@ -37,11 +40,13 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalList; import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty; +import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.getTaskWithId; public final class GeoIpProcessor extends AbstractProcessor { @@ -51,6 +56,7 @@ public final class GeoIpProcessor extends AbstractProcessor { private static final String ASN_DB_SUFFIX = "-ASN"; private final String field; + private final Supplier isValid; private final String targetField; private final CheckedSupplier supplier; private final Set properties; @@ -59,10 +65,11 @@ public final class GeoIpProcessor extends AbstractProcessor { /** * Construct a geo-IP processor. - * @param tag the processor tag + * @param tag the processor tag * @param description the processor description * @param field the source field to geo-IP map * @param supplier a supplier of a geo-IP database reader; ideally this is lazily-loaded once on first use + * @param isValid * @param targetField the target field * @param properties the properties; ideally this is lazily-loaded once on first use * @param ignoreMissing true if documents with a missing value for the field should be ignored @@ -73,12 +80,14 @@ public final class GeoIpProcessor extends AbstractProcessor { final String description, final String field, final CheckedSupplier supplier, + final Supplier isValid, final String targetField, final Set properties, final boolean ignoreMissing, final boolean firstOnly) { super(tag, description); this.field = field; + this.isValid = isValid; this.targetField = targetField; this.supplier = supplier; this.properties = properties; @@ -94,7 +103,10 @@ boolean isIgnoreMissing() { public IngestDocument execute(IngestDocument ingestDocument) throws IOException { Object ip = ingestDocument.getFieldValue(field, Object.class, ignoreMissing); - if (ip == null && ignoreMissing) { + if (isValid.get() == false) { + ingestDocument.appendFieldValue("tags","_geoip_expired_database", false); + return ingestDocument; + } else if (ip == null && ignoreMissing) { return ingestDocument; } else if (ip == null) { throw new IllegalArgumentException("field [" + field + "] is null, cannot extract geoip information."); @@ -342,13 +354,15 @@ public static final class Factory implements Processor.Factory { )); private final DatabaseRegistry databaseRegistry; + private final ClusterService clusterService; List getAllDatabases() { return databaseRegistry.getAllDatabases(); } - public Factory(DatabaseRegistry databaseRegistry) { + public Factory(DatabaseRegistry databaseRegistry, ClusterService clusterService) { this.databaseRegistry = databaseRegistry; + this.clusterService = clusterService; } @Override @@ -413,7 +427,19 @@ public GeoIpProcessor create( "] doesn't match with expected suffix [" + expectedSuffix + "]"; return loader; }; - return new GeoIpProcessor(processorTag, description, ipField, supplier, targetField, properties, ignoreMissing, firstOnly); + Supplier isValid = () -> { + ClusterState currentState = clusterService.state(); + assert currentState != null; + + PersistentTask task = getTaskWithId(currentState, GeoIpDownloader.GEOIP_DOWNLOADER); + if (task == null || task.getState() == null) { + return true; + } + GeoIpTaskState state = (GeoIpTaskState) task.getState(); + return state.getDatabases().get(databaseFile).isValid(currentState.metadata().settings()); + }; + return new GeoIpProcessor(processorTag, description, ipField, supplier, isValid, targetField, properties, ignoreMissing, + firstOnly); } } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java index a394289ca722b..c9062a3443e87 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java @@ -9,18 +9,22 @@ package org.elasticsearch.ingest.geoip; import org.elasticsearch.Version; -import org.elasticsearch.common.xcontent.ParseField; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.VersionedNamedWriteable; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.persistent.PersistentTaskState; import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -29,6 +33,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; class GeoIpTaskState implements PersistentTaskState, VersionedNamedWriteable { @@ -61,7 +66,11 @@ public static GeoIpTaskState fromXContent(XContentParser parser) throws IOExcept GeoIpTaskState(StreamInput input) throws IOException { databases = Collections.unmodifiableMap(input.readMap(StreamInput::readString, - in -> new Metadata(in.readLong(), in.readVInt(), in.readVInt(), in.readString()))); + in -> { + long lastUpdate = in.readLong(); + return new Metadata(lastUpdate, in.readVInt(), in.readVInt(), in.readString(), + in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readLong() : lastUpdate); + })); } public GeoIpTaskState put(String name, Metadata metadata) { @@ -126,12 +135,16 @@ public void writeTo(StreamOutput out) throws IOException { o.writeVInt(v.firstChunk); o.writeVInt(v.lastChunk); o.writeString(v.md5); + if (o.getVersion().onOrAfter(Version.V_8_0_0)) { + o.writeLong(v.lastCheck); + } }); } static class Metadata implements ToXContentObject { static final String NAME = GEOIP_DOWNLOADER + "-metadata"; + private static final ParseField LAST_CHECK = new ParseField("last_check"); private static final ParseField LAST_UPDATE = new ParseField("last_update"); private static final ParseField FIRST_CHUNK = new ParseField("first_chunk"); private static final ParseField LAST_CHUNK = new ParseField("last_chunk"); @@ -139,13 +152,15 @@ static class Metadata implements ToXContentObject { private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, - args -> new Metadata((long) args[0], (int) args[1], (int) args[2], (String) args[3])); + args -> new Metadata((long) args[0], (int) args[1], (int) args[2], (String) args[3], (long) (args[4] == null ? args[0] : + args[4]))); static { PARSER.declareLong(constructorArg(), LAST_UPDATE); PARSER.declareInt(constructorArg(), FIRST_CHUNK); PARSER.declareInt(constructorArg(), LAST_CHUNK); PARSER.declareString(constructorArg(), MD5); + PARSER.declareLong(optionalConstructorArg(), LAST_CHECK); } public static Metadata fromXContent(XContentParser parser) { @@ -160,18 +175,25 @@ public static Metadata fromXContent(XContentParser parser) { private final int firstChunk; private final int lastChunk; private final String md5; + private final long lastCheck; - Metadata(long lastUpdate, int firstChunk, int lastChunk, String md5) { + Metadata(long lastUpdate, int firstChunk, int lastChunk, String md5, long lastCheck) { this.lastUpdate = lastUpdate; this.firstChunk = firstChunk; this.lastChunk = lastChunk; this.md5 = Objects.requireNonNull(md5); + this.lastCheck = lastCheck; } public long getLastUpdate() { return lastUpdate; } + public boolean isValid(Settings settings) { + TimeValue valid = settings.getAsTime("ingest.geoip.database_validity", TimeValue.timeValueDays(30)); + return Instant.ofEpochMilli(lastCheck).isAfter(Instant.now().minus(valid.getMillis(), ChronoUnit.MILLIS)); + } + public int getFirstChunk() { return firstChunk; } @@ -184,6 +206,10 @@ public String getMd5() { return md5; } + public long getLastCheck() { + return lastCheck; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -192,12 +218,13 @@ public boolean equals(Object o) { return lastUpdate == metadata.lastUpdate && firstChunk == metadata.firstChunk && lastChunk == metadata.lastChunk + && lastCheck == metadata.lastCheck && md5.equals(metadata.md5); } @Override public int hashCode() { - return Objects.hash(lastUpdate, firstChunk, lastChunk, md5); + return Objects.hash(lastUpdate, firstChunk, lastChunk, md5, lastCheck); } @Override @@ -205,6 +232,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); { builder.field(LAST_UPDATE.getPreferredName(), lastUpdate); + builder.field(LAST_CHECK.getPreferredName(), lastCheck); builder.field(FIRST_CHUNK.getPreferredName(), firstChunk); builder.field(LAST_CHUNK.getPreferredName(), lastChunk); builder.field(MD5.getPreferredName(), md5); diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 29740f752601a..f5698d7ab01f5 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -91,7 +91,7 @@ public Map getProcessors(Processor.Parameters paramet GeoIpCache geoIpCache = new GeoIpCache(cacheSize); DatabaseRegistry registry = new DatabaseRegistry(parameters.env, parameters.client, geoIpCache, parameters.genericExecutor); databaseRegistry.set(registry); - return Map.of(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(registry)); + return Map.of(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(registry, parameters.ingestService.getClusterService())); } @Override @@ -113,7 +113,7 @@ public Collection createComponents(Client client, throw new UncheckedIOException(e); } - if(GeoIpDownloaderTaskExecutor.ENABLED_DEFAULT == false){ + if (GeoIpDownloaderTaskExecutor.ENABLED_DEFAULT == false) { return List.of(databaseRegistry.get()); } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java index 653e902e4eb26..d368127b30cd9 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java @@ -8,6 +8,7 @@ package org.elasticsearch.ingest.geoip.stats; +import org.elasticsearch.Version; import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -23,16 +24,18 @@ public class GeoIpDownloaderStats implements Task.Status { - public static final GeoIpDownloaderStats EMPTY = new GeoIpDownloaderStats(0, 0, 0, 0, 0); + public static final GeoIpDownloaderStats EMPTY = new GeoIpDownloaderStats(0, 0, 0, 0, 0, 0); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - "geoip_downloader_stats", a -> new GeoIpDownloaderStats((int) a[0], (int) a[1], (long) a[2], (int) a[3], (int) a[4])); + "geoip_downloader_stats", a -> new GeoIpDownloaderStats((int) a[0], (int) a[1], (long) a[2], (int) a[3], (int) a[4], + a[5] == null ? 0 : (int) a[5])); private static final ParseField SUCCESSFUL_DOWNLOADS = new ParseField("successful_downloads"); private static final ParseField FAILED_DOWNLOADS = new ParseField("failed_downloads"); private static final ParseField TOTAL_DOWNLOAD_TIME = new ParseField("total_download_time"); private static final ParseField DATABASES_COUNT = new ParseField("databases_count"); private static final ParseField SKIPPED_DOWNLOADS = new ParseField("skipped_updates"); + private static final ParseField EXPIRED_DATABASES = new ParseField("expired_databases"); static { PARSER.declareInt(ConstructingObjectParser.constructorArg(), SUCCESSFUL_DOWNLOADS); @@ -40,6 +43,7 @@ public class GeoIpDownloaderStats implements Task.Status { PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_DOWNLOAD_TIME); PARSER.declareInt(ConstructingObjectParser.constructorArg(), DATABASES_COUNT); PARSER.declareInt(ConstructingObjectParser.constructorArg(), SKIPPED_DOWNLOADS); + PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), EXPIRED_DATABASES); } private final int successfulDownloads; @@ -47,6 +51,7 @@ public class GeoIpDownloaderStats implements Task.Status { private final long totalDownloadTime; private final int databasesCount; private final int skippedDownloads; + private final int expiredDatabases; public GeoIpDownloaderStats(StreamInput in) throws IOException { successfulDownloads = in.readVInt(); @@ -54,15 +59,21 @@ public GeoIpDownloaderStats(StreamInput in) throws IOException { totalDownloadTime = in.readVLong(); databasesCount = in.readVInt(); skippedDownloads = in.readVInt(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + expiredDatabases = in.readVInt(); + } else { + expiredDatabases = 0; + } } private GeoIpDownloaderStats(int successfulDownloads, int failedDownloads, long totalDownloadTime, int databasesCount, - int skippedDownloads) { + int skippedDownloads, int expiredDatabases) { this.successfulDownloads = successfulDownloads; this.failedDownloads = failedDownloads; this.totalDownloadTime = totalDownloadTime; this.databasesCount = databasesCount; this.skippedDownloads = skippedDownloads; + this.expiredDatabases = expiredDatabases; } public int getSuccessfulDownloads() { @@ -85,21 +96,33 @@ public int getSkippedDownloads() { return skippedDownloads; } + public int getExpiredDatabases() { + return expiredDatabases; + } + public GeoIpDownloaderStats skippedDownload() { - return new GeoIpDownloaderStats(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads + 1); + return new GeoIpDownloaderStats(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads + 1, + expiredDatabases); } public GeoIpDownloaderStats successfulDownload(long downloadTime) { return new GeoIpDownloaderStats(successfulDownloads + 1, failedDownloads, totalDownloadTime + Math.max(downloadTime, 0), - databasesCount, skippedDownloads); + databasesCount, skippedDownloads, expiredDatabases); } public GeoIpDownloaderStats failedDownload() { - return new GeoIpDownloaderStats(successfulDownloads, failedDownloads + 1, totalDownloadTime, databasesCount, skippedDownloads); + return new GeoIpDownloaderStats(successfulDownloads, failedDownloads + 1, totalDownloadTime, databasesCount, skippedDownloads, + expiredDatabases); } public GeoIpDownloaderStats count(int databasesCount) { - return new GeoIpDownloaderStats(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads); + return new GeoIpDownloaderStats(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads, + expiredDatabases); + } + + public GeoIpDownloaderStats expiredDatabases(int expiredDatabases) { + return new GeoIpDownloaderStats(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads, + expiredDatabases); } @Override @@ -110,6 +133,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(TOTAL_DOWNLOAD_TIME.getPreferredName(), totalDownloadTime); builder.field(DATABASES_COUNT.getPreferredName(), databasesCount); builder.field(SKIPPED_DOWNLOADS.getPreferredName(), skippedDownloads); + builder.field(EXPIRED_DATABASES.getPreferredName(), expiredDatabases); builder.endObject(); return builder; } @@ -125,6 +149,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(totalDownloadTime); out.writeVInt(databasesCount); out.writeVInt(skippedDownloads); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeVInt(expiredDatabases); + } } @Override @@ -136,12 +163,13 @@ public boolean equals(Object o) { failedDownloads == that.failedDownloads && totalDownloadTime == that.totalDownloadTime && databasesCount == that.databasesCount && - skippedDownloads == that.skippedDownloads; + skippedDownloads == that.skippedDownloads && + expiredDatabases == that.expiredDatabases; } @Override public int hashCode() { - return Objects.hash(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads); + return Objects.hash(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads, expiredDatabases); } @Override diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java index c43fecec61c05..73803f5b78d19 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java @@ -80,7 +80,6 @@ import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.TYPE; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; @@ -128,7 +127,8 @@ public void testCheckDatabases() throws Exception { String md5 = mockSearches("GeoIP2-City.mmdb", 5, 14); String taskId = GeoIpDownloader.GEOIP_DOWNLOADER; PersistentTask task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null); - task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(10L, 5, 14, md5)))); + task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", + new GeoIpTaskState.Metadata(10, 5, 14, md5, 10)))); PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task)); ClusterState state = ClusterState.builder(new ClusterName("name")) @@ -142,16 +142,14 @@ public void testCheckDatabases() throws Exception { assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue()); databaseRegistry.checkDatabases(state); DatabaseReaderLazyLoader database = databaseRegistry.getDatabase("GeoIP2-City.mmdb", false); - assertThat(database, notNullValue()); - verify(client, times(10)).search(any()); + assertThat(database, nullValue()); + verify(client, times(0)).search(any()); try (Stream files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) { - assertThat(files.collect(Collectors.toList()), hasSize(1)); + assertEquals(0, files.count()); } - IllegalStateException e = expectThrows(IllegalStateException.class, database::get); - assertEquals("database [GeoIP2-City.mmdb] was not updated for 30 days and is disabled", e.getMessage()); task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", - new GeoIpTaskState.Metadata(System.currentTimeMillis(), 5, 14, md5)))); + new GeoIpTaskState.Metadata(10, 5, 14, md5, System.currentTimeMillis())))); tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task)); state = ClusterState.builder(new ClusterName("name")) @@ -163,6 +161,8 @@ public void testCheckDatabases() throws Exception { .build(); databaseRegistry.checkDatabases(state); database = databaseRegistry.getDatabase("GeoIP2-City.mmdb", false); + assertThat(database, notNullValue()); + verify(client, times(10)).search(any()); //30 days check passed but we mocked mmdb data so parsing will fail expectThrows(InvalidDatabaseException.class, database::get); } @@ -171,7 +171,7 @@ public void testCheckDatabases_dontCheckDatabaseOnNonIngestNode() throws Excepti String md5 = mockSearches("GeoIP2-City.mmdb", 0, 9); String taskId = GeoIpDownloader.GEOIP_DOWNLOADER; PersistentTask task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null); - task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 0, 9, md5)))); + task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 0, 9, md5, 10)))); PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task)); ClusterState state = ClusterState.builder(new ClusterName("name")) @@ -195,7 +195,7 @@ public void testCheckDatabases_dontCheckDatabaseWhenNoDatabasesIndex() throws Ex String md5 = mockSearches("GeoIP2-City.mmdb", 0, 9); String taskId = GeoIpDownloader.GEOIP_DOWNLOADER; PersistentTask task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null); - task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 0, 9, md5)))); + task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 0, 9, md5, 10)))); PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task)); ClusterState state = ClusterState.builder(new ClusterName("name")) @@ -236,7 +236,7 @@ public void testCheckDatabases_dontCheckDatabaseWhenGeoIpDownloadTask() throws E public void testRetrieveDatabase() throws Exception { String md5 = mockSearches("_name", 0, 29); - GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(-1, 0, 29, md5); + GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(-1, 0, 29, md5, 10); @SuppressWarnings("unchecked") CheckedConsumer chunkConsumer = mock(CheckedConsumer.class); @@ -254,7 +254,7 @@ public void testRetrieveDatabase() throws Exception { public void testRetrieveDatabaseCorruption() throws Exception { String md5 = mockSearches("_name", 0, 9); String incorrectMd5 = "different"; - GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(-1, 0, 9, incorrectMd5); + GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(-1, 0, 9, incorrectMd5, 10); @SuppressWarnings("unchecked") CheckedConsumer chunkConsumer = mock(CheckedConsumer.class); diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java index 6fd28db01a63f..4b4342e978522 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java @@ -279,13 +279,13 @@ void deleteOldChunks(String name, int firstChunk) { } }; - geoIpDownloader.setState(GeoIpTaskState.EMPTY.put("test.mmdb", new GeoIpTaskState.Metadata(0, 5, 8, "0"))); + geoIpDownloader.setState(GeoIpTaskState.EMPTY.put("test.mmdb", new GeoIpTaskState.Metadata(0, 5, 8, "0", 0))); geoIpDownloader.processDatabase(Map.of("name", "test.tgz", "url", "http://a.b/t1", "md5_hash", "1")); } public void testProcessDatabaseSame() throws IOException { - GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(0, 4, 10, "1"); + GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(0, 4, 10, "1", 0); GeoIpTaskState taskState = GeoIpTaskState.EMPTY.put("test.mmdb", metadata); ByteArrayInputStream bais = new ByteArrayInputStream(new byte[0]); when(httpClient.get("a.b/t1")).thenReturn(bais); diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index 8ff76903229ec..e43e37a23e305 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -12,6 +12,8 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.VersionType; @@ -43,11 +45,13 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class GeoIpProcessorFactoryTests extends ESTestCase { private Path geoipTmpDir; private DatabaseRegistry databaseRegistry; + private ClusterService clusterService; @Before public void loadDatabaseReaders() throws IOException { @@ -62,6 +66,8 @@ public void loadDatabaseReaders() throws IOException { LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, new GeoIpCache(1000)); geoipTmpDir = createTempDir(); databaseRegistry = new DatabaseRegistry(geoipTmpDir, client, cache, localDatabases, Runnable::run); + clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); } @After @@ -71,7 +77,7 @@ public void closeDatabaseReaders() throws IOException { } public void testBuildDefaults() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -87,7 +93,7 @@ public void testBuildDefaults() throws Exception { } public void testSetIgnoreMissing() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -104,7 +110,7 @@ public void testSetIgnoreMissing() throws Exception { } public void testCountryBuildDefaults() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -122,7 +128,7 @@ public void testCountryBuildDefaults() throws Exception { } public void testAsnBuildDefaults() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -140,7 +146,7 @@ public void testAsnBuildDefaults() throws Exception { } public void testBuildTargetField() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); Map config = new HashMap<>(); config.put("field", "_field"); config.put("target_field", "_field"); @@ -151,7 +157,7 @@ public void testBuildTargetField() throws Exception { } public void testBuildDbFile() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-Country.mmdb"); @@ -164,7 +170,7 @@ public void testBuildDbFile() throws Exception { } public void testBuildWithCountryDbAndAsnFields() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-Country.mmdb"); @@ -178,7 +184,7 @@ public void testBuildWithCountryDbAndAsnFields() throws Exception { } public void testBuildWithAsnDbAndCityFields() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-ASN.mmdb"); @@ -192,7 +198,7 @@ public void testBuildWithAsnDbAndCityFields() throws Exception { } public void testBuildNonExistingDbFile() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -202,7 +208,7 @@ public void testBuildNonExistingDbFile() throws Exception { } public void testBuildFields() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); Set properties = EnumSet.noneOf(GeoIpProcessor.Property.class); List fieldNames = new ArrayList<>(); @@ -226,7 +232,7 @@ public void testBuildFields() throws Exception { } public void testBuildIllegalFieldOption() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); Map config1 = new HashMap<>(); config1.put("field", "_field"); @@ -256,7 +262,7 @@ public void testLazyLoading() throws Exception { GeoIpCache cache = new GeoIpCache(1000); LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, cache); DatabaseRegistry databaseRegistry = new DatabaseRegistry(createTempDir(), client, cache, localDatabases, Runnable::run); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) { assertNull(lazyLoader.databaseReader.get()); } @@ -319,7 +325,7 @@ public void testLoadingCustomDatabase() throws IOException { GeoIpCache cache = new GeoIpCache(1000); DatabaseRegistry databaseRegistry = new DatabaseRegistry(createTempDir(), client, cache, localDatabases, Runnable::run); databaseRegistry.initialize("nodeId", resourceWatcherService, mock(IngestService.class)); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) { assertNull(lazyLoader.databaseReader.get()); } @@ -342,7 +348,7 @@ public void testLoadingCustomDatabase() throws IOException { } public void testFallbackUsingDefaultDatabases() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); { Map config = new HashMap<>(); config.put("field", "source_field"); @@ -363,7 +369,7 @@ public void testFallbackUsingDefaultDatabases() throws Exception { public void testFallbackUsingDefaultDatabasesWhileIngesting() throws Exception { copyDatabaseFile(geoipTmpDir, "GeoLite2-City-Test.mmdb"); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); // fallback_to_default_databases=true, first use default city db then a custom city db: { Map config = new HashMap<>(); @@ -379,7 +385,7 @@ public void testFallbackUsingDefaultDatabasesWhileIngesting() throws Exception { Map geoData = (Map) ingestDocument.getSourceAndMetadata().get("geoip"); assertThat(geoData.get("city_name"), equalTo("Tumba")); - databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0); + databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb")); ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); processor.execute(ingestDocument); geoData = (Map) ingestDocument.getSourceAndMetadata().get("geoip"); diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java index 7806ebc529cfe..c99672aa30eb9 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.ingest.geoip; import com.maxmind.geoip2.DatabaseReader; + import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.core.PathUtils; import org.elasticsearch.ingest.IngestDocument; @@ -28,6 +29,7 @@ import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -35,7 +37,7 @@ public class GeoIpProcessorTests extends ESTestCase { public void testCity() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); + loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", "8.8.8.8"); @@ -59,7 +61,7 @@ public void testCity() throws Exception { public void testNullValueWithIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, false); + loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, false); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("source_field", null)); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -69,7 +71,7 @@ public void testNullValueWithIgnoreMissing() throws Exception { public void testNonExistentWithIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, false); + loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, false); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); processor.execute(ingestDocument); @@ -78,7 +80,7 @@ public void testNonExistentWithIgnoreMissing() throws Exception { public void testNullWithoutIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); + loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("source_field", null)); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -88,7 +90,7 @@ public void testNullWithoutIgnoreMissing() throws Exception { public void testNonExistentWithoutIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); + loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument)); @@ -97,7 +99,7 @@ public void testNonExistentWithoutIgnoreMissing() throws Exception { public void testCity_withIpV6() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); + loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); String address = "2602:306:33d3:8000::3257:9652"; Map document = new HashMap<>(); @@ -125,7 +127,7 @@ public void testCity_withIpV6() throws Exception { public void testCityWithMissingLocation() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); + loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", "80.231.5.0"); @@ -141,7 +143,7 @@ public void testCityWithMissingLocation() throws Exception { public void testCountry() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-Country.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); + loader("/GeoLite2-Country.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", "82.170.213.79"); @@ -160,7 +162,7 @@ public void testCountry() throws Exception { public void testCountryWithMissingLocation() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-Country.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); + loader("/GeoLite2-Country.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", "80.231.5.0"); @@ -177,7 +179,7 @@ public void testCountryWithMissingLocation() throws Exception { public void testAsn() throws Exception { String ip = "82.171.64.0"; GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-ASN.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); + loader("/GeoLite2-ASN.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", ip); @@ -196,7 +198,7 @@ public void testAsn() throws Exception { public void testAddressIsNotInTheDatabase() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); + loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", "127.0.0.1"); @@ -205,10 +207,12 @@ public void testAddressIsNotInTheDatabase() throws Exception { assertThat(ingestDocument.getSourceAndMetadata().containsKey("target_field"), is(false)); } - /** Don't silently do DNS lookups or anything trappy on bogus data */ + /** + * Don't silently do DNS lookups or anything trappy on bogus data + */ public void testInvalid() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); + loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", "www.google.com"); @@ -219,7 +223,7 @@ public void testInvalid() throws Exception { public void testListAllValid() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); + loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", Arrays.asList("8.8.8.8", "82.171.64.0")); @@ -239,7 +243,7 @@ public void testListAllValid() throws Exception { public void testListPartiallyValid() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); + loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", Arrays.asList("8.8.8.8", "127.0.0.1")); @@ -259,7 +263,7 @@ public void testListPartiallyValid() throws Exception { public void testListNoMatches() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); + loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", Arrays.asList("127.0.0.1", "127.0.0.1")); @@ -271,7 +275,7 @@ public void testListNoMatches() throws Exception { public void testListFirstOnly() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, true); + loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, true); Map document = new HashMap<>(); document.put("source_field", Arrays.asList("8.8.8.8", "127.0.0.1")); @@ -289,7 +293,19 @@ public void testListFirstOnly() throws Exception { public void testListFirstOnlyNoMatches() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, true); + loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, true); + + Map document = new HashMap<>(); + document.put("source_field", Arrays.asList("127.0.0.1", "127.0.0.2")); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + + assertThat(ingestDocument.getSourceAndMetadata().containsKey("target_field"), is(false)); + } + + public void testInvalidDatabase() throws Exception { + GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", + loader("/GeoLite2-City.mmdb"), () -> false, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, true); Map document = new HashMap<>(); document.put("source_field", Arrays.asList("127.0.0.1", "127.0.0.2")); @@ -297,6 +313,7 @@ public void testListFirstOnlyNoMatches() throws Exception { processor.execute(ingestDocument); assertThat(ingestDocument.getSourceAndMetadata().containsKey("target_field"), is(false)); + assertThat(ingestDocument.getSourceAndMetadata(), hasEntry("tags", List.of("_geoip_expired_database"))); } private CheckedSupplier loader(final String path) { diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpTaskStateSerializationTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpTaskStateSerializationTests.java index dd5faa9d8fa33..b5f5043d8ae5e 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpTaskStateSerializationTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpTaskStateSerializationTests.java @@ -30,7 +30,8 @@ protected GeoIpTaskState createTestInstance() { GeoIpTaskState state = GeoIpTaskState.EMPTY; int databaseCount = randomInt(20); for (int i = 0; i < databaseCount; i++) { - GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(randomLong(), randomInt(), randomInt(), randomAlphaOfLength(32)); + GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(randomLong(), randomInt(), randomInt(), + randomAlphaOfLength(32), randomLong()); state = state.put(randomAlphaOfLengthBetween(5, 10), metadata); } return state;