Skip to content

Buckets can now be serialized outside of an Aggregation #8113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 17, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;

import java.io.IOException;
import java.util.Map;

public class BucketStreamContext implements Streamable {

@Nullable
private ValueFormatter formatter;
private boolean keyed;
private Map<String, Object> attributes;

public BucketStreamContext() {
}

public void formatter(@Nullable ValueFormatter formatter) {
this.formatter = formatter;
}

public ValueFormatter formatter() {
return formatter;
}

public void keyed(boolean keyed) {
this.keyed = keyed;
}

public boolean keyed() {
return keyed;
}

public void attributes(Map<String, Object> attributes) {
this.attributes = attributes;
}

public Map<String, Object> attributes() {
return attributes;
}

@Override
public void readFrom(StreamInput in) throws IOException {
formatter = ValueFormatterStreams.readOptional(in);
keyed = in.readBoolean();
attributes = in.readMap();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
ValueFormatterStreams.writeOptional(formatter, out);
out.writeBoolean(keyed);
out.writeMap(attributes);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket;

import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;

import java.io.IOException;

public class BucketStreams {

private static ImmutableMap<BytesReference, Stream> STREAMS = ImmutableMap.of();

/**
* A stream that knows how to read a bucket from the input.
*/
public static interface Stream<B extends MultiBucketsAggregation.Bucket> {
B readResult(StreamInput in, BucketStreamContext context) throws IOException;
BucketStreamContext getBucketStreamContext(B bucket);
}

/**
* Registers the given stream and associate it with the given types.
*
* @param stream The streams to register
* @param types The types associated with the streams
*/
public static synchronized void registerStream(Stream stream, BytesReference... types) {
MapBuilder<BytesReference, Stream> uStreams = MapBuilder.newMapBuilder(STREAMS);
for (BytesReference type : types) {
uStreams.put(type, stream);
}
STREAMS = uStreams.immutableMap();
}

/**
* Returns the stream that is registered for the given type
*
* @param type The given type
* @return The associated stream
*/
public static Stream stream(BytesReference type) {
return STREAMS.get(type);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@

package org.elasticsearch.search.aggregations.bucket;

import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.Comparators;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.HasAggregations;
import org.elasticsearch.search.aggregations.support.OrderPath;

import java.util.Collection;
import java.util.List;

/**
* An aggregation that returns multiple buckets
Expand All @@ -38,7 +41,7 @@ public interface MultiBucketsAggregation extends Aggregation {
* A bucket represents a criteria to which all documents that fall in it adhere to. It is also uniquely identified
* by a key, and can potentially hold sub-aggregations computed over all documents in it.
*/
public interface Bucket extends HasAggregations {
public interface Bucket extends HasAggregations, ToXContent, Streamable {

/**
* @return The key associated with the bucket as a string
Expand Down Expand Up @@ -90,7 +93,7 @@ public int compare(B b1, B b2) {
/**
* @return The buckets of this aggregation.
*/
Collection<? extends Bucket> getBuckets();
List<? extends Bucket> getBuckets();

/**
* The bucket that is associated with the given key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;

import java.util.Collection;
import java.util.List;

/**
* A multi bucket aggregation where the buckets are defined by a set of filters (a bucket per filter). Each bucket
Expand All @@ -38,7 +39,7 @@ public static interface Bucket extends MultiBucketsAggregation.Bucket {
/**
* The buckets created by this aggregation.
*/
Collection<? extends Bucket> getBuckets();
List<? extends Bucket> getBuckets();

@Override
Bucket getBucketByKey(String key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
for (int i = 0; i < filters.length; i++) {
KeyedFilter filter = filters[i];
long bucketOrd = bucketOrd(owningBucketOrdinal, i);
InternalFilters.Bucket bucket = new InternalFilters.Bucket(filter.key, bucketDocCount(bucketOrd), bucketAggregations(bucketOrd));
InternalFilters.Bucket bucket = new InternalFilters.Bucket(filter.key, bucketDocCount(bucketOrd), bucketAggregations(bucketOrd), keyed);
buckets.add(bucket);
}
return new InternalFilters(name, buckets, keyed);
Expand All @@ -102,7 +102,7 @@ public InternalAggregation buildEmptyAggregation() {
InternalAggregations subAggs = buildEmptySubAggregations();
List<InternalFilters.Bucket> buckets = Lists.newArrayListWithCapacity(filters.length);
for (int i = 0; i < filters.length; i++) {
InternalFilters.Bucket bucket = new InternalFilters.Bucket(filters[i].key, 0, subAggs);
InternalFilters.Bucket bucket = new InternalFilters.Bucket(filters[i].key, 0, subAggs, keyed);
buckets.add(bucket);
}
return new InternalFilters(name, buckets, keyed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,14 @@
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
import org.elasticsearch.search.aggregations.bucket.BucketStreams;

import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
*
Expand All @@ -49,20 +54,44 @@ public InternalFilters readResult(StreamInput in) throws IOException {
}
};

private final static BucketStreams.Stream<Bucket> BUCKET_STREAM = new BucketStreams.Stream<Bucket>() {
@Override
public Bucket readResult(StreamInput in, BucketStreamContext context) throws IOException {
Bucket filters = new Bucket(context.keyed());
filters.readFrom(in);
return filters;
}

@Override
public BucketStreamContext getBucketStreamContext(Bucket bucket) {
BucketStreamContext context = new BucketStreamContext();
context.keyed(bucket.keyed);
return context;
}
};

public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
BucketStreams.registerStream(BUCKET_STREAM, TYPE.stream());
}

public static class Bucket implements Filters.Bucket {

private final boolean keyed;
private String key;
private long docCount;
InternalAggregations aggregations;

public Bucket(String key, long docCount, InternalAggregations aggregations) {
private Bucket(boolean keyed) {
// for serialization
this.keyed = keyed;
}

public Bucket(String key, long docCount, InternalAggregations aggregations, boolean keyed) {
this.key = key;
this.docCount = docCount;
this.aggregations = aggregations;
this.keyed = keyed;
}

public String getKey() {
Expand All @@ -89,7 +118,7 @@ Bucket reduce(List<Bucket> buckets, ReduceContext context) {
List<InternalAggregations> aggregationsList = Lists.newArrayListWithCapacity(buckets.size());
for (Bucket bucket : buckets) {
if (reduced == null) {
reduced = new Bucket(bucket.key, bucket.docCount, bucket.aggregations);
reduced = new Bucket(bucket.key, bucket.docCount, bucket.aggregations, bucket.keyed);
} else {
reduced.docCount += bucket.docCount;
}
Expand All @@ -99,7 +128,8 @@ Bucket reduce(List<Bucket> buckets, ReduceContext context) {
return reduced;
}

void toXContent(XContentBuilder builder, Params params, boolean keyed) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (keyed) {
builder.startObject(key);
} else {
Expand All @@ -108,6 +138,21 @@ void toXContent(XContentBuilder builder, Params params, boolean keyed) throws IO
builder.field(CommonFields.DOC_COUNT, docCount);
aggregations.toXContentInternal(builder, params);
builder.endObject();
return builder;
}

@Override
public void readFrom(StreamInput in) throws IOException {
key = in.readOptionalString();
docCount = in.readVLong();
aggregations = InternalAggregations.readAggregations(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(key);
out.writeVLong(docCount);
aggregations.writeTo(out);
}
}

Expand All @@ -129,7 +174,7 @@ public Type type() {
}

@Override
public Collection<Bucket> getBuckets() {
public List<Bucket> getBuckets() {
return buckets;
}

Expand Down Expand Up @@ -179,8 +224,9 @@ public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
List<Bucket> buckets = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
String key = in.readOptionalString();
buckets.add(new Bucket(key, in.readVLong(), InternalAggregations.readAggregations(in)));
Bucket bucket = new Bucket(keyed);
bucket.readFrom(in);
buckets.add(bucket);
}
this.buckets = buckets;
this.bucketMap = null;
Expand All @@ -192,9 +238,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(keyed);
out.writeVInt(buckets.size());
for (Bucket bucket : buckets) {
out.writeOptionalString(bucket.key);
out.writeVLong(bucket.docCount);
bucket.aggregations.writeTo(out);
bucket.writeTo(out);
}
}

Expand All @@ -206,7 +250,7 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th
builder.startArray(CommonFields.BUCKETS);
}
for (Bucket bucket : buckets) {
bucket.toXContent(builder, params, keyed);
bucket.toXContent(builder, params);
}
if (keyed) {
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;

import java.util.Collection;
import java.util.List;

/**
* A {@code geohash_grid} aggregation. Defines multiple buckets, each representing a cell in a geo-grid of a specific
Expand Down Expand Up @@ -51,7 +52,7 @@ public static interface Bucket extends MultiBucketsAggregation.Bucket {
* @return The buckets of this aggregation (each bucket representing a geohash grid cell)
*/
@Override
Collection<Bucket> getBuckets();
List<Bucket> getBuckets();

@Override
Bucket getBucketByKey(String key);
Expand Down
Loading