Skip to content

Commit 331a44b

Browse files
authored
Change GeoIP downloader policy after 30 days of no updates (elastic#74099)
This PR changes the way GeoIpDownloader and GeoIpProcessor handle situation when we are unable to update databases for 30 days. In that case: GeoIpDownloader will delete all chunks from .geoip_databases index DatabaseRegistry will delete all files on ingest nodes GeoIpProcessor will tag document with tags: ["_geoip_expired_database"] field (same way as in Logstash) This change also fixes bug with that breaks DatabaseRegistry and when it tires to download databases after updating timestamp only (GeoIpDownloader checks if there are new databases and updates timestamp because local databases are up to date)
1 parent c9ad768 commit 331a44b

15 files changed

+415
-193
lines changed

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/AbstractGeoIpIT.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.elasticsearch.common.settings.Setting;
1212
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.core.TimeValue;
1314
import org.elasticsearch.plugins.Plugin;
1415
import org.elasticsearch.test.ESIntegTestCase;
1516
import org.elasticsearch.test.StreamsUtils;
@@ -21,7 +22,6 @@
2122
import java.nio.file.Path;
2223
import java.util.Arrays;
2324
import java.util.Collection;
24-
import java.util.Collections;
2525
import java.util.List;
2626

2727
public abstract class AbstractGeoIpIT extends ESIntegTestCase {
@@ -58,7 +58,9 @@ public static class IngestGeoIpSettingsPlugin extends Plugin {
5858

5959
@Override
6060
public List<Setting<?>> getSettings() {
61-
return Collections.singletonList(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope));
61+
return List.of(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope),
62+
Setting.timeSetting("ingest.geoip.database_validity", TimeValue.timeValueDays(3), Setting.Property.NodeScope,
63+
Setting.Property.Dynamic));
6264
}
6365
}
6466
}

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java

Lines changed: 172 additions & 77 deletions
Large diffs are not rendered by default.

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
import org.apache.lucene.util.LuceneTestCase;
1212
import org.elasticsearch.client.Client;
13+
import org.elasticsearch.cluster.ClusterState;
14+
import org.elasticsearch.cluster.service.ClusterService;
1315
import org.elasticsearch.common.network.InetAddresses;
1416
import org.elasticsearch.common.util.concurrent.AtomicArray;
1517
import org.elasticsearch.core.internal.io.IOUtils;
@@ -41,6 +43,7 @@
4143
import static org.hamcrest.Matchers.nullValue;
4244
import static org.hamcrest.Matchers.sameInstance;
4345
import static org.mockito.Mockito.mock;
46+
import static org.mockito.Mockito.when;
4447

4548
@LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") // Don't randomly add 'extra' files to directory.
4649
public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
@@ -57,13 +60,15 @@ public void test() throws Exception {
5760
Path geoIpConfigDir = createTempDir();
5861
Path geoIpTmpDir = createTempDir();
5962
DatabaseRegistry databaseRegistry = createRegistry(geoIpModulesDir, geoIpConfigDir, geoIpTmpDir);
60-
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
63+
ClusterService clusterService = mock(ClusterService.class);
64+
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
65+
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
6166
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
6267
geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
6368
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
6469
geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
65-
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"), 0);
66-
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0);
70+
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
71+
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
6772
lazyLoadReaders(databaseRegistry);
6873

6974
final GeoIpProcessor processor1 = factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field")));
@@ -116,13 +121,13 @@ public void test() throws Exception {
116121
} else {
117122
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
118123
geoIpTmpDir.resolve("GeoLite2-City.mmdb"), StandardCopyOption.REPLACE_EXISTING);
119-
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"), 0);
124+
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
120125
}
121126
DatabaseReaderLazyLoader previous2 = databaseRegistry.get("GeoLite2-City-Test.mmdb");
122127
InputStream source = LocalDatabases.class.getResourceAsStream(i % 2 == 0 ? "/GeoIP2-City-Test.mmdb" :
123128
"/GeoLite2-City-Test.mmdb");
124129
Files.copy(source, geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), StandardCopyOption.REPLACE_EXISTING);
125-
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0);
130+
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
126131

127132
DatabaseReaderLazyLoader current1 = databaseRegistry.get("GeoLite2-City.mmdb");
128133
DatabaseReaderLazyLoader current2 = databaseRegistry.get("GeoLite2-City-Test.mmdb");

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.common.CheckedBiFunction;
2525
import org.elasticsearch.common.CheckedSupplier;
2626
import org.elasticsearch.core.SuppressForbidden;
27-
import org.elasticsearch.common.logging.HeaderWarning;
2827
import org.elasticsearch.core.internal.io.IOUtils;
2928

3029
import java.io.Closeable;
@@ -36,7 +35,6 @@
3635
import java.nio.file.Path;
3736
import java.security.AccessController;
3837
import java.security.PrivilegedAction;
39-
import java.time.Duration;
4038
import java.util.Objects;
4139
import java.util.concurrent.atomic.AtomicInteger;
4240

@@ -55,7 +53,6 @@ class DatabaseReaderLazyLoader implements Closeable {
5553
private final GeoIpCache cache;
5654
private final Path databasePath;
5755
private final CheckedSupplier<DatabaseReader, IOException> loader;
58-
private volatile long lastUpdate;
5956
final SetOnce<DatabaseReader> databaseReader;
6057

6158
// cache the database type so that we do not re-read it on every pipeline execution
@@ -197,16 +194,6 @@ private <T extends AbstractResponse> T getResponse(InetAddress ipAddress,
197194
}
198195

199196
DatabaseReader get() throws IOException {
200-
//only downloaded databases will have lastUpdate != 0, we never update it for default databases or databases from config dir
201-
if (lastUpdate != 0) {
202-
Path fileName = databasePath.getFileName();
203-
if (System.currentTimeMillis() - lastUpdate > Duration.ofDays(30).toMillis()) {
204-
throw new IllegalStateException("database [" + fileName + "] was not updated for 30 days and is disabled");
205-
} else if (System.currentTimeMillis() - lastUpdate > Duration.ofDays(25).toMillis()) {
206-
HeaderWarning.addWarning(
207-
"database [{}] was not updated for over 25 days, ingestion will fail if there is no update for 30 days", fileName);
208-
}
209-
}
210197
if (databaseReader.get() == null) {
211198
synchronized (databaseReader) {
212199
if (databaseReader.get() == null) {
@@ -261,7 +248,4 @@ private static DatabaseReader.Builder createDatabaseBuilder(Path databasePath) {
261248
return new DatabaseReader.Builder(databasePath.toFile());
262249
}
263250

264-
void setLastUpdate(long lastUpdate) {
265-
this.lastUpdate = lastUpdate;
266-
}
267251
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.Collection;
4949
import java.util.List;
5050
import java.util.Locale;
51+
import java.util.Map;
5152
import java.util.Objects;
5253
import java.util.Set;
5354
import java.util.concurrent.ConcurrentHashMap;
@@ -202,25 +203,31 @@ void checkDatabases(ClusterState state) {
202203
// Empty state will purge stale entries in databases map.
203204
GeoIpTaskState taskState = task == null || task.getState() == null ? GeoIpTaskState.EMPTY : (GeoIpTaskState) task.getState();
204205

205-
taskState.getDatabases().forEach((name, metadata) -> {
206-
DatabaseReaderLazyLoader reference = databases.get(name);
207-
String remoteMd5 = metadata.getMd5();
208-
String localMd5 = reference != null ? reference.getMd5() : null;
209-
if (Objects.equals(localMd5, remoteMd5)) {
210-
reference.setLastUpdate(metadata.getLastUpdate());
211-
LOGGER.debug("Current reference of [{}] is up to date [{}] with was recorded in CS [{}]", name, localMd5, remoteMd5);
212-
return;
213-
}
206+
taskState.getDatabases().entrySet().stream()
207+
.filter(e -> e.getValue().isValid(state.getMetadata().settings()))
208+
.forEach(e -> {
209+
String name = e.getKey();
210+
GeoIpTaskState.Metadata metadata = e.getValue();
211+
DatabaseReaderLazyLoader reference = databases.get(name);
212+
String remoteMd5 = metadata.getMd5();
213+
String localMd5 = reference != null ? reference.getMd5() : null;
214+
if (Objects.equals(localMd5, remoteMd5)) {
215+
LOGGER.debug("Current reference of [{}] is up to date [{}] with was recorded in CS [{}]", name, localMd5, remoteMd5);
216+
return;
217+
}
214218

215-
try {
216-
retrieveAndUpdateDatabase(name, metadata);
217-
} catch (Exception e) {
218-
LOGGER.error((Supplier<?>) () -> new ParameterizedMessage("attempt to download database [{}] failed", name), e);
219-
}
220-
});
219+
try {
220+
retrieveAndUpdateDatabase(name, metadata);
221+
} catch (Exception ex) {
222+
LOGGER.error((Supplier<?>) () -> new ParameterizedMessage("attempt to download database [{}] failed", name), ex);
223+
}
224+
});
221225

222226
List<String> staleEntries = new ArrayList<>(databases.keySet());
223-
staleEntries.removeAll(taskState.getDatabases().keySet());
227+
staleEntries.removeAll(taskState.getDatabases().entrySet().stream()
228+
.filter(e->e.getValue().isValid(state.getMetadata().settings()))
229+
.map(Map.Entry::getKey)
230+
.collect(Collectors.toSet()));
224231
removeStaleEntries(staleEntries);
225232
}
226233

@@ -284,7 +291,7 @@ void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata meta
284291

285292
LOGGER.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile);
286293
Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
287-
updateDatabase(databaseName, recordedMd5, databaseFile, metadata.getLastUpdate());
294+
updateDatabase(databaseName, recordedMd5, databaseFile);
288295
Files.delete(databaseTmpGzFile);
289296
},
290297
failure -> {
@@ -299,11 +306,10 @@ void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata meta
299306
});
300307
}
301308

302-
void updateDatabase(String databaseFileName, String recordedMd5, Path file, long lastUpdate) {
309+
void updateDatabase(String databaseFileName, String recordedMd5, Path file) {
303310
try {
304311
LOGGER.info("database file changed [{}], reload database...", file);
305312
DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, recordedMd5);
306-
loader.setLastUpdate(lastUpdate);
307313
DatabaseReaderLazyLoader existing = databases.put(databaseFileName, loader);
308314
if (existing != null) {
309315
existing.close();

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@
2222
import org.elasticsearch.common.settings.Setting;
2323
import org.elasticsearch.common.settings.Setting.Property;
2424
import org.elasticsearch.common.settings.Settings;
25-
import org.elasticsearch.core.TimeValue;
2625
import org.elasticsearch.common.xcontent.DeprecationHandler;
2726
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2827
import org.elasticsearch.common.xcontent.XContentParser;
2928
import org.elasticsearch.common.xcontent.XContentType;
29+
import org.elasticsearch.core.TimeValue;
3030
import org.elasticsearch.index.query.BoolQueryBuilder;
3131
import org.elasticsearch.index.query.MatchQueryBuilder;
3232
import org.elasticsearch.index.query.RangeQueryBuilder;
@@ -70,6 +70,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
7070

7171
private final Client client;
7272
private final HttpClient httpClient;
73+
private final ClusterService clusterService;
7374
private final ThreadPool threadPool;
7475
private final String endpoint;
7576

@@ -84,6 +85,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
8485
super(id, type, action, description, parentTask, headers);
8586
this.httpClient = httpClient;
8687
this.client = new OriginSettingClient(client, IngestService.INGEST_ORIGIN);
88+
this.clusterService = clusterService;
8789
this.threadPool = threadPool;
8890
endpoint = ENDPOINT_SETTING.get(settings);
8991
pollInterval = POLL_INTERVAL_SETTING.get(settings);
@@ -139,7 +141,7 @@ void processDatabase(Map<String, Object> databaseInfo) {
139141
int firstChunk = state.contains(name) ? state.get(name).getLastChunk() + 1 : 0;
140142
int lastChunk = indexChunks(name, is, firstChunk, md5, start);
141143
if (lastChunk > firstChunk) {
142-
state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5));
144+
state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5, start));
143145
updateTaskState();
144146
stats = stats.successfulDownload(System.currentTimeMillis() - start).count(state.getDatabases().size());
145147
logger.info("updated geoip database [" + name + "]");
@@ -166,7 +168,8 @@ void deleteOldChunks(String name, int firstChunk) {
166168
//visible for testing
167169
protected void updateTimestamp(String name, Metadata old) {
168170
logger.info("geoip database [" + name + "] is up to date, updated timestamp");
169-
state = state.put(name, new Metadata(System.currentTimeMillis(), old.getFirstChunk(), old.getLastChunk(), old.getMd5()));
171+
state = state.put(name, new Metadata(old.getLastUpdate(), old.getFirstChunk(), old.getLastChunk(), old.getMd5(),
172+
System.currentTimeMillis()));
170173
stats = stats.skippedDownload();
171174
updateTaskState();
172175
}
@@ -235,9 +238,28 @@ void runDownloader() {
235238
} catch (Exception e) {
236239
logger.error("exception during geoip databases update", e);
237240
}
241+
try {
242+
cleanDatabases();
243+
} catch (Exception e) {
244+
logger.error("exception during geoip databases cleanup", e);
245+
}
238246
scheduleNextRun(pollInterval);
239247
}
240248

249+
private void cleanDatabases() {
250+
long expiredDatabases = state.getDatabases().entrySet().stream()
251+
.filter(e -> e.getValue().isValid(clusterService.state().metadata().settings()) == false)
252+
.peek(e -> {
253+
String name = e.getKey();
254+
Metadata meta = e.getValue();
255+
deleteOldChunks(name, meta.getLastChunk() + 1);
256+
state = state.put(name, new Metadata(meta.getLastUpdate(), meta.getFirstChunk(), meta.getLastChunk(), meta.getMd5(),
257+
meta.getLastCheck() - 1));
258+
updateTaskState();
259+
}).count();
260+
stats = stats.expiredDatabases((int) expiredDatabases);
261+
}
262+
241263
@Override
242264
protected void onCancelled() {
243265
if (scheduled != null) {
@@ -251,6 +273,8 @@ public GeoIpDownloaderStats getStatus() {
251273
}
252274

253275
private void scheduleNextRun(TimeValue time) {
254-
scheduled = threadPool.schedule(this::runDownloader, time, ThreadPool.Names.GENERIC);
276+
if (threadPool.scheduler().isShutdown() == false) {
277+
scheduled = threadPool.schedule(this::runDownloader, time, ThreadPool.Names.GENERIC);
278+
}
255279
}
256280
}

0 commit comments

Comments
 (0)