Skip to content

Fixes GeoLineAggregator bugs #65521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
Expand All @@ -27,8 +29,10 @@ final class GeoLineAggregator extends MetricsAggregator {
/** Multiple ValuesSource with field names */
private final GeoLineMultiValuesSource valuesSources;

private final BigArrays bigArrays;
private final GeoLineBucketedSort sort;
private final GeoLineBucketedSort.Extra extra;
private LongArray counts;
private final boolean includeSorts;
private final SortOrder sortOrder;
private final int size;
Expand All @@ -38,9 +42,11 @@ final class GeoLineAggregator extends MetricsAggregator {
int size) throws IOException {
super(name, context, parent, metaData);
this.valuesSources = valuesSources;
this.bigArrays = context.bigArrays();
if (valuesSources != null) {
this.extra = new GeoLineBucketedSort.Extra(context.bigArrays(), valuesSources);
this.sort = new GeoLineBucketedSort(context.bigArrays(), sortOrder, null, size, valuesSources, extra);
this.extra = new GeoLineBucketedSort.Extra(bigArrays, valuesSources);
this.sort = new GeoLineBucketedSort(bigArrays, sortOrder, null, size, valuesSources, extra);
this.counts = bigArrays.newLongArray(1, true);
} else {
this.extra = null;
this.sort = null;
Expand Down Expand Up @@ -70,6 +76,8 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
@Override
public void collect(int doc, long bucket) throws IOException {
leafSort.collect(doc, bucket);
counts = bigArrays.grow(counts, bucket + 1);
counts.increment(bucket, 1);
}
};
}
Expand All @@ -79,7 +87,7 @@ public InternalAggregation buildAggregation(long bucket) {
if (valuesSources == null) {
return buildEmptyAggregation();
}
boolean complete = sort.inHeapMode(bucket) == false;
boolean complete = counts.get(bucket) <= size;
addRequestCircuitBreakerBytes((Double.SIZE + Long.SIZE) * sort.sizeOf(bucket));
double[] sortVals = sort.getSortValues(bucket);
long[] bucketLine = sort.getPoints(bucket);
Expand All @@ -94,6 +102,6 @@ public InternalAggregation buildEmptyAggregation() {

@Override
public void doClose() {
Releasables.close(sort, extra);
Releasables.close(sort, extra, counts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public Loader loader(LeafReaderContext ctx) throws IOException {
"single document. Use a script to combine multiple geo_point-values-per-doc into a single value.");
}

if (index > values.size()) {
if (index >= values.size()) {
values = bigArrays.grow(values, index + 1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public double[] sortVals() {
}

public int length() {
return line.length;
return line == null ? 0 : line.length;
}

public boolean isComplete() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public double[] getFinalSortValues() {
public void merge() {
// 1. add first element of each sub line to heap
for (int i = 0; i < geoLines.size(); i++) {
if (geoLines.size() > 0) {
if (geoLines.get(i).length() > 0) {
add(i, 0);
}
}
Expand All @@ -66,7 +66,7 @@ public void merge() {
finalSortValues[i] = getTopSortValue();
removeTop();
InternalGeoLine lineChosen = geoLines.get(lineIdx);
if (idxInLine + 1 < lineChosen.line().length) {
if (idxInLine + 1 < lineChosen.length()) {
add(lineIdx, idxInLine + 1);
}
i++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.geo.GeometryTestUtils;
Expand All @@ -44,7 +43,6 @@

import static org.hamcrest.CoreMatchers.equalTo;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/65473")
public class GeoLineAggregatorTests extends AggregatorTestCase {

@Override
Expand Down Expand Up @@ -112,6 +110,96 @@ public void testDescending() throws IOException {
testAggregator(SortOrder.DESC);
}

public void testComplete() throws IOException {
// max size is the same as the number of points
testCompleteForSizeAndNumDocuments(10, 10, true);
// max size is more than the number of points
testCompleteForSizeAndNumDocuments(11, 10, true);
// max size is less than the number of points
testCompleteForSizeAndNumDocuments(9, 10, false);
}

public void testCompleteForSizeAndNumDocuments(int size, int numPoints, boolean complete) throws IOException {
MultiValuesSourceFieldConfig valueConfig = new MultiValuesSourceFieldConfig.Builder()
.setFieldName("value_field")
.build();
MultiValuesSourceFieldConfig sortConfig = new MultiValuesSourceFieldConfig.Builder().setFieldName("sort_field").build();
GeoLineAggregationBuilder lineAggregationBuilder = new GeoLineAggregationBuilder("_name")
.point(valueConfig)
.sortOrder(SortOrder.ASC)
.sort(sortConfig)
.size(size);
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("_name")
.field("group_id")
.subAggregation(lineAggregationBuilder);

Map<String, InternalGeoLine> lines = new HashMap<>(1);
String groupOrd = "0";
long[] points = new long[numPoints];
double[] sortValues = new double[numPoints];
for (int i = 0; i < numPoints; i++) {
Point point = GeometryTestUtils.randomPoint(false);
int encodedLat = GeoEncodingUtils.encodeLatitude(point.getLat());
int encodedLon = GeoEncodingUtils.encodeLongitude(point.getLon());
long lonLat = (((long) encodedLon) << 32) | encodedLat & 0xffffffffL;
points[i] = lonLat;
sortValues[i] = i;
}

int lineSize = Math.min(numPoints, size);
// re-sort line to be ascending
long[] linePoints = Arrays.copyOf(points, lineSize);
double[] lineSorts = Arrays.copyOf(sortValues, lineSize);
new PathArraySorter(linePoints, lineSorts, SortOrder.ASC).sort();

lines.put(groupOrd, new InternalGeoLine("_name",
linePoints, lineSorts, null, complete, true, SortOrder.ASC, size));

testCase(new MatchAllDocsQuery(), aggregationBuilder, iw -> {
for (int i = 0; i < points.length; i++) {
int x = (int) (points[i] >> 32);
int y = (int) points[i];
iw.addDocument(Arrays.asList(new LatLonDocValuesField("value_field",
GeoEncodingUtils.decodeLatitude(y),
GeoEncodingUtils.decodeLongitude(x)),
new SortedNumericDocValuesField("sort_field", NumericUtils.doubleToSortableLong(sortValues[i])),
new SortedDocValuesField("group_id", new BytesRef(groupOrd))));
}
}, terms -> {
for (Terms.Bucket bucket : terms.getBuckets()) {
InternalGeoLine expectedGeoLine = lines.get(bucket.getKeyAsString());
InternalGeoLine geoLine = bucket.getAggregations().get("_name");
assertThat(geoLine.length(), equalTo(expectedGeoLine.length()));
assertThat(geoLine.isComplete(), equalTo(expectedGeoLine.isComplete()));
for (int i = 0; i < geoLine.sortVals().length; i++) {
geoLine.sortVals()[i] = NumericUtils.sortableLongToDouble((long) geoLine.sortVals()[i]);
}
assertArrayEquals(expectedGeoLine.sortVals(), geoLine.sortVals(), 0d);
assertArrayEquals(expectedGeoLine.line(), geoLine.line());
}
});
}

public void testEmpty() throws IOException {
int size = randomIntBetween(1, GeoLineAggregationBuilder.MAX_PATH_SIZE);
MultiValuesSourceFieldConfig valueConfig = new MultiValuesSourceFieldConfig.Builder()
.setFieldName("value_field")
.build();
MultiValuesSourceFieldConfig sortConfig = new MultiValuesSourceFieldConfig.Builder().setFieldName("sort_field").build();
GeoLineAggregationBuilder lineAggregationBuilder = new GeoLineAggregationBuilder("_name")
.point(valueConfig)
.sortOrder(SortOrder.ASC)
.sort(sortConfig)
.size(size);
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("_name")
.field("group_id")
.subAggregation(lineAggregationBuilder);
testCase(new MatchAllDocsQuery(), aggregationBuilder, iw -> {
}, terms -> {
assertTrue(terms.getBuckets().isEmpty());
});
}

private void testAggregator(SortOrder sortOrder) throws IOException {
int size = randomIntBetween(1, GeoLineAggregationBuilder.MAX_PATH_SIZE);
MultiValuesSourceFieldConfig valueConfig = new MultiValuesSourceFieldConfig.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class MergedGeoLinesTests extends ESTestCase {
public InternalGeoLine randomLine(SortOrder sortOrder, int maxLength, double magicDecimal) {
String name = randomAlphaOfLength(5);
int length = randomBoolean() ? maxLength : randomIntBetween(1, maxLength);
boolean complete = length < maxLength;
boolean complete = length <= maxLength;
long[] points = new long[length];
double[] sortValues = new double[length];
for (int i = 0; i < length; i++) {
Expand Down Expand Up @@ -49,4 +49,22 @@ public void testSimpleMerge() {
assertArrayEquals(sortedValues, mergedGeoLines.getFinalSortValues(), 0d);
assertArrayEquals(sortedPoints, mergedGeoLines.getFinalPoints());
}

public void testMergeWithEmptyGeoLine() {
int maxLength = 10;
SortOrder sortOrder = SortOrder.ASC;
InternalGeoLine lineWithPoints = randomLine(sortOrder, maxLength, 0.0);
InternalGeoLine emptyLine = new InternalGeoLine("name", new long[]{}, new double[]{}, Collections.emptyMap(),
true, randomBoolean(), sortOrder, maxLength);
List<InternalGeoLine> geoLines = List.of(lineWithPoints, emptyLine);
MergedGeoLines mergedGeoLines = new MergedGeoLines(geoLines, lineWithPoints.length(), sortOrder);
mergedGeoLines.merge();

// assert that the mergedGeoLines are sorted (does not necessarily validate correctness, but it is a good heuristic)
long[] sortedPoints = Arrays.copyOf(mergedGeoLines.getFinalPoints(), mergedGeoLines.getFinalPoints().length);
double[] sortedValues = Arrays.copyOf(mergedGeoLines.getFinalSortValues(), mergedGeoLines.getFinalSortValues().length);
new PathArraySorter(sortedPoints, sortedValues, sortOrder).sort();
assertArrayEquals(sortedValues, mergedGeoLines.getFinalSortValues(), 0d);
assertArrayEquals(sortedPoints, mergedGeoLines.getFinalPoints());
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
---
"Test geo_line aggregation on geo points":
- skip:
version: "all"
reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/65473"
- do:
indices.create:
index: races
Expand Down