Skip to content

Commit 126ec9e

Browse files
authored
Fix sorting terms by cardinality agg (#67839)
The cardinality agg delays calculating stuff until just before it is needed. Before #64016 it used the `postCollect` phase to do this work which was perfect for the `terms` agg but we decided that `postCollect` was dangerous because some aggs, notably the `parent` and `child` aggs need to know which children to build and they *can't* during `postCollect`. After #64016 we built the cardinality agg results when we built the buckets. But we if you sort on the cardinality agg then you need to do the `postCollect` stuff in order to know which buckets to build! So you have a chicken and egg problem. Sort of. This change splits the difference by running the delayed cardinality agg stuff as soon as you *either* try to build the buckets *or* read the cardinality for use with sorting. This works, but is a little janky and feels wrong. It feels like we could make a structural fix to the way we read metric values from aggs before building the buckets that would make this sort of bug much more difficult to cause. But any sort of solution to this is a larger structural change. So this fixes the bug in the quick and janky way and we hope to do a more structural fix to the way we read metrics soon. Closes #67782
1 parent 48d7902 commit 126ec9e

File tree

4 files changed

+109
-3
lines changed

4 files changed

+109
-3
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.common.util.ObjectArray;
3939
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
4040
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
41+
import org.elasticsearch.search.aggregations.AggregationExecutionException;
4142
import org.elasticsearch.search.aggregations.Aggregator;
4243
import org.elasticsearch.search.aggregations.InternalAggregation;
4344
import org.elasticsearch.search.aggregations.LeafBucketCollector;
@@ -152,6 +153,12 @@ protected void beforeBuildingResults(long[] ordsToCollect) throws IOException {
152153

153154
@Override
154155
public double metric(long owningBucketOrd) {
156+
try {
157+
// Make sure all outstanding data has been synced down to the counts.
158+
postCollectLastCollector();
159+
} catch (IOException e) {
160+
throw new AggregationExecutionException("error collecting data in last segment", e);
161+
}
155162
return counts == null ? 0 : counts.cardinality(owningBucketOrd);
156163
}
157164

server/src/main/java/org/elasticsearch/search/aggregations/metrics/GlobalOrdCardinalityAggregator.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.util.BitArray;
3232
import org.elasticsearch.common.util.LongArray;
3333
import org.elasticsearch.common.util.ObjectArray;
34+
import org.elasticsearch.search.aggregations.AggregationExecutionException;
3435
import org.elasticsearch.search.aggregations.Aggregator;
3536
import org.elasticsearch.search.aggregations.InternalAggregation;
3637
import org.elasticsearch.search.aggregations.LeafBucketCollector;
@@ -103,6 +104,13 @@ public void collect(int doc, long bucketOrd) throws IOException {
103104

104105
@Override
105106
protected void beforeBuildingResults(long[] ordsToCollect) throws IOException {
107+
buildCountIfNeeded();
108+
}
109+
110+
private void buildCountIfNeeded() throws IOException {
111+
if (counts != null) {
112+
return;
113+
}
106114
counts = new HyperLogLogPlusPlusSparse(precision, bigArrays, visitedOrds.size());
107115
try (LongArray hashes = bigArrays.newLongArray(maxOrd, false)) {
108116
try (BitArray allVisitedOrds = new BitArray(maxOrd, bigArrays)) {
@@ -141,12 +149,18 @@ protected void beforeBuildingResults(long[] ordsToCollect) throws IOException {
141149

142150
@Override
143151
public double metric(long owningBucketOrd) {
144-
return counts == null ? 0 : counts.cardinality(owningBucketOrd);
152+
try {
153+
// Make sure all outstanding data has been synced down to the counts.
154+
buildCountIfNeeded();
155+
} catch (IOException e) {
156+
throw new AggregationExecutionException("error collecting data in last segment", e);
157+
}
158+
return counts.cardinality(owningBucketOrd);
145159
}
146160

147161
@Override
148162
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
149-
if (counts == null || owningBucketOrdinal >= counts.maxOrd() || counts.cardinality(owningBucketOrdinal) == 0) {
163+
if (owningBucketOrdinal >= counts.maxOrd() || counts.cardinality(owningBucketOrdinal) == 0) {
150164
return buildEmptyAggregation();
151165
}
152166
// We need to build a copy because the returned Aggregation needs remain usable after

server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import org.elasticsearch.search.aggregations.bucket.nested.InternalNested;
8787
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
8888
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregatorTests;
89+
import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder;
8990
import org.elasticsearch.search.aggregations.metrics.InternalTopHits;
9091
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
9192
import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder;
@@ -113,6 +114,7 @@
113114
import java.util.function.Function;
114115

115116
import static java.util.Collections.singleton;
117+
import static java.util.stream.Collectors.toList;
116118
import static org.elasticsearch.index.mapper.SeqNoFieldMapper.PRIMARY_TERM_NAME;
117119
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
118120
import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.bucketScript;
@@ -1419,6 +1421,71 @@ public void testOrderByPipelineAggregation() throws Exception {
14191421
}
14201422
}
14211423

1424+
public void testOrderByCardinality() throws IOException {
1425+
boolean bIsString = randomBoolean();
1426+
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("a").field("a")
1427+
.size(3)
1428+
.shardSize(3)
1429+
.subAggregation(new CardinalityAggregationBuilder("b").field("b"))
1430+
.order(BucketOrder.aggregation("b", false));
1431+
1432+
/*
1433+
* Build documents where larger "a"s obviously have more distinct "b"s
1434+
* associated with them. But insert them into Lucene in a random
1435+
* order using Lucene's randomizeWriter so we'll bump into situations
1436+
* where documents in the last segment change the outcome of the
1437+
* cardinality agg. At least, right now the bug has to do with
1438+
* documents in the last segment. But randomize so we can catch
1439+
* new and strange bugs in the future. Finally, its important that
1440+
* we have few enough values that cardinality can be exact.
1441+
*/
1442+
List<List<IndexableField>> docs = new ArrayList<>();
1443+
for (int a = 0; a < 10; a++) {
1444+
for (int b = 0; b <= a; b++) {
1445+
docs.add(
1446+
List.of(
1447+
new NumericDocValuesField("a", a),
1448+
bIsString ? new SortedSetDocValuesField("b", new BytesRef(Integer.toString(b))) : new NumericDocValuesField("b", b)
1449+
)
1450+
);
1451+
}
1452+
}
1453+
Collections.shuffle(docs, random());
1454+
try (Directory directory = newDirectory()) {
1455+
RandomIndexWriter iw = new RandomIndexWriter(random(), directory);
1456+
for (List<IndexableField> doc : docs) {
1457+
iw.addDocument(doc);
1458+
}
1459+
iw.close();
1460+
1461+
try (DirectoryReader unwrapped = DirectoryReader.open(directory);
1462+
IndexReader indexReader = wrapDirectoryReader(unwrapped)) {
1463+
IndexSearcher indexSearcher = newIndexSearcher(indexReader);
1464+
1465+
LongTerms terms = searchAndReduce(
1466+
createIndexSettings(),
1467+
indexSearcher,
1468+
new MatchAllDocsQuery(),
1469+
aggregationBuilder,
1470+
Integer.MAX_VALUE,
1471+
false,
1472+
new NumberFieldMapper.NumberFieldType("a", NumberFieldMapper.NumberType.INTEGER),
1473+
bIsString
1474+
? new KeywordFieldMapper.KeywordFieldType("b")
1475+
: new NumberFieldMapper.NumberFieldType("b", NumberFieldMapper.NumberType.INTEGER)
1476+
);
1477+
assertThat(
1478+
terms.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(toList()),
1479+
equalTo(List.of(9L, 8L, 7L))
1480+
);
1481+
assertThat(
1482+
terms.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getDocCount).collect(toList()),
1483+
equalTo(List.of(10L, 9L, 8L))
1484+
);
1485+
}
1486+
}
1487+
}
1488+
14221489
private final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
14231490
private List<Document> generateDocsWithNested(String id, int value, int[] nestedValues) {
14241491
List<Document> documents = new ArrayList<>();

test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,24 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
429429
AggregationBuilder builder,
430430
int maxBucket,
431431
MappedFieldType... fieldTypes) throws IOException {
432+
return searchAndReduce(indexSettings, searcher, query, builder, maxBucket, randomBoolean(), fieldTypes);
433+
}
434+
435+
/**
436+
* Collects all documents that match the provided query {@link Query} and
437+
* returns the reduced {@link InternalAggregation}.
438+
* <p>
439+
* @param splitLeavesIntoSeparateAggregators If true this creates a new {@link Aggregator}
440+
* for each leaf as though it were a separate index. If false this aggregates
441+
* all leaves together, like we do in production.
442+
*/
443+
protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSettings indexSettings,
444+
IndexSearcher searcher,
445+
Query query,
446+
AggregationBuilder builder,
447+
int maxBucket,
448+
boolean splitLeavesIntoSeparateAggregators,
449+
MappedFieldType... fieldTypes) throws IOException {
432450
final IndexReaderContext ctx = searcher.getTopReaderContext();
433451
final PipelineTree pipelines = builder.buildPipelineTree();
434452
List<InternalAggregation> aggs = new ArrayList<>();
@@ -445,7 +463,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
445463
);
446464
C root = createAggregator(builder, context);
447465

448-
if (randomBoolean() && searcher.getIndexReader().leaves().size() > 0) {
466+
if (splitLeavesIntoSeparateAggregators && searcher.getIndexReader().leaves().size() > 0) {
449467
assertThat(ctx, instanceOf(CompositeReaderContext.class));
450468
final CompositeReaderContext compCTX = (CompositeReaderContext) ctx;
451469
final int size = compCTX.leaves().size();

0 commit comments

Comments
 (0)