Skip to content

Terms aggregation should remap global ordinal buckets when a sub-aggregator is used to sort the terms #24941

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,28 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri
protected final SignificantTermsAggregatorFactory termsAggFactory;
private final SignificanceHeuristic significanceHeuristic;

public GlobalOrdinalsSignificantTermsAggregator(String name, AggregatorFactories factories,
ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, DocValueFormat format,
BucketCountThresholds bucketCountThresholds, IncludeExclude.OrdinalsFilter includeExclude,
SearchContext context, Aggregator parent,
SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory termsAggFactory,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {

public GlobalOrdinalsSignificantTermsAggregator(String name,
AggregatorFactories factories,
ValuesSource.Bytes.WithOrdinals.FieldData valuesSource,
DocValueFormat format,
BucketCountThresholds bucketCountThresholds,
IncludeExclude.OrdinalsFilter includeExclude,
SearchContext context,
Aggregator parent,
boolean forceRemapGlobalOrds,
SignificanceHeuristic significanceHeuristic,
SignificantTermsAggregatorFactory termsAggFactory,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
super(name, factories, valuesSource, null, format, bucketCountThresholds, includeExclude, context, parent,
SubAggCollectionMode.DEPTH_FIRST, false, pipelineAggregators, metaData);
forceRemapGlobalOrds, SubAggCollectionMode.DEPTH_FIRST, false, pipelineAggregators, metaData);
this.significanceHeuristic = significanceHeuristic;
this.termsAggFactory = termsAggFactory;
this.numCollectedDocs = 0;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
return new LeafBucketCollectorBase(super.getLeafCollector(ctx, sub), null) {
@Override
public void collect(int doc, long bucket) throws IOException {
Expand All @@ -78,18 +84,17 @@ public void collect(int doc, long bucket) throws IOException {
};
}


@Override
public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
if (globalOrds == null) { // no context in this reader
if (valueCount == 0) { // no context in this reader
return buildEmptyAggregation();
}

final int size;
if (bucketCountThresholds.getMinDocCount() == 0) {
// if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
size = (int) Math.min(globalOrds.getValueCount(), bucketCountThresholds.getShardSize());
size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize());
} else {
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
}
Expand All @@ -98,7 +103,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws

BucketSignificancePriorityQueue<SignificantStringTerms.Bucket> ordered = new BucketSignificancePriorityQueue<>(size);
SignificantStringTerms.Bucket spare = null;
for (long globalTermOrd = 0; globalTermOrd < globalOrds.getValueCount(); ++globalTermOrd) {
for (long globalTermOrd = 0; globalTermOrd < valueCount; ++globalTermOrd) {
if (includeExclude != null && !acceptedGlobalOrdinals.get(globalTermOrd)) {
continue;
}
Expand All @@ -115,7 +120,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format);
}
spare.bucketOrd = bucketOrd;
copy(globalOrds.lookupOrd(globalTermOrd), spare.termBytes);
copy(lookupGlobalOrd.apply(globalTermOrd), spare.termBytes);
spare.subsetDf = bucketDocCount;
spare.subsetSize = subsetSize;
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
Expand Down Expand Up @@ -148,63 +153,13 @@ public SignificantStringTerms buildEmptyAggregation() {
IndexReader topReader = searcher.getIndexReader();
int supersetSize = topReader.numDocs();
return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
pipelineAggregators(), metaData(), format, 0, supersetSize, significanceHeuristic, emptyList());
pipelineAggregators(), metaData(), format, numCollectedDocs, supersetSize, significanceHeuristic, emptyList());
}

@Override
protected void doClose() {
super.doClose();
Releasables.close(termsAggFactory);
}

public static class WithHash extends GlobalOrdinalsSignificantTermsAggregator {

private final LongHash bucketOrds;

public WithHash(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource,
DocValueFormat format, BucketCountThresholds bucketCountThresholds, IncludeExclude.OrdinalsFilter includeExclude,
SearchContext context, Aggregator parent, SignificanceHeuristic significanceHeuristic,
SignificantTermsAggregatorFactory termsAggFactory, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
super(name, factories, valuesSource, format, bucketCountThresholds, includeExclude, context, parent, significanceHeuristic,
termsAggFactory, pipelineAggregators, metaData);
bucketOrds = new LongHash(1, context.bigArrays());
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
return new LeafBucketCollectorBase(super.getLeafCollector(ctx, sub), null) {
@Override
public void collect(int doc, long bucket) throws IOException {
assert bucket == 0;
numCollectedDocs++;
if (globalOrds.advanceExact(doc)) {
for (long globalOrd = globalOrds.nextOrd();
globalOrd != SortedSetDocValues.NO_MORE_ORDS;
globalOrd = globalOrds.nextOrd()) {
long bucketOrd = bucketOrds.add(globalOrd);
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
collectExistingBucket(sub, doc, bucketOrd);
} else {
collectBucket(sub, doc, bucketOrd);
}
}
}
}
};
}

@Override
protected long getBucketOrd(long termOrd) {
return bucketOrds.find(termOrd);
}

@Override
protected void doClose() {
Releasables.close(termsAggFactory, bucketOrds);
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,17 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
private final TermsAggregator.BucketCountThresholds bucketCountThresholds;
private final SignificanceHeuristic significanceHeuristic;

public SignificantTermsAggregatorFactory(String name, ValuesSourceConfig<ValuesSource> config, IncludeExclude includeExclude,
String executionHint, QueryBuilder filterBuilder, TermsAggregator.BucketCountThresholds bucketCountThresholds,
SignificanceHeuristic significanceHeuristic, SearchContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
public SignificantTermsAggregatorFactory(String name,
ValuesSourceConfig<ValuesSource> config,
IncludeExclude includeExclude,
String executionHint,
QueryBuilder filterBuilder,
TermsAggregator.BucketCountThresholds bucketCountThresholds,
SignificanceHeuristic significanceHeuristic,
SearchContext context,
AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metaData) throws IOException {
super(name, config, context, parent, subFactoriesBuilder, metaData);
this.includeExclude = includeExclude;
this.executionHint = executionHint;
Expand Down Expand Up @@ -246,44 +253,71 @@ public enum ExecutionMode {
MAP(new ParseField("map")) {

@Override
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format,
TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude,
SearchContext aggregationContext, Aggregator parent, SignificanceHeuristic significanceHeuristic,
SignificantTermsAggregatorFactory termsAggregatorFactory, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
Aggregator create(String name,
AggregatorFactories factories,
ValuesSource valuesSource,
DocValueFormat format,
TermsAggregator.BucketCountThresholds bucketCountThresholds,
IncludeExclude includeExclude,
SearchContext aggregationContext,
Aggregator parent,
SignificanceHeuristic significanceHeuristic,
SignificantTermsAggregatorFactory termsAggregatorFactory,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {

final IncludeExclude.StringFilter filter = includeExclude == null ? null : includeExclude.convertToStringFilter(format);
return new SignificantStringTermsAggregator(name, factories, valuesSource, format, bucketCountThresholds, filter,
aggregationContext, parent, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData);

}

},
GLOBAL_ORDINALS(new ParseField("global_ordinals")) {

@Override
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format,
TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude,
SearchContext aggregationContext, Aggregator parent, SignificanceHeuristic significanceHeuristic,
SignificantTermsAggregatorFactory termsAggregatorFactory, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
Aggregator create(String name,
AggregatorFactories factories,
ValuesSource valuesSource,
DocValueFormat format,
TermsAggregator.BucketCountThresholds bucketCountThresholds,
IncludeExclude includeExclude,
SearchContext aggregationContext,
Aggregator parent,
SignificanceHeuristic significanceHeuristic,
SignificantTermsAggregatorFactory termsAggregatorFactory,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {

final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format);
return new GlobalOrdinalsSignificantTermsAggregator(name, factories,
(ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, format, bucketCountThresholds, filter,
aggregationContext, parent, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData);
aggregationContext, parent, false, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData);

}

},
GLOBAL_ORDINALS_HASH(new ParseField("global_ordinals_hash")) {

@Override
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format,
TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude,
SearchContext aggregationContext, Aggregator parent, SignificanceHeuristic significanceHeuristic,
SignificantTermsAggregatorFactory termsAggregatorFactory, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
Aggregator create(String name,
AggregatorFactories factories,
ValuesSource valuesSource,
DocValueFormat format,
TermsAggregator.BucketCountThresholds bucketCountThresholds,
IncludeExclude includeExclude,
SearchContext aggregationContext,
Aggregator parent,
SignificanceHeuristic significanceHeuristic,
SignificantTermsAggregatorFactory termsAggregatorFactory,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {

final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format);
return new GlobalOrdinalsSignificantTermsAggregator.WithHash(name, factories,
(ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, format, bucketCountThresholds, filter,
aggregationContext, parent, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData);
return new GlobalOrdinalsSignificantTermsAggregator(name, factories,
(ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, format, bucketCountThresholds, filter, aggregationContext, parent,
true, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData);

}
};

Expand All @@ -302,11 +336,18 @@ public static ExecutionMode fromString(String value) {
this.parseField = parseField;
}

abstract Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format,
TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude,
SearchContext aggregationContext, Aggregator parent, SignificanceHeuristic significanceHeuristic,
SignificantTermsAggregatorFactory termsAggregatorFactory, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException;
abstract Aggregator create(String name,
AggregatorFactories factories,
ValuesSource valuesSource,
DocValueFormat format,
TermsAggregator.BucketCountThresholds bucketCountThresholds,
IncludeExclude includeExclude,
SearchContext aggregationContext,
Aggregator parent,
SignificanceHeuristic significanceHeuristic,
SignificantTermsAggregatorFactory termsAggregatorFactory,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException;

@Override
public String toString() {
Expand Down
Loading