From 3c9c9f33e2869d801026a966e22cb39ff380db8d Mon Sep 17 00:00:00 2001 From: uboness Date: Sun, 11 May 2014 05:59:57 +0200 Subject: [PATCH] Aggregations Added Filters aggregation A multi-bucket aggregation where multiple filters can be defined (each filter defines a bucket). The buckets will collect all the documents that match their associated filter. This aggregation can be very useful when one wants to compare analytics between different criterias. It can also be accomplished using multiple definitions of the single filter aggregation, but here, the user will only need to define the sub-aggregations only once. Closes #6118 --- .../search/aggregations/bucket.asciidoc | 2 + .../bucket/filters-aggregation.asciidoc | 113 +++++++++ .../aggregations/AggregationBuilders.java | 5 + .../aggregations/AggregationModule.java | 2 + .../TransportAggregationModule.java | 2 + .../aggregations/bucket/filters/Filters.java | 43 ++++ .../filters/FiltersAggregationBuilder.java | 90 +++++++ .../bucket/filters/FiltersAggregator.java | 132 ++++++++++ .../bucket/filters/FiltersParser.java | 89 +++++++ .../bucket/filters/InternalFilters.java | 220 +++++++++++++++++ .../aggregations/bucket/FiltersTests.java | 231 ++++++++++++++++++ 11 files changed, 929 insertions(+) create mode 100644 docs/reference/search/aggregations/bucket/filters-aggregation.asciidoc create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/filters/Filters.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregationBuilder.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregator.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersParser.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/filters/InternalFilters.java create mode 100644 src/test/java/org/elasticsearch/search/aggregations/bucket/FiltersTests.java diff --git a/docs/reference/search/aggregations/bucket.asciidoc b/docs/reference/search/aggregations/bucket.asciidoc index ceb8e15ee190d..e9d163a67e72c 100644 --- a/docs/reference/search/aggregations/bucket.asciidoc +++ b/docs/reference/search/aggregations/bucket.asciidoc @@ -4,6 +4,8 @@ include::bucket/global-aggregation.asciidoc[] include::bucket/filter-aggregation.asciidoc[] +include::bucket/filters-aggregation.asciidoc[] + include::bucket/missing-aggregation.asciidoc[] include::bucket/nested-aggregation.asciidoc[] diff --git a/docs/reference/search/aggregations/bucket/filters-aggregation.asciidoc b/docs/reference/search/aggregations/bucket/filters-aggregation.asciidoc new file mode 100644 index 0000000000000..7b83318033085 --- /dev/null +++ b/docs/reference/search/aggregations/bucket/filters-aggregation.asciidoc @@ -0,0 +1,113 @@ +[[search-aggregations-bucket-filters-aggregation]] +=== Filters + +Defines a multi bucket aggregations where each bucket is associated with a filter. Each bucket will collect all +documents that match its associated filter. + +Example: + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "messages" : { + "filters" : { + "filters" : { + "errors" : { "query" : { "match" : { "body" : "error" } } }, + "warnings" : { "query" : { "match" : { "body" : "warning" } } } + } + }, + "aggs" : { "monthly" : { "histogram" : { "field" : "timestamp", "interval" : "1M" } } } + } + } +} +-------------------------------------------------- + +In the above example, we analyze log messages. The aggregation will build two collection (buckets) of log messages - one +for all those containing an error, and another for all those containing a warning. And for each of these buckets it will +break them down by month. + +Response: + +[source,js] +-------------------------------------------------- +{ + ... + + "aggs" : { + "messages" : { + "buckets" : { + "errors" : { + "doc_count" : 34, + "monthly" : { + "buckets : [ + ... // the histogram monthly breakdown + ] + } + }, + "warnings" : { + "doc_count" : 439, + "monthly" : { + "buckets : [ + ... // the histogram monthly breakdown + ] + } + } + } + } + } +} +-------------------------------------------------- + +==== Anonymous filters + +The filters field can also be provided as an array of filters, as in the following request: + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "messages" : { + "filters" : { + "filters" : [ + "query" : { "match" : { "body" : "error" } }, + "query" : { "match" : { "body" : "warning" } } + ] + }, + "aggs" : { "monthly" : { "histogram" : { "field" : "timestamp", "interval" : "1M" } } } + } + } +} +-------------------------------------------------- + +The filtered buckets are returned in the same order as provided in the request. The response for this example would be: + +[source,js] +-------------------------------------------------- +{ + ... + + "aggs" : { + "messages" : { + "buckets" : [ + { + "doc_count" : 34, + "monthly" : { + "buckets : [ + ... // the histogram monthly breakdown + ] + } + }, + { + "doc_count" : 439, + "monthly" : { + "buckets : [ + ... // the histogram monthly breakdown + ] + } + } + ] + } + } +} +-------------------------------------------------- \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java b/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java index 6449ddbe0d219..d71212001f217 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations; import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.filters.FiltersAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridBuilder; import org.elasticsearch.search.aggregations.bucket.global.GlobalBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramBuilder; @@ -85,6 +86,10 @@ public static FilterAggregationBuilder filter(String name) { return new FilterAggregationBuilder(name); } + public static FiltersAggregationBuilder filters(String name) { + return new FiltersAggregationBuilder(name); + } + public static GlobalBuilder global(String name) { return new GlobalBuilder(name); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java b/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java index 6af5c320a2a08..b8c8c7d4ea275 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.search.aggregations.bucket.filter.FilterParser; +import org.elasticsearch.search.aggregations.bucket.filters.FiltersParser; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridParser; import org.elasticsearch.search.aggregations.bucket.global.GlobalParser; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramParser; @@ -72,6 +73,7 @@ public AggregationModule() { parsers.add(GlobalParser.class); parsers.add(MissingParser.class); parsers.add(FilterParser.class); + parsers.add(FiltersParser.class); parsers.add(TermsParser.class); parsers.add(SignificantTermsParser.class); parsers.add(RangeParser.class); diff --git a/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java b/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java index abc39e70f035b..e37e68df25d71 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java +++ b/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; +import org.elasticsearch.search.aggregations.bucket.filters.InternalFilters; import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGrid; import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal; import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; @@ -74,6 +75,7 @@ protected void configure() { // buckets InternalGlobal.registerStreams(); InternalFilter.registerStreams(); + InternalFilters.registerStream(); InternalMissing.registerStreams(); StringTerms.registerStreams(); LongTerms.registerStreams(); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/Filters.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/Filters.java new file mode 100644 index 0000000000000..eff68f3880a85 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/Filters.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.filters; + +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; + +import java.util.Collection; + +/** + * A multi bucket aggregation where the buckets are defined by a set of filters (a bucket per filter). Each bucket + * will collect all documents matching its filter. + */ +public interface Filters extends MultiBucketsAggregation { + + /** + * A bucket associated with a specific filter (identified by its key) + */ + public static interface Bucket extends MultiBucketsAggregation.Bucket { + } + + Collection getBuckets(); + + @Override + Bucket getBucketByKey(String key); + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregationBuilder.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregationBuilder.java new file mode 100644 index 0000000000000..478129c8f84ef --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregationBuilder.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.filters; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.FilterBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilderException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * + */ +public class FiltersAggregationBuilder extends AggregationBuilder { + + private Map keyedFilters = null; + private List nonKeyedFilters = null; + + public FiltersAggregationBuilder(String name) { + super(name, InternalFilters.TYPE.name()); + } + + public FiltersAggregationBuilder filter(String key, FilterBuilder filter) { + if (keyedFilters == null) { + keyedFilters = new LinkedHashMap<>(); + } + keyedFilters.put(key, filter); + return this; + } + + public FiltersAggregationBuilder filter(FilterBuilder filter) { + if (nonKeyedFilters == null) { + nonKeyedFilters = new ArrayList<>(); + } + nonKeyedFilters.add(filter); + return this; + } + + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (keyedFilters == null && nonKeyedFilters == null) { + throw new SearchSourceBuilderException("At least one filter must be set on filter aggregation [" + name + "]"); + } + if (keyedFilters != null && nonKeyedFilters != null) { + throw new SearchSourceBuilderException("Cannot add both keyed and non-keyed filters to filters aggregation"); + } + + if (keyedFilters != null) { + builder.startObject("filters"); + for (Map.Entry entry : keyedFilters.entrySet()) { + builder.field(entry.getKey()); + entry.getValue().toXContent(builder, params); + } + builder.endObject(); + } + if (nonKeyedFilters != null) { + builder.startArray("filters"); + for (FilterBuilder filterBuilder : nonKeyedFilters) { + filterBuilder.toXContent(builder, params); + } + builder.endArray(); + + } + return builder.endObject(); + } +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregator.java new file mode 100644 index 0000000000000..e56b95159a12a --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregator.java @@ -0,0 +1,132 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.filters; + +import com.google.common.collect.Lists; +import org.apache.lucene.index.AtomicReaderContext; +import org.apache.lucene.search.Filter; +import org.apache.lucene.util.Bits; +import org.elasticsearch.common.lucene.docset.DocIdSets; +import org.elasticsearch.search.aggregations.*; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; +import org.elasticsearch.search.aggregations.support.AggregationContext; + +import java.io.IOException; +import java.util.List; + +/** + * + */ +public class FiltersAggregator extends BucketsAggregator { + + static class KeyedFilter { + + final String key; + final Filter filter; + + KeyedFilter(String key, Filter filter) { + this.key = key; + this.filter = filter; + } + } + + private final KeyedFilter[] filters; + private final Bits[] bits; + private boolean keyed; + + public FiltersAggregator(String name, AggregatorFactories factories, List filters, boolean keyed, AggregationContext aggregationContext, + Aggregator parent) { + super(name, BucketAggregationMode.MULTI_BUCKETS, factories, filters.size() * (parent == null ? 1 : parent.estimatedBucketCount()), + aggregationContext, parent); + this.keyed = keyed; + this.filters = filters.toArray(new KeyedFilter[filters.size()]); + this.bits = new Bits[this.filters.length]; + } + + @Override + public boolean shouldCollect() { + return true; + } + + @Override + public void setNextReader(AtomicReaderContext reader) { + try { + for (int i = 0; i < filters.length; i++) { + bits[i] = DocIdSets.toSafeBits(reader.reader(), filters[i].filter.getDocIdSet(reader, reader.reader().getLiveDocs())); + } + } catch (IOException ioe) { + throw new AggregationExecutionException("Failed to aggregate filter aggregator [" + name + "]", ioe); + } + } + + @Override + public void collect(int doc, long owningBucketOrdinal) throws IOException { + for (int i = 0; i < bits.length; i++) { + if (bits[i].get(doc)) { + collectBucket(doc, bucketOrd(owningBucketOrdinal, i)); + } + } + } + + @Override + public InternalAggregation buildAggregation(long owningBucketOrdinal) { + List buckets = Lists.newArrayListWithCapacity(filters.length); + for (int i = 0; i < filters.length; i++) { + KeyedFilter filter = filters[i]; + long bucketOrd = bucketOrd(owningBucketOrdinal, i); + InternalFilters.Bucket bucket = new InternalFilters.Bucket(filter.key, bucketDocCount(bucketOrd), bucketAggregations(bucketOrd)); + buckets.add(bucket); + } + return new InternalFilters(name, buckets, keyed); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + InternalAggregations subAggs = buildEmptySubAggregations(); + List buckets = Lists.newArrayListWithCapacity(filters.length); + for (int i = 0; i < filters.length; i++) { + InternalFilters.Bucket bucket = new InternalFilters.Bucket(filters[i].key, 0, subAggs); + buckets.add(bucket); + } + return new InternalFilters(name, buckets, keyed); + } + + private final long bucketOrd(long owningBucketOrdinal, int filterOrd) { + return owningBucketOrdinal * filters.length + filterOrd; + } + + public static class Factory extends AggregatorFactory { + + private final List filters; + private boolean keyed; + + public Factory(String name, List filters, boolean keyed) { + super(name, InternalFilters.TYPE.name()); + this.filters = filters; + this.keyed = keyed; + } + + @Override + public Aggregator create(AggregationContext context, Aggregator parent, long expectedBucketsCount) { + return new FiltersAggregator(name, factories, filters, keyed, context, parent); + } + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersParser.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersParser.java new file mode 100644 index 0000000000000..87d115260187f --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersParser.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.filters; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.ParsedFilter; +import org.elasticsearch.search.SearchParseException; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * + */ +public class FiltersParser implements Aggregator.Parser { + + @Override + public String type() { + return InternalFilters.TYPE.name(); + } + + @Override + public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException { + + List filters = new ArrayList<>(); + + XContentParser.Token token = null; + String currentFieldName = null; + Boolean keyed = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if ("filters".equals(currentFieldName)) { + keyed = true; + String key = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + key = parser.currentName(); + } else { + ParsedFilter filter = context.queryParserService().parseInnerFilter(parser); + filters.add(new FiltersAggregator.KeyedFilter(key, filter.filter())); + } + } + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "]."); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if ("filters".equals(currentFieldName)) { + keyed = false; + int idx = 0; + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + ParsedFilter filter = context.queryParserService().parseInnerFilter(parser); + filters.add(new FiltersAggregator.KeyedFilter(String.valueOf(idx), filter.filter())); + idx++; + } + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "]."); + } + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "]."); + } + } + + return new FiltersAggregator.Factory(aggregationName, filters, keyed); + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/InternalFilters.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/InternalFilters.java new file mode 100644 index 0000000000000..cbd203a2f7787 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/InternalFilters.java @@ -0,0 +1,220 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.filters; + +import com.google.common.collect.Lists; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.text.StringText; +import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; + +import java.io.IOException; +import java.util.*; + +/** + * + */ +public class InternalFilters extends InternalAggregation implements Filters { + + public final static Type TYPE = new Type("filters"); + + private final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { + @Override + public InternalFilters readResult(StreamInput in) throws IOException { + InternalFilters filters = new InternalFilters(); + filters.readFrom(in); + return filters; + } + }; + + public static void registerStream() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + } + + public static class Bucket implements Filters.Bucket { + + private String key; + private long docCount; + InternalAggregations aggregations; + + public Bucket(String key, long docCount, InternalAggregations aggregations) { + this.key = key; + this.docCount = docCount; + this.aggregations = aggregations; + } + + public String getKey() { + return key; + } + + @Override + public Text getKeyAsText() { + return new StringText(getKey()); + } + + @Override + public long getDocCount() { + return docCount; + } + + @Override + public Aggregations getAggregations() { + return aggregations; + } + + Bucket reduce(List buckets, BigArrays bigArrays) { + Bucket reduced = null; + List aggregationsList = Lists.newArrayListWithCapacity(buckets.size()); + for (Bucket bucket : buckets) { + if (reduced == null) { + reduced = new Bucket(bucket.key, bucket.docCount, bucket.aggregations); + } else { + reduced.docCount += bucket.docCount; + } + aggregationsList.add(bucket.aggregations); + } + reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays); + return reduced; + } + + void toXContent(XContentBuilder builder, Params params, boolean keyed) throws IOException { + if (keyed) { + builder.startObject(key); + } else { + builder.startObject(); + } + builder.field(CommonFields.DOC_COUNT, docCount); + aggregations.toXContentInternal(builder, params); + builder.endObject(); + } + } + + private List buckets; + private Map bucketMap; + private boolean keyed; + + public InternalFilters() {} // for serialization + + public InternalFilters(String name, List buckets, boolean keyed) { + super(name); + this.buckets = buckets; + this.keyed = keyed; + } + + @Override + public Type type() { + return TYPE; + } + + @Override + public Collection getBuckets() { + return buckets; + } + + @Override + public Bucket getBucketByKey(String key) { + if (bucketMap == null) { + bucketMap = new HashMap<>(buckets.size()); + for (Bucket bucket : buckets) { + bucketMap.put(bucket.getKey(), bucket); + } + } + return bucketMap.get(key); + } + + @Override + public InternalAggregation reduce(ReduceContext reduceContext) { + List aggregations = reduceContext.aggregations(); + List> bucketsList = null; + for (InternalAggregation aggregation : aggregations) { + InternalFilters filters = (InternalFilters) aggregation; + if (bucketsList == null) { + bucketsList = new ArrayList<>(filters.buckets.size()); + for (Bucket bucket : filters.buckets) { + List sameRangeList = new ArrayList<>(aggregations.size()); + sameRangeList.add(bucket); + bucketsList.add(sameRangeList); + } + } else { + int i = 0; + for (Bucket bucket : filters.buckets) { + bucketsList.get(i++).add(bucket); + } + } + } + + InternalFilters reduced = new InternalFilters(name, new ArrayList(bucketsList.size()), keyed); + for (List sameRangeList : bucketsList) { + reduced.buckets.add((sameRangeList.get(0)).reduce(sameRangeList, reduceContext.bigArrays())); + } + return reduced; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + name = in.readString(); + keyed = in.readBoolean(); + int size = in.readVInt(); + List buckets = Lists.newArrayListWithCapacity(size); + for (int i = 0; i < size; i++) { + String key = in.readOptionalString(); + buckets.add(new Bucket(key, in.readVLong(), InternalAggregations.readAggregations(in))); + } + this.buckets = buckets; + this.bucketMap = null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + out.writeBoolean(keyed); + out.writeVInt(buckets.size()); + for (Bucket bucket : buckets) { + out.writeOptionalString(bucket.key); + out.writeVLong(bucket.docCount); + bucket.aggregations.writeTo(out); + } + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + if (keyed) { + builder.startObject(CommonFields.BUCKETS); + } else { + builder.startArray(CommonFields.BUCKETS); + } + for (Bucket bucket : buckets) { + bucket.toXContent(builder, params, keyed); + } + if (keyed) { + builder.endObject(); + } else { + builder.endArray(); + } + return builder; + } + +} diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/FiltersTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/FiltersTests.java new file mode 100644 index 0000000000000..b0df05bc97c38 --- /dev/null +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/FiltersTests.java @@ -0,0 +1,231 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.aggregations.bucket.filters.Filters; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.metrics.avg.Avg; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.FilterBuilders.matchAllFilter; +import static org.elasticsearch.index.query.FilterBuilders.termFilter; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.search.aggregations.AggregationBuilders.*; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.core.IsNull.notNullValue; + +/** + * + */ +@ElasticsearchIntegrationTest.SuiteScopeTest +public class FiltersTests extends ElasticsearchIntegrationTest { + + static int numDocs, numTag1Docs, numTag2Docs; + + @Override + public void setupSuiteScopeCluster() throws Exception { + createIndex("idx"); + createIndex("idx2"); + numDocs = randomIntBetween(5, 20); + numTag1Docs = randomIntBetween(1, numDocs - 1); + List builders = new ArrayList<>(); + for (int i = 0; i < numTag1Docs; i++) { + builders.add(client().prepareIndex("idx", "type", ""+i).setSource(jsonBuilder() + .startObject() + .field("value", i + 1) + .field("tag", "tag1") + .endObject())); + } + for (int i = numTag1Docs; i < numDocs; i++) { + numTag2Docs++; + builders.add(client().prepareIndex("idx", "type", ""+i).setSource(jsonBuilder() + .startObject() + .field("value", i) + .field("tag", "tag2") + .field("name", "name" + i) + .endObject())); + } + prepareCreate("empty_bucket_idx").addMapping("type", "value", "type=integer").execute().actionGet(); + for (int i = 0; i < 2; i++) { + builders.add(client().prepareIndex("empty_bucket_idx", "type", ""+i).setSource(jsonBuilder() + .startObject() + .field("value", i*2) + .endObject())); + } + indexRandom(true, builders); + ensureSearchable(); + } + + @Test + public void simple() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .addAggregation( + filters("tags") + .filter("tag1", termFilter("tag", "tag1")) + .filter("tag2", termFilter("tag", "tag2"))) + .execute().actionGet(); + + assertSearchResponse(response); + + Filters filters = response.getAggregations().get("tags"); + assertThat(filters, notNullValue()); + assertThat(filters.getName(), equalTo("tags")); + + assertThat(filters.getBuckets().size(), equalTo(2)); + + Filters.Bucket bucket = filters.getBucketByKey("tag1"); + assertThat(bucket, Matchers.notNullValue()); + assertThat(bucket.getDocCount(), equalTo((long) numTag1Docs)); + + bucket = filters.getBucketByKey("tag2"); + assertThat(bucket, Matchers.notNullValue()); + assertThat(bucket.getDocCount(), equalTo((long) numTag2Docs)); + } + + @Test + public void withSubAggregation() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .addAggregation( + filters("tags") + .filter("tag1", termFilter("tag", "tag1")) + .filter("tag2", termFilter("tag", "tag2")) + .subAggregation(avg("avg_value").field("value"))) + .execute().actionGet(); + + assertSearchResponse(response); + + Filters filters = response.getAggregations().get("tags"); + assertThat(filters, notNullValue()); + assertThat(filters.getName(), equalTo("tags")); + + assertThat(filters.getBuckets().size(), equalTo(2)); + + Filters.Bucket bucket = filters.getBucketByKey("tag1"); + assertThat(bucket, Matchers.notNullValue()); + assertThat(bucket.getDocCount(), equalTo((long) numTag1Docs)); + long sum = 0; + for (int i = 0; i < numTag1Docs; ++i) { + sum += i + 1; + } + assertThat(bucket.getAggregations().asList().isEmpty(), is(false)); + Avg avgValue = bucket.getAggregations().get("avg_value"); + assertThat(avgValue, notNullValue()); + assertThat(avgValue.getName(), equalTo("avg_value")); + assertThat(avgValue.getValue(), equalTo((double) sum / numTag1Docs)); + + bucket = filters.getBucketByKey("tag2"); + assertThat(bucket, Matchers.notNullValue()); + assertThat(bucket.getDocCount(), equalTo((long) numTag2Docs)); + sum = 0; + for (int i = numTag1Docs; i < numDocs; ++i) { + sum += i; + } + assertThat(bucket.getAggregations().asList().isEmpty(), is(false)); + avgValue = bucket.getAggregations().get("avg_value"); + assertThat(avgValue, notNullValue()); + assertThat(avgValue.getName(), equalTo("avg_value")); + assertThat(avgValue.getValue(), equalTo((double) sum / numTag2Docs)); + } + + @Test + public void withContextBasedSubAggregation() throws Exception { + + try { + client().prepareSearch("idx") + .addAggregation( + filters("tags") + .filter("tag1", termFilter("tag", "tag1")) + .filter("tag2", termFilter("tag", "tag2")) + .subAggregation(avg("avg_value")) + ) + .execute().actionGet(); + + fail("expected execution to fail - an attempt to have a context based numeric sub-aggregation, but there is not value source" + + "context which the sub-aggregation can inherit"); + + } catch (ElasticsearchException ese) { + } + } + + @Test + public void emptyAggregation() throws Exception { + SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") + .setQuery(matchAllQuery()) + .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0) + .subAggregation(filters("filters").filter("all", matchAllFilter()))) + .execute().actionGet(); + + assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l)); + Histogram histo = searchResponse.getAggregations().get("histo"); + assertThat(histo, Matchers.notNullValue()); + Histogram.Bucket bucket = histo.getBucketByKey(1l); + assertThat(bucket, Matchers.notNullValue()); + + Filters filters = bucket.getAggregations().get("filters"); + assertThat(filters, notNullValue()); + Filters.Bucket all = filters.getBucketByKey("all"); + assertThat(all, Matchers.notNullValue()); + assertThat(all.getKey(), equalTo("all")); + assertThat(all.getDocCount(), is(0l)); + } + + @Test + public void simple_nonKeyed() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .addAggregation( + filters("tags") + .filter(termFilter("tag", "tag1")) + .filter(termFilter("tag", "tag2"))) + .execute().actionGet(); + + assertSearchResponse(response); + + Filters filters = response.getAggregations().get("tags"); + assertThat(filters, notNullValue()); + assertThat(filters.getName(), equalTo("tags")); + + assertThat(filters.getBuckets().size(), equalTo(2)); + + Collection buckets = filters.getBuckets(); + Iterator itr = buckets.iterator(); + + Filters.Bucket bucket = itr.next(); + assertThat(bucket, Matchers.notNullValue()); + assertThat(bucket.getDocCount(), equalTo((long) numTag1Docs)); + + bucket = itr.next(); + assertThat(bucket, Matchers.notNullValue()); + assertThat(bucket.getDocCount(), equalTo((long) numTag2Docs)); + } + +}