Skip to content

Commit 687ad17

Browse files
committed
[Rollup] Use composite's missing_bucket (#31402)
We can leverage the composite agg's new `missing_bucket` feature on terms groupings. This means the aggregation criteria used in the indexer will now return null buckets for missing keys. Because all buckets are now returned (even if a key is null), we can guarantee correct doc counts with "combined" jobs (where a job rolls up multiple schemas). This was previously impossible since composite would ignore documents that didn't have _all_ the keys, meaning non-overlapping schemas would cause composite to return no buckets. Note: date_histo does not use `missing_bucket`, since a timestamp is always required. The docs have been adjusted to recommend a single, combined job. It also makes reference to the previous issue to help users that are upgrading (rather than just deleting the sections).
1 parent ff97e6e commit 687ad17

File tree

14 files changed

+298
-263
lines changed

14 files changed

+298
-263
lines changed

x-pack/docs/en/rollup/understanding-groups.asciidoc

+12-225
Original file line numberDiff line numberDiff line change
@@ -121,16 +121,15 @@ if a field is useful for aggregating later, and how you might wish to use it (te
121121

122122
=== Grouping Limitations with heterogeneous indices
123123

124-
There is a known limitation to Rollup groups, due to some internal implementation details at this time. The Rollup feature leverages
125-
the `composite` aggregation from Elasticsearch. At the moment, the composite agg only returns buckets when all keys in the tuple are non-null.
126-
Put another way, if the you request keys `[A,B,C]` in the composite aggregation, the only documents that are aggregated are those that have
127-
_all_ of the keys `A, B` and `C`.
124+
There was previously a limitation in how Rollup could handle indices that had heterogeneous mappings (multiple, unrelated/non-overlapping
125+
mappings). The recommendation at the time was to configure a separate job per data "type". For example, you might configure a separate
126+
job for each Beats module that you had enabled (one for `process`, another for `filesystem`, etc).
128127

129-
Because Rollup uses the composite agg during the indexing process, it inherits this behavior. Practically speaking, if all of the documents
130-
in your index are homogeneous (they have the same mapping), you can ignore this limitation and stop reading now.
128+
This recommendation was driven by internal implementation details that caused document counts to be potentially incorrect if a single "merged"
129+
job was used.
131130

132-
However, if you have a heterogeneous collection of documents that you wish to roll up, you may need to configure two or more jobs to
133-
accurately cover the original data.
131+
This limitation has since been alleviated. As of 6.4.0, it is now considered best practice to combine all rollup configurations
132+
into a single job.
134133

135134
As an example, if your index has two types of documents:
136135

@@ -157,7 +156,7 @@ and
157156
--------------------------------------------------
158157
// NOTCONSOLE
159158

160-
it may be tempting to create a single, combined rollup job which covers both of these document types, something like this:
159+
the best practice is to combine them into a single rollup job which covers both of these document types, like this:
161160

162161
[source,js]
163162
--------------------------------------------------
@@ -191,222 +190,10 @@ PUT _xpack/rollup/job/combined
191190
--------------------------------------------------
192191
// NOTCONSOLE
193192

194-
You can see that it includes a `terms` grouping on both "node" and "title", fields that are mutually exclusive in the document types.
195-
*This will not work.* Because the `composite` aggregation (and by extension, Rollup) only returns buckets when all keys are non-null,
196-
and there are no documents that have both a "node" field and a "title" field, this rollup job will not produce any rollups.
197-
198-
Instead, you should configure two independent jobs (sharing the same index, or going to separate indices):
199-
200-
[source,js]
201-
--------------------------------------------------
202-
PUT _xpack/rollup/job/sensor
203-
{
204-
"index_pattern": "data-*",
205-
"rollup_index": "data_rollup",
206-
"cron": "*/30 * * * * ?",
207-
"page_size" :1000,
208-
"groups" : {
209-
"date_histogram": {
210-
"field": "timestamp",
211-
"interval": "1h",
212-
"delay": "7d"
213-
},
214-
"terms": {
215-
"fields": ["node"]
216-
}
217-
},
218-
"metrics": [
219-
{
220-
"field": "temperature",
221-
"metrics": ["min", "max", "sum"]
222-
}
223-
]
224-
}
225-
--------------------------------------------------
226-
// NOTCONSOLE
227-
228-
[source,js]
229-
--------------------------------------------------
230-
PUT _xpack/rollup/job/purchases
231-
{
232-
"index_pattern": "data-*",
233-
"rollup_index": "data_rollup",
234-
"cron": "*/30 * * * * ?",
235-
"page_size" :1000,
236-
"groups" : {
237-
"date_histogram": {
238-
"field": "timestamp",
239-
"interval": "1h",
240-
"delay": "7d"
241-
},
242-
"terms": {
243-
"fields": ["title"]
244-
}
245-
},
246-
"metrics": [
247-
{
248-
"field": "price",
249-
"metrics": ["avg"]
250-
}
251-
]
252-
}
253-
--------------------------------------------------
254-
// NOTCONSOLE
255-
256-
Notice that each job now deals with a single "document type", and will not run into the limitations described above. We are working on changes
257-
in core Elasticsearch to remove this limitation from the `composite` aggregation, and the documentation will be updated accordingly
258-
when this particular scenario is fixed.
259-
260193
=== Doc counts and overlapping jobs
261194

262-
There is an issue with doc counts, related to the above grouping limitation. Imagine you have two Rollup jobs saving to the same index, where
263-
one job is a "subset" of another job.
264-
265-
For example, you might have jobs with these two groupings:
266-
267-
[source,js]
268-
--------------------------------------------------
269-
PUT _xpack/rollup/job/sensor-all
270-
{
271-
"groups" : {
272-
"date_histogram": {
273-
"field": "timestamp",
274-
"interval": "1h",
275-
"delay": "7d"
276-
},
277-
"terms": {
278-
"fields": ["node"]
279-
}
280-
},
281-
"metrics": [
282-
{
283-
"field": "price",
284-
"metrics": ["avg"]
285-
}
286-
]
287-
...
288-
}
289-
--------------------------------------------------
290-
// NOTCONSOLE
291-
292-
and
293-
294-
[source,js]
295-
--------------------------------------------------
296-
PUT _xpack/rollup/job/sensor-building
297-
{
298-
"groups" : {
299-
"date_histogram": {
300-
"field": "timestamp",
301-
"interval": "1h",
302-
"delay": "7d"
303-
},
304-
"terms": {
305-
"fields": ["node", "building"]
306-
}
307-
}
308-
...
309-
}
310-
--------------------------------------------------
311-
// NOTCONSOLE
312-
313-
314-
The first job `sensor-all` contains the groupings and metrics that apply to all data in the index. The second job is rolling up a subset
315-
of data (in different buildings) which also include a building identifier. You did this because combining them would run into the limitation
316-
described in the previous section.
317-
318-
This _mostly_ works, but can sometimes return incorrect `doc_counts` when you search. All metrics will be valid however.
319-
320-
The issue arises from the composite agg limitation described before, combined with search-time optimization. Imagine you try to run the
321-
following aggregation:
322-
323-
[source,js]
324-
--------------------------------------------------
325-
"aggs" : {
326-
"nodes": {
327-
"terms": {
328-
"field": "node"
329-
}
330-
}
331-
}
332-
--------------------------------------------------
333-
// NOTCONSOLE
334-
335-
This aggregation could be serviced by either `sensor-all` or `sensor-building` job, since they both group on the node field. So the RollupSearch
336-
API will search both of them and merge results. This will result in *correct* doc_counts and *correct* metrics. No problem here.
337-
338-
The issue arises from an aggregation that can _only_ be serviced by `sensor-building`, like this one:
339-
340-
[source,js]
341-
--------------------------------------------------
342-
"aggs" : {
343-
"nodes": {
344-
"terms": {
345-
"field": "node"
346-
},
347-
"aggs": {
348-
"building": {
349-
"terms": {
350-
"field": "building"
351-
}
352-
}
353-
}
354-
}
355-
}
356-
--------------------------------------------------
357-
// NOTCONSOLE
358-
359-
Now we run into a problem. The RollupSearch API will correctly identify that only `sensor-building` job has all the required components
360-
to answer the aggregation, and will search it exclusively. Unfortunately, due to the composite aggregation limitation, that job only
361-
rolled up documents that have both a "node" and a "building" field. Meaning that the doc_counts for the `"nodes"` aggregation will not
362-
include counts for any document that doesn't have `[node, building]` fields.
363-
364-
- The `doc_count` for `"nodes"` aggregation will be incorrect because it only contains counts for `nodes` that also have buildings
365-
- The `doc_count` for `"buildings"` aggregation will be correct
366-
- Any metrics, on any level, will be correct
367-
368-
==== Workarounds
369-
370-
There are two main workarounds if you find yourself with a schema like the above.
371-
372-
Easiest and most robust method: use separate indices to store your rollups. The limitations arise because you have several document
373-
schemas co-habitating in a single index, which makes it difficult for rollups to correctly summarize. If you make several rollup
374-
jobs and store them in separate indices, these sorts of difficulties do not arise. It does, however, keep you from searching across several
375-
different rollup indices at the same time.
376-
377-
The other workaround is to include an "off-target" aggregation in the query, which pulls in the "superset" job and corrects the doc counts.
378-
The RollupSearch API determines the best job to search for each "leaf node" in the aggregation tree. So if we include a metric agg on `price`,
379-
which was only defined in the `sensor-all` job, that will "pull in" the other job:
380-
381-
[source,js]
382-
--------------------------------------------------
383-
"aggs" : {
384-
"nodes": {
385-
"terms": {
386-
"field": "node"
387-
},
388-
"aggs": {
389-
"building": {
390-
"terms": {
391-
"field": "building"
392-
}
393-
},
394-
"avg_price": {
395-
"avg": { "field": "price" } <1>
396-
}
397-
}
398-
}
399-
}
400-
--------------------------------------------------
401-
// NOTCONSOLE
402-
<1> Adding an avg aggregation here will fix the doc counts
403-
404-
Because only `sensor-all` job had an `avg` on the price field, the RollupSearch API is forced to pull in that additional job for searching,
405-
and will merge/correct the doc_counts as appropriate. This sort of workaround applies to any additional aggregation -- metric or bucketing --
406-
although it can be tedious to look through the jobs and determine the right one to add.
407-
408-
==== Status
195+
There was previously an issue with document counts on "overlapping" job configurations, driven by the same internal implementation detail.
196+
If there were two Rollup jobs saving to the same index, where one job is a "subset" of another job, it was possible that document counts
197+
could be incorrect for certain aggregation arrangements.
409198

410-
We realize this is an onerous limitation, and somewhat breaks the rollup contract of "pick the fields to rollup, we do the rest". We are
411-
actively working to get the limitation to `composite` agg fixed, and the related issues in Rollup. The documentation will be updated when
412-
the fix is implemented.
199+
This issue has also since been eliminated in 6.4.0.

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java

-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ public List<CompositeValuesSourceBuilder<?>> toBuilders() {
159159
vsBuilder.dateHistogramInterval(interval);
160160
vsBuilder.field(field);
161161
vsBuilder.timeZone(timeZone);
162-
163162
return Collections.singletonList(vsBuilder);
164163
}
165164

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/HistoGroupConfig.java

+1
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public List<CompositeValuesSourceBuilder<?>> toBuilders() {
9696
= new HistogramValuesSourceBuilder(RollupField.formatIndexerAggName(f, HistogramAggregationBuilder.NAME));
9797
vsBuilder.interval(interval);
9898
vsBuilder.field(f);
99+
vsBuilder.missingBucket(true);
99100
return vsBuilder;
100101
}).collect(Collectors.toList());
101102
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/TermsGroupConfig.java

+1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public List<CompositeValuesSourceBuilder<?>> toBuilders() {
8080
TermsValuesSourceBuilder vsBuilder
8181
= new TermsValuesSourceBuilder(RollupField.formatIndexerAggName(f, TermsAggregationBuilder.NAME));
8282
vsBuilder.field(f);
83+
vsBuilder.missingBucket(true);
8384
return vsBuilder;
8485
}).collect(Collectors.toList());
8586
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/RollupRestTestStateCleaner.java

+7-17
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package org.elasticsearch.xpack.core.rollup;
77

88
import org.apache.http.HttpStatus;
9-
import org.apache.logging.log4j.Logger;
109
import org.elasticsearch.client.Response;
1110
import org.elasticsearch.client.RestClient;
1211
import org.elasticsearch.common.xcontent.support.XContentMapValues;
@@ -27,21 +26,13 @@
2726

2827
public class RollupRestTestStateCleaner {
2928

30-
private final Logger logger;
31-
private final RestClient adminClient;
32-
33-
public RollupRestTestStateCleaner(Logger logger, RestClient adminClient) {
34-
this.logger = logger;
35-
this.adminClient = adminClient;
36-
}
37-
38-
public void clearRollupMetadata() throws Exception {
39-
deleteAllJobs();
40-
waitForPendingTasks();
29+
public static void clearRollupMetadata(RestClient adminClient) throws Exception {
30+
deleteAllJobs(adminClient);
31+
waitForPendingTasks(adminClient);
4132
// indices will be deleted by the ESRestTestCase class
4233
}
4334

44-
private void waitForPendingTasks() throws Exception {
35+
private static void waitForPendingTasks(RestClient adminClient) throws Exception {
4536
ESTestCase.assertBusy(() -> {
4637
try {
4738
Response response = adminClient.performRequest("GET", "/_cat/tasks",
@@ -71,7 +62,7 @@ private void waitForPendingTasks() throws Exception {
7162
}
7263

7364
@SuppressWarnings("unchecked")
74-
private void deleteAllJobs() throws Exception {
65+
private static void deleteAllJobs(RestClient adminClient) throws Exception {
7566
Response response = adminClient.performRequest("GET", "/_xpack/rollup/job/_all");
7667
Map<String, Object> jobs = ESRestTestCase.entityAsMap(response);
7768
@SuppressWarnings("unchecked")
@@ -83,9 +74,7 @@ private void deleteAllJobs() throws Exception {
8374
}
8475

8576
for (Map<String, Object> jobConfig : jobConfigs) {
86-
logger.debug(jobConfig);
8777
String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");
88-
logger.debug("Deleting job " + jobId);
8978
try {
9079
response = adminClient.performRequest("DELETE", "/_xpack/rollup/job/" + jobId);
9180
} catch (Exception e) {
@@ -95,7 +84,8 @@ private void deleteAllJobs() throws Exception {
9584
}
9685

9786
private static String responseEntityToString(Response response) throws Exception {
98-
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
87+
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(),
88+
StandardCharsets.UTF_8))) {
9989
return reader.lines().collect(Collectors.joining("\n"));
10090
}
10191
}

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java

+1
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,7 @@ private static InternalAggregation unrollMultiBucket(InternalMultiBucketAggregat
382382
});
383383
} else if (rolled instanceof StringTerms) {
384384
return unrollMultiBucket(rolled, original, currentTree, (bucket, bucketCount, subAggs) -> {
385+
385386
BytesRef key = new BytesRef(bucket.getKeyAsString().getBytes(StandardCharsets.UTF_8));
386387
assert bucketCount >= 0;
387388
//TODO expose getFormatter(), keyed upstream in Core

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java

+8
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,14 @@ static void updateMapping(RollupJob job, ActionListener<PutRollupJobAction.Respo
176176
}
177177

178178
Map<String, Object> rollupMeta = (Map<String, Object>)((Map<String, Object>) m).get(RollupField.ROLLUP_META);
179+
180+
String stringVersion = (String)((Map<String, Object>) m).get(Rollup.ROLLUP_TEMPLATE_VERSION_FIELD);
181+
if (stringVersion == null) {
182+
listener.onFailure(new IllegalStateException("Could not determine version of existing rollup metadata for index ["
183+
+ indexName + "]"));
184+
return;
185+
}
186+
179187
if (rollupMeta.get(job.getConfig().getId()) != null) {
180188
String msg = "Cannot create rollup job [" + job.getConfig().getId()
181189
+ "] because job was previously created (existing metadata).";

0 commit comments

Comments
 (0)