Skip to content

Commit e33d107

Browse files
authored
Add missing_bucket option in the composite agg (#29465)
This change adds a new option to the composite aggregation named `missing_bucket`. This option can be set by source and dictates whether documents without a value for the source should be ignored. When set to true, documents without a value for a field emits an explicit `null` value which is then added in the composite bucket. The `missing` option that allows to set an explicit value (instead of `null`) is deprecated in this change and will be removed in a follow up (only in 7.x). This commit also changes how the big arrays are allocated, instead of reserving the provided `size` for all sources they are created with a small intial size and they grow depending on the number of buckets created by the aggregation: Closes #29380
1 parent 67905c8 commit e33d107

25 files changed

+890
-224
lines changed

docs/reference/aggregations/bucket/composite-aggregation.asciidoc

+28
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,34 @@ GET /_search
348348
\... will sort the composite bucket in descending order when comparing values from the `date_histogram` source
349349
and in ascending order when comparing values from the `terms` source.
350350

351+
====== Missing bucket
352+
353+
By default documents without a value for a given source are ignored.
354+
It is possible to include them in the response by setting `missing_bucket` to
355+
`true` (defaults to `false`):
356+
357+
[source,js]
358+
--------------------------------------------------
359+
GET /_search
360+
{
361+
"aggs" : {
362+
"my_buckets": {
363+
"composite" : {
364+
"sources" : [
365+
{ "product_name": { "terms" : { "field": "product", "missing_bucket": true } } }
366+
]
367+
}
368+
}
369+
}
370+
}
371+
--------------------------------------------------
372+
// CONSOLE
373+
374+
In the example above the source `product_name` will emit an explicit `null` value
375+
for documents without a value for the field `product`.
376+
The `order` specified in the source dictates whether the `null` values should rank
377+
first (ascending order, `asc`) or last (descending order, `desc`).
378+
351379
==== Size
352380

353381
The `size` parameter can be set to define how many composite buckets should be returned.

rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml

+29
Original file line numberDiff line numberDiff line change
@@ -323,3 +323,32 @@ setup:
323323
- length: { aggregations.test.buckets: 2 }
324324
- length: { aggregations.test.after_key: 1 }
325325
- match: { aggregations.test.after_key.keyword: "foo" }
326+
327+
---
328+
"Composite aggregation and array size":
329+
- skip:
330+
version: " - 6.99.99"
331+
reason: starting in 7.0 the composite sources do not allocate arrays eagerly.
332+
333+
- do:
334+
search:
335+
index: test
336+
body:
337+
aggregations:
338+
test:
339+
composite:
340+
size: 1000000000
341+
sources: [
342+
{
343+
"keyword": {
344+
"terms": {
345+
"field": "keyword",
346+
}
347+
}
348+
}
349+
]
350+
351+
- match: {hits.total: 6}
352+
- length: { aggregations.test.buckets: 2 }
353+
- length: { aggregations.test.after_key: 1 }
354+
- match: { aggregations.test.after_key.keyword: "foo" }

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java

+66-15
Original file line numberDiff line numberDiff line change
@@ -24,49 +24,93 @@
2424
import org.apache.lucene.search.MatchAllDocsQuery;
2525
import org.apache.lucene.search.Query;
2626
import org.apache.lucene.util.BytesRef;
27+
import org.apache.lucene.util.BytesRefBuilder;
2728
import org.elasticsearch.common.CheckedFunction;
29+
import org.elasticsearch.common.lease.Releasables;
30+
import org.elasticsearch.common.util.BigArrays;
31+
import org.elasticsearch.common.util.ObjectArray;
2832
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
29-
import org.elasticsearch.index.mapper.KeywordFieldMapper;
3033
import org.elasticsearch.index.mapper.MappedFieldType;
3134
import org.elasticsearch.index.mapper.StringFieldType;
32-
import org.elasticsearch.index.mapper.TextFieldMapper;
3335
import org.elasticsearch.search.DocValueFormat;
3436
import org.elasticsearch.search.aggregations.LeafBucketCollector;
3537

3638
import java.io.IOException;
39+
import java.util.function.LongConsumer;
3740

3841
/**
3942
* A {@link SingleDimensionValuesSource} for binary source ({@link BytesRef}).
4043
*/
4144
class BinaryValuesSource extends SingleDimensionValuesSource<BytesRef> {
45+
private final LongConsumer breakerConsumer;
4246
private final CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc;
43-
private final BytesRef[] values;
47+
private ObjectArray<BytesRef> values;
48+
private ObjectArray<BytesRefBuilder> valueBuilders;
4449
private BytesRef currentValue;
4550

46-
BinaryValuesSource(MappedFieldType fieldType, CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc,
47-
DocValueFormat format, Object missing, int size, int reverseMul) {
48-
super(format, fieldType, missing, size, reverseMul);
51+
BinaryValuesSource(BigArrays bigArrays, LongConsumer breakerConsumer,
52+
MappedFieldType fieldType, CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc,
53+
DocValueFormat format, boolean missingBucket, Object missing, int size, int reverseMul) {
54+
super(bigArrays, format, fieldType, missingBucket, missing, size, reverseMul);
55+
this.breakerConsumer = breakerConsumer;
4956
this.docValuesFunc = docValuesFunc;
50-
this.values = new BytesRef[size];
57+
this.values = bigArrays.newObjectArray(Math.min(size, 100));
58+
this.valueBuilders = bigArrays.newObjectArray(Math.min(size, 100));
5159
}
5260

5361
@Override
54-
public void copyCurrent(int slot) {
55-
values[slot] = BytesRef.deepCopyOf(currentValue);
62+
void copyCurrent(int slot) {
63+
values = bigArrays.grow(values, slot+1);
64+
valueBuilders = bigArrays.grow(valueBuilders, slot+1);
65+
BytesRefBuilder builder = valueBuilders.get(slot);
66+
int byteSize = builder == null ? 0 : builder.bytes().length;
67+
if (builder == null) {
68+
builder = new BytesRefBuilder();
69+
valueBuilders.set(slot, builder);
70+
}
71+
if (missingBucket && currentValue == null) {
72+
values.set(slot, null);
73+
} else {
74+
assert currentValue != null;
75+
builder.copyBytes(currentValue);
76+
breakerConsumer.accept(builder.bytes().length - byteSize);
77+
values.set(slot, builder.get());
78+
}
5679
}
5780

5881
@Override
59-
public int compare(int from, int to) {
60-
return compareValues(values[from], values[to]);
82+
int compare(int from, int to) {
83+
if (missingBucket) {
84+
if (values.get(from) == null) {
85+
return values.get(to) == null ? 0 : -1 * reverseMul;
86+
} else if (values.get(to) == null) {
87+
return reverseMul;
88+
}
89+
}
90+
return compareValues(values.get(from), values.get(to));
6191
}
6292

6393
@Override
6494
int compareCurrent(int slot) {
65-
return compareValues(currentValue, values[slot]);
95+
if (missingBucket) {
96+
if (currentValue == null) {
97+
return values.get(slot) == null ? 0 : -1 * reverseMul;
98+
} else if (values.get(slot) == null) {
99+
return reverseMul;
100+
}
101+
}
102+
return compareValues(currentValue, values.get(slot));
66103
}
67104

68105
@Override
69106
int compareCurrentWithAfter() {
107+
if (missingBucket) {
108+
if (currentValue == null) {
109+
return afterValue == null ? 0 : -1 * reverseMul;
110+
} else if (afterValue == null) {
111+
return reverseMul;
112+
}
113+
}
70114
return compareValues(currentValue, afterValue);
71115
}
72116

@@ -76,7 +120,9 @@ int compareValues(BytesRef v1, BytesRef v2) {
76120

77121
@Override
78122
void setAfter(Comparable<?> value) {
79-
if (value.getClass() == String.class) {
123+
if (missingBucket && value == null) {
124+
afterValue = null;
125+
} else if (value.getClass() == String.class) {
80126
afterValue = format.parseBytesRef(value.toString());
81127
} else {
82128
throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName());
@@ -85,7 +131,7 @@ void setAfter(Comparable<?> value) {
85131

86132
@Override
87133
BytesRef toComparable(int slot) {
88-
return values[slot];
134+
return values.get(slot);
89135
}
90136

91137
@Override
@@ -100,6 +146,9 @@ public void collect(int doc, long bucket) throws IOException {
100146
currentValue = dvs.nextValue();
101147
next.collect(doc, bucket);
102148
}
149+
} else if (missingBucket) {
150+
currentValue = null;
151+
next.collect(doc, bucket);
103152
}
104153
}
105154
};
@@ -130,5 +179,7 @@ SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query quer
130179
}
131180

132181
@Override
133-
public void close() {}
182+
public void close() {
183+
Releasables.close(values, valueBuilders);
184+
}
134185
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.aggregations.bucket.composite;
21+
22+
import org.elasticsearch.common.lease.Releasable;
23+
import org.elasticsearch.common.lease.Releasables;
24+
import org.elasticsearch.common.util.BigArrays;
25+
import org.elasticsearch.common.util.LongArray;
26+
27+
/**
28+
* A bit array that is implemented using a growing {@link LongArray}
29+
* created from {@link BigArrays}.
30+
* The underlying long array grows lazily based on the biggest index
31+
* that needs to be set.
32+
*/
33+
final class BitArray implements Releasable {
34+
private final BigArrays bigArrays;
35+
private LongArray bits;
36+
37+
BitArray(BigArrays bigArrays, int initialSize) {
38+
this.bigArrays = bigArrays;
39+
this.bits = bigArrays.newLongArray(initialSize, true);
40+
}
41+
42+
public void set(int index) {
43+
fill(index, true);
44+
}
45+
46+
public void clear(int index) {
47+
fill(index, false);
48+
}
49+
50+
public boolean get(int index) {
51+
int wordNum = index >> 6;
52+
long bitmask = 1L << index;
53+
return (bits.get(wordNum) & bitmask) != 0;
54+
}
55+
56+
private void fill(int index, boolean bit) {
57+
int wordNum = index >> 6;
58+
bits = bigArrays.grow(bits,wordNum+1);
59+
long bitmask = 1L << index;
60+
long value = bit ? bits.get(wordNum) | bitmask : bits.get(wordNum) & ~bitmask;
61+
bits.set(wordNum, value);
62+
}
63+
64+
@Override
65+
public void close() {
66+
Releasables.close(bits);
67+
}
68+
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregation.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.search.aggregations.bucket.composite;
2121

22-
import org.apache.lucene.util.BytesRef;
2322
import org.elasticsearch.common.xcontent.XContentBuilder;
2423
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
2524

@@ -66,11 +65,7 @@ static XContentBuilder toXContentFragment(CompositeAggregation aggregation, XCon
6665
static void buildCompositeMap(String fieldName, Map<String, Object> composite, XContentBuilder builder) throws IOException {
6766
builder.startObject(fieldName);
6867
for (Map.Entry<String, Object> entry : composite.entrySet()) {
69-
if (entry.getValue().getClass() == BytesRef.class) {
70-
builder.field(entry.getKey(), ((BytesRef) entry.getValue()).utf8ToString());
71-
} else {
72-
builder.field(entry.getKey(), entry.getValue());
73-
}
68+
builder.field(entry.getKey(), entry.getValue());
7469
}
7570
builder.endObject();
7671
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,9 @@ protected AggregatorFactory<?> doBuild(SearchContext context, AggregatorFactory<
170170
throw new IllegalArgumentException("Missing value for [after." + sources.get(i).name() + "]");
171171
}
172172
Object obj = after.get(sourceName);
173-
if (obj instanceof Comparable) {
173+
if (configs[i].missingBucket() && obj == null) {
174+
values[i] = null;
175+
} else if (obj instanceof Comparable) {
174176
values[i] = (Comparable<?>) obj;
175177
} else {
176178
throw new IllegalArgumentException("Invalid value for [after." + sources.get(i).name() +

0 commit comments

Comments
 (0)