From 430e7e0f5d1c9baaa68822eaf995eaa8b48e55bd Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Thu, 14 Jun 2018 21:00:04 +0000 Subject: [PATCH 1/6] [Rollup] Use composite's missing_bucket We can leverage the composite agg's new `missin_bucket` feature on terms groupings. This means the aggregation criteria used in the indexer will now return null buckets for missing keys. We then index these and rely on a default `null_value` on the Rollup's mapping to inject a placeholder. On the search side of the house, we can remove the placeholder when unrolling the response. By indexing null values, we can guarantee correct doc counts with "combined" jobs (where a job rolls up multiple schemas). This was previously impossible since composite would ignore documents that didn't have _all_ the keys, meaning non-overlapping schemas would cause composite to return no buckets. The docs have been adjusted to recommend a single, combined job. It also makes reference to the previous issue to help users that are upgrading (rather than just deleting the sections). Because the mapping change is incompatible with prior versions, this PR forbids 6.4.0+ jobs from being created in a pre-6.4.0 index. --- .../en/rollup/understanding-groups.asciidoc | 239 ++---------------- .../core/rollup/job/DateHistoGroupConfig.java | 2 +- .../core/rollup/job/HistoGroupConfig.java | 1 + .../core/rollup/job/TermsGroupConfig.java | 1 + .../resources/rollup-dynamic-template.json | 3 +- .../elasticsearch/xpack/rollup/Rollup.java | 2 + .../rollup/RollupResponseTranslator.java | 6 + .../action/TransportPutRollupJobAction.java | 19 +- .../xpack/rollup/job/IndexerUtils.java | 7 +- .../xpack/rollup/job/RollupIndexer.java | 1 + .../RollupResponseTranslationTests.java | 50 ++++ .../action/PutJobStateMachineTests.java | 40 +++ .../xpack/rollup/job/IndexerUtilsTests.java | 72 ++++++ 13 files changed, 214 insertions(+), 229 deletions(-) diff --git a/x-pack/docs/en/rollup/understanding-groups.asciidoc b/x-pack/docs/en/rollup/understanding-groups.asciidoc index f57f905ae04c8..064a501b769a8 100644 --- a/x-pack/docs/en/rollup/understanding-groups.asciidoc +++ b/x-pack/docs/en/rollup/understanding-groups.asciidoc @@ -121,16 +121,15 @@ if a field is useful for aggregating later, and how you might wish to use it (te === Grouping Limitations with heterogeneous indices -There is a known limitation to Rollup groups, due to some internal implementation details at this time. The Rollup feature leverages -the `composite` aggregation from Elasticsearch. At the moment, the composite agg only returns buckets when all keys in the tuple are non-null. -Put another way, if the you request keys `[A,B,C]` in the composite aggregation, the only documents that are aggregated are those that have -_all_ of the keys `A, B` and `C`. +There was previously a limitation in how Rollup could handle indices that had heterogeneous mappings (multiple, unrelated/non-overlapping +mappings). The recommendation at the time was to configure a separate job per data "type". For example, you might configure a separate +job for each Beat module that you had enabled (one for `process`, another for `filesystem`, etc). -Because Rollup uses the composite agg during the indexing process, it inherits this behavior. Practically speaking, if all of the documents -in your index are homogeneous (they have the same mapping), you can ignore this limitation and stop reading now. +This recommendation was driven by internal implementation details that caused document counts to be potentially incorrect if a single "merged" +job was used. -However, if you have a heterogeneous collection of documents that you wish to roll up, you may need to configure two or more jobs to -accurately cover the original data. +This limitation has since been alleviated. As of 6.4.0, it is now considered best practice to combine all rollup configurations +into a single job. As an example, if your index has two types of documents: @@ -157,7 +156,7 @@ and -------------------------------------------------- // NOTCONSOLE -it may be tempting to create a single, combined rollup job which covers both of these document types, something like this: +the best practice is to combine them into a single rollup job which covers both of these document types, like this: [source,js] -------------------------------------------------- @@ -191,222 +190,14 @@ PUT _xpack/rollup/job/combined -------------------------------------------------- // NOTCONSOLE -You can see that it includes a `terms` grouping on both "node" and "title", fields that are mutually exclusive in the document types. -*This will not work.* Because the `composite` aggregation (and by extension, Rollup) only returns buckets when all keys are non-null, -and there are no documents that have both a "node" field and a "title" field, this rollup job will not produce any rollups. - -Instead, you should configure two independent jobs (sharing the same index, or going to separate indices): - -[source,js] --------------------------------------------------- -PUT _xpack/rollup/job/sensor -{ - "index_pattern": "data-*", - "rollup_index": "data_rollup", - "cron": "*/30 * * * * ?", - "page_size" :1000, - "groups" : { - "date_histogram": { - "field": "timestamp", - "interval": "1h", - "delay": "7d" - }, - "terms": { - "fields": ["node"] - } - }, - "metrics": [ - { - "field": "temperature", - "metrics": ["min", "max", "sum"] - } - ] -} --------------------------------------------------- -// NOTCONSOLE - -[source,js] --------------------------------------------------- -PUT _xpack/rollup/job/purchases -{ - "index_pattern": "data-*", - "rollup_index": "data_rollup", - "cron": "*/30 * * * * ?", - "page_size" :1000, - "groups" : { - "date_histogram": { - "field": "timestamp", - "interval": "1h", - "delay": "7d" - }, - "terms": { - "fields": ["title"] - } - }, - "metrics": [ - { - "field": "price", - "metrics": ["avg"] - } - ] -} --------------------------------------------------- -// NOTCONSOLE - -Notice that each job now deals with a single "document type", and will not run into the limitations described above. We are working on changes -in core Elasticsearch to remove this limitation from the `composite` aggregation, and the documentation will be updated accordingly -when this particular scenario is fixed. +The rollup job will automatically use a placeholder term (`__ROLLUP_NULL_VALUE_PLACEHOLDER__`) as the `null_value` for keyword fields, +which allows it to handle documents that may be missing some of the grouping fields. This placeholder is then removed from search +results, resulting in correct doc counts in a manner that is invisible to the user. === Doc counts and overlapping jobs -There is an issue with doc counts, related to the above grouping limitation. Imagine you have two Rollup jobs saving to the same index, where -one job is a "subset" of another job. - -For example, you might have jobs with these two groupings: - -[source,js] --------------------------------------------------- -PUT _xpack/rollup/job/sensor-all -{ - "groups" : { - "date_histogram": { - "field": "timestamp", - "interval": "1h", - "delay": "7d" - }, - "terms": { - "fields": ["node"] - } - }, - "metrics": [ - { - "field": "price", - "metrics": ["avg"] - } - ] - ... -} --------------------------------------------------- -// NOTCONSOLE - -and - -[source,js] --------------------------------------------------- -PUT _xpack/rollup/job/sensor-building -{ - "groups" : { - "date_histogram": { - "field": "timestamp", - "interval": "1h", - "delay": "7d" - }, - "terms": { - "fields": ["node", "building"] - } - } - ... -} --------------------------------------------------- -// NOTCONSOLE - - -The first job `sensor-all` contains the groupings and metrics that apply to all data in the index. The second job is rolling up a subset -of data (in different buildings) which also include a building identifier. You did this because combining them would run into the limitation -described in the previous section. - -This _mostly_ works, but can sometimes return incorrect `doc_counts` when you search. All metrics will be valid however. - -The issue arises from the composite agg limitation described before, combined with search-time optimization. Imagine you try to run the -following aggregation: - -[source,js] --------------------------------------------------- -"aggs" : { - "nodes": { - "terms": { - "field": "node" - } - } -} --------------------------------------------------- -// NOTCONSOLE - -This aggregation could be serviced by either `sensor-all` or `sensor-building` job, since they both group on the node field. So the RollupSearch -API will search both of them and merge results. This will result in *correct* doc_counts and *correct* metrics. No problem here. - -The issue arises from an aggregation that can _only_ be serviced by `sensor-building`, like this one: - -[source,js] --------------------------------------------------- -"aggs" : { - "nodes": { - "terms": { - "field": "node" - }, - "aggs": { - "building": { - "terms": { - "field": "building" - } - } - } - } -} --------------------------------------------------- -// NOTCONSOLE - -Now we run into a problem. The RollupSearch API will correctly identify that only `sensor-building` job has all the required components -to answer the aggregation, and will search it exclusively. Unfortunately, due to the composite aggregation limitation, that job only -rolled up documents that have both a "node" and a "building" field. Meaning that the doc_counts for the `"nodes"` aggregation will not -include counts for any document that doesn't have `[node, building]` fields. - -- The `doc_count` for `"nodes"` aggregation will be incorrect because it only contains counts for `nodes` that also have buildings -- The `doc_count` for `"buildings"` aggregation will be correct -- Any metrics, on any level, will be correct - -==== Workarounds - -There are two main workarounds if you find yourself with a schema like the above. - -Easiest and most robust method: use separate indices to store your rollups. The limitations arise because you have several document -schemas co-habitating in a single index, which makes it difficult for rollups to correctly summarize. If you make several rollup -jobs and store them in separate indices, these sorts of difficulties do not arise. It does, however, keep you from searching across several -different rollup indices at the same time. - -The other workaround is to include an "off-target" aggregation in the query, which pulls in the "superset" job and corrects the doc counts. -The RollupSearch API determines the best job to search for each "leaf node" in the aggregation tree. So if we include a metric agg on `price`, -which was only defined in the `sensor-all` job, that will "pull in" the other job: - -[source,js] --------------------------------------------------- -"aggs" : { - "nodes": { - "terms": { - "field": "node" - }, - "aggs": { - "building": { - "terms": { - "field": "building" - } - }, - "avg_price": { - "avg": { "field": "price" } <1> - } - } - } -} --------------------------------------------------- -// NOTCONSOLE -<1> Adding an avg aggregation here will fix the doc counts - -Because only `sensor-all` job had an `avg` on the price field, the RollupSearch API is forced to pull in that additional job for searching, -and will merge/correct the doc_counts as appropriate. This sort of workaround applies to any additional aggregation -- metric or bucketing -- -although it can be tedious to look through the jobs and determine the right one to add. - -==== Status +There was previously an issue with document counts on "overlapping" job configurations, driven by the same internal implementation detail. +If there were two Rollup jobs saving to the same index, where one job is a "subset" of another job, it was possible that document counts +could be incorrect for certain aggregation arrangements. -We realize this is an onerous limitation, and somewhat breaks the rollup contract of "pick the fields to rollup, we do the rest". We are -actively working to get the limitation to `composite` agg fixed, and the related issues in Rollup. The documentation will be updated when -the fix is implemented. \ No newline at end of file +This issue has also since been eliminated in 6.4.0. \ No newline at end of file diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java index 4b4e4cf7b7c81..a1fa81d67a92c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java @@ -159,7 +159,7 @@ public List> toBuilders() { vsBuilder.dateHistogramInterval(interval); vsBuilder.field(field); vsBuilder.timeZone(timeZone); - + vsBuilder.missingBucket(true); return Collections.singletonList(vsBuilder); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/HistoGroupConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/HistoGroupConfig.java index 8b8d53b4ce9af..2b1511077d955 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/HistoGroupConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/HistoGroupConfig.java @@ -96,6 +96,7 @@ public List> toBuilders() { = new HistogramValuesSourceBuilder(RollupField.formatIndexerAggName(f, HistogramAggregationBuilder.NAME)); vsBuilder.interval(interval); vsBuilder.field(f); + vsBuilder.missingBucket(true); return vsBuilder; }).collect(Collectors.toList()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/TermsGroupConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/TermsGroupConfig.java index 2f1c35a73edb4..da73020f0087f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/TermsGroupConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/TermsGroupConfig.java @@ -80,6 +80,7 @@ public List> toBuilders() { TermsValuesSourceBuilder vsBuilder = new TermsValuesSourceBuilder(RollupField.formatIndexerAggName(f, TermsAggregationBuilder.NAME)); vsBuilder.field(f); + vsBuilder.missingBucket(true); return vsBuilder; }).collect(Collectors.toList()); } diff --git a/x-pack/plugin/core/src/main/resources/rollup-dynamic-template.json b/x-pack/plugin/core/src/main/resources/rollup-dynamic-template.json index 94336c60c4d68..6c2a1ee15bed5 100644 --- a/x-pack/plugin/core/src/main/resources/rollup-dynamic-template.json +++ b/x-pack/plugin/core/src/main/resources/rollup-dynamic-template.json @@ -10,7 +10,8 @@ "strings": { "match_mapping_type": "string", "mapping": { - "type": "keyword" + "type": "keyword", + "null_value": "ROLLUP_NULL_VALUE_PLACEHOLDER" } } }, diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index cc24a0b4ab944..48ea20d2bf76a 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -85,6 +85,8 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin public static final String ROLLUP_TEMPLATE_VERSION_FIELD = "rollup-version"; public static final String ROLLUP_TEMPLATE_VERSION_PATTERN = Pattern.quote("${rollup.dynamic_template.version}"); + public static final String ROLLUP_NULL_VALUE_PLACEHOLDER = "ROLLUP_NULL_VALUE_PLACEHOLDER"; + public static final String ROLLUP_NULL_VALUE = "__ROLLUP_NULL_VALUE_PLACEHOLDER__"; private static final String ROLLUP_TEMPLATE_NAME = "/rollup-dynamic-template.json"; public static final String DYNAMIC_MAPPING_TEMPLATE = TemplateUtils.loadTemplate(ROLLUP_TEMPLATE_NAME, diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java index ba1002896c041..466d3afc5a7f5 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java @@ -382,6 +382,12 @@ private static InternalAggregation unrollMultiBucket(InternalMultiBucketAggregat }); } else if (rolled instanceof StringTerms) { return unrollMultiBucket(rolled, original, currentTree, (bucket, bucketCount, subAggs) -> { + + // Hide our `null_value` placeholder so it doesn't show up in the terms list + if (bucket.getKeyAsString().equals(Rollup.ROLLUP_NULL_VALUE)) { + return null; + } + BytesRef key = new BytesRef(bucket.getKeyAsString().getBytes(StandardCharsets.UTF_8)); assert bucketCount >= 0; //TODO expose getFormatter(), keyed upstream in Core diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java index 819a8dfa3fe9f..20b365b89d2e2 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; @@ -132,7 +133,8 @@ static void createIndex(RollupJob job, ActionListener rollupMeta = (Map)((Map) m).get(RollupField.ROLLUP_META); + + String stringVersion = (String)((Map) m).get(Rollup.ROLLUP_TEMPLATE_VERSION_FIELD); + if (stringVersion == null) { + logger.warn("Could not determine version of existing rollup metadata for index [" + indexName + "]"); + } + Version parsedVersion = Version.fromString(stringVersion); + if (parsedVersion.before(Version.V_6_4_0)) { + String msg = "Cannot create rollup job [" + job.getConfig().getId() + "] because the rollup index contains " + + "jobs from pre-6.4.0. The mappings for these jobs are not compatible with 6.4.0+. Please specify a new rollup " + + "index."; + logger.error(msg); + listener.onFailure(new ElasticsearchStatusException(msg, RestStatus.CONFLICT)); + return; + } + if (rollupMeta.get(job.getConfig().getId()) != null) { String msg = "Cannot create rollup job [" + job.getConfig().getId() + "] because job was previously created (existing metadata)."; diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java index e180e34c4cc26..8d0c598070489 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java @@ -45,7 +45,7 @@ class IndexerUtils { * @param rollupIndex The index that holds rollups for this job * @return A list of rolled documents derived from the response */ - static List processBuckets(CompositeAggregation agg, String rollupIndex, RollupJobStats stats, + static List processBuckets(CompositeAggregation agg, String rollupIndex, RollupJobStats stats, GroupConfig groupConfig, String jobId) { logger.debug("Buckets: [" + agg.getBuckets().size() + "][" + jobId + "]"); @@ -90,7 +90,10 @@ private static CRC32 processKeys(Map keys, Map d docID.update(Numbers.doubleToBytes((Double)v), 0, 8); } else if (k.endsWith("." + TermsAggregationBuilder.NAME)) { doc.put(k + "." + RollupField.VALUE, v); - if (v instanceof String) { + if (v == null) { + // Arbitrary value to update the doc ID with for nulls + docID.update(19); + } else if (v instanceof String) { byte[] vs = ((String) v).getBytes(StandardCharsets.UTF_8); docID.update(vs, 0, vs.length); } else if (v instanceof Long) { diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java index 3ba3611293fdc..9ddc98a69e062 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java @@ -401,6 +401,7 @@ private CompositeAggregationBuilder createCompositeBuilder(RollupJobConfig confi composite.setMetaData(metadata); } composite.size(config.getPageSize()); + return composite; } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java index 7b03d8e8d038d..28ceef5537571 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java @@ -1082,6 +1082,56 @@ public void testStringTerms() throws IOException { assertThat(unrolled.toString(), not(equalTo(responses.get(1).toString()))); } + public void testStringTermsNullValuePlaceholder() throws IOException { + TermsAggregationBuilder nonRollupTerms = new TermsAggregationBuilder("terms", ValueType.STRING) + .field("stringField"); + + TermsAggregationBuilder rollupTerms = new TermsAggregationBuilder("terms", ValueType.STRING) + .field("stringfield.terms." + RollupField.VALUE) + .subAggregation(new SumAggregationBuilder("terms." + COUNT_FIELD) + .field("stringfield.terms." + RollupField.COUNT_FIELD)); + + KeywordFieldMapper.Builder nrBuilder = new KeywordFieldMapper.Builder("terms"); + KeywordFieldMapper.KeywordFieldType nrFTterm = nrBuilder.fieldType(); + nrFTterm.setHasDocValues(true); + nrFTterm.setName(nonRollupTerms.field()); + + KeywordFieldMapper.Builder rBuilder = new KeywordFieldMapper.Builder("terms"); + KeywordFieldMapper.KeywordFieldType rFTterm = rBuilder.fieldType(); + rFTterm.setHasDocValues(true); + rFTterm.setName(rollupTerms.field()); + + NumberFieldMapper.Builder valueBuilder = new NumberFieldMapper.Builder("terms." + RollupField.COUNT_FIELD, + NumberFieldMapper.NumberType.LONG); + MappedFieldType rFTvalue = valueBuilder.fieldType(); + rFTvalue.setHasDocValues(true); + rFTvalue.setName("stringfield.terms." + RollupField.COUNT_FIELD); + + List responses = doQueries(new MatchAllDocsQuery(), + iw -> { + iw.addDocument(stringValueDoc("abc")); + iw.addDocument(stringValueDoc("abc")); + iw.addDocument(stringValueDoc("abc")); + + // off target + Document doc = new Document(); + doc.add(new SortedSetDocValuesField("otherField", new BytesRef("other"))); + iw.addDocument(doc); + }, nonRollupTerms, + iw -> { + iw.addDocument(stringValueRollupDoc("abc", 3)); + // off target + iw.addDocument(stringValueRollupDoc(Rollup.ROLLUP_NULL_VALUE, 1)); + }, rollupTerms, + new MappedFieldType[]{nrFTterm}, new MappedFieldType[]{rFTterm, rFTvalue}); + + InternalAggregation unrolled = RollupResponseTranslator.unrollAgg(responses.get(1), null, null, 0); + + // The null_value placeholder should be removed from the response and not visible here + assertThat(unrolled.toString(), equalTo(responses.get(0).toString())); + assertThat(unrolled.toString(), not(equalTo(responses.get(1).toString()))); + } + public void testLongTerms() throws IOException { TermsAggregationBuilder nonRollupTerms = new TermsAggregationBuilder("terms", ValueType.LONG) .field("longField"); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java index 64cf9d2e3fe21..fa7415e042d4c 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java @@ -28,9 +28,12 @@ import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; +import org.elasticsearch.xpack.rollup.Rollup; import org.mockito.ArgumentCaptor; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -203,6 +206,43 @@ public void testNoMetadataInMapping() { verify(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), any()); } + public void testIncompatibleMappingVersion() { + RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap()); + + ActionListener testListener = ActionListener.wrap(response -> { + fail("Listener success should not have been triggered."); + }, e -> { + assertThat(e.getMessage(), equalTo("Cannot create rollup job [foo] because the rollup index contains jobs from pre-6.4.0. " + + "The mappings for these jobs are not compatible with 6.4.0+. Please specify a new rollup index.")); + }); + + Logger logger = mock(Logger.class); + Client client = mock(Client.class); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ActionListener.class); + doAnswer(invocation -> { + GetMappingsResponse response = mock(GetMappingsResponse.class); + Map m = new HashMap<>(2); + m.put(Rollup.ROLLUP_TEMPLATE_VERSION_FIELD, Version.V_6_3_0); + m.put(RollupField.ROLLUP_META, + Collections.singletonMap(job.getConfig().getId(), job.getConfig())); + MappingMetaData meta = new MappingMetaData(RollupField.TYPE_NAME, + Collections.singletonMap("_meta", m)); + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(1); + builder.put(RollupField.TYPE_NAME, meta); + + ImmutableOpenMap.Builder> builder2 = ImmutableOpenMap.builder(1); + builder2.put(job.getConfig().getRollupIndex(), builder.build()); + + when(response.getMappings()).thenReturn(builder2.build()); + requestCaptor.getValue().onResponse(response); + return null; + }).when(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), requestCaptor.capture()); + + TransportPutRollupJobAction.updateMapping(job, testListener, mock(PersistentTasksService.class), client, logger); + verify(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), any()); + } + @SuppressWarnings("unchecked") public void testJobAlreadyInMapping() { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap()); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java index 53421faa9bc38..51ab5e71184ae 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.xpack.core.rollup.job.MetricConfig; import org.elasticsearch.xpack.core.rollup.job.RollupJobStats; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; +import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig; import org.joda.time.DateTime; import org.mockito.stubbing.Answer; @@ -359,6 +360,77 @@ public void testKeyOrdering() { assertThat(docs.get(0).id(), equalTo("1237859798")); } + public void testMissingBuckets() throws IOException { + String indexName = randomAlphaOfLengthBetween(1, 10); + RollupJobStats stats= new RollupJobStats(0, 0, 0, 0); + + String metricField = "metric_field"; + String valueField = "value_field"; + + Directory directory = newDirectory(); + RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory); + + int numDocs = 10; + + for (int i = 0; i < numDocs; i++) { + Document document = new Document(); + + // Every other doc omit the valueField, so that we get some null buckets + if (i % 2 == 0) { + document.add(new SortedNumericDocValuesField(valueField, i)); + document.add(new LongPoint(valueField, i)); + } + document.add(new SortedNumericDocValuesField(metricField, i)); + document.add(new LongPoint(metricField, i)); + indexWriter.addDocument(document); + } + + indexWriter.close(); + + IndexReader indexReader = DirectoryReader.open(directory); + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + valueFieldType.setName(valueField); + valueFieldType.setHasDocValues(true); + valueFieldType.setName(valueField); + + MappedFieldType metricFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + metricFieldType.setName(metricField); + metricFieldType.setHasDocValues(true); + metricFieldType.setName(metricField); + + // Setup the composite agg + TermsGroupConfig termsGroupConfig = new TermsGroupConfig.Builder().setFields(Collections.singletonList(valueField)).build(); + CompositeAggregationBuilder compositeBuilder = new CompositeAggregationBuilder(RollupIndexer.AGGREGATION_NAME, + termsGroupConfig.toBuilders()).size(numDocs*2); + + MetricConfig metricConfig = new MetricConfig.Builder().setField(metricField).setMetrics(Collections.singletonList("max")).build(); + metricConfig.toBuilders().forEach(compositeBuilder::subAggregation); + + Aggregator aggregator = createAggregator(compositeBuilder, indexSearcher, valueFieldType, metricFieldType); + aggregator.preCollection(); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + CompositeAggregation composite = (CompositeAggregation) aggregator.buildAggregation(0L); + indexReader.close(); + directory.close(); + + List docs = IndexerUtils.processBuckets(composite, indexName, stats, + ConfigTestHelpers.getGroupConfig().build(), "foo"); + + assertThat(docs.size(), equalTo(6)); + for (IndexRequest doc : docs) { + Map map = doc.sourceAsMap(); + Object value = map.get(valueField + "." + TermsAggregationBuilder.NAME + "." + RollupField.VALUE); + if (value == null) { + assertThat(map.get(valueField + "." + TermsAggregationBuilder.NAME + "." + RollupField.COUNT_FIELD), equalTo(5)); + } else { + assertThat(map.get(valueField + "." + TermsAggregationBuilder.NAME + "." + RollupField.COUNT_FIELD), equalTo(1)); + } + } + } + interface Mock { List getBuckets(); } From d29f681e16fe12d52979bf8b72a1c60b91765774 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Fri, 22 Jun 2018 17:39:31 +0000 Subject: [PATCH 2/6] Review cleanup --- .../en/rollup/understanding-groups.asciidoc | 2 +- .../elasticsearch/xpack/rollup/Rollup.java | 5 ++ .../rollup/RollupResponseTranslator.java | 2 + .../action/TransportPutRollupJobAction.java | 4 +- .../action/PutJobStateMachineTests.java | 53 +++++++++++++++++-- 5 files changed, 59 insertions(+), 7 deletions(-) diff --git a/x-pack/docs/en/rollup/understanding-groups.asciidoc b/x-pack/docs/en/rollup/understanding-groups.asciidoc index 064a501b769a8..a963f8b27de7d 100644 --- a/x-pack/docs/en/rollup/understanding-groups.asciidoc +++ b/x-pack/docs/en/rollup/understanding-groups.asciidoc @@ -123,7 +123,7 @@ if a field is useful for aggregating later, and how you might wish to use it (te There was previously a limitation in how Rollup could handle indices that had heterogeneous mappings (multiple, unrelated/non-overlapping mappings). The recommendation at the time was to configure a separate job per data "type". For example, you might configure a separate -job for each Beat module that you had enabled (one for `process`, another for `filesystem`, etc). +job for each Beats module that you had enabled (one for `process`, another for `filesystem`, etc). This recommendation was driven by internal implementation details that caused document counts to be potentially incorrect if a single "merged" job was used. diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index 48ea20d2bf76a..bfc32ec350cd1 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -85,7 +85,12 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin public static final String ROLLUP_TEMPLATE_VERSION_FIELD = "rollup-version"; public static final String ROLLUP_TEMPLATE_VERSION_PATTERN = Pattern.quote("${rollup.dynamic_template.version}"); + + // This is the value in the template that we look for and replace. Done this way so it can be easily configurable + // in the future, rather than hard-coding in the template now public static final String ROLLUP_NULL_VALUE_PLACEHOLDER = "ROLLUP_NULL_VALUE_PLACEHOLDER"; + + // This is the actual value that is inserted into the template, which will be used as the `null_value` public static final String ROLLUP_NULL_VALUE = "__ROLLUP_NULL_VALUE_PLACEHOLDER__"; private static final String ROLLUP_TEMPLATE_NAME = "/rollup-dynamic-template.json"; diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java index 466d3afc5a7f5..0f27423b27931 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java @@ -384,6 +384,8 @@ private static InternalAggregation unrollMultiBucket(InternalMultiBucketAggregat return unrollMultiBucket(rolled, original, currentTree, (bucket, bucketCount, subAggs) -> { // Hide our `null_value` placeholder so it doesn't show up in the terms list + // Note: this only applies to string terms right now, because we only configure a `null_value` + // on keywords in the template. Other fields won't have a `null_value` placeholder to replace if (bucket.getKeyAsString().equals(Rollup.ROLLUP_NULL_VALUE)) { return null; } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java index 20b365b89d2e2..0b8eb3182bfe5 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java @@ -181,7 +181,9 @@ static void updateMapping(RollupJob job, ActionListener) m).get(Rollup.ROLLUP_TEMPLATE_VERSION_FIELD); if (stringVersion == null) { - logger.warn("Could not determine version of existing rollup metadata for index [" + indexName + "]"); + listener.onFailure(new IllegalStateException("Could not determine version of existing rollup metadata for index [" + + indexName + "]")); + return; } Version parsedVersion = Version.fromString(stringVersion); if (parsedVersion.before(Version.V_6_4_0)) { diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java index fa7415e042d4c..7774a1d4bc262 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java @@ -206,6 +206,7 @@ public void testNoMetadataInMapping() { verify(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), any()); } + @SuppressWarnings("unchecked") public void testIncompatibleMappingVersion() { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap()); @@ -243,6 +244,43 @@ public void testIncompatibleMappingVersion() { verify(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), any()); } + @SuppressWarnings("unchecked") + public void testNoMappingVersion() { + RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap()); + + ActionListener testListener = ActionListener.wrap(response -> { + fail("Listener success should not have been triggered."); + }, e -> { + assertThat(e.getMessage(), equalTo("Could not determine version of existing rollup metadata for index [" + + job.getConfig().getRollupIndex() + "]")); + }); + + Logger logger = mock(Logger.class); + Client client = mock(Client.class); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ActionListener.class); + doAnswer(invocation -> { + GetMappingsResponse response = mock(GetMappingsResponse.class); + Map m = new HashMap<>(2); + m.put(RollupField.ROLLUP_META, + Collections.singletonMap(job.getConfig().getId(), job.getConfig())); + MappingMetaData meta = new MappingMetaData(RollupField.TYPE_NAME, + Collections.singletonMap("_meta", m)); + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(1); + builder.put(RollupField.TYPE_NAME, meta); + + ImmutableOpenMap.Builder> builder2 = ImmutableOpenMap.builder(1); + builder2.put(job.getConfig().getRollupIndex(), builder.build()); + + when(response.getMappings()).thenReturn(builder2.build()); + requestCaptor.getValue().onResponse(response); + return null; + }).when(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), requestCaptor.capture()); + + TransportPutRollupJobAction.updateMapping(job, testListener, mock(PersistentTasksService.class), client, logger); + verify(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), any()); + } + @SuppressWarnings("unchecked") public void testJobAlreadyInMapping() { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap()); @@ -259,10 +297,12 @@ public void testJobAlreadyInMapping() { ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ActionListener.class); doAnswer(invocation -> { GetMappingsResponse response = mock(GetMappingsResponse.class); + Map m = new HashMap<>(2); + m.put(Rollup.ROLLUP_TEMPLATE_VERSION_FIELD, Version.V_6_4_0); + m.put(RollupField.ROLLUP_META, + Collections.singletonMap(job.getConfig().getId(), job.getConfig())); MappingMetaData meta = new MappingMetaData(RollupField.TYPE_NAME, - Collections.singletonMap("_meta", - Collections.singletonMap(RollupField.ROLLUP_META, - Collections.singletonMap(job.getConfig().getId(), job.getConfig())))); + Collections.singletonMap("_meta", m)); ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(1); builder.put(RollupField.TYPE_NAME, meta); @@ -298,9 +338,12 @@ public void testAddJobToMapping() { ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ActionListener.class); doAnswer(invocation -> { GetMappingsResponse response = mock(GetMappingsResponse.class); + Map m = new HashMap<>(2); + m.put(Rollup.ROLLUP_TEMPLATE_VERSION_FIELD, Version.V_6_4_0); + m.put(RollupField.ROLLUP_META, + Collections.singletonMap(unrelatedJob.getId(), unrelatedJob)); MappingMetaData meta = new MappingMetaData(RollupField.TYPE_NAME, - Collections.singletonMap("_meta", Collections.singletonMap(RollupField.ROLLUP_META, - Collections.singletonMap(unrelatedJob.getId(), unrelatedJob)))); + Collections.singletonMap("_meta", m)); ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(1); builder.put(RollupField.TYPE_NAME, meta); From d75d2b0c71eb5557f948da8ca92fe1471b58f171 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Wed, 27 Jun 2018 20:22:06 +0000 Subject: [PATCH 3/6] Add upgrade/restart tests --- .../xpack/restart/FullClusterRestartIT.java | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java index 5276abdbfb1d8..3048238095444 100644 --- a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java +++ b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java @@ -321,6 +321,125 @@ public void testRollupAfterRestart() throws Exception { } } + /** + * Tests that a RollUp job created on a old cluster is correctly restarted after the upgrade. + */ + public void testRollupIndexIncompatibilityAfterRestart() throws Exception { + assumeTrue("Rollup 6.3 index not compatible with 6.4+", + oldClusterVersion.onOrAfter(Version.V_6_3_0) && oldClusterVersion.before(Version.V_6_4_0)); + + if (runningAgainstOldCluster) { + final int numDocs = 59; + final int year = randomIntBetween(1970, 2018); + + // index documents for the rollup job + final StringBuilder bulk = new StringBuilder(); + for (int i = 0; i < numDocs; i++) { + bulk.append("{\"index\":{\"_index\":\"rollup-docs\",\"_type\":\"doc\"}}\n"); + String date = String.format(Locale.ROOT, "%04d-01-01T00:%02d:00Z", year, i); + bulk.append("{\"timestamp\":\"").append(date).append("\",\"value\":").append(i).append("}\n"); + } + bulk.append("\r\n"); + + final Request bulkRequest = new Request("POST", "/_bulk"); + bulkRequest.setJsonEntity(bulk.toString()); + client().performRequest(bulkRequest); + + // create the rollup job + final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-job-test"); + createRollupJobRequest.setJsonEntity("{" + + "\"index_pattern\":\"rollup-*\"," + + "\"rollup_index\":\"results-rollup\"," + + "\"cron\":\"*/30 * * * * ?\"," + + "\"page_size\":100," + + "\"groups\":{" + + " \"date_histogram\":{" + + " \"field\":\"timestamp\"," + + " \"interval\":\"5m\"" + + " }" + + "}," + + "\"metrics\":[" + + " {\"field\":\"value\",\"metrics\":[\"min\",\"max\",\"sum\"]}" + + "]" + + "}"); + + Map createRollupJobResponse = toMap(client().performRequest(createRollupJobRequest)); + assertThat(createRollupJobResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + // start the rollup job + final Request startRollupJobRequest = new Request("POST", "_xpack/rollup/job/rollup-job-test/_start"); + Map startRollupJobResponse = toMap(client().performRequest(startRollupJobRequest)); + assertThat(startRollupJobResponse.get("started"), equalTo(Boolean.TRUE)); + + assertRollUpJob("rollup-job-test"); + + } else { + + final Request clusterHealthRequest = new Request("GET", "/_cluster/health"); + clusterHealthRequest.addParameter("wait_for_status", "yellow"); + clusterHealthRequest.addParameter("wait_for_no_relocating_shards", "true"); + if (oldClusterVersion.onOrAfter(Version.V_6_2_0)) { + clusterHealthRequest.addParameter("wait_for_no_initializing_shards", "true"); + } + Map clusterHealthResponse = toMap(client().performRequest(clusterHealthRequest)); + assertThat(clusterHealthResponse.get("timed_out"), equalTo(Boolean.FALSE)); + + assertRollUpJob("rollup-job-test"); + + // Attempt to create a new rollup job in old index, should fail + final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-job-test2"); + createRollupJobRequest.setJsonEntity("{" + + "\"index_pattern\":\"rollup-*\"," + + "\"rollup_index\":\"results-rollup\"," + + "\"cron\":\"*/30 * * * * ?\"," + + "\"page_size\":100," + + "\"groups\":{" + + " \"date_histogram\":{" + + " \"field\":\"timestamp\"," + + " \"interval\":\"5m\"" + + " }" + + "}," + + "\"metrics\":[" + + " {\"field\":\"value\",\"metrics\":[\"min\",\"max\",\"sum\"]}" + + "]" + + "}"); + + ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(createRollupJobRequest)); + assertThat(EntityUtils.toString(e.getResponse().getEntity()), + containsString("Cannot create rollup job [rollup-job-test2] because the rollup index contains jobs from pre-6.4.0. " + + "The mappings for these jobs are not compatible with 6.4.0+. Please specify a new rollup index.")); + + + // Now try again with a new index, should work + final Request createRollupJobRequest2 = new Request("PUT", "/_xpack/rollup/job/rollup-job-test2"); + createRollupJobRequest2.setJsonEntity("{" + + "\"index_pattern\":\"rollup-*\"," + + "\"rollup_index\":\"results-rollup2\"," + + "\"cron\":\"*/30 * * * * ?\"," + + "\"page_size\":100," + + "\"groups\":{" + + " \"date_histogram\":{" + + " \"field\":\"timestamp\"," + + " \"interval\":\"5m\"" + + " }" + + "}," + + "\"metrics\":[" + + " {\"field\":\"value\",\"metrics\":[\"min\",\"max\",\"sum\"]}" + + "]" + + "}"); + + Map createRollupJobResponse = toMap(client().performRequest(createRollupJobRequest2)); + assertThat(createRollupJobResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + // start the rollup job + final Request startRollupJobRequest = new Request("POST", "_xpack/rollup/job/rollup-job-test2/_start"); + Map startRollupJobResponse = toMap(client().performRequest(startRollupJobRequest)); + assertThat(startRollupJobResponse.get("started"), equalTo(Boolean.TRUE)); + + assertRollUpJob("rollup-job-test"); + } + } + public void testSqlFailsOnIndexWithTwoTypes() throws IOException { // TODO this isn't going to trigger until we backport to 6.1 assumeTrue("It is only possible to build an index that sql doesn't like before 6.0.0", From bae94c5fd698751b406856ecd8f6136406c95d42 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Mon, 2 Jul 2018 17:24:45 -0400 Subject: [PATCH 4/6] Make Restart tests play nicely with multiple jobs --- .../rollup/RollupRestTestStateCleaner.java | 24 ++--- .../xpack/test/rest/XPackRestIT.java | 2 +- .../xpack/restart/FullClusterRestartIT.java | 88 ++++++++++++++----- 3 files changed, 74 insertions(+), 40 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/RollupRestTestStateCleaner.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/RollupRestTestStateCleaner.java index 9938f3a41962b..ae171f138cf46 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/RollupRestTestStateCleaner.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/RollupRestTestStateCleaner.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.core.rollup; import org.apache.http.HttpStatus; -import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -27,21 +26,13 @@ public class RollupRestTestStateCleaner { - private final Logger logger; - private final RestClient adminClient; - - public RollupRestTestStateCleaner(Logger logger, RestClient adminClient) { - this.logger = logger; - this.adminClient = adminClient; - } - - public void clearRollupMetadata() throws Exception { - deleteAllJobs(); - waitForPendingTasks(); + public static void clearRollupMetadata(RestClient adminClient) throws Exception { + deleteAllJobs(adminClient); + waitForPendingTasks(adminClient); // indices will be deleted by the ESRestTestCase class } - private void waitForPendingTasks() throws Exception { + private static void waitForPendingTasks(RestClient adminClient) throws Exception { ESTestCase.assertBusy(() -> { try { Response response = adminClient.performRequest("GET", "/_cat/tasks", @@ -71,7 +62,7 @@ private void waitForPendingTasks() throws Exception { } @SuppressWarnings("unchecked") - private void deleteAllJobs() throws Exception { + private static void deleteAllJobs(RestClient adminClient) throws Exception { Response response = adminClient.performRequest("GET", "/_xpack/rollup/job/_all"); Map jobs = ESRestTestCase.entityAsMap(response); @SuppressWarnings("unchecked") @@ -83,9 +74,7 @@ private void deleteAllJobs() throws Exception { } for (Map jobConfig : jobConfigs) { - logger.debug(jobConfig); String jobId = (String) ((Map) jobConfig.get("config")).get("id"); - logger.debug("Deleting job " + jobId); try { response = adminClient.performRequest("DELETE", "/_xpack/rollup/job/" + jobId); } catch (Exception e) { @@ -95,7 +84,8 @@ private void deleteAllJobs() throws Exception { } private static String responseEntityToString(Response response) throws Exception { - try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), + StandardCharsets.UTF_8))) { return reader.lines().collect(Collectors.joining("\n")); } } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java index 412c75f0e639c..f1d9eb1fb3f24 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java @@ -263,7 +263,7 @@ private void clearMlState() throws Exception { */ private void clearRollupState() throws Exception { if (isRollupTest()) { - new RollupRestTestStateCleaner(logger, adminClient()).clearRollupMetadata(); + RollupRestTestStateCleaner.clearRollupMetadata(adminClient()); } } diff --git a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java index 3048238095444..42d462417e309 100644 --- a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java +++ b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.StreamsUtils; import org.elasticsearch.test.rest.ESRestTestCase; @@ -346,10 +347,10 @@ public void testRollupIndexIncompatibilityAfterRestart() throws Exception { client().performRequest(bulkRequest); // create the rollup job - final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-job-test"); + final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-incompat-test"); createRollupJobRequest.setJsonEntity("{" + "\"index_pattern\":\"rollup-*\"," - + "\"rollup_index\":\"results-rollup\"," + + "\"rollup_index\":\"results-rollup-incompat-test\"," + "\"cron\":\"*/30 * * * * ?\"," + "\"page_size\":100," + "\"groups\":{" @@ -367,11 +368,11 @@ public void testRollupIndexIncompatibilityAfterRestart() throws Exception { assertThat(createRollupJobResponse.get("acknowledged"), equalTo(Boolean.TRUE)); // start the rollup job - final Request startRollupJobRequest = new Request("POST", "_xpack/rollup/job/rollup-job-test/_start"); + final Request startRollupJobRequest = new Request("POST", "_xpack/rollup/job/rollup-incompat-test/_start"); Map startRollupJobResponse = toMap(client().performRequest(startRollupJobRequest)); assertThat(startRollupJobResponse.get("started"), equalTo(Boolean.TRUE)); - assertRollUpJob("rollup-job-test"); + assertRollUpJob("rollup-incompat-test"); } else { @@ -384,13 +385,13 @@ public void testRollupIndexIncompatibilityAfterRestart() throws Exception { Map clusterHealthResponse = toMap(client().performRequest(clusterHealthRequest)); assertThat(clusterHealthResponse.get("timed_out"), equalTo(Boolean.FALSE)); - assertRollUpJob("rollup-job-test"); + assertRollUpJob("rollup-incompat-test"); // Attempt to create a new rollup job in old index, should fail - final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-job-test2"); + final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-incompat-test2"); createRollupJobRequest.setJsonEntity("{" + "\"index_pattern\":\"rollup-*\"," - + "\"rollup_index\":\"results-rollup\"," + + "\"rollup_index\":\"results-rollup-incompat-test\"," + "\"cron\":\"*/30 * * * * ?\"," + "\"page_size\":100," + "\"groups\":{" @@ -406,15 +407,16 @@ public void testRollupIndexIncompatibilityAfterRestart() throws Exception { ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(createRollupJobRequest)); assertThat(EntityUtils.toString(e.getResponse().getEntity()), - containsString("Cannot create rollup job [rollup-job-test2] because the rollup index contains jobs from pre-6.4.0. " + - "The mappings for these jobs are not compatible with 6.4.0+. Please specify a new rollup index.")); + containsString("Cannot create rollup job [rollup-incompat-test2] because the rollup index contains " + + "jobs from pre-6.4.0. The mappings for these jobs are not compatible with 6.4.0+. " + + "Please specify a new rollup index.")); // Now try again with a new index, should work - final Request createRollupJobRequest2 = new Request("PUT", "/_xpack/rollup/job/rollup-job-test2"); + final Request createRollupJobRequest2 = new Request("PUT", "/_xpack/rollup/job/rollup-incompat-test2"); createRollupJobRequest2.setJsonEntity("{" + "\"index_pattern\":\"rollup-*\"," - + "\"rollup_index\":\"results-rollup2\"," + + "\"rollup_index\":\"results-rollup-incompat-test2\"," + "\"cron\":\"*/30 * * * * ?\"," + "\"page_size\":100," + "\"groups\":{" @@ -432,11 +434,11 @@ public void testRollupIndexIncompatibilityAfterRestart() throws Exception { assertThat(createRollupJobResponse.get("acknowledged"), equalTo(Boolean.TRUE)); // start the rollup job - final Request startRollupJobRequest = new Request("POST", "_xpack/rollup/job/rollup-job-test2/_start"); + final Request startRollupJobRequest = new Request("POST", "_xpack/rollup/job/rollup-incompat-test2/_start"); Map startRollupJobResponse = toMap(client().performRequest(startRollupJobRequest)); assertThat(startRollupJobResponse.get("started"), equalTo(Boolean.TRUE)); - assertRollUpJob("rollup-job-test"); + assertRollUpJob("rollup-incompat-test2"); } } @@ -650,7 +652,10 @@ private void assertRollUpJob(final String rollupJob) throws Exception { // check that the rollup job is started using the RollUp API final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/" + rollupJob); Map getRollupJobResponse = toMap(client().performRequest(getRollupJobRequest)); - assertThat(ObjectPath.eval("jobs.0.status.job_state", getRollupJobResponse), expectedStates); + Map job = getJob(getRollupJobResponse, rollupJob); + if (job != null) { + assertThat(ObjectPath.eval("status.job_state", job), expectedStates); + } // check that the rollup job is started using the Tasks API final Request taskRequest = new Request("GET", "_tasks"); @@ -666,15 +671,27 @@ private void assertRollUpJob(final String rollupJob) throws Exception { // check that the rollup job is started using the Cluster State API final Request clusterStateRequest = new Request("GET", "_cluster/state/metadata"); Map clusterStateResponse = toMap(client().performRequest(clusterStateRequest)); - Map rollupJobTask = ObjectPath.eval("metadata.persistent_tasks.tasks.0", clusterStateResponse); - assertThat(ObjectPath.eval("id", rollupJobTask), equalTo("rollup-job-test")); + List> rollupJobTasks = ObjectPath.eval("metadata.persistent_tasks.tasks", clusterStateResponse); - // Persistent task state field has been renamed in 6.4.0 from "status" to "state" - final String stateFieldName = (runningAgainstOldCluster && oldClusterVersion.before(Version.V_6_4_0)) ? "status" : "state"; + boolean hasRollupTask = false; + for (Map task : rollupJobTasks) { + if (ObjectPath.eval("id", task).equals(rollupJob)) { + hasRollupTask = true; + + // Persistent task state field has been renamed in 6.4.0 from "status" to "state" + final String stateFieldName + = (runningAgainstOldCluster && oldClusterVersion.before(Version.V_6_4_0)) ? "status" : "state"; + + final String jobStateField = "task.xpack/rollup/job." + stateFieldName + ".job_state"; + assertThat("Expected field [" + jobStateField + "] to be started or indexing in " + task.get("id"), + ObjectPath.eval(jobStateField, task), expectedStates); + break; + } + } + if (hasRollupTask == false) { + fail("Expected persistent task for [" + rollupJob + "] but none found."); + } - final String jobStateField = "task.xpack/rollup/job." + stateFieldName + ".job_state"; - assertThat("Expected field [" + jobStateField + "] to be started or indexing in " + rollupJobTask, - ObjectPath.eval(jobStateField, rollupJobTask), expectedStates); } private void waitForRollUpJob(final String rollupJob, final Matcher expectedStates) throws Exception { @@ -682,7 +699,34 @@ private void waitForRollUpJob(final String rollupJob, final Matcher expectedS final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/" + rollupJob); Response getRollupJobResponse = client().performRequest(getRollupJobRequest); assertThat(getRollupJobResponse.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); - assertThat(ObjectPath.eval("jobs.0.status.job_state", toMap(getRollupJobResponse)), expectedStates); + + Map job = getJob(getRollupJobResponse, rollupJob); + if (job != null) { + assertThat(ObjectPath.eval("status.job_state", job), expectedStates); + } }, 30L, TimeUnit.SECONDS); } + + private Map getJob(Response response, String targetJobId) throws IOException { + return getJob(ESRestTestCase.entityAsMap(response), targetJobId); + } + + @SuppressWarnings("unchecked") + private Map getJob(Map jobsMap, String targetJobId) throws IOException { + + List> jobs = + (List>) XContentMapValues.extractValue("jobs", jobsMap); + + if (jobs == null) { + return null; + } + + for (Map job : jobs) { + String jobId = (String) ((Map) job.get("config")).get("id"); + if (jobId.equals(targetJobId)) { + return job; + } + } + return null; + } } From e43a9755df9e0932c5aa62d6d2813fb77a60cd81 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Mon, 9 Jul 2018 14:37:49 -0400 Subject: [PATCH 5/6] Review: no need for `null_value` Which also means no need for compatibility check or restart test. Non-keyword keys needed a null check for doc ID generation however, plus a test for null keys. --- .../resources/rollup-dynamic-template.json | 3 +- .../elasticsearch/xpack/rollup/Rollup.java | 7 - .../rollup/RollupResponseTranslator.java | 7 - .../action/TransportPutRollupJobAction.java | 13 +- .../xpack/rollup/job/IndexerUtils.java | 14 +- .../RollupResponseTranslationTests.java | 4 +- .../action/PutJobStateMachineTests.java | 38 ------ .../xpack/rollup/job/IndexerUtilsTests.java | 35 ++++- .../xpack/restart/FullClusterRestartIT.java | 120 ------------------ 9 files changed, 48 insertions(+), 193 deletions(-) diff --git a/x-pack/plugin/core/src/main/resources/rollup-dynamic-template.json b/x-pack/plugin/core/src/main/resources/rollup-dynamic-template.json index 6c2a1ee15bed5..94336c60c4d68 100644 --- a/x-pack/plugin/core/src/main/resources/rollup-dynamic-template.json +++ b/x-pack/plugin/core/src/main/resources/rollup-dynamic-template.json @@ -10,8 +10,7 @@ "strings": { "match_mapping_type": "string", "mapping": { - "type": "keyword", - "null_value": "ROLLUP_NULL_VALUE_PLACEHOLDER" + "type": "keyword" } } }, diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index bfc32ec350cd1..cc24a0b4ab944 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -86,13 +86,6 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin public static final String ROLLUP_TEMPLATE_VERSION_PATTERN = Pattern.quote("${rollup.dynamic_template.version}"); - // This is the value in the template that we look for and replace. Done this way so it can be easily configurable - // in the future, rather than hard-coding in the template now - public static final String ROLLUP_NULL_VALUE_PLACEHOLDER = "ROLLUP_NULL_VALUE_PLACEHOLDER"; - - // This is the actual value that is inserted into the template, which will be used as the `null_value` - public static final String ROLLUP_NULL_VALUE = "__ROLLUP_NULL_VALUE_PLACEHOLDER__"; - private static final String ROLLUP_TEMPLATE_NAME = "/rollup-dynamic-template.json"; public static final String DYNAMIC_MAPPING_TEMPLATE = TemplateUtils.loadTemplate(ROLLUP_TEMPLATE_NAME, Version.CURRENT.toString(), Rollup.ROLLUP_TEMPLATE_VERSION_PATTERN); diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java index 0f27423b27931..4042e98ef93fb 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java @@ -383,13 +383,6 @@ private static InternalAggregation unrollMultiBucket(InternalMultiBucketAggregat } else if (rolled instanceof StringTerms) { return unrollMultiBucket(rolled, original, currentTree, (bucket, bucketCount, subAggs) -> { - // Hide our `null_value` placeholder so it doesn't show up in the terms list - // Note: this only applies to string terms right now, because we only configure a `null_value` - // on keywords in the template. Other fields won't have a `null_value` placeholder to replace - if (bucket.getKeyAsString().equals(Rollup.ROLLUP_NULL_VALUE)) { - return null; - } - BytesRef key = new BytesRef(bucket.getKeyAsString().getBytes(StandardCharsets.UTF_8)); assert bucketCount >= 0; //TODO expose getFormatter(), keyed upstream in Core diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java index 5de175f6cf59d..889dfa3ac8efc 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java @@ -9,7 +9,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; @@ -130,8 +129,7 @@ static void createIndex(RollupJob job, ActionListener keys, Map d doc.put(k + "." + RollupField.TIMESTAMP, v); doc.put(k + "." + RollupField.INTERVAL, groupConfig.getDateHisto().getInterval()); doc.put(k + "." + DateHistoGroupConfig.TIME_ZONE, groupConfig.getDateHisto().getTimeZone().toString()); - docID.update(Numbers.longToBytes((Long)v), 0, 8); + if (v == null) { + // Arbitrary value to update the doc ID with for nulls + docID.update(19); + } else { + docID.update(Numbers.longToBytes((Long)v), 0, 8); + } } else if (k.endsWith("." + HistogramAggregationBuilder.NAME)) { doc.put(k + "." + RollupField.VALUE, v); doc.put(k + "." + RollupField.INTERVAL, groupConfig.getHisto().getInterval()); - docID.update(Numbers.doubleToBytes((Double)v), 0, 8); + if (v == null) { + // Arbitrary value to update the doc ID with for nulls + docID.update(19); + } else { + docID.update(Numbers.doubleToBytes((Double) v), 0, 8); + } } else if (k.endsWith("." + TermsAggregationBuilder.NAME)) { doc.put(k + "." + RollupField.VALUE, v); if (v == null) { diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java index 28ceef5537571..98e3ad8197a51 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java @@ -1082,7 +1082,7 @@ public void testStringTerms() throws IOException { assertThat(unrolled.toString(), not(equalTo(responses.get(1).toString()))); } - public void testStringTermsNullValuePlaceholder() throws IOException { + public void testStringTermsNullValue() throws IOException { TermsAggregationBuilder nonRollupTerms = new TermsAggregationBuilder("terms", ValueType.STRING) .field("stringField"); @@ -1120,8 +1120,6 @@ public void testStringTermsNullValuePlaceholder() throws IOException { }, nonRollupTerms, iw -> { iw.addDocument(stringValueRollupDoc("abc", 3)); - // off target - iw.addDocument(stringValueRollupDoc(Rollup.ROLLUP_NULL_VALUE, 1)); }, rollupTerms, new MappedFieldType[]{nrFTterm}, new MappedFieldType[]{rFTterm, rFTvalue}); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java index 7774a1d4bc262..58fa9d4533bc3 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java @@ -206,44 +206,6 @@ public void testNoMetadataInMapping() { verify(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), any()); } - @SuppressWarnings("unchecked") - public void testIncompatibleMappingVersion() { - RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap()); - - ActionListener testListener = ActionListener.wrap(response -> { - fail("Listener success should not have been triggered."); - }, e -> { - assertThat(e.getMessage(), equalTo("Cannot create rollup job [foo] because the rollup index contains jobs from pre-6.4.0. " + - "The mappings for these jobs are not compatible with 6.4.0+. Please specify a new rollup index.")); - }); - - Logger logger = mock(Logger.class); - Client client = mock(Client.class); - - ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ActionListener.class); - doAnswer(invocation -> { - GetMappingsResponse response = mock(GetMappingsResponse.class); - Map m = new HashMap<>(2); - m.put(Rollup.ROLLUP_TEMPLATE_VERSION_FIELD, Version.V_6_3_0); - m.put(RollupField.ROLLUP_META, - Collections.singletonMap(job.getConfig().getId(), job.getConfig())); - MappingMetaData meta = new MappingMetaData(RollupField.TYPE_NAME, - Collections.singletonMap("_meta", m)); - ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(1); - builder.put(RollupField.TYPE_NAME, meta); - - ImmutableOpenMap.Builder> builder2 = ImmutableOpenMap.builder(1); - builder2.put(job.getConfig().getRollupIndex(), builder.build()); - - when(response.getMappings()).thenReturn(builder2.build()); - requestCaptor.getValue().onResponse(response); - return null; - }).when(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), requestCaptor.capture()); - - TransportPutRollupJobAction.updateMapping(job, testListener, mock(PersistentTasksService.class), client, logger); - verify(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), any()); - } - @SuppressWarnings("unchecked") public void testNoMappingVersion() { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap()); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java index 51ab5e71184ae..93234d6647c10 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java @@ -15,6 +15,7 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.store.Directory; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.Strings; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; @@ -33,12 +34,12 @@ import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; +import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.job.DateHistoGroupConfig; import org.elasticsearch.xpack.core.rollup.job.GroupConfig; import org.elasticsearch.xpack.core.rollup.job.MetricConfig; import org.elasticsearch.xpack.core.rollup.job.RollupJobStats; -import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig; import org.joda.time.DateTime; import org.mockito.stubbing.Answer; @@ -51,8 +52,8 @@ import java.util.List; import java.util.Map; -import static org.mockito.Mockito.mock; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class IndexerUtilsTests extends AggregatorTestCase { @@ -360,6 +361,36 @@ public void testKeyOrdering() { assertThat(docs.get(0).id(), equalTo("1237859798")); } + public void testNullKeys() { + CompositeAggregation composite = mock(CompositeAggregation.class); + + when(composite.getBuckets()).thenAnswer((Answer>) invocationOnMock -> { + List foos = new ArrayList<>(); + + CompositeAggregation.Bucket bucket = mock(CompositeAggregation.Bucket.class); + LinkedHashMap keys = new LinkedHashMap<>(3); + keys.put("foo.date_histogram", null); + keys.put("bar.terms", null); + keys.put("abc.histogram", null); + when(bucket.getKey()).thenReturn(keys); + + Aggregations aggs = new Aggregations(Collections.emptyList()); + when(bucket.getAggregations()).thenReturn(aggs); + when(bucket.getDocCount()).thenReturn(1L); + + foos.add(bucket); + + return foos; + }); + + GroupConfig.Builder groupConfig = ConfigTestHelpers.getGroupConfig(); + groupConfig.setHisto(ConfigTestHelpers.getHisto().setFields(Collections.singletonList("abc")).build()); + + List docs = IndexerUtils.processBuckets(composite, "foo", new RollupJobStats(), groupConfig.build(), "foo"); + assertThat(docs.size(), equalTo(1)); + assertFalse(Strings.isNullOrEmpty(docs.get(0).id())); + } + public void testMissingBuckets() throws IOException { String indexName = randomAlphaOfLengthBetween(1, 10); RollupJobStats stats= new RollupJobStats(0, 0, 0, 0); diff --git a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java index 42d462417e309..ba6f9e9167821 100644 --- a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java +++ b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java @@ -322,126 +322,6 @@ public void testRollupAfterRestart() throws Exception { } } - /** - * Tests that a RollUp job created on a old cluster is correctly restarted after the upgrade. - */ - public void testRollupIndexIncompatibilityAfterRestart() throws Exception { - assumeTrue("Rollup 6.3 index not compatible with 6.4+", - oldClusterVersion.onOrAfter(Version.V_6_3_0) && oldClusterVersion.before(Version.V_6_4_0)); - - if (runningAgainstOldCluster) { - final int numDocs = 59; - final int year = randomIntBetween(1970, 2018); - - // index documents for the rollup job - final StringBuilder bulk = new StringBuilder(); - for (int i = 0; i < numDocs; i++) { - bulk.append("{\"index\":{\"_index\":\"rollup-docs\",\"_type\":\"doc\"}}\n"); - String date = String.format(Locale.ROOT, "%04d-01-01T00:%02d:00Z", year, i); - bulk.append("{\"timestamp\":\"").append(date).append("\",\"value\":").append(i).append("}\n"); - } - bulk.append("\r\n"); - - final Request bulkRequest = new Request("POST", "/_bulk"); - bulkRequest.setJsonEntity(bulk.toString()); - client().performRequest(bulkRequest); - - // create the rollup job - final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-incompat-test"); - createRollupJobRequest.setJsonEntity("{" - + "\"index_pattern\":\"rollup-*\"," - + "\"rollup_index\":\"results-rollup-incompat-test\"," - + "\"cron\":\"*/30 * * * * ?\"," - + "\"page_size\":100," - + "\"groups\":{" - + " \"date_histogram\":{" - + " \"field\":\"timestamp\"," - + " \"interval\":\"5m\"" - + " }" - + "}," - + "\"metrics\":[" - + " {\"field\":\"value\",\"metrics\":[\"min\",\"max\",\"sum\"]}" - + "]" - + "}"); - - Map createRollupJobResponse = toMap(client().performRequest(createRollupJobRequest)); - assertThat(createRollupJobResponse.get("acknowledged"), equalTo(Boolean.TRUE)); - - // start the rollup job - final Request startRollupJobRequest = new Request("POST", "_xpack/rollup/job/rollup-incompat-test/_start"); - Map startRollupJobResponse = toMap(client().performRequest(startRollupJobRequest)); - assertThat(startRollupJobResponse.get("started"), equalTo(Boolean.TRUE)); - - assertRollUpJob("rollup-incompat-test"); - - } else { - - final Request clusterHealthRequest = new Request("GET", "/_cluster/health"); - clusterHealthRequest.addParameter("wait_for_status", "yellow"); - clusterHealthRequest.addParameter("wait_for_no_relocating_shards", "true"); - if (oldClusterVersion.onOrAfter(Version.V_6_2_0)) { - clusterHealthRequest.addParameter("wait_for_no_initializing_shards", "true"); - } - Map clusterHealthResponse = toMap(client().performRequest(clusterHealthRequest)); - assertThat(clusterHealthResponse.get("timed_out"), equalTo(Boolean.FALSE)); - - assertRollUpJob("rollup-incompat-test"); - - // Attempt to create a new rollup job in old index, should fail - final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-incompat-test2"); - createRollupJobRequest.setJsonEntity("{" - + "\"index_pattern\":\"rollup-*\"," - + "\"rollup_index\":\"results-rollup-incompat-test\"," - + "\"cron\":\"*/30 * * * * ?\"," - + "\"page_size\":100," - + "\"groups\":{" - + " \"date_histogram\":{" - + " \"field\":\"timestamp\"," - + " \"interval\":\"5m\"" - + " }" - + "}," - + "\"metrics\":[" - + " {\"field\":\"value\",\"metrics\":[\"min\",\"max\",\"sum\"]}" - + "]" - + "}"); - - ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(createRollupJobRequest)); - assertThat(EntityUtils.toString(e.getResponse().getEntity()), - containsString("Cannot create rollup job [rollup-incompat-test2] because the rollup index contains " + - "jobs from pre-6.4.0. The mappings for these jobs are not compatible with 6.4.0+. " + - "Please specify a new rollup index.")); - - - // Now try again with a new index, should work - final Request createRollupJobRequest2 = new Request("PUT", "/_xpack/rollup/job/rollup-incompat-test2"); - createRollupJobRequest2.setJsonEntity("{" - + "\"index_pattern\":\"rollup-*\"," - + "\"rollup_index\":\"results-rollup-incompat-test2\"," - + "\"cron\":\"*/30 * * * * ?\"," - + "\"page_size\":100," - + "\"groups\":{" - + " \"date_histogram\":{" - + " \"field\":\"timestamp\"," - + " \"interval\":\"5m\"" - + " }" - + "}," - + "\"metrics\":[" - + " {\"field\":\"value\",\"metrics\":[\"min\",\"max\",\"sum\"]}" - + "]" - + "}"); - - Map createRollupJobResponse = toMap(client().performRequest(createRollupJobRequest2)); - assertThat(createRollupJobResponse.get("acknowledged"), equalTo(Boolean.TRUE)); - - // start the rollup job - final Request startRollupJobRequest = new Request("POST", "_xpack/rollup/job/rollup-incompat-test2/_start"); - Map startRollupJobResponse = toMap(client().performRequest(startRollupJobRequest)); - assertThat(startRollupJobResponse.get("started"), equalTo(Boolean.TRUE)); - - assertRollUpJob("rollup-incompat-test2"); - } - } - public void testSqlFailsOnIndexWithTwoTypes() throws IOException { // TODO this isn't going to trigger until we backport to 6.1 assumeTrue("It is only possible to build an index that sql doesn't like before 6.0.0", From 38ed7270c28c2d048000dd55644a75eb83d84de9 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Wed, 11 Jul 2018 17:49:39 -0400 Subject: [PATCH 6/6] Review cleanup --- x-pack/docs/en/rollup/understanding-groups.asciidoc | 4 ---- .../xpack/core/rollup/job/DateHistoGroupConfig.java | 1 - .../org/elasticsearch/xpack/rollup/job/IndexerUtils.java | 8 ++------ .../elasticsearch/xpack/rollup/job/IndexerUtilsTests.java | 1 - 4 files changed, 2 insertions(+), 12 deletions(-) diff --git a/x-pack/docs/en/rollup/understanding-groups.asciidoc b/x-pack/docs/en/rollup/understanding-groups.asciidoc index a963f8b27de7d..803555b2d73f7 100644 --- a/x-pack/docs/en/rollup/understanding-groups.asciidoc +++ b/x-pack/docs/en/rollup/understanding-groups.asciidoc @@ -190,10 +190,6 @@ PUT _xpack/rollup/job/combined -------------------------------------------------- // NOTCONSOLE -The rollup job will automatically use a placeholder term (`__ROLLUP_NULL_VALUE_PLACEHOLDER__`) as the `null_value` for keyword fields, -which allows it to handle documents that may be missing some of the grouping fields. This placeholder is then removed from search -results, resulting in correct doc counts in a manner that is invisible to the user. - === Doc counts and overlapping jobs There was previously an issue with document counts on "overlapping" job configurations, driven by the same internal implementation detail. diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java index a1fa81d67a92c..4a9fbde61d6be 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java @@ -159,7 +159,6 @@ public List> toBuilders() { vsBuilder.dateHistogramInterval(interval); vsBuilder.field(field); vsBuilder.timeZone(timeZone); - vsBuilder.missingBucket(true); return Collections.singletonList(vsBuilder); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java index ba0d5991e4dc9..efac4c2d61b98 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java @@ -80,15 +80,11 @@ private static CRC32 processKeys(Map keys, Map d doc.put(k + "." + RollupField.COUNT_FIELD, count); if (k.endsWith("." + DateHistogramAggregationBuilder.NAME)) { + assert v != null; doc.put(k + "." + RollupField.TIMESTAMP, v); doc.put(k + "." + RollupField.INTERVAL, groupConfig.getDateHisto().getInterval()); doc.put(k + "." + DateHistoGroupConfig.TIME_ZONE, groupConfig.getDateHisto().getTimeZone().toString()); - if (v == null) { - // Arbitrary value to update the doc ID with for nulls - docID.update(19); - } else { - docID.update(Numbers.longToBytes((Long)v), 0, 8); - } + docID.update(Numbers.longToBytes((Long)v), 0, 8); } else if (k.endsWith("." + HistogramAggregationBuilder.NAME)) { doc.put(k + "." + RollupField.VALUE, v); doc.put(k + "." + RollupField.INTERVAL, groupConfig.getHisto().getInterval()); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java index 93234d6647c10..07ad0af7f1c38 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java @@ -369,7 +369,6 @@ public void testNullKeys() { CompositeAggregation.Bucket bucket = mock(CompositeAggregation.Bucket.class); LinkedHashMap keys = new LinkedHashMap<>(3); - keys.put("foo.date_histogram", null); keys.put("bar.terms", null); keys.put("abc.histogram", null); when(bucket.getKey()).thenReturn(keys);