Skip to content

TSDB: Basic rolling upgrade test #78028

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 1, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,33 @@
*/
package org.elasticsearch.upgrades;

import io.github.nik9000.mapmatcher.ListMatcher;

import org.apache.http.util.EntityUtils;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.hamcrest.Matcher;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static io.github.nik9000.mapmatcher.ListMatcher.matchesList;
import static io.github.nik9000.mapmatcher.MapMatcher.assertMap;
import static io.github.nik9000.mapmatcher.MapMatcher.matchesMap;
import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -228,12 +238,144 @@ private void bulk(String index, String valueSuffix, int count) throws IOExceptio
b.append("{\"index\": {\"_index\": \"").append(index).append("\"}}\n");
b.append("{\"f1\": \"v").append(i).append(valueSuffix).append("\", \"f2\": ").append(i).append("}\n");
}
bulk(index, b.toString());
}

private static final List<String> TSDB_DIMS = List.of("6a841a21", "947e4ced", "a4c385a1", "b47a2f4e", "df3145b3");
private static final long[] TSDB_TIMES;
static {
String[] times = new String[] {
"2021-01-01T00:00:00Z",
"2021-01-02T00:00:00Z",
"2021-01-02T00:10:00Z",
"2021-01-02T00:20:00Z",
"2021-01-02T00:30:00Z" };
TSDB_TIMES = new long[times.length];
for (int i = 0; i < times.length; i++) {
TSDB_TIMES[i] = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis(times[i]);
}
}

public void testTsdb() throws IOException {
assumeTrue("tsdb added in 8.0.0", UPGRADE_FROM_VERSION.onOrAfter(Version.V_8_0_0));

StringBuilder bulk = new StringBuilder();
switch (CLUSTER_TYPE) {
case OLD:
createTsdbIndex();
tsdbBulk(bulk, TSDB_DIMS.get(0), TSDB_TIMES[0], TSDB_TIMES[1], 0.1);
tsdbBulk(bulk, TSDB_DIMS.get(1), TSDB_TIMES[0], TSDB_TIMES[1], -0.1);
bulk("tsdb", bulk.toString());
assertTsdbAgg(closeTo(215.95, 0.005), closeTo(-215.95, 0.005));
return;
case MIXED:
if (FIRST_MIXED_ROUND) {
tsdbBulk(bulk, TSDB_DIMS.get(0), TSDB_TIMES[1], TSDB_TIMES[2], 0.1);
tsdbBulk(bulk, TSDB_DIMS.get(1), TSDB_TIMES[1], TSDB_TIMES[2], -0.1);
tsdbBulk(bulk, TSDB_DIMS.get(2), TSDB_TIMES[0], TSDB_TIMES[2], 1.1);
bulk("tsdb", bulk.toString());
assertTsdbAgg(closeTo(217.45, 0.005), closeTo(-217.45, 0.005), closeTo(2391.95, 0.005));
return;
}
tsdbBulk(bulk, TSDB_DIMS.get(0), TSDB_TIMES[2], TSDB_TIMES[3], 0.1);
tsdbBulk(bulk, TSDB_DIMS.get(1), TSDB_TIMES[2], TSDB_TIMES[3], -0.1);
tsdbBulk(bulk, TSDB_DIMS.get(2), TSDB_TIMES[2], TSDB_TIMES[3], 1.1);
tsdbBulk(bulk, TSDB_DIMS.get(3), TSDB_TIMES[0], TSDB_TIMES[3], 10);
bulk("tsdb", bulk.toString());
assertTsdbAgg(closeTo(218.95, 0.005), closeTo(-218.95, 0.005), closeTo(2408.45, 0.005), closeTo(21895, 0.5));
return;
case UPGRADED:
tsdbBulk(bulk, TSDB_DIMS.get(0), TSDB_TIMES[3], TSDB_TIMES[4], 0.1);
tsdbBulk(bulk, TSDB_DIMS.get(1), TSDB_TIMES[3], TSDB_TIMES[4], -0.1);
tsdbBulk(bulk, TSDB_DIMS.get(2), TSDB_TIMES[3], TSDB_TIMES[4], 1.1);
tsdbBulk(bulk, TSDB_DIMS.get(3), TSDB_TIMES[3], TSDB_TIMES[4], 10);
tsdbBulk(bulk, TSDB_DIMS.get(4), TSDB_TIMES[0], TSDB_TIMES[4], -5);
bulk("tsdb", bulk.toString());
assertTsdbAgg(
closeTo(220.45, 0.005),
closeTo(-220.45, 0.005),
closeTo(2424.95, 0.005),
closeTo(22045, 0.5),
closeTo(-11022.5, 0.5)
);
return;
}
}

private void bulk(String index, String entity) throws IOException {
Request bulk = new Request("POST", "/_bulk");
bulk.addParameter("refresh", "true");
bulk.setJsonEntity(b.toString());
bulk.setJsonEntity(entity.toString());
client().performRequest(bulk);
}

private void createTsdbIndex() throws IOException {
Request createIndex = new Request("PUT", "/tsdb");
XContentBuilder indexSpec = XContentBuilder.builder(XContentType.JSON.xContent()).startObject();
indexSpec.startObject("mappings").startObject("properties");
{
indexSpec.startObject("@timestamp").field("type", "date").endObject();
indexSpec.startObject("dim").field("type", "keyword").field("time_series_dimension", true).endObject();
}
indexSpec.endObject().endObject();
indexSpec.startObject("settings").field("mode", "time_series").endObject();
createIndex.setJsonEntity(Strings.toString(indexSpec.endObject()));
client().performRequest(createIndex);
}

private void tsdbBulk(StringBuilder bulk, String dim, long timeStart, long timeEnd, double rate) throws IOException {
long delta = TimeUnit.SECONDS.toMillis(20);
double value = (timeStart - TSDB_TIMES[0]) / TimeUnit.SECONDS.toMillis(20) * rate;
for (long t = timeStart; t < timeEnd; t += delta) {
bulk.append("{\"index\": {\"_index\": \"tsdb\"}}\n");
bulk.append("{\"@timestamp\": ").append(t);
bulk.append(", \"dim\": \"").append(dim).append("\"");
bulk.append(", \"value\": ").append(value).append("}\n");
value += rate;
}
}

private void assertTsdbAgg(Matcher<?>... expected) throws IOException {
Request request = new Request("POST", "/tsdb/_search");
request.addParameter("size", "0");
XContentBuilder body = JsonXContent.contentBuilder().startObject();
// TODO replace tsid runtime field with real tsid
body.startObject("runtime_mappings");
{
body.startObject("tsid");
{
body.field("type", "keyword");
body.field("script", "emit('dim:' + doc['dim'].value)");
}
body.endObject();
}
body.endObject();
body.startObject("aggs").startObject("tsids");
{
body.startObject("terms").field("field", "tsid").endObject();
body.startObject("aggs").startObject("avg");
{
body.startObject("avg").field("field", "value").endObject();
}
body.endObject().endObject();
}
body.endObject().endObject();
request.setJsonEntity(Strings.toString(body.endObject()));
ListMatcher tsidsExpected = matchesList();
for (int d = 0; d < expected.length; d++) {
// Object key = Map.of("dim", TSDB_DIMS.get(d)); TODO use this once tsid is real
Object key = "dim:" + TSDB_DIMS.get(d);
tsidsExpected = tsidsExpected.item(
matchesMap().extraOk().entry("key", key).entry("avg", Map.of("value", expected[d]))
);
}
assertMap(
entityAsMap(client().performRequest(request)),
matchesMap().extraOk()
.entry("aggregations", matchesMap().entry("tsids", matchesMap().extraOk().entry("buckets", tsidsExpected)))
);
}

private void assertCount(String index, int count) throws IOException {
Request searchTestIndexRequest = new Request("POST", "/" + index + "/_search");
searchTestIndexRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
Expand Down