diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/IndexingIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/IndexingIT.java index ea09cb95f7d62..065e3d02c9034 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/IndexingIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/IndexingIT.java @@ -7,23 +7,33 @@ */ package org.elasticsearch.upgrades; +import io.github.nik9000.mapmatcher.ListMatcher; + import org.apache.http.util.EntityUtils; import org.elasticsearch.Version; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; -import org.elasticsearch.core.Booleans; import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.core.Booleans; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.hamcrest.Matcher; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import static io.github.nik9000.mapmatcher.ListMatcher.matchesList; +import static io.github.nik9000.mapmatcher.MapMatcher.assertMap; +import static io.github.nik9000.mapmatcher.MapMatcher.matchesMap; import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM; +import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; @@ -228,12 +238,144 @@ private void bulk(String index, String valueSuffix, int count) throws IOExceptio b.append("{\"index\": {\"_index\": \"").append(index).append("\"}}\n"); b.append("{\"f1\": \"v").append(i).append(valueSuffix).append("\", \"f2\": ").append(i).append("}\n"); } + bulk(index, b.toString()); + } + + private static final List TSDB_DIMS = List.of("6a841a21", "947e4ced", "a4c385a1", "b47a2f4e", "df3145b3"); + private static final long[] TSDB_TIMES; + static { + String[] times = new String[] { + "2021-01-01T00:00:00Z", + "2021-01-02T00:00:00Z", + "2021-01-02T00:10:00Z", + "2021-01-02T00:20:00Z", + "2021-01-02T00:30:00Z" }; + TSDB_TIMES = new long[times.length]; + for (int i = 0; i < times.length; i++) { + TSDB_TIMES[i] = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis(times[i]); + } + } + + public void testTsdb() throws IOException { + assumeTrue("tsdb added in 8.0.0", UPGRADE_FROM_VERSION.onOrAfter(Version.V_8_0_0)); + + StringBuilder bulk = new StringBuilder(); + switch (CLUSTER_TYPE) { + case OLD: + createTsdbIndex(); + tsdbBulk(bulk, TSDB_DIMS.get(0), TSDB_TIMES[0], TSDB_TIMES[1], 0.1); + tsdbBulk(bulk, TSDB_DIMS.get(1), TSDB_TIMES[0], TSDB_TIMES[1], -0.1); + bulk("tsdb", bulk.toString()); + assertTsdbAgg(closeTo(215.95, 0.005), closeTo(-215.95, 0.005)); + return; + case MIXED: + if (FIRST_MIXED_ROUND) { + tsdbBulk(bulk, TSDB_DIMS.get(0), TSDB_TIMES[1], TSDB_TIMES[2], 0.1); + tsdbBulk(bulk, TSDB_DIMS.get(1), TSDB_TIMES[1], TSDB_TIMES[2], -0.1); + tsdbBulk(bulk, TSDB_DIMS.get(2), TSDB_TIMES[0], TSDB_TIMES[2], 1.1); + bulk("tsdb", bulk.toString()); + assertTsdbAgg(closeTo(217.45, 0.005), closeTo(-217.45, 0.005), closeTo(2391.95, 0.005)); + return; + } + tsdbBulk(bulk, TSDB_DIMS.get(0), TSDB_TIMES[2], TSDB_TIMES[3], 0.1); + tsdbBulk(bulk, TSDB_DIMS.get(1), TSDB_TIMES[2], TSDB_TIMES[3], -0.1); + tsdbBulk(bulk, TSDB_DIMS.get(2), TSDB_TIMES[2], TSDB_TIMES[3], 1.1); + tsdbBulk(bulk, TSDB_DIMS.get(3), TSDB_TIMES[0], TSDB_TIMES[3], 10); + bulk("tsdb", bulk.toString()); + assertTsdbAgg(closeTo(218.95, 0.005), closeTo(-218.95, 0.005), closeTo(2408.45, 0.005), closeTo(21895, 0.5)); + return; + case UPGRADED: + tsdbBulk(bulk, TSDB_DIMS.get(0), TSDB_TIMES[3], TSDB_TIMES[4], 0.1); + tsdbBulk(bulk, TSDB_DIMS.get(1), TSDB_TIMES[3], TSDB_TIMES[4], -0.1); + tsdbBulk(bulk, TSDB_DIMS.get(2), TSDB_TIMES[3], TSDB_TIMES[4], 1.1); + tsdbBulk(bulk, TSDB_DIMS.get(3), TSDB_TIMES[3], TSDB_TIMES[4], 10); + tsdbBulk(bulk, TSDB_DIMS.get(4), TSDB_TIMES[0], TSDB_TIMES[4], -5); + bulk("tsdb", bulk.toString()); + assertTsdbAgg( + closeTo(220.45, 0.005), + closeTo(-220.45, 0.005), + closeTo(2424.95, 0.005), + closeTo(22045, 0.5), + closeTo(-11022.5, 0.5) + ); + return; + } + } + + private void bulk(String index, String entity) throws IOException { Request bulk = new Request("POST", "/_bulk"); bulk.addParameter("refresh", "true"); - bulk.setJsonEntity(b.toString()); + bulk.setJsonEntity(entity.toString()); client().performRequest(bulk); } + private void createTsdbIndex() throws IOException { + Request createIndex = new Request("PUT", "/tsdb"); + XContentBuilder indexSpec = XContentBuilder.builder(XContentType.JSON.xContent()).startObject(); + indexSpec.startObject("mappings").startObject("properties"); + { + indexSpec.startObject("@timestamp").field("type", "date").endObject(); + indexSpec.startObject("dim").field("type", "keyword").field("time_series_dimension", true).endObject(); + } + indexSpec.endObject().endObject(); + indexSpec.startObject("settings").field("mode", "time_series").endObject(); + createIndex.setJsonEntity(Strings.toString(indexSpec.endObject())); + client().performRequest(createIndex); + } + + private void tsdbBulk(StringBuilder bulk, String dim, long timeStart, long timeEnd, double rate) throws IOException { + long delta = TimeUnit.SECONDS.toMillis(20); + double value = (timeStart - TSDB_TIMES[0]) / TimeUnit.SECONDS.toMillis(20) * rate; + for (long t = timeStart; t < timeEnd; t += delta) { + bulk.append("{\"index\": {\"_index\": \"tsdb\"}}\n"); + bulk.append("{\"@timestamp\": ").append(t); + bulk.append(", \"dim\": \"").append(dim).append("\""); + bulk.append(", \"value\": ").append(value).append("}\n"); + value += rate; + } + } + + private void assertTsdbAgg(Matcher... expected) throws IOException { + Request request = new Request("POST", "/tsdb/_search"); + request.addParameter("size", "0"); + XContentBuilder body = JsonXContent.contentBuilder().startObject(); + // TODO replace tsid runtime field with real tsid + body.startObject("runtime_mappings"); + { + body.startObject("tsid"); + { + body.field("type", "keyword"); + body.field("script", "emit('dim:' + doc['dim'].value)"); + } + body.endObject(); + } + body.endObject(); + body.startObject("aggs").startObject("tsids"); + { + body.startObject("terms").field("field", "tsid").endObject(); + body.startObject("aggs").startObject("avg"); + { + body.startObject("avg").field("field", "value").endObject(); + } + body.endObject().endObject(); + } + body.endObject().endObject(); + request.setJsonEntity(Strings.toString(body.endObject())); + ListMatcher tsidsExpected = matchesList(); + for (int d = 0; d < expected.length; d++) { +// Object key = Map.of("dim", TSDB_DIMS.get(d)); TODO use this once tsid is real + Object key = "dim:" + TSDB_DIMS.get(d); + tsidsExpected = tsidsExpected.item( + matchesMap().extraOk().entry("key", key).entry("avg", Map.of("value", expected[d])) + ); + } + assertMap( + entityAsMap(client().performRequest(request)), + matchesMap().extraOk() + .entry("aggregations", matchesMap().entry("tsids", matchesMap().extraOk().entry("buckets", tsidsExpected))) + ); + } + private void assertCount(String index, int count) throws IOException { Request searchTestIndexRequest = new Request("POST", "/" + index + "/_search"); searchTestIndexRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");