Skip to content

Commit e4e8905

Browse files
authored
Fixes GeoLineAggregator bugs (#65521)
This commit unmutes and fixes tests around some GeoLineAggregator edge cases. - MergedGeoLines had a silly bug where it was accepting InternalGeoLines that were empty - "complete" is measured by the heap-mode of the BucketedSort, which is a problem since if the length of the data equals the max-size, then it is difficult to know whether any values were discarded. - GeoLineBucketSort had an array-resizing bug s/>/>= Fixes #65473.
1 parent ae69472 commit e4e8905

File tree

7 files changed

+125
-14
lines changed

7 files changed

+125
-14
lines changed

x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregator.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import org.apache.lucene.index.LeafReaderContext;
99
import org.apache.lucene.search.ScoreMode;
1010
import org.elasticsearch.common.lease.Releasables;
11+
import org.elasticsearch.common.util.BigArrays;
12+
import org.elasticsearch.common.util.LongArray;
1113
import org.elasticsearch.search.aggregations.Aggregator;
1214
import org.elasticsearch.search.aggregations.InternalAggregation;
1315
import org.elasticsearch.search.aggregations.LeafBucketCollector;
@@ -27,8 +29,10 @@ final class GeoLineAggregator extends MetricsAggregator {
2729
/** Multiple ValuesSource with field names */
2830
private final GeoLineMultiValuesSource valuesSources;
2931

32+
private final BigArrays bigArrays;
3033
private final GeoLineBucketedSort sort;
3134
private final GeoLineBucketedSort.Extra extra;
35+
private LongArray counts;
3236
private final boolean includeSorts;
3337
private final SortOrder sortOrder;
3438
private final int size;
@@ -38,9 +42,11 @@ final class GeoLineAggregator extends MetricsAggregator {
3842
int size) throws IOException {
3943
super(name, context, parent, metaData);
4044
this.valuesSources = valuesSources;
45+
this.bigArrays = context.bigArrays();
4146
if (valuesSources != null) {
42-
this.extra = new GeoLineBucketedSort.Extra(context.bigArrays(), valuesSources);
43-
this.sort = new GeoLineBucketedSort(context.bigArrays(), sortOrder, null, size, valuesSources, extra);
47+
this.extra = new GeoLineBucketedSort.Extra(bigArrays, valuesSources);
48+
this.sort = new GeoLineBucketedSort(bigArrays, sortOrder, null, size, valuesSources, extra);
49+
this.counts = bigArrays.newLongArray(1, true);
4450
} else {
4551
this.extra = null;
4652
this.sort = null;
@@ -70,6 +76,8 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
7076
@Override
7177
public void collect(int doc, long bucket) throws IOException {
7278
leafSort.collect(doc, bucket);
79+
counts = bigArrays.grow(counts, bucket + 1);
80+
counts.increment(bucket, 1);
7381
}
7482
};
7583
}
@@ -79,7 +87,7 @@ public InternalAggregation buildAggregation(long bucket) {
7987
if (valuesSources == null) {
8088
return buildEmptyAggregation();
8189
}
82-
boolean complete = sort.inHeapMode(bucket) == false;
90+
boolean complete = counts.get(bucket) <= size;
8391
addRequestCircuitBreakerBytes((Double.SIZE + Long.SIZE) * sort.sizeOf(bucket));
8492
double[] sortVals = sort.getSortValues(bucket);
8593
long[] bucketLine = sort.getPoints(bucket);
@@ -94,6 +102,6 @@ public InternalAggregation buildEmptyAggregation() {
94102

95103
@Override
96104
public void doClose() {
97-
Releasables.close(sort, extra);
105+
Releasables.close(sort, extra, counts);
98106
}
99107
}

x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineBucketedSort.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ public Loader loader(LeafReaderContext ctx) throws IOException {
173173
"single document. Use a script to combine multiple geo_point-values-per-doc into a single value.");
174174
}
175175

176-
if (index > values.size()) {
176+
if (index >= values.size()) {
177177
values = bigArrays.grow(values, index + 1);
178178
}
179179

x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public double[] sortVals() {
125125
}
126126

127127
public int length() {
128-
return line.length;
128+
return line == null ? 0 : line.length;
129129
}
130130

131131
public boolean isComplete() {

x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/MergedGeoLines.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public double[] getFinalSortValues() {
5050
public void merge() {
5151
// 1. add first element of each sub line to heap
5252
for (int i = 0; i < geoLines.size(); i++) {
53-
if (geoLines.size() > 0) {
53+
if (geoLines.get(i).length() > 0) {
5454
add(i, 0);
5555
}
5656
}
@@ -66,7 +66,7 @@ public void merge() {
6666
finalSortValues[i] = getTopSortValue();
6767
removeTop();
6868
InternalGeoLine lineChosen = geoLines.get(lineIdx);
69-
if (idxInLine + 1 < lineChosen.line().length) {
69+
if (idxInLine + 1 < lineChosen.length()) {
7070
add(lineIdx, idxInLine + 1);
7171
}
7272
i++;

x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorTests.java

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.apache.lucene.search.Query;
1818
import org.apache.lucene.store.Directory;
1919
import org.apache.lucene.util.BytesRef;
20-
import org.apache.lucene.util.LuceneTestCase;
2120
import org.apache.lucene.util.NumericUtils;
2221
import org.elasticsearch.common.CheckedConsumer;
2322
import org.elasticsearch.geo.GeometryTestUtils;
@@ -44,7 +43,6 @@
4443

4544
import static org.hamcrest.CoreMatchers.equalTo;
4645

47-
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/65473")
4846
public class GeoLineAggregatorTests extends AggregatorTestCase {
4947

5048
@Override
@@ -112,6 +110,96 @@ public void testDescending() throws IOException {
112110
testAggregator(SortOrder.DESC);
113111
}
114112

113+
public void testComplete() throws IOException {
114+
// max size is the same as the number of points
115+
testCompleteForSizeAndNumDocuments(10, 10, true);
116+
// max size is more than the number of points
117+
testCompleteForSizeAndNumDocuments(11, 10, true);
118+
// max size is less than the number of points
119+
testCompleteForSizeAndNumDocuments(9, 10, false);
120+
}
121+
122+
public void testCompleteForSizeAndNumDocuments(int size, int numPoints, boolean complete) throws IOException {
123+
MultiValuesSourceFieldConfig valueConfig = new MultiValuesSourceFieldConfig.Builder()
124+
.setFieldName("value_field")
125+
.build();
126+
MultiValuesSourceFieldConfig sortConfig = new MultiValuesSourceFieldConfig.Builder().setFieldName("sort_field").build();
127+
GeoLineAggregationBuilder lineAggregationBuilder = new GeoLineAggregationBuilder("_name")
128+
.point(valueConfig)
129+
.sortOrder(SortOrder.ASC)
130+
.sort(sortConfig)
131+
.size(size);
132+
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("_name")
133+
.field("group_id")
134+
.subAggregation(lineAggregationBuilder);
135+
136+
Map<String, InternalGeoLine> lines = new HashMap<>(1);
137+
String groupOrd = "0";
138+
long[] points = new long[numPoints];
139+
double[] sortValues = new double[numPoints];
140+
for (int i = 0; i < numPoints; i++) {
141+
Point point = GeometryTestUtils.randomPoint(false);
142+
int encodedLat = GeoEncodingUtils.encodeLatitude(point.getLat());
143+
int encodedLon = GeoEncodingUtils.encodeLongitude(point.getLon());
144+
long lonLat = (((long) encodedLon) << 32) | encodedLat & 0xffffffffL;
145+
points[i] = lonLat;
146+
sortValues[i] = i;
147+
}
148+
149+
int lineSize = Math.min(numPoints, size);
150+
// re-sort line to be ascending
151+
long[] linePoints = Arrays.copyOf(points, lineSize);
152+
double[] lineSorts = Arrays.copyOf(sortValues, lineSize);
153+
new PathArraySorter(linePoints, lineSorts, SortOrder.ASC).sort();
154+
155+
lines.put(groupOrd, new InternalGeoLine("_name",
156+
linePoints, lineSorts, null, complete, true, SortOrder.ASC, size));
157+
158+
testCase(new MatchAllDocsQuery(), aggregationBuilder, iw -> {
159+
for (int i = 0; i < points.length; i++) {
160+
int x = (int) (points[i] >> 32);
161+
int y = (int) points[i];
162+
iw.addDocument(Arrays.asList(new LatLonDocValuesField("value_field",
163+
GeoEncodingUtils.decodeLatitude(y),
164+
GeoEncodingUtils.decodeLongitude(x)),
165+
new SortedNumericDocValuesField("sort_field", NumericUtils.doubleToSortableLong(sortValues[i])),
166+
new SortedDocValuesField("group_id", new BytesRef(groupOrd))));
167+
}
168+
}, terms -> {
169+
for (Terms.Bucket bucket : terms.getBuckets()) {
170+
InternalGeoLine expectedGeoLine = lines.get(bucket.getKeyAsString());
171+
InternalGeoLine geoLine = bucket.getAggregations().get("_name");
172+
assertThat(geoLine.length(), equalTo(expectedGeoLine.length()));
173+
assertThat(geoLine.isComplete(), equalTo(expectedGeoLine.isComplete()));
174+
for (int i = 0; i < geoLine.sortVals().length; i++) {
175+
geoLine.sortVals()[i] = NumericUtils.sortableLongToDouble((long) geoLine.sortVals()[i]);
176+
}
177+
assertArrayEquals(expectedGeoLine.sortVals(), geoLine.sortVals(), 0d);
178+
assertArrayEquals(expectedGeoLine.line(), geoLine.line());
179+
}
180+
});
181+
}
182+
183+
public void testEmpty() throws IOException {
184+
int size = randomIntBetween(1, GeoLineAggregationBuilder.MAX_PATH_SIZE);
185+
MultiValuesSourceFieldConfig valueConfig = new MultiValuesSourceFieldConfig.Builder()
186+
.setFieldName("value_field")
187+
.build();
188+
MultiValuesSourceFieldConfig sortConfig = new MultiValuesSourceFieldConfig.Builder().setFieldName("sort_field").build();
189+
GeoLineAggregationBuilder lineAggregationBuilder = new GeoLineAggregationBuilder("_name")
190+
.point(valueConfig)
191+
.sortOrder(SortOrder.ASC)
192+
.sort(sortConfig)
193+
.size(size);
194+
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("_name")
195+
.field("group_id")
196+
.subAggregation(lineAggregationBuilder);
197+
testCase(new MatchAllDocsQuery(), aggregationBuilder, iw -> {
198+
}, terms -> {
199+
assertTrue(terms.getBuckets().isEmpty());
200+
});
201+
}
202+
115203
private void testAggregator(SortOrder sortOrder) throws IOException {
116204
int size = randomIntBetween(1, GeoLineAggregationBuilder.MAX_PATH_SIZE);
117205
MultiValuesSourceFieldConfig valueConfig = new MultiValuesSourceFieldConfig.Builder()

x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/MergedGeoLinesTests.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class MergedGeoLinesTests extends ESTestCase {
1818
public InternalGeoLine randomLine(SortOrder sortOrder, int maxLength, double magicDecimal) {
1919
String name = randomAlphaOfLength(5);
2020
int length = randomBoolean() ? maxLength : randomIntBetween(1, maxLength);
21-
boolean complete = length < maxLength;
21+
boolean complete = length <= maxLength;
2222
long[] points = new long[length];
2323
double[] sortValues = new double[length];
2424
for (int i = 0; i < length; i++) {
@@ -49,4 +49,22 @@ public void testSimpleMerge() {
4949
assertArrayEquals(sortedValues, mergedGeoLines.getFinalSortValues(), 0d);
5050
assertArrayEquals(sortedPoints, mergedGeoLines.getFinalPoints());
5151
}
52+
53+
public void testMergeWithEmptyGeoLine() {
54+
int maxLength = 10;
55+
SortOrder sortOrder = SortOrder.ASC;
56+
InternalGeoLine lineWithPoints = randomLine(sortOrder, maxLength, 0.0);
57+
InternalGeoLine emptyLine = new InternalGeoLine("name", new long[]{}, new double[]{}, Collections.emptyMap(),
58+
true, randomBoolean(), sortOrder, maxLength);
59+
List<InternalGeoLine> geoLines = List.of(lineWithPoints, emptyLine);
60+
MergedGeoLines mergedGeoLines = new MergedGeoLines(geoLines, lineWithPoints.length(), sortOrder);
61+
mergedGeoLines.merge();
62+
63+
// assert that the mergedGeoLines are sorted (does not necessarily validate correctness, but it is a good heuristic)
64+
long[] sortedPoints = Arrays.copyOf(mergedGeoLines.getFinalPoints(), mergedGeoLines.getFinalPoints().length);
65+
double[] sortedValues = Arrays.copyOf(mergedGeoLines.getFinalSortValues(), mergedGeoLines.getFinalSortValues().length);
66+
new PathArraySorter(sortedPoints, sortedValues, sortOrder).sort();
67+
assertArrayEquals(sortedValues, mergedGeoLines.getFinalSortValues(), 0d);
68+
assertArrayEquals(sortedPoints, mergedGeoLines.getFinalPoints());
69+
}
5270
}

x-pack/plugin/spatial/src/yamlRestTest/resources/rest-api-spec/test/60_geo_line.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
---
22
"Test geo_line aggregation on geo points":
3-
- skip:
4-
version: "all"
5-
reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/65473"
63
- do:
74
indices.create:
85
index: races

0 commit comments

Comments
 (0)