Skip to content

Change GeoIP downloader policy after 30 days of no updates #74099

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.StreamsUtils;
Expand All @@ -21,7 +22,6 @@
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

public abstract class AbstractGeoIpIT extends ESIntegTestCase {
Expand Down Expand Up @@ -58,7 +58,9 @@ public static class IngestGeoIpSettingsPlugin extends Plugin {

@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope));
return List.of(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope),
Setting.timeSetting("ingest.geoip.database_validity", TimeValue.timeValueDays(3), Setting.Property.NodeScope,
Setting.Property.Dynamic));
}
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -41,6 +43,7 @@
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") // Don't randomly add 'extra' files to directory.
public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
Expand All @@ -57,13 +60,15 @@ public void test() throws Exception {
Path geoIpConfigDir = createTempDir();
Path geoIpTmpDir = createTempDir();
DatabaseRegistry databaseRegistry = createRegistry(geoIpModulesDir, geoIpConfigDir, geoIpTmpDir);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"), 0);
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0);
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
lazyLoadReaders(databaseRegistry);

final GeoIpProcessor processor1 = factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field")));
Expand Down Expand Up @@ -116,13 +121,13 @@ public void test() throws Exception {
} else {
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
geoIpTmpDir.resolve("GeoLite2-City.mmdb"), StandardCopyOption.REPLACE_EXISTING);
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"), 0);
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
}
DatabaseReaderLazyLoader previous2 = databaseRegistry.get("GeoLite2-City-Test.mmdb");
InputStream source = LocalDatabases.class.getResourceAsStream(i % 2 == 0 ? "/GeoIP2-City-Test.mmdb" :
"/GeoLite2-City-Test.mmdb");
Files.copy(source, geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), StandardCopyOption.REPLACE_EXISTING);
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0);
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));

DatabaseReaderLazyLoader current1 = databaseRegistry.get("GeoLite2-City.mmdb");
DatabaseReaderLazyLoader current2 = databaseRegistry.get("GeoLite2-City-Test.mmdb");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.core.internal.io.IOUtils;

import java.io.Closeable;
Expand All @@ -36,7 +35,6 @@
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -55,7 +53,6 @@ class DatabaseReaderLazyLoader implements Closeable {
private final GeoIpCache cache;
private final Path databasePath;
private final CheckedSupplier<DatabaseReader, IOException> loader;
private volatile long lastUpdate;
final SetOnce<DatabaseReader> databaseReader;

// cache the database type so that we do not re-read it on every pipeline execution
Expand Down Expand Up @@ -197,16 +194,6 @@ private <T extends AbstractResponse> T getResponse(InetAddress ipAddress,
}

DatabaseReader get() throws IOException {
//only downloaded databases will have lastUpdate != 0, we never update it for default databases or databases from config dir
if (lastUpdate != 0) {
Path fileName = databasePath.getFileName();
if (System.currentTimeMillis() - lastUpdate > Duration.ofDays(30).toMillis()) {
throw new IllegalStateException("database [" + fileName + "] was not updated for 30 days and is disabled");
} else if (System.currentTimeMillis() - lastUpdate > Duration.ofDays(25).toMillis()) {
HeaderWarning.addWarning(
"database [{}] was not updated for over 25 days, ingestion will fail if there is no update for 30 days", fileName);
}
}
if (databaseReader.get() == null) {
synchronized (databaseReader) {
if (databaseReader.get() == null) {
Expand Down Expand Up @@ -261,7 +248,4 @@ private static DatabaseReader.Builder createDatabaseBuilder(Path databasePath) {
return new DatabaseReader.Builder(databasePath.toFile());
}

void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -202,25 +203,31 @@ void checkDatabases(ClusterState state) {
// Empty state will purge stale entries in databases map.
GeoIpTaskState taskState = task == null || task.getState() == null ? GeoIpTaskState.EMPTY : (GeoIpTaskState) task.getState();

taskState.getDatabases().forEach((name, metadata) -> {
DatabaseReaderLazyLoader reference = databases.get(name);
String remoteMd5 = metadata.getMd5();
String localMd5 = reference != null ? reference.getMd5() : null;
if (Objects.equals(localMd5, remoteMd5)) {
reference.setLastUpdate(metadata.getLastUpdate());
LOGGER.debug("Current reference of [{}] is up to date [{}] with was recorded in CS [{}]", name, localMd5, remoteMd5);
return;
}
taskState.getDatabases().entrySet().stream()
.filter(e -> e.getValue().isValid(state.getMetadata().settings()))
.forEach(e -> {
String name = e.getKey();
GeoIpTaskState.Metadata metadata = e.getValue();
DatabaseReaderLazyLoader reference = databases.get(name);
String remoteMd5 = metadata.getMd5();
String localMd5 = reference != null ? reference.getMd5() : null;
if (Objects.equals(localMd5, remoteMd5)) {
LOGGER.debug("Current reference of [{}] is up to date [{}] with was recorded in CS [{}]", name, localMd5, remoteMd5);
return;
}

try {
retrieveAndUpdateDatabase(name, metadata);
} catch (Exception e) {
LOGGER.error((Supplier<?>) () -> new ParameterizedMessage("attempt to download database [{}] failed", name), e);
}
});
try {
retrieveAndUpdateDatabase(name, metadata);
} catch (Exception ex) {
LOGGER.error((Supplier<?>) () -> new ParameterizedMessage("attempt to download database [{}] failed", name), ex);
}
});

List<String> staleEntries = new ArrayList<>(databases.keySet());
staleEntries.removeAll(taskState.getDatabases().keySet());
staleEntries.removeAll(taskState.getDatabases().entrySet().stream()
.filter(e->e.getValue().isValid(state.getMetadata().settings()))
.map(Map.Entry::getKey)
.collect(Collectors.toSet()));
removeStaleEntries(staleEntries);
}

Expand Down Expand Up @@ -284,7 +291,7 @@ void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata meta

LOGGER.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile);
Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
updateDatabase(databaseName, recordedMd5, databaseFile, metadata.getLastUpdate());
updateDatabase(databaseName, recordedMd5, databaseFile);
Files.delete(databaseTmpGzFile);
},
failure -> {
Expand All @@ -299,11 +306,10 @@ void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata meta
});
}

void updateDatabase(String databaseFileName, String recordedMd5, Path file, long lastUpdate) {
void updateDatabase(String databaseFileName, String recordedMd5, Path file) {
try {
LOGGER.info("database file changed [{}], reload database...", file);
DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, recordedMd5);
loader.setLastUpdate(lastUpdate);
DatabaseReaderLazyLoader existing = databases.put(databaseFileName, loader);
if (existing != null) {
existing.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
Expand Down Expand Up @@ -70,6 +70,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {

private final Client client;
private final HttpClient httpClient;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final String endpoint;

Expand All @@ -84,6 +85,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
super(id, type, action, description, parentTask, headers);
this.httpClient = httpClient;
this.client = new OriginSettingClient(client, IngestService.INGEST_ORIGIN);
this.clusterService = clusterService;
this.threadPool = threadPool;
endpoint = ENDPOINT_SETTING.get(settings);
pollInterval = POLL_INTERVAL_SETTING.get(settings);
Expand Down Expand Up @@ -139,7 +141,7 @@ void processDatabase(Map<String, Object> databaseInfo) {
int firstChunk = state.contains(name) ? state.get(name).getLastChunk() + 1 : 0;
int lastChunk = indexChunks(name, is, firstChunk, md5, start);
if (lastChunk > firstChunk) {
state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5));
state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5, start));
updateTaskState();
stats = stats.successfulDownload(System.currentTimeMillis() - start).count(state.getDatabases().size());
logger.info("updated geoip database [" + name + "]");
Expand All @@ -166,7 +168,8 @@ void deleteOldChunks(String name, int firstChunk) {
//visible for testing
protected void updateTimestamp(String name, Metadata old) {
logger.info("geoip database [" + name + "] is up to date, updated timestamp");
state = state.put(name, new Metadata(System.currentTimeMillis(), old.getFirstChunk(), old.getLastChunk(), old.getMd5()));
state = state.put(name, new Metadata(old.getLastUpdate(), old.getFirstChunk(), old.getLastChunk(), old.getMd5(),
System.currentTimeMillis()));
stats = stats.skippedDownload();
updateTaskState();
}
Expand Down Expand Up @@ -235,9 +238,28 @@ void runDownloader() {
} catch (Exception e) {
logger.error("exception during geoip databases update", e);
}
try {
cleanDatabases();
} catch (Exception e) {
logger.error("exception during geoip databases cleanup", e);
}
scheduleNextRun(pollInterval);
}

private void cleanDatabases() {
long expiredDatabases = state.getDatabases().entrySet().stream()
.filter(e -> e.getValue().isValid(clusterService.state().metadata().settings()) == false)
.peek(e -> {
String name = e.getKey();
Metadata meta = e.getValue();
deleteOldChunks(name, meta.getLastChunk() + 1);
state = state.put(name, new Metadata(meta.getLastUpdate(), meta.getFirstChunk(), meta.getLastChunk(), meta.getMd5(),
meta.getLastCheck() - 1));
updateTaskState();
}).count();
stats = stats.expiredDatabases((int) expiredDatabases);
}

@Override
protected void onCancelled() {
if (scheduled != null) {
Expand All @@ -251,6 +273,8 @@ public GeoIpDownloaderStats getStatus() {
}

private void scheduleNextRun(TimeValue time) {
scheduled = threadPool.schedule(this::runDownloader, time, ThreadPool.Names.GENERIC);
if (threadPool.scheduler().isShutdown() == false) {
scheduled = threadPool.schedule(this::runDownloader, time, ThreadPool.Names.GENERIC);
}
}
}
Loading