Skip to content

Commit 838a832

Browse files
committed
Fix rollup on date fields that don't support epoch_millis (#31890)
The rollup indexer uses a range query to select the next page of results based on the last time bucket of the previous round and the `delay` configured on the rollup job. This query uses the `epoch_millis` format implicitly but doesn't set the `format`. This result in errors during the rollup job if the field definition doesn't allow this format. It can also miss documents if the format is not accepted but another format in the field definition is able to parse the query (e.g.: `epoch_second`). This change ensures that we use `epoch_millis` as the only format to parse the rollup range query.
1 parent 161f5ae commit 838a832

File tree

3 files changed

+43
-13
lines changed

3 files changed

+43
-13
lines changed

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,8 @@ private QueryBuilder createBoundaryQuery(Map<String, Object> position) {
425425
assert lowerBound <= maxBoundary;
426426
final RangeQueryBuilder query = new RangeQueryBuilder(fieldName)
427427
.gte(lowerBound)
428-
.lt(maxBoundary);
428+
.lt(maxBoundary)
429+
.format("epoch_millis");
429430
return query;
430431
}
431432
}

x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.elasticsearch.action.search.SearchResponse;
3030
import org.elasticsearch.action.search.SearchResponseSections;
3131
import org.elasticsearch.action.search.ShardSearchFailure;
32+
import org.elasticsearch.common.joda.DateMathParser;
33+
import org.elasticsearch.common.joda.Joda;
3234
import org.elasticsearch.common.rounding.Rounding;
3335
import org.elasticsearch.common.unit.TimeValue;
3436
import org.elasticsearch.index.IndexSettings;
@@ -506,6 +508,7 @@ private void executeTestCase(List<Map<String, Object>> docs, RollupJobConfig con
506508
private Map<String, MappedFieldType> createFieldTypes(RollupJobConfig job) {
507509
Map<String, MappedFieldType> fieldTypes = new HashMap<>();
508510
MappedFieldType fieldType = new DateFieldMapper.Builder(job.getGroupConfig().getDateHisto().getField())
511+
.dateTimeFormatter(Joda.forPattern(randomFrom("basic_date", "date_optional_time", "epoch_second")))
509512
.build(new Mapper.BuilderContext(settings.getSettings(), new ContentPath(0)))
510513
.fieldType();
511514
fieldTypes.put(fieldType.name(), fieldType);
@@ -618,7 +621,7 @@ protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse
618621
RangeQueryBuilder range = (RangeQueryBuilder) request.source().query();
619622
final DateTimeZone timeZone = range.timeZone() != null ? DateTimeZone.forID(range.timeZone()) : null;
620623
Query query = timestampField.rangeQuery(range.from(), range.to(), range.includeLower(), range.includeUpper(),
621-
null, timeZone, null, queryShardContext);
624+
null, timeZone, new DateMathParser(Joda.forPattern(range.format())), queryShardContext);
622625

623626
// extract composite agg
624627
assertThat(request.source().aggregations().getAggregatorFactories().size(), equalTo(1));

x-pack/qa/multi-node/src/test/java/org/elasticsearch/multi_node/RollupIT.java

+37-11
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,16 @@
66
package org.elasticsearch.multi_node;
77

88
import org.apache.http.HttpStatus;
9+
import org.apache.http.entity.ContentType;
10+
import org.apache.http.entity.StringEntity;
911
import org.apache.http.util.EntityUtils;
1012
import org.elasticsearch.client.Request;
1113
import org.elasticsearch.client.Response;
14+
import org.elasticsearch.common.Strings;
1215
import org.elasticsearch.common.settings.SecureString;
1316
import org.elasticsearch.common.settings.Settings;
1417
import org.elasticsearch.common.util.concurrent.ThreadContext;
18+
import org.elasticsearch.common.xcontent.XContentBuilder;
1519
import org.elasticsearch.common.xcontent.XContentHelper;
1620
import org.elasticsearch.common.xcontent.json.JsonXContent;
1721
import org.elasticsearch.common.xcontent.support.XContentMapValues;
@@ -33,8 +37,8 @@
3337
import java.util.List;
3438
import java.util.Map;
3539
import java.util.concurrent.TimeUnit;
36-
import java.util.stream.Collectors;
3740

41+
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
3842
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
3943
import static org.hamcrest.Matchers.equalTo;
4044
import static org.hamcrest.Matchers.isOneOf;
@@ -73,6 +77,31 @@ public void clearRollupMetadata() throws Exception {
7377

7478
public void testBigRollup() throws Exception {
7579
final int numDocs = 200;
80+
String dateFormat = "strict_date_optional_time";
81+
82+
// create the test-index index
83+
try (XContentBuilder builder = jsonBuilder()) {
84+
builder.startObject();
85+
{
86+
builder.startObject("mappings").startObject("_doc")
87+
.startObject("properties")
88+
.startObject("timestamp")
89+
.field("type", "date")
90+
.field("format", dateFormat)
91+
.endObject()
92+
.startObject("value")
93+
.field("type", "integer")
94+
.endObject()
95+
.endObject()
96+
.endObject().endObject();
97+
}
98+
builder.endObject();
99+
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
100+
Request req = new Request("PUT", "rollup-docs");
101+
req.setEntity(entity);
102+
client().performRequest(req);
103+
}
104+
76105

77106
// index documents for the rollup job
78107
final StringBuilder bulk = new StringBuilder();
@@ -88,13 +117,15 @@ public void testBigRollup() throws Exception {
88117
bulkRequest.addParameter("refresh", "true");
89118
bulkRequest.setJsonEntity(bulk.toString());
90119
client().performRequest(bulkRequest);
120+
91121
// create the rollup job
92122
final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-job-test");
123+
int pageSize = randomIntBetween(2, 50);
93124
createRollupJobRequest.setJsonEntity("{"
94125
+ "\"index_pattern\":\"rollup-*\","
95126
+ "\"rollup_index\":\"results-rollup\","
96-
+ "\"cron\":\"*/1 * * * * ?\"," // fast cron and big page size so test runs quickly
97-
+ "\"page_size\":20,"
127+
+ "\"cron\":\"*/1 * * * * ?\"," // fast cron so test runs quickly
128+
+ "\"page_size\":" + pageSize + ","
98129
+ "\"groups\":{"
99130
+ " \"date_histogram\":{"
100131
+ " \"field\":\"timestamp\","
@@ -142,7 +173,8 @@ public void testBigRollup() throws Exception {
142173
" \"date_histo\": {\n" +
143174
" \"date_histogram\": {\n" +
144175
" \"field\": \"timestamp\",\n" +
145-
" \"interval\": \"1h\"\n" +
176+
" \"interval\": \"1h\",\n" +
177+
" \"format\": \"date_time\"\n" +
146178
" },\n" +
147179
" \"aggs\": {\n" +
148180
" \"the_max\": {\n" +
@@ -226,7 +258,7 @@ private void assertRollUpJob(final String rollupJob) throws Exception {
226258

227259
}
228260

229-
private void waitForRollUpJob(final String rollupJob,String[] expectedStates) throws Exception {
261+
private void waitForRollUpJob(final String rollupJob, String[] expectedStates) throws Exception {
230262
assertBusy(() -> {
231263
final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/" + rollupJob);
232264
Response getRollupJobResponse = client().performRequest(getRollupJobRequest);
@@ -317,10 +349,4 @@ private void deleteAllJobs() throws Exception {
317349
}
318350
}
319351
}
320-
321-
private static String responseEntityToString(Response response) throws Exception {
322-
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
323-
return reader.lines().collect(Collectors.joining("\n"));
324-
}
325-
}
326352
}

0 commit comments

Comments
 (0)