Skip to content

Added filters aggregation #6119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/reference/search/aggregations/bucket.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ include::bucket/global-aggregation.asciidoc[]

include::bucket/filter-aggregation.asciidoc[]

include::bucket/filters-aggregation.asciidoc[]

include::bucket/missing-aggregation.asciidoc[]

include::bucket/nested-aggregation.asciidoc[]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
[[search-aggregations-bucket-filters-aggregation]]
=== Filters

Defines a multi bucket aggregations where each bucket is associated with a filter. Each bucket will collect all
documents that match its associated filter.

Example:

[source,js]
--------------------------------------------------
{
"aggs" : {
"messages" : {
"filters" : {
"filters" : {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be consistent with the filter aggregation and expect filters to be directly defined in the body of the aggregation definition instead of repeating filters twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

problematic... you can also specify addition settings, such as keyed... this requires additional level. I couldn't come up with a proper naming for the filters object...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need keyed here? Maybe it would make sense to enforce a keyed output given that it perfectly matches the way the request is expressed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could potentially, though it won't be consistent with other buckets aggs that we have, and also, I'm not really sure what's the preferred way ppl would like the response... @rashidkpc any feedback?

"errors" : { "query" : { "match" : { "body" : "error" } } },
"warnings" : { "query" : { "match" : { "body" : "error" } } }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the 2nd query should look for warning in the body instead of error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed

}
},
"aggs" : { "monthly" : { "histogram" : { "field" : "timestamp", "interval" : "1M" } } }
}
}
}
--------------------------------------------------

In the above example, we analyze log messages. The aggregation wll build two collection (buckets) of log messages - one
for all those containing an error, and another for all those containing a warning. And for each of these buckets it will
break them down by month.

Response:

[source,js]
--------------------------------------------------
{
...

"aggs" : {
"messages" : {
"buckets" : {
"errors" : {
"doc_count" : 34,
"monthly" : {
"buckets : [
... // the histogram monthly breakdown
]
}
},
"warnings" : {
"doc_count" : 439,
"monthly" : {
"buckets : [
... // the histogram monthly breakdown
]
}
}
}
}
}
}
--------------------------------------------------
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations;

import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.filters.FiltersAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridBuilder;
import org.elasticsearch.search.aggregations.bucket.global.GlobalBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramBuilder;
Expand Down Expand Up @@ -82,6 +83,10 @@ public static FilterAggregationBuilder filter(String name) {
return new FilterAggregationBuilder(name);
}

public static FiltersAggregationBuilder filters(String name) {
return new FiltersAggregationBuilder(name);
}

public static GlobalBuilder global(String name) {
return new GlobalBuilder(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.search.aggregations.bucket.filter.FilterParser;
import org.elasticsearch.search.aggregations.bucket.filters.FiltersParser;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridParser;
import org.elasticsearch.search.aggregations.bucket.global.GlobalParser;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramParser;
Expand Down Expand Up @@ -68,6 +69,7 @@ public AggregationModule() {
parsers.add(GlobalParser.class);
parsers.add(MissingParser.class);
parsers.add(FilterParser.class);
parsers.add(FiltersParser.class);
parsers.add(TermsParser.class);
parsers.add(SignificantTermsParser.class);
parsers.add(RangeParser.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
import org.elasticsearch.search.aggregations.bucket.filters.InternalFilters;
import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGrid;
import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
Expand Down Expand Up @@ -70,6 +71,7 @@ protected void configure() {
// buckets
InternalGlobal.registerStreams();
InternalFilter.registerStreams();
InternalFilters.registerStream();
InternalMissing.registerStreams();
StringTerms.registerStreams();
LongTerms.registerStreams();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket.filters;

import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;

import java.util.Collection;

/**
* A multi bucket aggregation where the buckets are defined by a set of filters (a bucket per filter). Each bucket
* will collect all documents matching its filter.
*/
public interface Filters extends MultiBucketsAggregation {

/**
* A bucket associated with a specific filter (identified by its key)
*/
public static interface Bucket extends MultiBucketsAggregation.Bucket {
}

Collection<? extends Bucket> getBuckets();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be just Collection<Bucket>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed


@Override
Bucket getBucketByKey(String key);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket.filters;

import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
*
*/
public class FiltersAggregationBuilder extends AggregationBuilder<FiltersAggregationBuilder> {

private Map<String, FilterBuilder> filters = new HashMap<>();

public FiltersAggregationBuilder(String name) {
super(name, InternalFilters.TYPE.name());
}

public FiltersAggregationBuilder filter(String key, FilterBuilder filter) {
filters.put(key, filter);
return this;
}

@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startObject("filters");
for (Map.Entry<String, FilterBuilder> entry : filters.entrySet()) {
builder.field(entry.getKey());
entry.getValue().toXContent(builder, params);
}
builder.endObject();
return builder.endObject();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket.filters;

import com.google.common.collect.Lists;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.search.Filter;
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.lucene.docset.DocIdSets;
import org.elasticsearch.search.aggregations.*;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext;

import java.io.IOException;
import java.util.List;

/**
*
*/
public class FiltersAggregator extends BucketsAggregator {

static class KeyedFilter {

final String key;
final Filter filter;

KeyedFilter(String key, Filter filter) {
this.key = key;
this.filter = filter;
}
}

private final KeyedFilter[] filters;
private final Bits[] bits;
private final boolean keyed;

public FiltersAggregator(String name, AggregatorFactories factories, List<KeyedFilter> filters,
boolean keyed, AggregationContext aggregationContext, Aggregator parent) {

super(name, BucketAggregationMode.MULTI_BUCKETS, factories, filters.size() * (parent == null ? 1 : parent.estimatedBucketCount()), aggregationContext, parent);
this.keyed = keyed;
this.filters = filters.toArray(new KeyedFilter[filters.size()]);
this.bits = new Bits[this.filters.length];
}

@Override
public boolean shouldCollect() {
return true;
}

@Override
public void setNextReader(AtomicReaderContext reader) {
try {
for (int i = 0; i < filters.length; i++) {
bits[i] = DocIdSets.toSafeBits(reader.reader(), filters[i].filter.getDocIdSet(reader, reader.reader().getLiveDocs()));
}
} catch (IOException ioe) {
throw new AggregationExecutionException("Failed to aggregate filter aggregator [" + name + "]", ioe);
}
}

@Override
public void collect(int doc, long owningBucketOrdinal) throws IOException {
for (int i = 0; i < bits.length; i++) {
if (bits[i].get(doc)) {
collectBucket(doc, bucketOrd(owningBucketOrdinal, i));
}
}
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
List<InternalFilters.Bucket> buckets = Lists.newArrayListWithCapacity(filters.length);
for (int i = 0; i < filters.length; i++) {
KeyedFilter filter = filters[i];
long bucketOrd = bucketOrd(owningBucketOrdinal, i);
InternalFilters.Bucket bucket = new InternalFilters.Bucket(filter.key, bucketDocCount(bucketOrd), bucketAggregations(bucketOrd));
buckets.add(bucket);
}
return new InternalFilters(name, buckets, keyed);
}

@Override
public InternalAggregation buildEmptyAggregation() {
InternalAggregations subAggs = buildEmptySubAggregations();
List<InternalFilters.Bucket> buckets = Lists.newArrayListWithCapacity(filters.length);
for (int i = 0; i < filters.length; i++) {
InternalFilters.Bucket bucket = new InternalFilters.Bucket(filters[i].key, 0, subAggs);
buckets.add(bucket);
}
return new InternalFilters(name, buckets, keyed);
}

private final long bucketOrd(long owningBucketOrdinal, int filterOrd) {
return owningBucketOrdinal * filters.length + filterOrd;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


public static class Factory extends AggregatorFactory {

private final List<KeyedFilter> filters;
private final boolean keyed;

public Factory(String name, List<KeyedFilter> filters, boolean keyed) {
super(name, InternalFilters.TYPE.name());
this.filters = filters;
this.keyed = keyed;
}

@Override
public Aggregator create(AggregationContext context, Aggregator parent, long expectedBucketsCount) {
return new FiltersAggregator(name, factories, filters, keyed, context, parent);
}
}

}
Loading