Skip to content

[Rollup] Use composite's missing_bucket #31402

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 13, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 15 additions & 224 deletions x-pack/docs/en/rollup/understanding-groups.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,15 @@ if a field is useful for aggregating later, and how you might wish to use it (te

=== Grouping Limitations with heterogeneous indices

There is a known limitation to Rollup groups, due to some internal implementation details at this time. The Rollup feature leverages
the `composite` aggregation from Elasticsearch. At the moment, the composite agg only returns buckets when all keys in the tuple are non-null.
Put another way, if the you request keys `[A,B,C]` in the composite aggregation, the only documents that are aggregated are those that have
_all_ of the keys `A, B` and `C`.
There was previously a limitation in how Rollup could handle indices that had heterogeneous mappings (multiple, unrelated/non-overlapping
mappings). The recommendation at the time was to configure a separate job per data "type". For example, you might configure a separate
job for each Beat module that you had enabled (one for `process`, another for `filesystem`, etc).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it should be Beats module?


Because Rollup uses the composite agg during the indexing process, it inherits this behavior. Practically speaking, if all of the documents
in your index are homogeneous (they have the same mapping), you can ignore this limitation and stop reading now.
This recommendation was driven by internal implementation details that caused document counts to be potentially incorrect if a single "merged"
job was used.

However, if you have a heterogeneous collection of documents that you wish to roll up, you may need to configure two or more jobs to
accurately cover the original data.
This limitation has since been alleviated. As of 6.4.0, it is now considered best practice to combine all rollup configurations
into a single job.

As an example, if your index has two types of documents:

Expand All @@ -157,7 +156,7 @@ and
--------------------------------------------------
// NOTCONSOLE

it may be tempting to create a single, combined rollup job which covers both of these document types, something like this:
the best practice is to combine them into a single rollup job which covers both of these document types, like this:

[source,js]
--------------------------------------------------
Expand Down Expand Up @@ -191,222 +190,14 @@ PUT _xpack/rollup/job/combined
--------------------------------------------------
// NOTCONSOLE

You can see that it includes a `terms` grouping on both "node" and "title", fields that are mutually exclusive in the document types.
*This will not work.* Because the `composite` aggregation (and by extension, Rollup) only returns buckets when all keys are non-null,
and there are no documents that have both a "node" field and a "title" field, this rollup job will not produce any rollups.

Instead, you should configure two independent jobs (sharing the same index, or going to separate indices):

[source,js]
--------------------------------------------------
PUT _xpack/rollup/job/sensor
{
"index_pattern": "data-*",
"rollup_index": "data_rollup",
"cron": "*/30 * * * * ?",
"page_size" :1000,
"groups" : {
"date_histogram": {
"field": "timestamp",
"interval": "1h",
"delay": "7d"
},
"terms": {
"fields": ["node"]
}
},
"metrics": [
{
"field": "temperature",
"metrics": ["min", "max", "sum"]
}
]
}
--------------------------------------------------
// NOTCONSOLE

[source,js]
--------------------------------------------------
PUT _xpack/rollup/job/purchases
{
"index_pattern": "data-*",
"rollup_index": "data_rollup",
"cron": "*/30 * * * * ?",
"page_size" :1000,
"groups" : {
"date_histogram": {
"field": "timestamp",
"interval": "1h",
"delay": "7d"
},
"terms": {
"fields": ["title"]
}
},
"metrics": [
{
"field": "price",
"metrics": ["avg"]
}
]
}
--------------------------------------------------
// NOTCONSOLE

Notice that each job now deals with a single "document type", and will not run into the limitations described above. We are working on changes
in core Elasticsearch to remove this limitation from the `composite` aggregation, and the documentation will be updated accordingly
when this particular scenario is fixed.
The rollup job will automatically use a placeholder term (`__ROLLUP_NULL_VALUE_PLACEHOLDER__`) as the `null_value` for keyword fields,
which allows it to handle documents that may be missing some of the grouping fields. This placeholder is then removed from search
results, resulting in correct doc counts in a manner that is invisible to the user.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part should be updated since we don't use the placeholder term anymore.


=== Doc counts and overlapping jobs

There is an issue with doc counts, related to the above grouping limitation. Imagine you have two Rollup jobs saving to the same index, where
one job is a "subset" of another job.

For example, you might have jobs with these two groupings:

[source,js]
--------------------------------------------------
PUT _xpack/rollup/job/sensor-all
{
"groups" : {
"date_histogram": {
"field": "timestamp",
"interval": "1h",
"delay": "7d"
},
"terms": {
"fields": ["node"]
}
},
"metrics": [
{
"field": "price",
"metrics": ["avg"]
}
]
...
}
--------------------------------------------------
// NOTCONSOLE

and

[source,js]
--------------------------------------------------
PUT _xpack/rollup/job/sensor-building
{
"groups" : {
"date_histogram": {
"field": "timestamp",
"interval": "1h",
"delay": "7d"
},
"terms": {
"fields": ["node", "building"]
}
}
...
}
--------------------------------------------------
// NOTCONSOLE


The first job `sensor-all` contains the groupings and metrics that apply to all data in the index. The second job is rolling up a subset
of data (in different buildings) which also include a building identifier. You did this because combining them would run into the limitation
described in the previous section.

This _mostly_ works, but can sometimes return incorrect `doc_counts` when you search. All metrics will be valid however.

The issue arises from the composite agg limitation described before, combined with search-time optimization. Imagine you try to run the
following aggregation:

[source,js]
--------------------------------------------------
"aggs" : {
"nodes": {
"terms": {
"field": "node"
}
}
}
--------------------------------------------------
// NOTCONSOLE

This aggregation could be serviced by either `sensor-all` or `sensor-building` job, since they both group on the node field. So the RollupSearch
API will search both of them and merge results. This will result in *correct* doc_counts and *correct* metrics. No problem here.

The issue arises from an aggregation that can _only_ be serviced by `sensor-building`, like this one:

[source,js]
--------------------------------------------------
"aggs" : {
"nodes": {
"terms": {
"field": "node"
},
"aggs": {
"building": {
"terms": {
"field": "building"
}
}
}
}
}
--------------------------------------------------
// NOTCONSOLE

Now we run into a problem. The RollupSearch API will correctly identify that only `sensor-building` job has all the required components
to answer the aggregation, and will search it exclusively. Unfortunately, due to the composite aggregation limitation, that job only
rolled up documents that have both a "node" and a "building" field. Meaning that the doc_counts for the `"nodes"` aggregation will not
include counts for any document that doesn't have `[node, building]` fields.

- The `doc_count` for `"nodes"` aggregation will be incorrect because it only contains counts for `nodes` that also have buildings
- The `doc_count` for `"buildings"` aggregation will be correct
- Any metrics, on any level, will be correct

==== Workarounds

There are two main workarounds if you find yourself with a schema like the above.

Easiest and most robust method: use separate indices to store your rollups. The limitations arise because you have several document
schemas co-habitating in a single index, which makes it difficult for rollups to correctly summarize. If you make several rollup
jobs and store them in separate indices, these sorts of difficulties do not arise. It does, however, keep you from searching across several
different rollup indices at the same time.

The other workaround is to include an "off-target" aggregation in the query, which pulls in the "superset" job and corrects the doc counts.
The RollupSearch API determines the best job to search for each "leaf node" in the aggregation tree. So if we include a metric agg on `price`,
which was only defined in the `sensor-all` job, that will "pull in" the other job:

[source,js]
--------------------------------------------------
"aggs" : {
"nodes": {
"terms": {
"field": "node"
},
"aggs": {
"building": {
"terms": {
"field": "building"
}
},
"avg_price": {
"avg": { "field": "price" } <1>
}
}
}
}
--------------------------------------------------
// NOTCONSOLE
<1> Adding an avg aggregation here will fix the doc counts

Because only `sensor-all` job had an `avg` on the price field, the RollupSearch API is forced to pull in that additional job for searching,
and will merge/correct the doc_counts as appropriate. This sort of workaround applies to any additional aggregation -- metric or bucketing --
although it can be tedious to look through the jobs and determine the right one to add.

==== Status
There was previously an issue with document counts on "overlapping" job configurations, driven by the same internal implementation detail.
If there were two Rollup jobs saving to the same index, where one job is a "subset" of another job, it was possible that document counts
could be incorrect for certain aggregation arrangements.

We realize this is an onerous limitation, and somewhat breaks the rollup contract of "pick the fields to rollup, we do the rest". We are
actively working to get the limitation to `composite` agg fixed, and the related issues in Rollup. The documentation will be updated when
the fix is implemented.
This issue has also since been eliminated in 6.4.0.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public List<CompositeValuesSourceBuilder<?>> toBuilders() {
vsBuilder.dateHistogramInterval(interval);
vsBuilder.field(field);
vsBuilder.timeZone(timeZone);

vsBuilder.missingBucket(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't allow missing bucket here. We use this config for the timestamp field which a required field for rollup documents. Moreover since it's the first field in the composite, enabling missing buckets would disable the composite optimization when the field is indexed. IMO it's ok to require the timestamp field to be present since otherwise we have no way to know if the document should be part of the rollup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, silly me. Absolutely shouldn't allow missing on the timestamp field :)

Good catch thanks

return Collections.singletonList(vsBuilder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public List<CompositeValuesSourceBuilder<?>> toBuilders() {
= new HistogramValuesSourceBuilder(RollupField.formatIndexerAggName(f, HistogramAggregationBuilder.NAME));
vsBuilder.interval(interval);
vsBuilder.field(f);
vsBuilder.missingBucket(true);
return vsBuilder;
}).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public List<CompositeValuesSourceBuilder<?>> toBuilders() {
TermsValuesSourceBuilder vsBuilder
= new TermsValuesSourceBuilder(RollupField.formatIndexerAggName(f, TermsAggregationBuilder.NAME));
vsBuilder.field(f);
vsBuilder.missingBucket(true);
return vsBuilder;
}).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"strings": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
"type": "keyword",
"null_value": "ROLLUP_NULL_VALUE_PLACEHOLDER"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin
public static final String ROLLUP_TEMPLATE_VERSION_FIELD = "rollup-version";
public static final String ROLLUP_TEMPLATE_VERSION_PATTERN =
Pattern.quote("${rollup.dynamic_template.version}");
public static final String ROLLUP_NULL_VALUE_PLACEHOLDER = "ROLLUP_NULL_VALUE_PLACEHOLDER";
public static final String ROLLUP_NULL_VALUE = "__ROLLUP_NULL_VALUE_PLACEHOLDER__";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These look like they might be for the same purpose, maybe we should add comments to them to explain why they are both needed and what for?


private static final String ROLLUP_TEMPLATE_NAME = "/rollup-dynamic-template.json";
public static final String DYNAMIC_MAPPING_TEMPLATE = TemplateUtils.loadTemplate(ROLLUP_TEMPLATE_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,12 @@ private static InternalAggregation unrollMultiBucket(InternalMultiBucketAggregat
});
} else if (rolled instanceof StringTerms) {
return unrollMultiBucket(rolled, original, currentTree, (bucket, bucketCount, subAggs) -> {

// Hide our `null_value` placeholder so it doesn't show up in the terms list
if (bucket.getKeyAsString().equals(Rollup.ROLLUP_NULL_VALUE)) {
return null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not needed for LongTerms and the histograms? do they not have the concept of a null bucket as well?


BytesRef key = new BytesRef(bucket.getKeyAsString().getBytes(StandardCharsets.UTF_8));
assert bucketCount >= 0;
//TODO expose getFormatter(), keyed upstream in Core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
Expand Down Expand Up @@ -132,7 +133,8 @@ static void createIndex(RollupJob job, ActionListener<PutRollupJobAction.Respons
String jobMetadata = "\"" + job.getConfig().getId() + "\":" + job.getConfig().toJSONString();

String mapping = Rollup.DYNAMIC_MAPPING_TEMPLATE
.replace(Rollup.MAPPING_METADATA_PLACEHOLDER, jobMetadata);
.replace(Rollup.MAPPING_METADATA_PLACEHOLDER, jobMetadata)
.replace(Rollup.ROLLUP_NULL_VALUE_PLACEHOLDER, Rollup.ROLLUP_NULL_VALUE);

CreateIndexRequest request = new CreateIndexRequest(job.getConfig().getRollupIndex());
request.mapping(RollupField.TYPE_NAME, mapping, XContentType.JSON);
Expand Down Expand Up @@ -176,6 +178,21 @@ static void updateMapping(RollupJob job, ActionListener<PutRollupJobAction.Respo
}

Map<String, Object> rollupMeta = (Map<String, Object>)((Map<String, Object>) m).get(RollupField.ROLLUP_META);

String stringVersion = (String)((Map<String, Object>) m).get(Rollup.ROLLUP_TEMPLATE_VERSION_FIELD);
if (stringVersion == null) {
logger.warn("Could not determine version of existing rollup metadata for index [" + indexName + "]");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a warning enough here? I am wondering if when the stringVersion is null: a) will we end up throwing a NPE below such as in the Version.fromString() method and b) if we will be able to "do the right thing"?

}
Version parsedVersion = Version.fromString(stringVersion);
if (parsedVersion.before(Version.V_6_4_0)) {
String msg = "Cannot create rollup job [" + job.getConfig().getId() + "] because the rollup index contains " +
"jobs from pre-6.4.0. The mappings for these jobs are not compatible with 6.4.0+. Please specify a new rollup " +
"index.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that users are going to need to recreate their existing rollup jobs and rollup indexes when they upgrade from 6.3 to 6.4+?

logger.error(msg);
listener.onFailure(new ElasticsearchStatusException(msg, RestStatus.CONFLICT));
return;
}

if (rollupMeta.get(job.getConfig().getId()) != null) {
String msg = "Cannot create rollup job [" + job.getConfig().getId()
+ "] because job was previously created (existing metadata).";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class IndexerUtils {
* @param rollupIndex The index that holds rollups for this job
* @return A list of rolled documents derived from the response
*/
static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollupIndex, RollupJobStats stats,
static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollupIndex, RollupJobStats stats,
GroupConfig groupConfig, String jobId) {

logger.debug("Buckets: [" + agg.getBuckets().size() + "][" + jobId + "]");
Expand Down Expand Up @@ -90,7 +90,10 @@ private static CRC32 processKeys(Map<String, Object> keys, Map<String, Object> d
docID.update(Numbers.doubleToBytes((Double)v), 0, 8);
} else if (k.endsWith("." + TermsAggregationBuilder.NAME)) {
doc.put(k + "." + RollupField.VALUE, v);
if (v instanceof String) {
if (v == null) {
// Arbitrary value to update the doc ID with for nulls
docID.update(19);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disappointed you didn't make this value 42 😃

} else if (v instanceof String) {
byte[] vs = ((String) v).getBytes(StandardCharsets.UTF_8);
docID.update(vs, 0, vs.length);
} else if (v instanceof Long) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ private CompositeAggregationBuilder createCompositeBuilder(RollupJobConfig confi
composite.setMetaData(metadata);
}
composite.size(config.getPageSize());

return composite;
}

Expand Down
Loading