Skip to content

Commit 5a449db

Browse files
committed
rework
1 parent c8ab28a commit 5a449db

File tree

8 files changed

+44
-64
lines changed

8 files changed

+44
-64
lines changed

modules/ingest-geoip/build.gradle

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ dependencies {
2626

2727
testImplementation 'org.elasticsearch:geolite2-databases:20191119'
2828
internalClusterTestImplementation project(path: ":modules:reindex")
29-
internalClusterTestImplementation project(path: ":modules:lang-painless")
3029
}
3130

3231
restResources {
@@ -72,9 +71,6 @@ tasks.named("bundlePlugin").configure {
7271
}
7372
}
7473

75-
tasks.named("processTestResources").configure {
76-
((Copy)it).duplicatesStrategy(DuplicatesStrategy.EXCLUDE)
77-
}
7874
tasks.named("thirdPartyAudit").configure {
7975
ignoreMissingClasses(
8076
// geoip WebServiceClient needs apache http client, but we're not using WebServiceClient:

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,18 @@
1515
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
1616
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
1717
import org.elasticsearch.action.search.SearchResponse;
18-
import org.elasticsearch.core.SuppressForbidden;
1918
import org.elasticsearch.common.bytes.BytesReference;
2019
import org.elasticsearch.common.settings.Settings;
2120
import org.elasticsearch.common.xcontent.XContentBuilder;
2221
import org.elasticsearch.common.xcontent.XContentType;
2322
import org.elasticsearch.common.xcontent.json.JsonXContent;
23+
import org.elasticsearch.core.SuppressForbidden;
2424
import org.elasticsearch.core.TimeValue;
2525
import org.elasticsearch.env.Environment;
2626
import org.elasticsearch.index.query.BoolQueryBuilder;
2727
import org.elasticsearch.index.query.MatchQueryBuilder;
2828
import org.elasticsearch.index.query.RangeQueryBuilder;
2929
import org.elasticsearch.index.reindex.ReindexPlugin;
30-
import org.elasticsearch.painless.PainlessPlugin;
3130
import org.elasticsearch.persistent.PersistentTaskParams;
3231
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
3332
import org.elasticsearch.plugins.Plugin;
@@ -70,8 +69,7 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {
7069

7170
@Override
7271
protected Collection<Class<? extends Plugin>> nodePlugins() {
73-
return Arrays.asList(PainlessPlugin.class, ReindexPlugin.class, IngestGeoIpPlugin.class,
74-
GeoIpProcessorNonIngestNodeIT.IngestGeoIpSettingsPlugin.class);
72+
return Arrays.asList(ReindexPlugin.class, IngestGeoIpPlugin.class, GeoIpProcessorNonIngestNodeIT.IngestGeoIpSettingsPlugin.class);
7573
}
7674

7775
@Override

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,13 @@
3030
import org.elasticsearch.index.query.BoolQueryBuilder;
3131
import org.elasticsearch.index.query.MatchQueryBuilder;
3232
import org.elasticsearch.index.query.RangeQueryBuilder;
33-
import org.elasticsearch.index.query.TermQueryBuilder;
3433
import org.elasticsearch.index.reindex.DeleteByQueryAction;
3534
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
36-
import org.elasticsearch.index.reindex.UpdateByQueryAction;
37-
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
3835
import org.elasticsearch.ingest.IngestService;
3936
import org.elasticsearch.ingest.geoip.GeoIpTaskState.Metadata;
4037
import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStats;
4138
import org.elasticsearch.persistent.AllocatedPersistentTask;
4239
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
43-
import org.elasticsearch.script.Script;
4440
import org.elasticsearch.tasks.TaskId;
4541
import org.elasticsearch.threadpool.Scheduler;
4642
import org.elasticsearch.threadpool.ThreadPool;
@@ -67,8 +63,6 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
6763
TimeValue.timeValueDays(3), TimeValue.timeValueDays(1), Property.Dynamic, Property.NodeScope);
6864
public static final Setting<String> ENDPOINT_SETTING = Setting.simpleString("ingest.geoip.downloader.endpoint",
6965
"https://geoip.elastic.co/v1/database", Property.NodeScope);
70-
public static final Setting<TimeValue> DATABASE_VALIDITY = Setting.timeSetting("ingest.geoip.database.validity",
71-
TimeValue.timeValueDays(3), Property.Dynamic, Property.NodeScope);
7266

7367
public static final String GEOIP_DOWNLOADER = "geoip-downloader";
7468
static final String DATABASES_INDEX = ".geoip_databases";
@@ -147,7 +141,7 @@ void processDatabase(Map<String, Object> databaseInfo) {
147141
int firstChunk = state.contains(name) ? state.get(name).getLastChunk() + 1 : 0;
148142
int lastChunk = indexChunks(name, is, firstChunk, md5, start);
149143
if (lastChunk > firstChunk) {
150-
state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5));
144+
state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5, start));
151145
updateTaskState();
152146
stats = stats.successfulDownload(System.currentTimeMillis() - start).count(state.getDatabases().size());
153147
logger.info("updated geoip database [" + name + "]");
@@ -174,17 +168,8 @@ void deleteOldChunks(String name, int firstChunk) {
174168
//visible for testing
175169
protected void updateTimestamp(String name, Metadata old) {
176170
logger.info("geoip database [" + name + "] is up to date, updated timestamp");
177-
long timestamp = System.currentTimeMillis();
178-
UpdateByQueryRequest request = new UpdateByQueryRequest(DATABASES_INDEX).setRefresh(true);
179-
request.setScript(new Script("ctx._source.timestamp=" + timestamp + "L"));
180-
request.setQuery(new BoolQueryBuilder()
181-
.filter(new TermQueryBuilder("name", name))
182-
.filter(new TermQueryBuilder("timestamp", old.getLastUpdate()))
183-
.filter(new RangeQueryBuilder("chunk")
184-
.from(old.getFirstChunk(), true)
185-
.to(old.getLastChunk(), true)));
186-
client.execute(UpdateByQueryAction.INSTANCE, request).actionGet();
187-
state = state.put(name, new Metadata(timestamp, old.getFirstChunk(), old.getLastChunk(), old.getMd5()));
171+
state = state.put(name, new Metadata(old.getLastUpdate(), old.getFirstChunk(), old.getLastChunk(), old.getMd5(),
172+
System.currentTimeMillis()));
188173
stats = stats.skippedDownload();
189174
updateTaskState();
190175
}
@@ -268,7 +253,8 @@ private void cleanDatabases() {
268253
String name = e.getKey();
269254
Metadata meta = e.getValue();
270255
deleteOldChunks(name, meta.getLastChunk() + 1);
271-
state = state.put(name, new Metadata(meta.getLastUpdate() - 1, meta.getFirstChunk(), meta.getLastChunk(), meta.getMd5()));
256+
state = state.put(name, new Metadata(meta.getLastUpdate(), meta.getFirstChunk(), meta.getLastChunk(), meta.getMd5(),
257+
meta.getLastCheck() - 1));
272258
updateTaskState();
273259
}).count();
274260
stats = stats.expiredDatabases((int) expiredDatabases);

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,17 @@
99
package org.elasticsearch.ingest.geoip;
1010

1111
import org.elasticsearch.Version;
12-
import org.elasticsearch.common.xcontent.ParseField;
13-
import org.elasticsearch.core.TimeValue;
14-
import org.elasticsearch.core.Tuple;
1512
import org.elasticsearch.common.io.stream.StreamInput;
1613
import org.elasticsearch.common.io.stream.StreamOutput;
1714
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
1815
import org.elasticsearch.common.settings.Settings;
1916
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
17+
import org.elasticsearch.common.xcontent.ParseField;
2018
import org.elasticsearch.common.xcontent.ToXContentObject;
2119
import org.elasticsearch.common.xcontent.XContentBuilder;
2220
import org.elasticsearch.common.xcontent.XContentParser;
21+
import org.elasticsearch.core.TimeValue;
22+
import org.elasticsearch.core.Tuple;
2323
import org.elasticsearch.persistent.PersistentTaskState;
2424

2525
import java.io.IOException;
@@ -33,6 +33,7 @@
3333
import java.util.stream.Collectors;
3434

3535
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
36+
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
3637
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
3738

3839
class GeoIpTaskState implements PersistentTaskState, VersionedNamedWriteable {
@@ -65,7 +66,11 @@ public static GeoIpTaskState fromXContent(XContentParser parser) throws IOExcept
6566

6667
GeoIpTaskState(StreamInput input) throws IOException {
6768
databases = Collections.unmodifiableMap(input.readMap(StreamInput::readString,
68-
in -> new Metadata(in.readLong(), in.readVInt(), in.readVInt(), in.readString())));
69+
in -> {
70+
long lastUpdate = in.readLong();
71+
return new Metadata(lastUpdate, in.readVInt(), in.readVInt(), in.readString(),
72+
in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readLong() : lastUpdate);
73+
}));
6974
}
7075

7176
public GeoIpTaskState put(String name, Metadata metadata) {
@@ -130,26 +135,32 @@ public void writeTo(StreamOutput out) throws IOException {
130135
o.writeVInt(v.firstChunk);
131136
o.writeVInt(v.lastChunk);
132137
o.writeString(v.md5);
138+
if (o.getVersion().onOrAfter(Version.V_8_0_0)) {
139+
o.writeLong(v.lastCheck);
140+
}
133141
});
134142
}
135143

136144
static class Metadata implements ToXContentObject {
137145

138146
static final String NAME = GEOIP_DOWNLOADER + "-metadata";
147+
private static final ParseField LAST_CHECK = new ParseField("last_check");
139148
private static final ParseField LAST_UPDATE = new ParseField("last_update");
140149
private static final ParseField FIRST_CHUNK = new ParseField("first_chunk");
141150
private static final ParseField LAST_CHUNK = new ParseField("last_chunk");
142151
private static final ParseField MD5 = new ParseField("md5");
143152

144153
private static final ConstructingObjectParser<Metadata, Void> PARSER =
145154
new ConstructingObjectParser<>(NAME, true,
146-
args -> new Metadata((long) args[0], (int) args[1], (int) args[2], (String) args[3]));
155+
args -> new Metadata((long) args[0], (int) args[1], (int) args[2], (String) args[3], (long) (args[4] == null ? args[0] :
156+
args[4])));
147157

148158
static {
149159
PARSER.declareLong(constructorArg(), LAST_UPDATE);
150160
PARSER.declareInt(constructorArg(), FIRST_CHUNK);
151161
PARSER.declareInt(constructorArg(), LAST_CHUNK);
152162
PARSER.declareString(constructorArg(), MD5);
163+
PARSER.declareLong(optionalConstructorArg(), LAST_CHECK);
153164
}
154165

155166
public static Metadata fromXContent(XContentParser parser) {
@@ -164,12 +175,14 @@ public static Metadata fromXContent(XContentParser parser) {
164175
private final int firstChunk;
165176
private final int lastChunk;
166177
private final String md5;
178+
private final long lastCheck;
167179

168-
Metadata(long lastUpdate, int firstChunk, int lastChunk, String md5) {
180+
Metadata(long lastUpdate, int firstChunk, int lastChunk, String md5, long lastCheck) {
169181
this.lastUpdate = lastUpdate;
170182
this.firstChunk = firstChunk;
171183
this.lastChunk = lastChunk;
172184
this.md5 = Objects.requireNonNull(md5);
185+
this.lastCheck = lastCheck;
173186
}
174187

175188
public long getLastUpdate() {
@@ -178,7 +191,7 @@ public long getLastUpdate() {
178191

179192
public boolean isValid(Settings settings) {
180193
TimeValue valid = settings.getAsTime("ingest.geoip.database_validity", TimeValue.timeValueDays(30));
181-
return Instant.ofEpochMilli(lastUpdate).isAfter(Instant.now().minus(valid.getMillis(), ChronoUnit.MILLIS));
194+
return Instant.ofEpochMilli(lastCheck).isAfter(Instant.now().minus(valid.getMillis(), ChronoUnit.MILLIS));
182195
}
183196

184197
public int getFirstChunk() {
@@ -193,6 +206,10 @@ public String getMd5() {
193206
return md5;
194207
}
195208

209+
public long getLastCheck() {
210+
return lastCheck;
211+
}
212+
196213
@Override
197214
public boolean equals(Object o) {
198215
if (this == o) return true;
@@ -201,19 +218,21 @@ public boolean equals(Object o) {
201218
return lastUpdate == metadata.lastUpdate
202219
&& firstChunk == metadata.firstChunk
203220
&& lastChunk == metadata.lastChunk
221+
&& lastCheck == metadata.lastCheck
204222
&& md5.equals(metadata.md5);
205223
}
206224

207225
@Override
208226
public int hashCode() {
209-
return Objects.hash(lastUpdate, firstChunk, lastChunk, md5);
227+
return Objects.hash(lastUpdate, firstChunk, lastChunk, md5, lastCheck);
210228
}
211229

212230
@Override
213231
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
214232
builder.startObject();
215233
{
216234
builder.field(LAST_UPDATE.getPreferredName(), lastUpdate);
235+
builder.field(LAST_CHECK.getPreferredName(), lastCheck);
217236
builder.field(FIRST_CHUNK.getPreferredName(), firstChunk);
218237
builder.field(LAST_CHUNK.getPreferredName(), lastChunk);
219238
builder.field(MD5.getPreferredName(), md5);

modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void testCheckDatabases() throws Exception {
128128
String taskId = GeoIpDownloader.GEOIP_DOWNLOADER;
129129
PersistentTask<?> task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null);
130130
task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb",
131-
new GeoIpTaskState.Metadata(10, 5, 14, md5))));
131+
new GeoIpTaskState.Metadata(10, 5, 14, md5, 10))));
132132
PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task));
133133

134134
ClusterState state = ClusterState.builder(new ClusterName("name"))
@@ -149,7 +149,7 @@ public void testCheckDatabases() throws Exception {
149149
}
150150

151151
task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb",
152-
new GeoIpTaskState.Metadata(System.currentTimeMillis(), 5, 14, md5))));
152+
new GeoIpTaskState.Metadata(10, 5, 14, md5, System.currentTimeMillis()))));
153153
tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task));
154154

155155
state = ClusterState.builder(new ClusterName("name"))
@@ -171,7 +171,7 @@ public void testCheckDatabases_dontCheckDatabaseOnNonIngestNode() throws Excepti
171171
String md5 = mockSearches("GeoIP2-City.mmdb", 0, 9);
172172
String taskId = GeoIpDownloader.GEOIP_DOWNLOADER;
173173
PersistentTask<?> task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null);
174-
task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 0, 9, md5))));
174+
task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 0, 9, md5, 10))));
175175
PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task));
176176

177177
ClusterState state = ClusterState.builder(new ClusterName("name"))
@@ -195,7 +195,7 @@ public void testCheckDatabases_dontCheckDatabaseWhenNoDatabasesIndex() throws Ex
195195
String md5 = mockSearches("GeoIP2-City.mmdb", 0, 9);
196196
String taskId = GeoIpDownloader.GEOIP_DOWNLOADER;
197197
PersistentTask<?> task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null);
198-
task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 0, 9, md5))));
198+
task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 0, 9, md5, 10))));
199199
PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task));
200200

201201
ClusterState state = ClusterState.builder(new ClusterName("name"))
@@ -236,7 +236,7 @@ public void testCheckDatabases_dontCheckDatabaseWhenGeoIpDownloadTask() throws E
236236

237237
public void testRetrieveDatabase() throws Exception {
238238
String md5 = mockSearches("_name", 0, 29);
239-
GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(-1, 0, 29, md5);
239+
GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(-1, 0, 29, md5, 10);
240240

241241
@SuppressWarnings("unchecked")
242242
CheckedConsumer<byte[], IOException> chunkConsumer = mock(CheckedConsumer.class);
@@ -254,7 +254,7 @@ public void testRetrieveDatabase() throws Exception {
254254
public void testRetrieveDatabaseCorruption() throws Exception {
255255
String md5 = mockSearches("_name", 0, 9);
256256
String incorrectMd5 = "different";
257-
GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(-1, 0, 9, incorrectMd5);
257+
GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(-1, 0, 9, incorrectMd5, 10);
258258

259259
@SuppressWarnings("unchecked")
260260
CheckedConsumer<byte[], IOException> chunkConsumer = mock(CheckedConsumer.class);

modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,13 +279,13 @@ void deleteOldChunks(String name, int firstChunk) {
279279
}
280280
};
281281

282-
geoIpDownloader.setState(GeoIpTaskState.EMPTY.put("test.mmdb", new GeoIpTaskState.Metadata(0, 5, 8, "0")));
282+
geoIpDownloader.setState(GeoIpTaskState.EMPTY.put("test.mmdb", new GeoIpTaskState.Metadata(0, 5, 8, "0", 0)));
283283
geoIpDownloader.processDatabase(Map.of("name", "test.tgz", "url", "http://a.b/t1", "md5_hash", "1"));
284284
}
285285

286286

287287
public void testProcessDatabaseSame() throws IOException {
288-
GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(0, 4, 10, "1");
288+
GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(0, 4, 10, "1", 0);
289289
GeoIpTaskState taskState = GeoIpTaskState.EMPTY.put("test.mmdb", metadata);
290290
ByteArrayInputStream bais = new ByteArrayInputStream(new byte[0]);
291291
when(httpClient.get("a.b/t1")).thenReturn(bais);

modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpTaskStateSerializationTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ protected GeoIpTaskState createTestInstance() {
3030
GeoIpTaskState state = GeoIpTaskState.EMPTY;
3131
int databaseCount = randomInt(20);
3232
for (int i = 0; i < databaseCount; i++) {
33-
GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(randomLong(), randomInt(), randomInt(), randomAlphaOfLength(32));
33+
GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(randomLong(), randomInt(), randomInt(),
34+
randomAlphaOfLength(32), randomLong());
3435
state = state.put(randomAlphaOfLengthBetween(5, 10), metadata);
3536
}
3637
return state;

modules/ingest-geoip/src/test/resources/plugin-security.policy

Lines changed: 0 additions & 20 deletions
This file was deleted.

0 commit comments

Comments
 (0)