diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java index 89042a05a7bf1..e3f4d91fdd759 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java @@ -11,33 +11,8 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.WarningsHandler; -import org.elasticsearch.client.core.AcknowledgedResponse; -import org.elasticsearch.client.core.PageParams; -import org.elasticsearch.client.transform.DeleteTransformRequest; -import org.elasticsearch.client.transform.GetTransformRequest; -import org.elasticsearch.client.transform.GetTransformResponse; -import org.elasticsearch.client.transform.GetTransformStatsRequest; -import org.elasticsearch.client.transform.GetTransformStatsResponse; -import org.elasticsearch.client.transform.PutTransformRequest; -import org.elasticsearch.client.transform.transforms.DestConfig; -import org.elasticsearch.client.transform.transforms.QueryConfig; -import org.elasticsearch.client.transform.transforms.SourceConfig; -import org.elasticsearch.client.transform.transforms.TransformConfig; -import org.elasticsearch.client.transform.transforms.TransformStats; -import org.elasticsearch.client.transform.transforms.pivot.AggregationConfig; -import org.elasticsearch.client.transform.transforms.pivot.GroupConfig; -import org.elasticsearch.client.transform.transforms.pivot.PivotConfig; -import org.elasticsearch.client.transform.transforms.pivot.TermsGroupSource; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.support.XContentMapValues; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchModule; -import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; import org.junit.After; @@ -50,7 +25,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -518,83 +492,84 @@ public void testGetStatsWithContinuous() throws Exception { }, 120, TimeUnit.SECONDS); } - public void testManyTranformsUsingHlrc() throws IOException { - AggregatorFactories.Builder aggs = AggregatorFactories.builder() - .addAggregator(AggregationBuilders.avg("review_score.avg").field("stars")) - .addAggregator(AggregationBuilders.max("timestamp.max").field("timestamp")); - - TransformConfig.Builder configBuilder = TransformConfig.builder() - .setSource( - SourceConfig.builder().setIndex(REVIEWS_INDEX_NAME).setQueryConfig(new QueryConfig(QueryBuilders.matchAllQuery())).build() - ) - .setDest(DestConfig.builder().setIndex("dest").build()) - .setFrequency(TimeValue.timeValueSeconds(10)) - .setDescription("Test 10000 transform configs") - .setPivotConfig( - PivotConfig.builder() - .setGroups(GroupConfig.builder().groupBy("by-user", TermsGroupSource.builder().setField("user_id").build()).build()) - .setAggregationConfig(new AggregationConfig(aggs)) - .build() - ); - - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - int numberOfTransforms = randomIntBetween(1_500, 4_000); - for (int i = 0; i < numberOfTransforms; ++i) { - AcknowledgedResponse response = restClient.transform() - .putTransform( - new PutTransformRequest(configBuilder.setId(String.format(Locale.ROOT, "t-%05d", i)).build()), - RequestOptions.DEFAULT - ); - assertTrue(response.isAcknowledged()); - } - - for (int i = 0; i < 3; ++i) { - int from = randomIntBetween(0, numberOfTransforms - 1_000); - int size = randomIntBetween(1, 1000); + @SuppressWarnings("unchecked") + public void testManyTransforms() throws IOException { + String config = transformConfig(); + + int numberOfTransforms = randomIntBetween(1_500, 4_000); + for (int i = 0; i < numberOfTransforms; ++i) { + String transformId = String.format(Locale.ROOT, "t-%05d", i); + final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, null); + createTransformRequest.setJsonEntity(config); + assertOK(client().performRequest(createTransformRequest)); + } - GetTransformRequest request = new GetTransformRequest("*"); - request.setPageParams(new PageParams(from, size)); - GetTransformStatsRequest statsRequest = new GetTransformStatsRequest("*"); - statsRequest.setPageParams(new PageParams(from, size)); + for (int i = 0; i < 3; ++i) { + int from = randomIntBetween(0, numberOfTransforms - 1_000); + int size = randomIntBetween(1, 1000); - GetTransformResponse response = restClient.transform().getTransform(request, RequestOptions.DEFAULT); - GetTransformStatsResponse statsResponse = restClient.transform().getTransformStats(statsRequest, RequestOptions.DEFAULT); + var transforms = getTransforms(from, size); + var statsResponse = getTransformsStateAndStats(from, size); - assertEquals(numberOfTransforms, response.getCount()); - assertEquals(numberOfTransforms, statsResponse.getCount()); + assertEquals(numberOfTransforms, transforms.get("count")); + assertEquals(numberOfTransforms, statsResponse.get("count")); - List configs = response.getTransformConfigurations(); - List stats = statsResponse.getTransformsStats(); + var configs = (List>) transforms.get("transforms"); + var stats = (List>) statsResponse.get("transforms"); - assertEquals(size, configs.size()); - assertEquals(size, stats.size()); + assertEquals(size, configs.size()); + assertEquals(size, stats.size()); - assertThat(configs.get(0).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from))); - assertThat(configs.get(configs.size() - 1).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from + size - 1))); - assertThat(stats.get(0).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from))); - assertThat(stats.get(stats.size() - 1).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from + size - 1))); + assertThat(configs.get(0).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from))); + assertThat(configs.get(configs.size() - 1).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from + size - 1))); + assertThat(stats.get(0).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from))); + assertThat(stats.get(stats.size() - 1).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from + size - 1))); - if (size > 2) { - int randomElement = randomIntBetween(1, size - 1); - assertThat(configs.get(randomElement).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from + randomElement))); - assertThat(stats.get(randomElement).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from + randomElement))); - } + if (size > 2) { + int randomElement = randomIntBetween(1, size - 1); + assertThat(configs.get(randomElement).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from + randomElement))); + assertThat(stats.get(randomElement).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from + randomElement))); } + } - for (int i = 0; i < numberOfTransforms; ++i) { - AcknowledgedResponse response = restClient.transform() - .deleteTransform(new DeleteTransformRequest(String.format(Locale.ROOT, "t-%05d", i)), RequestOptions.DEFAULT); - assertTrue(response.isAcknowledged()); - } + for (int i = 0; i < numberOfTransforms; ++i) { + deleteTransform(String.format(Locale.ROOT, "t-%05d", i)); } } - protected static class TestRestHighLevelClient extends RestHighLevelClient { - private static final List X_CONTENT_ENTRIES = new SearchModule(Settings.EMPTY, emptyList()) - .getNamedXContents(); - - TestRestHighLevelClient() { - super(client(), restClient -> {}, X_CONTENT_ENTRIES); - } + private static String transformConfig() { + return """ + { + "description": "Test 10000 transform configs", + "source": { + "index":""" + "\"" + REVIEWS_INDEX_NAME + "\"" + """ + }, + "pivot": { + "group_by": { + "by-user": { + "terms": { + "field": "user_id" + } + } + }, + "aggregations": { + "review_score.avg": { + "avg": { + "field": "stars" + } + }, + "timestamp.max": { + "max": { + "field": "timestamp" + } + } + } + }, + "dest": { + "index":"dest" + }, + "frequency": "10s" + } + """; } } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java index 25f912fc52f8d..7b7b432127ec9 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java @@ -1347,8 +1347,8 @@ public void testPivotWithGeoBoundsAgg() throws Exception { )).get(0); assertThat(actualObj.get("type"), equalTo("point")); List coordinates = (List) actualObj.get("coordinates"); - assertEquals((4 + 10), coordinates.get(1), 0.000001); - assertEquals((4 + 15), coordinates.get(0), 0.000001); + assertEquals(-76.0, coordinates.get(1), 0.000001); + assertEquals(-161.0, coordinates.get(0), 0.000001); } public void testPivotWithGeoCentroidAgg() throws Exception { @@ -1411,8 +1411,8 @@ public void testPivotWithGeoCentroidAgg() throws Exception { assertEquals(3.878048780, actual.doubleValue(), 0.000001); String actualString = (String) ((List) XContentMapValues.extractValue("hits.hits._source.location", searchResult)).get(0); String[] latlon = actualString.split(","); - assertEquals((4 + 10), Double.valueOf(latlon[0]), 0.000001); - assertEquals((4 + 15), Double.valueOf(latlon[1]), 0.000001); + assertEquals(-76.0, Double.valueOf(latlon[0]), 0.000001); + assertEquals(-161.0, Double.valueOf(latlon[1]), 0.000001); } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestSpecialCasesIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestSpecialCasesIT.java index 15d556998ab3d..5fa20bb954ba6 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestSpecialCasesIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestSpecialCasesIT.java @@ -248,8 +248,8 @@ public void testRestrictiveBucketSelector() throws Exception { String indexName = "special_pivot_bucket_selector_reviews"; createReviewsIndex(indexName, 1000, 327, "date", false, 5, "affiliate_id"); - verifyDestIndexHitsCount(indexName, "special_pivot_bucket_selector-10", 10, 14); - verifyDestIndexHitsCount(indexName, "special_pivot_bucket_selector-10000", 10000, 14); + verifyDestIndexHitsCount(indexName, "special_pivot_bucket_selector-10", 10, 41); + verifyDestIndexHitsCount(indexName, "special_pivot_bucket_selector-10000", 10000, 41); } private void verifyDestIndexHitsCount(String sourceIndex, String transformId, int maxPageSearchSize, long expectedDestIndexCount) diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index a3b87ac0f0caf..1190ea420abd9 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -39,7 +39,6 @@ import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; public abstract class TransformRestTestCase extends ESRestTestCase { @@ -88,7 +87,7 @@ protected void createReviewsIndex( min = 10 + (i % 49); } int sec = 10 + (i % 49); - String location = (user + 10) + "," + (user + 15); + String location = (((user + 10) % 180) - 90) + "," + (((user + 15) % 360) - 180); String date_string = "2017-01-" + day + "T" + hour + ":" + min + ":" + sec; if (dateType.equals("date_nanos")) { @@ -123,21 +122,33 @@ protected void createReviewsIndex( if (i % 50 == 0) { bulk.append("\r\n"); - final Request bulkRequest = new Request("POST", "/_bulk"); - bulkRequest.addParameter("refresh", "true"); - bulkRequest.setJsonEntity(bulk.toString()); - client().performRequest(bulkRequest); + doBulk(bulk.toString(), true); // clear the builder bulk.setLength(0); day += 1; } } + bulk.append("\r\n"); + doBulk(bulk.toString(), true); + } - final Request bulkRequest = new Request("POST", "/_bulk"); - bulkRequest.addParameter("refresh", "true"); - bulkRequest.setJsonEntity(bulk.toString()); - client().performRequest(bulkRequest); + @SuppressWarnings("unchecked") + protected void doBulk(String bulkDocuments, boolean refresh) throws IOException { + Request bulkRequest = new Request("POST", "/_bulk"); + if (refresh) { + bulkRequest.addParameter("refresh", "true"); + } + bulkRequest.setJsonEntity(bulkDocuments); + bulkRequest.setOptions(RequestOptions.DEFAULT); + Response bulkResponse = client().performRequest(bulkRequest); + assertOK(bulkResponse); + var bulkMap = entityAsMap(bulkResponse); + var hasErrors = (boolean) bulkMap.get("errors"); + if (hasErrors) { + var items = (List>) bulkMap.get("items"); + fail("Bulk item failures: " + items.toString()); + } } protected void putReviewsIndex(String indexName, String dateType, boolean isDataStream) throws IOException { @@ -463,6 +474,13 @@ protected static List> getTransforms(List getTransforms(int from, int size) throws IOException { + Request request = new Request("GET", getTransformEndpoint() + "_all?from=" + from + "&size=" + size); + Response response = adminClient().performRequest(request); + return entityAsMap(response); + } + protected static String getTransformState(String transformId) throws IOException { Map transformStatsAsMap = getTransformStateAndStats(transformId); return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state", transformStatsAsMap); @@ -477,6 +495,13 @@ protected static String getTransformState(String transformId) throws IOException return (Map) transforms.get(0); } + protected static Map getTransformsStateAndStats(int from, int size) throws IOException { + Response statsResponse = client().performRequest( + new Request("GET", getTransformEndpoint() + "_stats?from=" + from + "&size=" + size) + ); + return entityAsMap(statsResponse); + } + protected static void deleteTransform(String transformId) throws IOException { Request request = new Request("DELETE", getTransformEndpoint() + transformId); request.addParameter("ignore", "404"); // Ignore 404s because they imply someone was racing us to delete this diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformProgressIT.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformProgressIT.java similarity index 81% rename from x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformProgressIT.java rename to x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformProgressIT.java index 854d45d5e2517..3a4847ed02427 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformProgressIT.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformProgressIT.java @@ -10,24 +10,16 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.indices.CreateIndexRequest; -import org.elasticsearch.client.indices.CreateIndexResponse; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.transform.transforms.DestConfig; @@ -39,26 +31,25 @@ import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig; import org.elasticsearch.xpack.core.transform.transforms.pivot.HistogramGroupSource; import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig; +import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase; import org.elasticsearch.xpack.transform.transforms.Function; import org.elasticsearch.xpack.transform.transforms.pivot.Pivot; -import java.nio.charset.StandardCharsets; -import java.util.Base64; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.xpack.transform.integration.TransformRestTestCase.REVIEWS_INDEX_NAME; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @SuppressWarnings("removal") -public class TransformProgressIT extends ESRestTestCase { +public class TransformProgressIT extends TransformSingleNodeTestCase { + private static final String REVIEWS_INDEX_NAME = "reviews"; + protected void createReviewsIndex(int userWithMissingBuckets) throws Exception { final int numDocs = 1000; - final RestHighLevelClient restClient = new TestRestHighLevelClient(); // create mapping try (XContentBuilder builder = jsonBuilder()) { @@ -83,8 +74,8 @@ protected void createReviewsIndex(int userWithMissingBuckets) throws Exception { .endObject(); } builder.endObject(); - CreateIndexResponse response = restClient.indices() - .create(new CreateIndexRequest(REVIEWS_INDEX_NAME).mapping(builder), RequestOptions.DEFAULT); + + var response = client().admin().indices().prepareCreate(REVIEWS_INDEX_NAME).setMapping(builder).get(); assertThat(response.isAcknowledged(), is(true)); } @@ -120,15 +111,15 @@ protected void createReviewsIndex(int userWithMissingBuckets) throws Exception { bulk.add(new IndexRequest().source(sourceBuilder.toString(), XContentType.JSON)); if (i % 50 == 0) { - BulkResponse response = restClient.bulk(bulk, RequestOptions.DEFAULT); + BulkResponse response = client().bulk(bulk).actionGet(); assertThat(response.buildFailureMessage(), response.hasFailures(), is(false)); bulk = new BulkRequest(REVIEWS_INDEX_NAME); day += 1; } } - BulkResponse bulkResponse = restClient.bulk(bulk, RequestOptions.DEFAULT); + BulkResponse bulkResponse = client().bulk(bulk).actionGet(); assertFalse(bulkResponse.hasFailures()); - restClient.indices().refresh(new RefreshRequest(REVIEWS_INDEX_NAME), RequestOptions.DEFAULT); + client().admin().indices().prepareRefresh(REVIEWS_INDEX_NAME).get(); } public void testGetProgress() throws Exception { @@ -214,14 +205,8 @@ public void assertGetProgress(int userWithMissingBuckets) throws Exception { assertThat(progress.getPercentComplete(), equalTo(100.0)); } - deleteIndex(REVIEWS_INDEX_NAME); - } - - @Override - protected Settings restClientSettings() { - final String token = "Basic " - + Base64.getEncoder().encodeToString(("x_pack_rest_user:x-pack-test-password").getBytes(StandardCharsets.UTF_8)); - return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); + var ackResponse = client().admin().indices().prepareDelete(REVIEWS_INDEX_NAME).get(); + assertTrue(ackResponse.isAcknowledged()); } private TransformProgress getProgress(Function function, SearchRequest searchRequest) throws Exception { @@ -229,14 +214,11 @@ private TransformProgress getProgress(Function function, SearchRequest searchReq final AtomicReference progressHolder = new AtomicReference<>(); final AtomicReference exceptionHolder = new AtomicReference<>(); - try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { - SearchResponse response = restClient.search(searchRequest, RequestOptions.DEFAULT); - - function.getInitialProgressFromResponse( - response, - new LatchedActionListener<>(ActionListener.wrap(progressHolder::set, e -> { exceptionHolder.set(e); }), latch) - ); - } + SearchResponse response = client().search(searchRequest).actionGet(); + function.getInitialProgressFromResponse( + response, + new LatchedActionListener<>(ActionListener.wrap(progressHolder::set, exceptionHolder::set), latch) + ); assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS)); if (exceptionHolder.get() != null) { @@ -246,12 +228,6 @@ private TransformProgress getProgress(Function function, SearchRequest searchReq return progressHolder.get(); } - private class TestRestHighLevelClient extends RestHighLevelClient { - TestRestHighLevelClient() { - super(client(), restClient -> {}, Collections.emptyList()); - } - } - private static SearchRequest getProgressQuery(Function function, String[] source, QueryBuilder query) { SearchRequest searchRequest = new SearchRequest(source); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/TransformSurvivesUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/TransformSurvivesUpgradeIT.java index 6f72a0b01651e..30404af3ec328 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/TransformSurvivesUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/TransformSurvivesUpgradeIT.java @@ -15,27 +15,12 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.core.IndexerState; -import org.elasticsearch.client.transform.GetTransformStatsResponse; -import org.elasticsearch.client.transform.transforms.DestConfig; -import org.elasticsearch.client.transform.transforms.SourceConfig; -import org.elasticsearch.client.transform.transforms.TimeSyncConfig; -import org.elasticsearch.client.transform.transforms.TransformConfig; -import org.elasticsearch.client.transform.transforms.TransformStats; -import org.elasticsearch.client.transform.transforms.pivot.GroupConfig; -import org.elasticsearch.client.transform.transforms.pivot.PivotConfig; -import org.elasticsearch.client.transform.transforms.pivot.TermsGroupSource; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.core.Booleans; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.xcontent.DeprecationHandler; -import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentParser; -import org.elasticsearch.xcontent.XContentType; import java.io.IOException; import java.time.Instant; @@ -138,48 +123,75 @@ private void createAndStartContinuousTransform() throws Exception { totalDocsWrittenSum += docs * ENTITIES.size(); } long totalDocsWritten = totalDocsWrittenSum; - TransformConfig config = TransformConfig.builder() - .setSyncConfig(TimeSyncConfig.builder().setField("timestamp").setDelay(TimeValue.timeValueSeconds(1)).build()) - .setPivotConfig( - PivotConfig.builder() - .setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("stars").field("stars"))) - .setGroups(GroupConfig.builder().groupBy("user_id", TermsGroupSource.builder().setField("user_id").build()).build()) - .build() - ) - .setDest(DestConfig.builder().setIndex(CONTINUOUS_TRANSFORM_ID + "_idx").build()) - .setSource(SourceConfig.builder().setIndex(CONTINUOUS_TRANSFORM_SOURCE).build()) - .setId(CONTINUOUS_TRANSFORM_ID) - .setFrequency(TimeValue.timeValueSeconds(1)) - .build(); - putTransform(CONTINUOUS_TRANSFORM_ID, config); + + putTransform(CONTINUOUS_TRANSFORM_ID, transformConfig()); startTransform(CONTINUOUS_TRANSFORM_ID); waitUntilAfterCheckpoint(CONTINUOUS_TRANSFORM_ID, 0L); assertBusy(() -> { - TransformStats stateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID); - assertThat(stateAndStats.getIndexerStats().getDocumentsIndexed(), equalTo((long) ENTITIES.size())); - assertThat(stateAndStats.getIndexerStats().getDocumentsProcessed(), equalTo(totalDocsWritten)); + var stateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID); + assertThat( + ((Integer) XContentMapValues.extractValue("stats.documents_indexed", stateAndStats)).longValue(), + equalTo(totalDocsWritten) + ); + assertThat((Integer) XContentMapValues.extractValue("stats.documents_processed", stateAndStats), equalTo(ENTITIES.size())); // Even if we get back to started, we may periodically get set back to `indexing` when triggered. // Though short lived due to no changes on the source indices, it could result in flaky test behavior - assertThat(stateAndStats.getState(), oneOf(TransformStats.State.STARTED, TransformStats.State.INDEXING)); + assertThat(stateAndStats.get("state"), oneOf("started", "indexing")); }, 120, TimeUnit.SECONDS); // We want to make sure our latest state is written before we turn the node off, this makes the testing more reliable awaitWrittenIndexerState(CONTINUOUS_TRANSFORM_ID, IndexerState.STARTED.value()); } + private static String transformConfig() { + return """ + { + "source": { + "index":""" + "\"" + CONTINUOUS_TRANSFORM_SOURCE + "\"" + """ + }, + "pivot": { + "group_by": { + "user_id": { + "terms": { + "field": "user_id" + } + } + }, + "aggregations": { + "stars": { + "avg": { + "field": "stars" + } + } + } + }, + "dest": { + "index":""" + "\"" + CONTINUOUS_TRANSFORM_ID + "_idx" + "\"" + """ + }, + "frequency": "1s", + "sync": { + "time": { + "field": "timestamp", + "delay": "1s" + } + } + } + """; + } + @SuppressWarnings("unchecked") private void verifyContinuousTransformHandlesData(long expectedLastCheckpoint) throws Exception { // A continuous transform should automatically become started when it gets assigned to a node // if it was assigned to the node that was removed from the cluster assertBusy(() -> { - TransformStats stateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID); - assertThat(stateAndStats.getState(), oneOf(TransformStats.State.STARTED, TransformStats.State.INDEXING)); + var stateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID); + assertThat(stateAndStats.get("state"), oneOf("started", "indexing")); }, 120, TimeUnit.SECONDS); - TransformStats previousStateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID); + var previousStateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID); // Add a new user and write data to it // This is so we can have more reliable data counts, as writing to existing entities requires @@ -193,17 +205,17 @@ private void verifyContinuousTransformHandlesData(long expectedLastCheckpoint) t waitUntilAfterCheckpoint(CONTINUOUS_TRANSFORM_ID, expectedLastCheckpoint); - assertBusy( - () -> assertThat( - getTransformStats(CONTINUOUS_TRANSFORM_ID).getIndexerStats().getDocumentsProcessed(), - greaterThanOrEqualTo(docs + previousStateAndStats.getIndexerStats().getDocumentsProcessed()) - ), - 120, - TimeUnit.SECONDS - ); - TransformStats stateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID); + assertBusy(() -> { + Map currentStats = getTransformStats(CONTINUOUS_TRANSFORM_ID); + assertThat( + (Integer) XContentMapValues.extractValue("stats.documents_processed", currentStats), + greaterThanOrEqualTo(docs + (Integer) XContentMapValues.extractValue("stats.documents_processed", previousStateAndStats)) + ); + }, 120, TimeUnit.SECONDS); + + var stateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID); - assertThat(stateAndStats.getState(), oneOf(TransformStats.State.STARTED, TransformStats.State.INDEXING)); + assertThat(stateAndStats.get("state"), oneOf("started", "indexing")); awaitWrittenIndexerState(CONTINUOUS_TRANSFORM_ID, (responseBody) -> { Map indexerStats = (Map) ((List) XContentMapValues.extractValue( "hits.hits._source.stats", @@ -211,11 +223,11 @@ private void verifyContinuousTransformHandlesData(long expectedLastCheckpoint) t )).get(0); assertThat( (Integer) indexerStats.get("documents_indexed"), - greaterThan(Long.valueOf(previousStateAndStats.getIndexerStats().getDocumentsIndexed()).intValue()) + greaterThan((Integer) XContentMapValues.extractValue("stats.documents_indexed", previousStateAndStats)) ); assertThat( (Integer) indexerStats.get("documents_processed"), - greaterThan(Long.valueOf(previousStateAndStats.getIndexerStats().getDocumentsProcessed()).intValue()) + greaterThan((Integer) XContentMapValues.extractValue("stats.documents_processed", previousStateAndStats)) ); }); } @@ -281,9 +293,9 @@ private String getTransformEndpoint() { return TRANSFORM_ENDPOINT; } - private void putTransform(String id, TransformConfig config) throws IOException { + private void putTransform(String id, String config) throws IOException { final Request createDataframeTransformRequest = new Request("PUT", getTransformEndpoint() + id); - createDataframeTransformRequest.setJsonEntity(Strings.toString(config)); + createDataframeTransformRequest.setJsonEntity(config); Response response = client().performRequest(createDataframeTransformRequest); assertEquals(200, response.getStatusLine().getStatusCode()); } @@ -305,31 +317,25 @@ private void stopTransform(String id) throws IOException { assertEquals(200, response.getStatusLine().getStatusCode()); } - private TransformStats getTransformStats(String id) throws IOException { + @SuppressWarnings("unchecked") + private Map getTransformStats(String id) throws IOException { final Request getStats = new Request("GET", getTransformEndpoint() + id + "/_stats"); Response response = client().performRequest(getStats); assertEquals(200, response.getStatusLine().getStatusCode()); - XContentType xContentType = XContentType.fromMediaType(response.getEntity().getContentType().getValue()); - try ( - XContentParser parser = xContentType.xContent() - .createParser( - NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, - response.getEntity().getContent() - ) - ) { - GetTransformStatsResponse resp = GetTransformStatsResponse.fromXContent(parser); - assertThat(resp.getTransformsStats(), hasSize(1)); - return resp.getTransformsStats().get(0); - } + var responseMap = entityAsMap(response); + var stats = (List>) responseMap.get("transforms"); + assertThat(stats, hasSize(1)); + return stats.get(0); } private void waitUntilAfterCheckpoint(String id, long currentCheckpoint) throws Exception { - assertBusy( - () -> assertThat(getTransformStats(id).getCheckpointingInfo().getLast().getCheckpoint(), greaterThan(currentCheckpoint)), - 60, - TimeUnit.SECONDS - ); + assertBusy(() -> { + var statsMap = getTransformStats(id); + assertThat( + ((Integer) XContentMapValues.extractValue("checkpointing.last.checkpoint", statsMap)).longValue(), + greaterThan(currentCheckpoint) + ); + }, 60, TimeUnit.SECONDS); } private void createIndex(String indexName) throws IOException {