Skip to content

Commit 4eea602

Browse files
authored
Add a snapshot test module to delay shard aggregations (elastic#62082) (elastic#62359)
This change adds an aggregation that can be used to delay the query phase execution on shards with a configurable time: { "aggs": { "delay": { "shard_delay": { "value": "30s" }, "aggs": { "host": { "terms": { "field": "hostname" } } } } } } This test module is built on top of elastic#61954 so the aggregation will be available only within snapshots since this module is not meant to be used in production. Closes elastic#54159
1 parent 3ed60df commit 4eea602

File tree

7 files changed

+387
-0
lines changed

7 files changed

+387
-0
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
apply plugin: 'elasticsearch.esplugin'
21+
apply plugin: 'elasticsearch.yaml-rest-test'
22+
23+
esplugin {
24+
description 'A test module that allows to delay aggregations on shards with a configurable time'
25+
classname 'org.elasticsearch.search.aggregations.DelayedShardAggregationPlugin'
26+
}
27+
28+
restResources {
29+
restApi {
30+
includeCore '_common', 'indices', 'index', 'cluster', 'search'
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.search.aggregations;
20+
21+
import org.elasticsearch.action.index.IndexRequestBuilder;
22+
import org.elasticsearch.action.search.SearchResponse;
23+
import org.elasticsearch.common.unit.TimeValue;
24+
import org.elasticsearch.plugins.Plugin;
25+
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
26+
import org.elasticsearch.search.aggregations.metrics.InternalMax;
27+
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
28+
import org.elasticsearch.test.ESIntegTestCase;
29+
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.Collection;
33+
import java.util.List;
34+
35+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
36+
import static org.hamcrest.Matchers.equalTo;
37+
import static org.hamcrest.Matchers.instanceOf;
38+
39+
public class DelayedShardAggregationIT extends ESIntegTestCase {
40+
41+
@Override
42+
protected Collection<Class<? extends Plugin>> nodePlugins() {
43+
return Arrays.asList(DelayedShardAggregationPlugin.class);
44+
}
45+
46+
public void testSimple() throws Exception {
47+
assertAcked(client().admin().indices().prepareCreate("index"));
48+
float expectedMax = Float.MIN_VALUE;
49+
List<IndexRequestBuilder> reqs = new ArrayList<>();
50+
for (int i = 0; i < 5; i++) {
51+
float rand = randomFloat();
52+
expectedMax = Math.max(rand, expectedMax);
53+
reqs.add(client().prepareIndex("index").setSource("number", rand));
54+
}
55+
indexRandom(true, reqs);
56+
SearchResponse response = client().prepareSearch("index")
57+
.addAggregation(
58+
new DelayedShardAggregationBuilder("delay", TimeValue.timeValueMillis(10)).subAggregation(
59+
new MaxAggregationBuilder("max").field("number")
60+
)
61+
)
62+
.get();
63+
Aggregations aggs = response.getAggregations();
64+
assertThat(aggs.get("delay"), instanceOf(InternalFilter.class));
65+
InternalFilter filter = aggs.get("delay");
66+
InternalMax max = filter.getAggregations().get("max");
67+
assertThat((float) max.getValue(), equalTo(expectedMax));
68+
}
69+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.aggregations;
21+
22+
import org.elasticsearch.common.ParseField;
23+
import org.elasticsearch.common.io.stream.StreamInput;
24+
import org.elasticsearch.common.io.stream.StreamOutput;
25+
import org.elasticsearch.common.unit.TimeValue;
26+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
27+
import org.elasticsearch.common.xcontent.XContentBuilder;
28+
import org.elasticsearch.index.query.QueryBuilders;
29+
import org.elasticsearch.index.query.QueryShardContext;
30+
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
31+
import org.elasticsearch.search.internal.SearchContext;
32+
33+
import java.io.IOException;
34+
import java.util.Map;
35+
import java.util.Objects;
36+
37+
public class DelayedShardAggregationBuilder extends AbstractAggregationBuilder<DelayedShardAggregationBuilder> {
38+
public static final String NAME = "shard_delay";
39+
40+
private TimeValue delay;
41+
42+
public DelayedShardAggregationBuilder(String name, TimeValue delay) {
43+
super(name);
44+
this.delay = delay;
45+
}
46+
47+
public DelayedShardAggregationBuilder(StreamInput in) throws IOException {
48+
super(in);
49+
this.delay = in.readTimeValue();
50+
}
51+
52+
@Override
53+
protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map<String, Object> metaData) {
54+
return new DelayedShardAggregationBuilder(name, delay);
55+
}
56+
57+
@Override
58+
public BucketCardinality bucketCardinality() {
59+
return BucketCardinality.ONE;
60+
}
61+
62+
@Override
63+
public String getType() {
64+
return NAME;
65+
}
66+
67+
@Override
68+
protected void doWriteTo(StreamOutput out) throws IOException {
69+
out.writeTimeValue(delay);
70+
}
71+
72+
@Override
73+
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
74+
builder.startObject();
75+
builder.field("value", delay.toString());
76+
builder.endObject();
77+
return builder;
78+
}
79+
80+
static final ConstructingObjectParser<DelayedShardAggregationBuilder, String> PARSER = new ConstructingObjectParser<>(
81+
NAME,
82+
false,
83+
(args, name) -> new DelayedShardAggregationBuilder(name, TimeValue.parseTimeValue((String) args[0], "value"))
84+
);
85+
86+
static {
87+
PARSER.declareString(ConstructingObjectParser.constructorArg(), new ParseField("value"));
88+
}
89+
90+
@Override
91+
@SuppressWarnings("unchecked")
92+
protected AggregatorFactory doBuild(
93+
QueryShardContext queryShardContext,
94+
AggregatorFactory parent,
95+
AggregatorFactories.Builder subfactoriesBuilder
96+
) throws IOException {
97+
98+
// Disable the request cache
99+
queryShardContext.nowInMillis();
100+
101+
final FilterAggregationBuilder filterAgg = new FilterAggregationBuilder(name, QueryBuilders.matchAllQuery()).subAggregations(
102+
subfactoriesBuilder
103+
);
104+
final AggregatorFactory factory = filterAgg.build(queryShardContext, parent);
105+
return new AggregatorFactory(name, queryShardContext, parent, subfactoriesBuilder, metadata) {
106+
@Override
107+
protected Aggregator createInternal(
108+
SearchContext searchContext,
109+
Aggregator parent,
110+
CardinalityUpperBound cardinality,
111+
Map<String, Object> metadata
112+
) throws IOException {
113+
long start = searchContext.getRelativeTimeInMillis();
114+
long sleepTime = Math.min(delay.getMillis(), 100);
115+
do {
116+
if (searchContext.isCancelled()) {
117+
break;
118+
}
119+
try {
120+
Thread.sleep(sleepTime);
121+
} catch (InterruptedException e) {
122+
throw new IOException(e);
123+
}
124+
} while (searchContext.getRelativeTimeInMillis() - start < delay.getMillis());
125+
return factory.create(searchContext, parent, cardinality);
126+
}
127+
};
128+
}
129+
130+
@Override
131+
public boolean equals(Object o) {
132+
if (this == o) return true;
133+
if (o == null || getClass() != o.getClass()) return false;
134+
if (!super.equals(o)) return false;
135+
DelayedShardAggregationBuilder that = (DelayedShardAggregationBuilder) o;
136+
return Objects.equals(delay, that.delay);
137+
}
138+
139+
@Override
140+
public int hashCode() {
141+
return Objects.hash(super.hashCode(), delay);
142+
}
143+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.aggregations;
21+
22+
import java.util.List;
23+
24+
import org.elasticsearch.plugins.Plugin;
25+
import org.elasticsearch.plugins.SearchPlugin;
26+
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
27+
28+
import static java.util.Collections.singletonList;
29+
30+
/**
31+
* Test plugin that allows to delay aggregations on shards with a configurable time
32+
*/
33+
public class DelayedShardAggregationPlugin extends Plugin implements SearchPlugin {
34+
public DelayedShardAggregationPlugin() {}
35+
36+
@Override
37+
public List<AggregationSpec> getAggregations() {
38+
return singletonList(
39+
new AggregationSpec(
40+
DelayedShardAggregationBuilder.NAME,
41+
DelayedShardAggregationBuilder::new,
42+
DelayedShardAggregationBuilder.PARSER
43+
).addResultReader(InternalFilter::new)
44+
);
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.search.aggregations;
20+
21+
import org.elasticsearch.common.unit.TimeValue;
22+
import org.elasticsearch.plugins.Plugin;
23+
import org.elasticsearch.test.TestGeoShapeFieldMapperPlugin;
24+
25+
import java.util.Arrays;
26+
import java.util.Collection;
27+
28+
public class DelayedShardAggregationBuilderTests extends BaseAggregationTestCase<DelayedShardAggregationBuilder> {
29+
@Override
30+
protected Collection<Class<? extends Plugin>> getPlugins() {
31+
return Arrays.asList(DelayedShardAggregationPlugin.class, TestGeoShapeFieldMapperPlugin.class);
32+
}
33+
34+
@Override
35+
protected DelayedShardAggregationBuilder createTestAggregatorBuilder() {
36+
return new DelayedShardAggregationBuilder(randomAlphaOfLength(10), TimeValue.timeValueMillis(100));
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.aggregations;
21+
22+
import com.carrotsearch.randomizedtesting.annotations.Name;
23+
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
24+
25+
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
26+
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
27+
28+
public class DelayedShardAggregationClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
29+
public DelayedShardAggregationClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
30+
super(testCandidate);
31+
}
32+
33+
@ParametersFactory
34+
public static Iterable<Object[]> parameters() throws Exception {
35+
return ESClientYamlSuiteTestCase.createParameters();
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Integration tests for DelayAggregation components
2+
#
3+
4+
---
5+
"Delayed Aggs":
6+
- do:
7+
indices.create:
8+
index: test
9+
10+
- do:
11+
search:
12+
index: test
13+
body:
14+
aggs:
15+
delay:
16+
shard_delay:
17+
value: "200ms"
18+
19+
20+
- match: { hits.total.value: 0 }
21+
- match: { aggregations.delay.doc_count: 0 }
22+
- gt: { took: 100 }

0 commit comments

Comments
 (0)