Skip to content

Commit 70ec917

Browse files
committed
[Test] Add full cluster restart test for Rollup (#31533)
This pull request adds a full cluster restart test for a Rollup job. The test creates and starts a Rollup job on the cluster and checks that the job already exists and is correctly started on the upgraded cluster. This test allows to test that the persistent task state is correctly parsed from the cluster state after the upgrade, as the status field has been renamed to state in #31031. The test undercovers a ClassCastException that can be thrown in the RollupIndexer when the timestamp as a very low value that fits into an integer. When it's the case, the value is parsed back as an Integer instead of Long object and (long) position.get(rollupFieldName) fails.
1 parent 7e032d9 commit 70ec917

File tree

2 files changed

+117
-1
lines changed

2 files changed

+117
-1
lines changed

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,11 @@ private QueryBuilder createBoundaryQuery(Map<String, Object> position) {
415415
DateHistoGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHisto();
416416
String fieldName = dateHisto.getField();
417417
String rollupFieldName = fieldName + "." + DateHistogramAggregationBuilder.NAME;
418-
long lowerBound = position != null ? (long) position.get(rollupFieldName) : 0;
418+
long lowerBound = 0L;
419+
if (position != null) {
420+
Number value = (Number) position.get(rollupFieldName);
421+
lowerBound = value.longValue();
422+
}
419423
assert lowerBound <= maxBoundary;
420424
final RangeQueryBuilder query = new RangeQueryBuilder(fieldName)
421425
.gte(lowerBound)

x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.http.entity.StringEntity;
1010
import org.apache.http.util.EntityUtils;
1111
import org.elasticsearch.Version;
12+
import org.elasticsearch.client.Request;
1213
import org.elasticsearch.client.Response;
1314
import org.elasticsearch.client.ResponseException;
1415
import org.elasticsearch.common.Booleans;
@@ -30,6 +31,7 @@
3031
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
3132
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
3233
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
34+
import org.hamcrest.Matcher;
3335
import org.junit.Before;
3436

3537
import java.io.IOException;
@@ -38,6 +40,7 @@
3840
import java.util.Collections;
3941
import java.util.HashMap;
4042
import java.util.List;
43+
import java.util.Locale;
4144
import java.util.Map;
4245
import java.util.concurrent.TimeUnit;
4346
import java.util.stream.Collectors;
@@ -274,6 +277,71 @@ public void testWatcher() throws Exception {
274277
}
275278
}
276279

280+
/**
281+
* Tests that a RollUp job created on a old cluster is correctly restarted after the upgrade.
282+
*/
283+
public void testRollupAfterRestart() throws Exception {
284+
assumeTrue("Rollup can be tested with 6.3.0 and onwards", oldClusterVersion.onOrAfter(Version.V_6_3_0));
285+
if (runningAgainstOldCluster) {
286+
final int numDocs = 59;
287+
final int year = randomIntBetween(1970, 2018);
288+
289+
// index documents for the rollup job
290+
final StringBuilder bulk = new StringBuilder();
291+
for (int i = 0; i < numDocs; i++) {
292+
bulk.append("{\"index\":{\"_index\":\"rollup-docs\",\"_type\":\"doc\"}}\n");
293+
String date = String.format(Locale.ROOT, "%04d-01-01T00:%02d:00Z", year, i);
294+
bulk.append("{\"timestamp\":\"").append(date).append("\",\"value\":").append(i).append("}\n");
295+
}
296+
bulk.append("\r\n");
297+
298+
final Request bulkRequest = new Request("POST", "/_bulk");
299+
bulkRequest.setJsonEntity(bulk.toString());
300+
client().performRequest(bulkRequest);
301+
302+
// create the rollup job
303+
final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-job-test");
304+
createRollupJobRequest.setJsonEntity("{"
305+
+ "\"index_pattern\":\"rollup-*\","
306+
+ "\"rollup_index\":\"results-rollup\","
307+
+ "\"cron\":\"*/30 * * * * ?\","
308+
+ "\"page_size\":100,"
309+
+ "\"groups\":{"
310+
+ " \"date_histogram\":{"
311+
+ " \"field\":\"timestamp\","
312+
+ " \"interval\":\"5m\""
313+
+ " }"
314+
+ "},"
315+
+ "\"metrics\":["
316+
+ " {\"field\":\"value\",\"metrics\":[\"min\",\"max\",\"sum\"]}"
317+
+ "]"
318+
+ "}");
319+
320+
Map<String, Object> createRollupJobResponse = toMap(client().performRequest(createRollupJobRequest));
321+
assertThat(createRollupJobResponse.get("acknowledged"), equalTo(Boolean.TRUE));
322+
323+
// start the rollup job
324+
final Request startRollupJobRequest = new Request("POST", "_xpack/rollup/job/rollup-job-test/_start");
325+
Map<String, Object> startRollupJobResponse = toMap(client().performRequest(startRollupJobRequest));
326+
assertThat(startRollupJobResponse.get("started"), equalTo(Boolean.TRUE));
327+
328+
assertRollUpJob("rollup-job-test");
329+
330+
} else {
331+
332+
final Request clusterHealthRequest = new Request("GET", "/_cluster/health");
333+
clusterHealthRequest.addParameter("wait_for_status", "yellow");
334+
clusterHealthRequest.addParameter("wait_for_no_relocating_shards", "true");
335+
if (oldClusterVersion.onOrAfter(Version.V_6_2_0)) {
336+
clusterHealthRequest.addParameter("wait_for_no_initializing_shards", "true");
337+
}
338+
Map<String, Object> clusterHealthResponse = toMap(client().performRequest(clusterHealthRequest));
339+
assertThat(clusterHealthResponse.get("timed_out"), equalTo(Boolean.FALSE));
340+
341+
assertRollUpJob("rollup-job-test");
342+
}
343+
}
344+
277345
public void testSqlFailsOnIndexWithTwoTypes() throws IOException {
278346
// TODO this isn't going to trigger until we backport to 6.1
279347
assumeTrue("It is only possible to build an index that sql doesn't like before 6.0.0",
@@ -526,4 +594,48 @@ private void assertRoleInfo(final String role) throws Exception {
526594
assertNotNull(response.get("cluster"));
527595
assertNotNull(response.get("indices"));
528596
}
597+
598+
@SuppressWarnings("unchecked")
599+
private void assertRollUpJob(final String rollupJob) throws Exception {
600+
final Matcher<?> expectedStates = anyOf(equalTo("indexing"), equalTo("started"));
601+
waitForRollUpJob(rollupJob, expectedStates);
602+
603+
// check that the rollup job is started using the RollUp API
604+
final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/" + rollupJob);
605+
Map<String, Object> getRollupJobResponse = toMap(client().performRequest(getRollupJobRequest));
606+
assertThat(ObjectPath.eval("jobs.0.status.job_state", getRollupJobResponse), expectedStates);
607+
608+
// check that the rollup job is started using the Tasks API
609+
final Request taskRequest = new Request("GET", "_tasks");
610+
taskRequest.addParameter("detailed", "true");
611+
taskRequest.addParameter("actions", "xpack/rollup/*");
612+
Map<String, Object> taskResponse = toMap(client().performRequest(taskRequest));
613+
Map<String, Object> taskResponseNodes = (Map<String, Object>) taskResponse.get("nodes");
614+
Map<String, Object> taskResponseNode = (Map<String, Object>) taskResponseNodes.values().iterator().next();
615+
Map<String, Object> taskResponseTasks = (Map<String, Object>) taskResponseNode.get("tasks");
616+
Map<String, Object> taskResponseStatus = (Map<String, Object>) taskResponseTasks.values().iterator().next();
617+
assertThat(ObjectPath.eval("status.job_state", taskResponseStatus), expectedStates);
618+
619+
// check that the rollup job is started using the Cluster State API
620+
final Request clusterStateRequest = new Request("GET", "_cluster/state/metadata");
621+
Map<String, Object> clusterStateResponse = toMap(client().performRequest(clusterStateRequest));
622+
Map<String, Object> rollupJobTask = ObjectPath.eval("metadata.persistent_tasks.tasks.0", clusterStateResponse);
623+
assertThat(ObjectPath.eval("id", rollupJobTask), equalTo("rollup-job-test"));
624+
625+
// Persistent task state field has been renamed in 6.4.0 from "status" to "state"
626+
final String stateFieldName = (runningAgainstOldCluster && oldClusterVersion.before(Version.V_6_4_0)) ? "status" : "state";
627+
628+
final String jobStateField = "task.xpack/rollup/job." + stateFieldName + ".job_state";
629+
assertThat("Expected field [" + jobStateField + "] to be started or indexing in " + rollupJobTask,
630+
ObjectPath.eval(jobStateField, rollupJobTask), expectedStates);
631+
}
632+
633+
private void waitForRollUpJob(final String rollupJob, final Matcher<?> expectedStates) throws Exception {
634+
assertBusy(() -> {
635+
final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/" + rollupJob);
636+
Response getRollupJobResponse = client().performRequest(getRollupJobRequest);
637+
assertThat(getRollupJobResponse.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus()));
638+
assertThat(ObjectPath.eval("jobs.0.status.job_state", toMap(getRollupJobResponse)), expectedStates);
639+
}, 30L, TimeUnit.SECONDS);
640+
}
529641
}

0 commit comments

Comments
 (0)