Skip to content

Commit 1adf4f1

Browse files
committed
[Test] Add full cluster restart test for Rollup (#31533)
This pull request adds a full cluster restart test for a Rollup job. The test creates and starts a Rollup job on the cluster and checks that the job already exists and is correctly started on the upgraded cluster. This test allows to test that the persistent task state is correctly parsed from the cluster state after the upgrade, as the status field has been renamed to state in #31031. The test undercovers a ClassCastException that can be thrown in the RollupIndexer when the timestamp as a very low value that fits into an integer. When it's the case, the value is parsed back as an Integer instead of Long object and (long) position.get(rollupFieldName) fails.
1 parent 7d7304d commit 1adf4f1

File tree

2 files changed

+107
-54
lines changed

2 files changed

+107
-54
lines changed

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,11 @@ private QueryBuilder createBoundaryQuery(Map<String, Object> position) {
415415
DateHistoGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHisto();
416416
String fieldName = dateHisto.getField();
417417
String rollupFieldName = fieldName + "." + DateHistogramAggregationBuilder.NAME;
418-
long lowerBound = position != null ? (long) position.get(rollupFieldName) : 0;
418+
long lowerBound = 0L;
419+
if (position != null) {
420+
Number value = (Number) position.get(rollupFieldName);
421+
lowerBound = value.longValue();
422+
}
419423
assert lowerBound <= maxBoundary;
420424
final RangeQueryBuilder query = new RangeQueryBuilder(fieldName)
421425
.gte(lowerBound)

x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java

Lines changed: 102 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.elasticsearch.rest.RestStatus;
2121
import org.elasticsearch.test.StreamsUtils;
2222
import org.elasticsearch.test.rest.ESRestTestCase;
23-
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
2423
import org.elasticsearch.xpack.core.watcher.client.WatchSourceBuilder;
2524
import org.elasticsearch.xpack.core.watcher.support.xcontent.ObjectPath;
2625
import org.elasticsearch.xpack.security.support.IndexLifecycleManager;
@@ -30,6 +29,7 @@
3029
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
3130
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
3231
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
32+
import org.hamcrest.Matcher;
3333
import org.junit.Before;
3434

3535
import java.io.IOException;
@@ -38,6 +38,7 @@
3838
import java.util.Collections;
3939
import java.util.HashMap;
4040
import java.util.List;
41+
import java.util.Locale;
4142
import java.util.Map;
4243
import java.util.concurrent.TimeUnit;
4344
import java.util.stream.Collectors;
@@ -53,7 +54,6 @@
5354
import static org.hamcrest.Matchers.hasEntry;
5455
import static org.hamcrest.Matchers.hasItems;
5556
import static org.hamcrest.Matchers.hasKey;
56-
import static org.hamcrest.Matchers.hasSize;
5757
import static org.hamcrest.Matchers.is;
5858
import static org.hamcrest.Matchers.not;
5959
import static org.hamcrest.Matchers.notNullValue;
@@ -274,6 +274,68 @@ public void testWatcher() throws Exception {
274274
}
275275
}
276276

277+
/**
278+
* Tests that a RollUp job created on a old cluster is correctly restarted after the upgrade.
279+
*/
280+
public void testRollupAfterRestart() throws Exception {
281+
assumeTrue("Rollup can be tested with 6.3.0 and onwards", oldClusterVersion.onOrAfter(Version.V_6_3_0));
282+
if (runningAgainstOldCluster) {
283+
final int numDocs = 59;
284+
final int year = randomIntBetween(1970, 2018);
285+
286+
// index documents for the rollup job
287+
final StringBuilder bulk = new StringBuilder();
288+
for (int i = 0; i < numDocs; i++) {
289+
bulk.append("{\"index\":{\"_index\":\"rollup-docs\",\"_type\":\"doc\"}}\n");
290+
String date = String.format(Locale.ROOT, "%04d-01-01T00:%02d:00Z", year, i);
291+
bulk.append("{\"timestamp\":\"").append(date).append("\",\"value\":").append(i).append("}\n");
292+
}
293+
bulk.append("\r\n");
294+
295+
client().performRequest("POST", "/_bulk", emptyMap(), new StringEntity(bulk.toString(), ContentType.APPLICATION_JSON));
296+
297+
// create the rollup job
298+
final String rollupJob = "{"
299+
+ "\"index_pattern\":\"rollup-*\","
300+
+ "\"rollup_index\":\"results-rollup\","
301+
+ "\"cron\":\"*/30 * * * * ?\","
302+
+ "\"page_size\":100,"
303+
+ "\"groups\":{"
304+
+ " \"date_histogram\":{"
305+
+ " \"field\":\"timestamp\","
306+
+ " \"interval\":\"5m\""
307+
+ " }"
308+
+ "},"
309+
+ "\"metrics\":["
310+
+ " {\"field\":\"value\",\"metrics\":[\"min\",\"max\",\"sum\"]}"
311+
+ "]"
312+
+ "}";
313+
314+
Map<String, Object> createRollupJobResponse = toMap(client().performRequest("PUT", "/_xpack/rollup/job/rollup-job-test",
315+
emptyMap(), new StringEntity(rollupJob.toString(), ContentType.APPLICATION_JSON)));
316+
assertThat(createRollupJobResponse.get("acknowledged"), equalTo(Boolean.TRUE));
317+
318+
// start the rollup job
319+
Map<String, Object> startRollupJobResponse = toMap(client().performRequest("POST", "_xpack/rollup/job/rollup-job-test/_start"));
320+
assertThat(startRollupJobResponse.get("started"), equalTo(Boolean.TRUE));
321+
322+
assertRollUpJob("rollup-job-test");
323+
324+
} else {
325+
326+
final Map<String, String> params = new HashMap<>();
327+
params.put("wait_for_status", "yellow");
328+
params.put("wait_for_no_relocating_shards", "true");
329+
if (oldClusterVersion.onOrAfter(Version.V_6_2_0)) {
330+
params.put("wait_for_no_initializing_shards", "true");
331+
}
332+
Map<String, Object> clusterHealthResponse = toMap(client().performRequest("GET", "/_cluster/health", params));
333+
assertThat(clusterHealthResponse.get("timed_out"), equalTo(Boolean.FALSE));
334+
335+
assertRollUpJob("rollup-job-test");
336+
}
337+
}
338+
277339
public void testSqlFailsOnIndexWithTwoTypes() throws IOException {
278340
// TODO this isn't going to trigger until we backport to 6.1
279341
assumeTrue("It is only possible to build an index that sql doesn't like before 6.0.0",
@@ -413,57 +475,6 @@ private void waitForHits(String indexName, int expectedHits) throws Exception {
413475
}, 30, TimeUnit.SECONDS);
414476
}
415477

416-
@SuppressWarnings("unchecked")
417-
private void waitForMonitoringTemplates() throws Exception {
418-
assertBusy(() -> {
419-
final Map<String, Object> templates = toMap(client().performRequest("GET", "/_template/.monitoring-*"));
420-
421-
// in earlier versions, we published legacy templates in addition to the current ones to support transitioning
422-
assertThat(templates.size(), greaterThanOrEqualTo(MonitoringTemplateUtils.TEMPLATE_IDS.length));
423-
424-
// every template should be updated to whatever the current version is
425-
for (final String templateId : MonitoringTemplateUtils.TEMPLATE_IDS) {
426-
final String templateName = MonitoringTemplateUtils.templateName(templateId);
427-
final Map<String, Object> template = (Map<String, Object>) templates.get(templateName);
428-
429-
assertThat(template.get("version"), is(MonitoringTemplateUtils.LAST_UPDATED_VERSION));
430-
}
431-
}, 30, TimeUnit.SECONDS);
432-
}
433-
434-
@SuppressWarnings("unchecked")
435-
private void waitForClusterStats(final String expectedVersion) throws Exception {
436-
assertBusy(() -> {
437-
final Map<String, String> params = new HashMap<>(3);
438-
params.put("q", "type:cluster_stats _type:cluster_stats");
439-
params.put("size", "1");
440-
params.put("sort", "timestamp:desc");
441-
442-
final Map<String, Object> response = toMap(client().performRequest("GET", "/.monitoring-es-*/_search", params));
443-
final Map<String, Object> hits = (Map<String, Object>) response.get("hits");
444-
445-
assertThat("No cluster_stats documents found.", (int)hits.get("total"), greaterThanOrEqualTo(1));
446-
447-
final Map<String, Object> hit = (Map<String, Object>) ((List<Object>) hits.get("hits")).get(0);
448-
final Map<String, Object> source = (Map<String, Object>) hit.get("_source");
449-
450-
// 5.5+ shares the same index semantics as 6.0+, and we can verify the version in the cluster_stats
451-
if (Version.fromString(expectedVersion).onOrAfter(Version.V_5_5_0)) {
452-
assertThat(source.get("version"), is(expectedVersion));
453-
}
454-
455-
// 5.0 - 5.4 do not have the "version" field in the same index (it's in the .monitoring-data-2 index)
456-
// 5.0+ have cluster_stats: { versions: [ "1.2.3" ] }
457-
// This allows us to verify that it properly recorded the document in 5.0 and we can detect it moving forward
458-
final Map<String, Object> clusterStats = (Map<String, Object>) source.get("cluster_stats");
459-
final Map<String, Object> nodes = (Map<String, Object>) clusterStats.get("nodes");
460-
final List<String> versions = (List<String>) nodes.get("versions");
461-
462-
assertThat(versions, hasSize(1));
463-
assertThat(versions.get(0), is(expectedVersion));
464-
}, 30, TimeUnit.SECONDS);
465-
}
466-
467478
static Map<String, Object> toMap(Response response) throws IOException {
468479
return toMap(EntityUtils.toString(response.getEntity()));
469480
}
@@ -526,4 +537,42 @@ private void assertRoleInfo(final String role) throws Exception {
526537
assertNotNull(response.get("cluster"));
527538
assertNotNull(response.get("indices"));
528539
}
540+
541+
@SuppressWarnings("unchecked")
542+
private void assertRollUpJob(final String rollupJob) throws Exception {
543+
final Matcher<?> expectedStates = anyOf(equalTo("indexing"), equalTo("started"));
544+
waitForRollUpJob(rollupJob, expectedStates);
545+
546+
// check that the rollup job is started using the RollUp API
547+
Map<String, Object> getRollupJobResponse = toMap(client().performRequest("GET", "_xpack/rollup/job/" + rollupJob));
548+
assertThat(ObjectPath.eval("jobs.0.status.job_state", getRollupJobResponse), expectedStates);
549+
550+
// check that the rollup job is started using the Tasks API
551+
final Map<String, String> params = new HashMap<>();
552+
params.put("detailed", "true");
553+
params.put("actions", "xpack/rollup/*");
554+
Map<String, Object> taskResponse = toMap(client().performRequest("GET", "_tasks", params));
555+
Map<String, Object> taskResponseNodes = (Map<String, Object>) taskResponse.get("nodes");
556+
Map<String, Object> taskResponseNode = (Map<String, Object>) taskResponseNodes.values().iterator().next();
557+
Map<String, Object> taskResponseTasks = (Map<String, Object>) taskResponseNode.get("tasks");
558+
Map<String, Object> taskResponseStatus = (Map<String, Object>) taskResponseTasks.values().iterator().next();
559+
assertThat(ObjectPath.eval("status.job_state", taskResponseStatus), expectedStates);
560+
561+
// check that the rollup job is started using the Cluster State API
562+
Map<String, Object> clusterStateResponse = toMap(client().performRequest("GET", "_cluster/state/metadata"));
563+
Map<String, Object> rollupJobTask = ObjectPath.eval("metadata.persistent_tasks.tasks.0", clusterStateResponse);
564+
assertThat(ObjectPath.eval("id", rollupJobTask), equalTo("rollup-job-test"));
565+
566+
final String jobStateField = "task.xpack/rollup/job.status.job_state";
567+
assertThat("Expected field [" + jobStateField + "] to be started or indexing in " + rollupJobTask,
568+
ObjectPath.eval(jobStateField, rollupJobTask), expectedStates);
569+
}
570+
571+
private void waitForRollUpJob(final String rollupJob, final Matcher<?> expectedStates) throws Exception {
572+
assertBusy(() -> {
573+
Response getRollupJobResponse = client().performRequest("GET", "_xpack/rollup/job/" + rollupJob);
574+
assertThat(getRollupJobResponse.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus()));
575+
assertThat(ObjectPath.eval("jobs.0.status.job_state", toMap(getRollupJobResponse)), expectedStates);
576+
}, 30L, TimeUnit.SECONDS);
577+
}
529578
}

0 commit comments

Comments
 (0)