Skip to content

[TRANSFORM] Remove HLRC from single node tests #84220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,8 @@
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.WarningsHandler;
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.core.PageParams;
import org.elasticsearch.client.transform.DeleteTransformRequest;
import org.elasticsearch.client.transform.GetTransformRequest;
import org.elasticsearch.client.transform.GetTransformResponse;
import org.elasticsearch.client.transform.GetTransformStatsRequest;
import org.elasticsearch.client.transform.GetTransformStatsResponse;
import org.elasticsearch.client.transform.PutTransformRequest;
import org.elasticsearch.client.transform.transforms.DestConfig;
import org.elasticsearch.client.transform.transforms.QueryConfig;
import org.elasticsearch.client.transform.transforms.SourceConfig;
import org.elasticsearch.client.transform.transforms.TransformConfig;
import org.elasticsearch.client.transform.transforms.TransformStats;
import org.elasticsearch.client.transform.transforms.pivot.AggregationConfig;
import org.elasticsearch.client.transform.transforms.pivot.GroupConfig;
import org.elasticsearch.client.transform.transforms.pivot.PivotConfig;
import org.elasticsearch.client.transform.transforms.pivot.TermsGroupSource;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.junit.After;
Expand All @@ -50,7 +25,6 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -518,83 +492,84 @@ public void testGetStatsWithContinuous() throws Exception {
}, 120, TimeUnit.SECONDS);
}

public void testManyTranformsUsingHlrc() throws IOException {
AggregatorFactories.Builder aggs = AggregatorFactories.builder()
.addAggregator(AggregationBuilders.avg("review_score.avg").field("stars"))
.addAggregator(AggregationBuilders.max("timestamp.max").field("timestamp"));

TransformConfig.Builder configBuilder = TransformConfig.builder()
.setSource(
SourceConfig.builder().setIndex(REVIEWS_INDEX_NAME).setQueryConfig(new QueryConfig(QueryBuilders.matchAllQuery())).build()
)
.setDest(DestConfig.builder().setIndex("dest").build())
.setFrequency(TimeValue.timeValueSeconds(10))
.setDescription("Test 10000 transform configs")
.setPivotConfig(
PivotConfig.builder()
.setGroups(GroupConfig.builder().groupBy("by-user", TermsGroupSource.builder().setField("user_id").build()).build())
.setAggregationConfig(new AggregationConfig(aggs))
.build()
);

try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
int numberOfTransforms = randomIntBetween(1_500, 4_000);
for (int i = 0; i < numberOfTransforms; ++i) {
AcknowledgedResponse response = restClient.transform()
.putTransform(
new PutTransformRequest(configBuilder.setId(String.format(Locale.ROOT, "t-%05d", i)).build()),
RequestOptions.DEFAULT
);
assertTrue(response.isAcknowledged());
}

for (int i = 0; i < 3; ++i) {
int from = randomIntBetween(0, numberOfTransforms - 1_000);
int size = randomIntBetween(1, 1000);
@SuppressWarnings("unchecked")
public void testManyTransforms() throws IOException {
String config = transformConfig();

int numberOfTransforms = randomIntBetween(1_500, 4_000);
for (int i = 0; i < numberOfTransforms; ++i) {
String transformId = String.format(Locale.ROOT, "t-%05d", i);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, null);
createTransformRequest.setJsonEntity(config);
assertOK(client().performRequest(createTransformRequest));
}

GetTransformRequest request = new GetTransformRequest("*");
request.setPageParams(new PageParams(from, size));
GetTransformStatsRequest statsRequest = new GetTransformStatsRequest("*");
statsRequest.setPageParams(new PageParams(from, size));
for (int i = 0; i < 3; ++i) {
int from = randomIntBetween(0, numberOfTransforms - 1_000);
int size = randomIntBetween(1, 1000);

GetTransformResponse response = restClient.transform().getTransform(request, RequestOptions.DEFAULT);
GetTransformStatsResponse statsResponse = restClient.transform().getTransformStats(statsRequest, RequestOptions.DEFAULT);
var transforms = getTransforms(from, size);
var statsResponse = getTransformStateAndStats(from, size);

assertEquals(numberOfTransforms, response.getCount());
assertEquals(numberOfTransforms, statsResponse.getCount());
assertEquals(numberOfTransforms, transforms.get("count"));
assertEquals(numberOfTransforms, statsResponse.get("count"));

List<TransformConfig> configs = response.getTransformConfigurations();
List<TransformStats> stats = statsResponse.getTransformsStats();
var configs = (List<Map<String, Object>>) transforms.get("transforms");
var stats = (List<Map<String, Object>>) statsResponse.get("transforms");

assertEquals(size, configs.size());
assertEquals(size, stats.size());
assertEquals(size, configs.size());
assertEquals(size, stats.size());

assertThat(configs.get(0).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from)));
assertThat(configs.get(configs.size() - 1).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from + size - 1)));
assertThat(stats.get(0).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from)));
assertThat(stats.get(stats.size() - 1).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from + size - 1)));
assertThat(configs.get(0).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from)));
assertThat(configs.get(configs.size() - 1).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from + size - 1)));
assertThat(stats.get(0).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from)));
assertThat(stats.get(stats.size() - 1).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from + size - 1)));

if (size > 2) {
int randomElement = randomIntBetween(1, size - 1);
assertThat(configs.get(randomElement).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from + randomElement)));
assertThat(stats.get(randomElement).getId(), equalTo(String.format(Locale.ROOT, "t-%05d", from + randomElement)));
}
if (size > 2) {
int randomElement = randomIntBetween(1, size - 1);
assertThat(configs.get(randomElement).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from + randomElement)));
assertThat(stats.get(randomElement).get("id"), equalTo(String.format(Locale.ROOT, "t-%05d", from + randomElement)));
}
}

for (int i = 0; i < numberOfTransforms; ++i) {
AcknowledgedResponse response = restClient.transform()
.deleteTransform(new DeleteTransformRequest(String.format(Locale.ROOT, "t-%05d", i)), RequestOptions.DEFAULT);
assertTrue(response.isAcknowledged());
}
for (int i = 0; i < numberOfTransforms; ++i) {
deleteTransform(String.format(Locale.ROOT, "t-%05d", i));
}
}

protected static class TestRestHighLevelClient extends RestHighLevelClient {
private static final List<NamedXContentRegistry.Entry> X_CONTENT_ENTRIES = new SearchModule(Settings.EMPTY, emptyList())
.getNamedXContents();

TestRestHighLevelClient() {
super(client(), restClient -> {}, X_CONTENT_ENTRIES);
}
private String transformConfig() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static?

return """
{
"description": "Test 10000 transform configs",
"source": {
"index":""" + "\"" + REVIEWS_INDEX_NAME + "\"" + """
},
"pivot": {
"group_by": {
"by-user": {
"terms": {
"field": "user_id"
}
}
},
"aggregations": {
"review_score.avg": {
"avg": {
"field": "stars"
}
},
"timestamp.max": {
"max": {
"field": "timestamp"
}
}
}
},
"dest": {
"index":"dest"
},
"frequency": "10s"
}
""";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1347,8 +1347,8 @@ public void testPivotWithGeoBoundsAgg() throws Exception {
)).get(0);
assertThat(actualObj.get("type"), equalTo("point"));
List<Double> coordinates = (List<Double>) actualObj.get("coordinates");
assertEquals((4 + 10), coordinates.get(1), 0.000001);
assertEquals((4 + 15), coordinates.get(0), 0.000001);
assertEquals(-76.0, coordinates.get(1), 0.000001);
assertEquals(-161.0, coordinates.get(0), 0.000001);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also due to the lat/lon change

}

public void testPivotWithGeoCentroidAgg() throws Exception {
Expand Down Expand Up @@ -1411,8 +1411,8 @@ public void testPivotWithGeoCentroidAgg() throws Exception {
assertEquals(3.878048780, actual.doubleValue(), 0.000001);
String actualString = (String) ((List<?>) XContentMapValues.extractValue("hits.hits._source.location", searchResult)).get(0);
String[] latlon = actualString.split(",");
assertEquals((4 + 10), Double.valueOf(latlon[0]), 0.000001);
assertEquals((4 + 15), Double.valueOf(latlon[1]), 0.000001);
assertEquals(-76.0, Double.valueOf(latlon[0]), 0.000001);
assertEquals(-161.0, Double.valueOf(latlon[1]), 0.000001);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ public void testRestrictiveBucketSelector() throws Exception {
String indexName = "special_pivot_bucket_selector_reviews";
createReviewsIndex(indexName, 1000, 327, "date", false, 5, "affiliate_id");

verifyDestIndexHitsCount(indexName, "special_pivot_bucket_selector-10", 10, 14);
verifyDestIndexHitsCount(indexName, "special_pivot_bucket_selector-10000", 10000, 14);
verifyDestIndexHitsCount(indexName, "special_pivot_bucket_selector-10", 10, 41);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change here is the result of fixing the geo-coordinates that were failing bulk upload

verifyDestIndexHitsCount(indexName, "special_pivot_bucket_selector-10000", 10000, 41);
}

private void verifyDestIndexHitsCount(String sourceIndex, String transformId, int maxPageSearchSize, long expectedDestIndexCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

public abstract class TransformRestTestCase extends ESRestTestCase {

Expand Down Expand Up @@ -88,7 +87,7 @@ protected void createReviewsIndex(
min = 10 + (i % 49);
}
int sec = 10 + (i % 49);
String location = (user + 10) + "," + (user + 15);
String location = (((user + 10) % 180) - 90) + "," + (((user + 15) % 360) - 180);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bulk upload was failing for the test TransformPivotRestSpecialCasesIT::testRestrictiveBucketSelector because the lat,lon values were outside of the range.


String date_string = "2017-01-" + day + "T" + hour + ":" + min + ":" + sec;
if (dateType.equals("date_nanos")) {
Expand Down Expand Up @@ -123,21 +122,33 @@ protected void createReviewsIndex(

if (i % 50 == 0) {
bulk.append("\r\n");
final Request bulkRequest = new Request("POST", "/_bulk");
bulkRequest.addParameter("refresh", "true");
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);
doBulk(bulk.toString(), true);
// clear the builder
bulk.setLength(0);
day += 1;
}
}

bulk.append("\r\n");
doBulk(bulk.toString(), true);
}

final Request bulkRequest = new Request("POST", "/_bulk");
bulkRequest.addParameter("refresh", "true");
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);
@SuppressWarnings("unchecked")
protected void doBulk(String bulkDocuments, boolean refresh) throws IOException {
Request bulkRequest = new Request("POST", "/_bulk");
if (refresh) {
bulkRequest.addParameter("refresh", "true");
}
bulkRequest.setJsonEntity(bulkDocuments);
bulkRequest.setOptions(RequestOptions.DEFAULT);
Response bulkResponse = client().performRequest(bulkRequest);
assertOK(bulkResponse);
var bulkMap = entityAsMap(bulkResponse);
var hasErrors = (boolean) bulkMap.get("errors");
if (hasErrors) {
var items = (List<Map<String, Object>>) bulkMap.get("items");
fail("Bulk item failures: " + items.toString());
}
}

protected void putReviewsIndex(String indexName, String dateType, boolean isDataStream) throws IOException {
Expand Down Expand Up @@ -463,6 +474,13 @@ protected static List<Map<String, Object>> getTransforms(List<Map<String, String
return transformConfigs == null ? Collections.emptyList() : transformConfigs;
}

@SuppressWarnings("unchecked")
protected static Map<String, Object> getTransforms(int from, int size) throws IOException {
Request request = new Request("GET", getTransformEndpoint() + "_all?from=" + from + "&size=" + size);
Response response = adminClient().performRequest(request);
return entityAsMap(response);
}

protected static String getTransformState(String transformId) throws IOException {
Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformId);
return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state", transformStatsAsMap);
Expand All @@ -477,6 +495,13 @@ protected static String getTransformState(String transformId) throws IOException
return (Map<?, ?>) transforms.get(0);
}

protected static Map<String, Object> getTransformStateAndStats(int from, int size) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it returns a number of transforms, would it make sense to name it getTransformsStateAndStats?

Response statsResponse = client().performRequest(
new Request("GET", getTransformEndpoint() + "_stats?from=" + from + "&size=" + size)
);
return entityAsMap(statsResponse);
}

protected static void deleteTransform(String transformId) throws IOException {
Request request = new Request("DELETE", getTransformEndpoint() + transformId);
request.addParameter("ignore", "404"); // Ignore 404s because they imply someone was racing us to delete this
Expand Down
Loading