Skip to content

Commit 40f532b

Browse files
committed
Refactor children aggregator into a generic ParentJoinAggregator (#34845)
This commit adds a new ParentJoinAggregator that implements a join using global ordinals in a way that can be reused by the `children` and the upcoming `parent` aggregation. This new aggregator is a refactor of the existing ParentToChildrenAggregator with two main changes: * It uses a dense bit array instead of a long array when the aggregation does not have any parent. * It uses a single aggregator per bucket if it is nested under another aggregation. For the latter case we use a `MultiBucketAggregatorWrapper` in the factory in order to ensure that each instance of the aggregator handles a single bucket. This is more inlined with the strategy we use for other aggregations like `terms` aggregation for instance since the number of buckets to handle should be low (thanks to the breadth_first strategy). This change is also required for #34210 which adds the `parent` aggregation in the parent-join module. Relates #34508
1 parent e30bee7 commit 40f532b

File tree

7 files changed

+198
-152
lines changed

7 files changed

+198
-152
lines changed

modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenAggregatorFactory.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,39 +35,49 @@
3535
import java.util.List;
3636
import java.util.Map;
3737

38-
public class ChildrenAggregatorFactory
39-
extends ValuesSourceAggregatorFactory<WithOrdinals, ChildrenAggregatorFactory> {
38+
public class ChildrenAggregatorFactory extends ValuesSourceAggregatorFactory<WithOrdinals, ChildrenAggregatorFactory> {
4039

4140
private final Query parentFilter;
4241
private final Query childFilter;
4342

44-
public ChildrenAggregatorFactory(String name, ValuesSourceConfig<WithOrdinals> config,
45-
Query childFilter, Query parentFilter, SearchContext context, AggregatorFactory<?> parent,
46-
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
43+
public ChildrenAggregatorFactory(String name,
44+
ValuesSourceConfig<WithOrdinals> config,
45+
Query childFilter,
46+
Query parentFilter,
47+
SearchContext context,
48+
AggregatorFactory<?> parent,
49+
AggregatorFactories.Builder subFactoriesBuilder,
50+
Map<String, Object> metaData) throws IOException {
4751
super(name, config, context, parent, subFactoriesBuilder, metaData);
52+
4853
this.childFilter = childFilter;
4954
this.parentFilter = parentFilter;
5055
}
5156

5257
@Override
53-
protected Aggregator createUnmapped(Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
54-
throws IOException {
58+
protected Aggregator createUnmapped(Aggregator parent,
59+
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
5560
return new NonCollectingAggregator(name, context, parent, pipelineAggregators, metaData) {
56-
5761
@Override
5862
public InternalAggregation buildEmptyAggregation() {
5963
return new InternalChildren(name, 0, buildEmptySubAggregations(), pipelineAggregators(), metaData());
6064
}
61-
6265
};
6366
}
6467

6568
@Override
66-
protected Aggregator doCreateInternal(WithOrdinals valuesSource, Aggregator parent,
67-
boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
68-
throws IOException {
69+
protected Aggregator doCreateInternal(WithOrdinals valuesSource,
70+
Aggregator parent,
71+
boolean collectsFromSingleBucket,
72+
List<PipelineAggregator> pipelineAggregators,
73+
Map<String, Object> metaData) throws IOException {
74+
6975
long maxOrd = valuesSource.globalMaxOrd(context.searcher());
70-
return new ParentToChildrenAggregator(name, factories, context, parent, childFilter,
71-
parentFilter, valuesSource, maxOrd, pipelineAggregators, metaData);
76+
if (collectsFromSingleBucket) {
77+
return new ParentToChildrenAggregator(name, factories, context, parent, childFilter,
78+
parentFilter, valuesSource, maxOrd, pipelineAggregators, metaData);
79+
} else {
80+
return asMultiBucketAggregator(this, context, parent);
81+
}
7282
}
7383
}
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.join.aggregations;
20+
21+
import org.apache.lucene.index.IndexReader;
22+
import org.apache.lucene.index.LeafReaderContext;
23+
import org.apache.lucene.index.SortedSetDocValues;
24+
import org.apache.lucene.search.DocIdSetIterator;
25+
import org.apache.lucene.search.Query;
26+
import org.apache.lucene.search.Scorer;
27+
import org.apache.lucene.search.ConstantScoreScorer;
28+
import org.apache.lucene.search.Weight;
29+
import org.apache.lucene.util.Bits;
30+
import org.elasticsearch.common.lease.Releasables;
31+
import org.elasticsearch.common.lucene.Lucene;
32+
import org.elasticsearch.common.util.BitArray;
33+
import org.elasticsearch.common.util.LongHash;
34+
import org.elasticsearch.search.aggregations.Aggregator;
35+
import org.elasticsearch.search.aggregations.AggregatorFactories;
36+
import org.elasticsearch.search.aggregations.LeafBucketCollector;
37+
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
38+
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
39+
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
40+
import org.elasticsearch.search.aggregations.support.ValuesSource;
41+
import org.elasticsearch.search.internal.SearchContext;
42+
43+
import java.io.IOException;
44+
import java.util.List;
45+
import java.util.Map;
46+
47+
/**
48+
* An aggregator that joins documents based on global ordinals.
49+
* Global ordinals that match the main query and the <code>inFilter</code> query are replayed
50+
* with documents matching the <code>outFilter</code> query.
51+
*/
52+
public abstract class ParentJoinAggregator extends BucketsAggregator implements SingleBucketAggregator {
53+
private final Weight inFilter;
54+
private final Weight outFilter;
55+
private final ValuesSource.Bytes.WithOrdinals valuesSource;
56+
private final boolean singleAggregator;
57+
58+
/**
59+
* If this aggregator is nested under another aggregator we allocate a long hash per bucket.
60+
*/
61+
private final LongHash ordsHash;
62+
/**
63+
* Otherwise we use a dense bit array to record the global ordinals.
64+
*/
65+
private final BitArray ordsBit;
66+
67+
public ParentJoinAggregator(String name,
68+
AggregatorFactories factories,
69+
SearchContext context,
70+
Aggregator parent,
71+
Query inFilter,
72+
Query outFilter,
73+
ValuesSource.Bytes.WithOrdinals valuesSource,
74+
long maxOrd,
75+
List<PipelineAggregator> pipelineAggregators,
76+
Map<String, Object> metaData) throws IOException {
77+
super(name, factories, context, parent, pipelineAggregators, metaData);
78+
79+
if (maxOrd > Integer.MAX_VALUE) {
80+
throw new IllegalStateException("the number of parent [" + maxOrd + "] + is greater than the allowed limit " +
81+
"for this aggregation: " + Integer.MAX_VALUE);
82+
}
83+
84+
// these two filters are cached in the parser
85+
this.inFilter = context.searcher().createWeight(context.searcher().rewrite(inFilter), false,1f);
86+
this.outFilter = context.searcher().createWeight(context.searcher().rewrite(outFilter), false,1f);
87+
this.valuesSource = valuesSource;
88+
this.singleAggregator = parent == null;
89+
this.ordsBit = singleAggregator ? new BitArray((int) maxOrd, context.bigArrays()) : null;
90+
this.ordsHash = singleAggregator ? null : new LongHash(1, context.bigArrays());
91+
}
92+
93+
private void addGlobalOrdinal(int globalOrdinal) {
94+
if (singleAggregator) {
95+
ordsBit.set(globalOrdinal);
96+
} else {
97+
ordsHash.add(globalOrdinal);
98+
}
99+
}
100+
101+
private boolean existsGlobalOrdinal(int globalOrdinal) {
102+
return singleAggregator ? ordsBit.get(globalOrdinal): ordsHash.find(globalOrdinal) >= 0;
103+
}
104+
105+
@Override
106+
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
107+
final LeafBucketCollector sub) throws IOException {
108+
if (valuesSource == null) {
109+
return LeafBucketCollector.NO_OP_COLLECTOR;
110+
}
111+
final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx);
112+
final Bits parentDocs = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), inFilter.scorerSupplier(ctx));
113+
return new LeafBucketCollector() {
114+
@Override
115+
public void collect(int docId, long bucket) throws IOException {
116+
assert bucket == 0;
117+
if (parentDocs.get(docId) && globalOrdinals.advanceExact(docId)) {
118+
int globalOrdinal = (int) globalOrdinals.nextOrd();
119+
assert globalOrdinal != -1 && globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS;
120+
addGlobalOrdinal(globalOrdinal);
121+
}
122+
}
123+
};
124+
}
125+
126+
@Override
127+
protected final void doPostCollection() throws IOException {
128+
IndexReader indexReader = context().searcher().getIndexReader();
129+
for (LeafReaderContext ctx : indexReader.leaves()) {
130+
Scorer childDocsScorer = outFilter.scorer(ctx);
131+
if (childDocsScorer == null) {
132+
continue;
133+
}
134+
DocIdSetIterator childDocsIter = childDocsScorer.iterator();
135+
136+
final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx);
137+
138+
final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx);
139+
// Set the scorer, since we now replay only the child docIds
140+
sub.setScorer(new ConstantScoreScorer(null, 1f, childDocsIter));
141+
142+
final Bits liveDocs = ctx.reader().getLiveDocs();
143+
for (int docId = childDocsIter.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = childDocsIter.nextDoc()) {
144+
if (liveDocs != null && liveDocs.get(docId) == false) {
145+
continue;
146+
}
147+
if (globalOrdinals.advanceExact(docId)) {
148+
int globalOrdinal = (int) globalOrdinals.nextOrd();
149+
assert globalOrdinal != -1 && globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS;
150+
if (existsGlobalOrdinal(globalOrdinal)) {
151+
collectBucket(sub, docId, 0);
152+
}
153+
}
154+
}
155+
}
156+
}
157+
158+
@Override
159+
protected void doClose() {
160+
Releasables.close(ordsBit, ordsHash);
161+
}
162+
}

modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java

Lines changed: 3 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -18,72 +18,28 @@
1818
*/
1919
package org.elasticsearch.join.aggregations;
2020

21-
import org.apache.lucene.index.IndexReader;
22-
import org.apache.lucene.index.LeafReaderContext;
23-
import org.apache.lucene.index.SortedSetDocValues;
24-
import org.apache.lucene.search.ConstantScoreScorer;
25-
import org.apache.lucene.search.DocIdSetIterator;
2621
import org.apache.lucene.search.Query;
27-
import org.apache.lucene.search.Scorer;
28-
import org.apache.lucene.search.Weight;
29-
import org.apache.lucene.util.Bits;
3022
import org.elasticsearch.common.ParseField;
31-
import org.elasticsearch.common.lease.Releasables;
32-
import org.elasticsearch.common.lucene.Lucene;
33-
import org.elasticsearch.common.util.LongArray;
34-
import org.elasticsearch.common.util.LongObjectPagedHashMap;
3523
import org.elasticsearch.search.aggregations.Aggregator;
3624
import org.elasticsearch.search.aggregations.AggregatorFactories;
3725
import org.elasticsearch.search.aggregations.InternalAggregation;
38-
import org.elasticsearch.search.aggregations.LeafBucketCollector;
39-
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
40-
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
4126
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
4227
import org.elasticsearch.search.aggregations.support.ValuesSource;
4328
import org.elasticsearch.search.internal.SearchContext;
4429

4530
import java.io.IOException;
46-
import java.util.Arrays;
4731
import java.util.List;
4832
import java.util.Map;
4933

50-
// The RecordingPerReaderBucketCollector assumes per segment recording which isn't the case for this
51-
// aggregation, for this reason that collector can't be used
52-
public class ParentToChildrenAggregator extends BucketsAggregator implements SingleBucketAggregator {
34+
public class ParentToChildrenAggregator extends ParentJoinAggregator {
5335

5436
static final ParseField TYPE_FIELD = new ParseField("type");
5537

56-
private final Weight childFilter;
57-
private final Weight parentFilter;
58-
private final ValuesSource.Bytes.WithOrdinals valuesSource;
59-
60-
// Maybe use PagedGrowableWriter? This will be less wasteful than LongArray,
61-
// but then we don't have the reuse feature of BigArrays.
62-
// Also if we know the highest possible value that a parent agg will create
63-
// then we store multiple values into one slot
64-
private final LongArray parentOrdToBuckets;
65-
66-
// Only pay the extra storage price if the a parentOrd has multiple buckets
67-
// Most of the times a parent doesn't have multiple buckets, since there is
68-
// only one document per parent ord,
69-
// only in the case of terms agg if a parent doc has multiple terms per
70-
// field this is needed:
71-
private final LongObjectPagedHashMap<long[]> parentOrdToOtherBuckets;
72-
private boolean multipleBucketsPerParentOrd = false;
73-
7438
public ParentToChildrenAggregator(String name, AggregatorFactories factories,
7539
SearchContext context, Aggregator parent, Query childFilter,
7640
Query parentFilter, ValuesSource.Bytes.WithOrdinals valuesSource,
77-
long maxOrd, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
78-
throws IOException {
79-
super(name, factories, context, parent, pipelineAggregators, metaData);
80-
// these two filters are cached in the parser
81-
this.childFilter = context.searcher().createNormalizedWeight(childFilter, false);
82-
this.parentFilter = context.searcher().createNormalizedWeight(parentFilter, false);
83-
this.parentOrdToBuckets = context.bigArrays().newLongArray(maxOrd, false);
84-
this.parentOrdToBuckets.fill(0, maxOrd, -1);
85-
this.parentOrdToOtherBuckets = new LongObjectPagedHashMap<>(context.bigArrays());
86-
this.valuesSource = valuesSource;
41+
long maxOrd, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
42+
super(name, factories, context, parent, parentFilter, childFilter, valuesSource, maxOrd, pipelineAggregators, metaData);
8743
}
8844

8945
@Override
@@ -97,87 +53,4 @@ public InternalAggregation buildEmptyAggregation() {
9753
return new InternalChildren(name, 0, buildEmptySubAggregations(), pipelineAggregators(),
9854
metaData());
9955
}
100-
101-
@Override
102-
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
103-
final LeafBucketCollector sub) throws IOException {
104-
if (valuesSource == null) {
105-
return LeafBucketCollector.NO_OP_COLLECTOR;
106-
}
107-
final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx);
108-
final Bits parentDocs = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), parentFilter.scorerSupplier(ctx));
109-
return new LeafBucketCollector() {
110-
111-
@Override
112-
public void collect(int docId, long bucket) throws IOException {
113-
if (parentDocs.get(docId) && globalOrdinals.advanceExact(docId)) {
114-
long globalOrdinal = globalOrdinals.nextOrd();
115-
assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS;
116-
if (globalOrdinal != -1) {
117-
if (parentOrdToBuckets.get(globalOrdinal) == -1) {
118-
parentOrdToBuckets.set(globalOrdinal, bucket);
119-
} else {
120-
long[] bucketOrds = parentOrdToOtherBuckets.get(globalOrdinal);
121-
if (bucketOrds != null) {
122-
bucketOrds = Arrays.copyOf(bucketOrds, bucketOrds.length + 1);
123-
bucketOrds[bucketOrds.length - 1] = bucket;
124-
parentOrdToOtherBuckets.put(globalOrdinal, bucketOrds);
125-
} else {
126-
parentOrdToOtherBuckets.put(globalOrdinal, new long[] { bucket });
127-
}
128-
multipleBucketsPerParentOrd = true;
129-
}
130-
}
131-
}
132-
}
133-
};
134-
}
135-
136-
@Override
137-
protected void doPostCollection() throws IOException {
138-
IndexReader indexReader = context().searcher().getIndexReader();
139-
for (LeafReaderContext ctx : indexReader.leaves()) {
140-
Scorer childDocsScorer = childFilter.scorer(ctx);
141-
if (childDocsScorer == null) {
142-
continue;
143-
}
144-
DocIdSetIterator childDocsIter = childDocsScorer.iterator();
145-
146-
final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx);
147-
148-
final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx);
149-
// Set the scorer, since we now replay only the child docIds
150-
sub.setScorer(new ConstantScoreScorer(null, 1f, childDocsIter));
151-
152-
final Bits liveDocs = ctx.reader().getLiveDocs();
153-
for (int docId = childDocsIter
154-
.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = childDocsIter
155-
.nextDoc()) {
156-
if (liveDocs != null && liveDocs.get(docId) == false) {
157-
continue;
158-
}
159-
if (globalOrdinals.advanceExact(docId)) {
160-
long globalOrdinal = globalOrdinals.nextOrd();
161-
assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS;
162-
long bucketOrd = parentOrdToBuckets.get(globalOrdinal);
163-
if (bucketOrd != -1) {
164-
collectBucket(sub, docId, bucketOrd);
165-
if (multipleBucketsPerParentOrd) {
166-
long[] otherBucketOrds = parentOrdToOtherBuckets.get(globalOrdinal);
167-
if (otherBucketOrds != null) {
168-
for (long otherBucketOrd : otherBucketOrds) {
169-
collectBucket(sub, docId, otherBucketOrd);
170-
}
171-
}
172-
}
173-
}
174-
}
175-
}
176-
}
177-
}
178-
179-
@Override
180-
protected void doClose() {
181-
Releasables.close(parentOrdToBuckets, parentOrdToOtherBuckets);
182-
}
18356
}

0 commit comments

Comments
 (0)