Skip to content

Commit 45ec73d

Browse files
committed
SQL: Fix ORDER BY on aggregates and GROUPed BY fields (#51894)
Previously, in the in-memory sorting module `LocalAggregationSorterListener` only the aggregate functions where used (grabbed by the `sortingColumns`). As a consequence, if the ORDER BY was also using columns of the GROUP BY clause, (especially in the case of higher priority - before the aggregate functions) wrong results were produced. E.g.: ``` SELECT gender, MAX(salary) AS max FROM test_emp GROUP BY gender ORDER BY gender, max ``` Add all columns of the ORDER BY to the `sortingColumns` so that the `LocalAggregationSorterListener` can use the correct comparators in the underlying PriorityQueue used to implement the in-memory sorting. Fixes: #50355 (cherry picked from commit be680af)
1 parent 6075a77 commit 45ec73d

File tree

8 files changed

+242
-84
lines changed

8 files changed

+242
-84
lines changed

x-pack/plugin/sql/qa/src/main/resources/agg-ordering.csv-spec

+47-1
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,50 @@ g:s | gender:s | s3:i | SUM(salary):i | s5:i
2323
M |M |2671054|2671054 |2671054
2424
F |F |1666196|1666196 |1666196
2525
null |null |487605 |487605 |487605
26-
;
26+
;
27+
28+
histogramDateTimeWithCountAndOrder_1
29+
schema::h:ts|c:l
30+
SELECT HISTOGRAM(birth_date, INTERVAL 1 YEAR) AS h, COUNT(*) as c FROM test_emp GROUP BY h ORDER BY h DESC, c ASC;
31+
32+
h | c
33+
------------------------+---------------
34+
1965-01-01T00:00:00.000Z|1
35+
1964-01-01T00:00:00.000Z|4
36+
1963-01-01T00:00:00.000Z|7
37+
1962-01-01T00:00:00.000Z|6
38+
1961-01-01T00:00:00.000Z|8
39+
1960-01-01T00:00:00.000Z|8
40+
1959-01-01T00:00:00.000Z|9
41+
1958-01-01T00:00:00.000Z|7
42+
1957-01-01T00:00:00.000Z|4
43+
1956-01-01T00:00:00.000Z|5
44+
1955-01-01T00:00:00.000Z|4
45+
1954-01-01T00:00:00.000Z|8
46+
1953-01-01T00:00:00.000Z|11
47+
1952-01-01T00:00:00.000Z|8
48+
null |10
49+
;
50+
51+
histogramDateTimeWithCountAndOrder_2
52+
schema::h:ts|c:l
53+
SELECT HISTOGRAM(birth_date, INTERVAL 1 YEAR) AS h, COUNT(*) as c FROM test_emp GROUP BY h ORDER BY c DESC, h ASC;
54+
55+
h | c
56+
------------------------+---------------
57+
1953-01-01T00:00:00.000Z|11
58+
null |10
59+
1959-01-01T00:00:00.000Z|9
60+
1952-01-01T00:00:00.000Z|8
61+
1954-01-01T00:00:00.000Z|8
62+
1960-01-01T00:00:00.000Z|8
63+
1961-01-01T00:00:00.000Z|8
64+
1958-01-01T00:00:00.000Z|7
65+
1963-01-01T00:00:00.000Z|7
66+
1962-01-01T00:00:00.000Z|6
67+
1956-01-01T00:00:00.000Z|5
68+
1955-01-01T00:00:00.000Z|4
69+
1957-01-01T00:00:00.000Z|4
70+
1964-01-01T00:00:00.000Z|4
71+
1965-01-01T00:00:00.000Z|1
72+
;

x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec

+24-3
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ aggNotSpecifiedInTheAggregateAndGroupWithHavingWithLimitAndDirection
7878
SELECT gender, MIN(salary) AS min, COUNT(*) AS c FROM test_emp GROUP BY gender HAVING c > 1 ORDER BY MAX(salary) ASC, c DESC LIMIT 5;
7979

8080
groupAndAggNotSpecifiedInTheAggregateWithHaving
81-
SELECT gender, MIN(salary) AS min, COUNT(*) AS c FROM test_emp GROUP BY gender HAVING c > 1 ORDER BY gender, MAX(salary);
81+
SELECT gender, MIN(salary) AS min, COUNT(*) AS c FROM test_emp GROUP BY gender HAVING c > 1 ORDER BY gender NULLS FIRST, MAX(salary);
8282

8383
multipleAggsThatGetRewrittenWithAliasOnAMediumGroupBy
8484
SELECT languages, MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY languages ORDER BY max;
@@ -134,5 +134,26 @@ SELECT gender AS g, first_name AS f, last_name AS l FROM test_emp GROUP BY f, ge
134134
multipleGroupingsAndOrderingByGroups_8
135135
SELECT gender AS g, first_name, last_name FROM test_emp GROUP BY g, last_name, first_name ORDER BY gender ASC, first_name DESC, last_name ASC;
136136

137-
multipleGroupingsAndOrderingByGroupsWithFunctions
138-
SELECT first_name f, last_name l, gender g, CONCAT(first_name, last_name) c FROM test_emp GROUP BY gender, l, f, c ORDER BY gender, c DESC, first_name, last_name ASC;
137+
multipleGroupingsAndOrderingByGroupsAndAggs_1
138+
SELECT gender, MIN(salary) AS min, COUNT(*) AS c, MAX(salary) AS max FROM test_emp GROUP BY gender HAVING c > 1 ORDER BY gender ASC NULLS FIRST, MAX(salary) DESC;
139+
140+
multipleGroupingsAndOrderingByGroupsAndAggs_2
141+
SELECT gender, MIN(salary) AS min, COUNT(*) AS c, MAX(salary) AS max FROM test_emp GROUP BY gender HAVING c > 1 ORDER BY gender DESC NULLS LAST, MAX(salary) ASC;
142+
143+
multipleGroupingsAndOrderingByGroupsWithFunctions_1
144+
SELECT first_name f, last_name l, gender g, CONCAT(first_name, last_name) c FROM test_emp GROUP BY gender, l, f, c ORDER BY gender NULLS FIRST, c DESC, first_name, last_name ASC;
145+
146+
multipleGroupingsAndOrderingByGroupsWithFunctions_2
147+
SELECT first_name f, last_name l, gender g, CONCAT(first_name, last_name) c FROM test_emp GROUP BY gender, l, f, c ORDER BY c DESC, gender DESC NULLS LAST, first_name, last_name ASC;
148+
149+
multipleGroupingsAndOrderingByGroupsAndAggregatesWithFunctions_1
150+
SELECT CONCAT('foo', gender) g, MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY g ORDER BY 1 NULLS FIRST, 2, 3;
151+
152+
multipleGroupingsAndOrderingByGroupsAndAggregatesWithFunctions_2
153+
SELECT CONCAT('foo', gender) g, MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY g ORDER BY 1 DESC NULLS LAST, 2, 3;
154+
155+
multipleGroupingsAndOrderingByGroupsAndAggregatesWithFunctions_3
156+
SELECT CONCAT('foo', gender) g, MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY g ORDER BY 2, 1 NULLS FIRST, 3;
157+
158+
multipleGroupingsAndOrderingByGroupsAndAggregatesWithFunctions_4
159+
SELECT CONCAT('foo', gender) g, MAX(salary) AS max, MIN(salary) AS min FROM test_emp GROUP BY g ORDER BY 3 DESC, 1 NULLS FIRST, 2;

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java

+31-16
Original file line numberDiff line numberDiff line change
@@ -578,36 +578,51 @@ static class AggSortingQueue extends PriorityQueue<Tuple<List<?>, Integer>> {
578578
this.sortingColumns = sortingColumns;
579579
}
580580

581-
// compare row based on the received attribute sort
582-
// if a sort item is not in the list, it is assumed the sorting happened in ES
583-
// and the results are left as is (by using the row ordering), otherwise it is sorted based on the given criteria.
584-
//
585-
// Take for example ORDER BY a, x, b, y
586-
// a, b - are sorted in ES
587-
// x, y - need to be sorted client-side
588-
// sorting on x kicks in, only if the values for a are equal.
589-
581+
/**
582+
* Compare row based on the received attribute sort
583+
* <ul>
584+
* <li>
585+
* If a tuple in {@code sortingColumns} has a null comparator, it is assumed the sorting
586+
* happened in ES and the results are left as is (by using the row ordering), otherwise it is
587+
* sorted based on the given criteria.
588+
* </li>
589+
* <li>
590+
* If no tuple exists in {@code sortingColumns} for an output column, it means this column
591+
* is not included at all in the ORDER BY
592+
* </li>
593+
*</ul>
594+
*
595+
* Take for example ORDER BY a, x, b, y
596+
* a, b - are sorted in ES
597+
* x, y - need to be sorted client-side
598+
* sorting on x kicks in only if the values for a are equal.
599+
* sorting on y kicks in only if the values for a, x and b are all equal
600+
*
601+
*/
590602
// thanks to @jpountz for the row ordering idea as a way to preserve ordering
591603
@SuppressWarnings("unchecked")
592604
@Override
593605
protected boolean lessThan(Tuple<List<?>, Integer> l, Tuple<List<?>, Integer> r) {
594606
for (Tuple<Integer, Comparator> tuple : sortingColumns) {
595-
int i = tuple.v1().intValue();
607+
int columnIdx = tuple.v1().intValue();
596608
Comparator comparator = tuple.v2();
597609

598-
Object vl = l.v1().get(i);
599-
Object vr = r.v1().get(i);
610+
// Get the values for left and right rows at the current column index
611+
Object vl = l.v1().get(columnIdx);
612+
Object vr = r.v1().get(columnIdx);
600613
if (comparator != null) {
601614
int result = comparator.compare(vl, vr);
602-
// if things are equals, move to the next comparator
615+
// if things are not equal: return the comparison result,
616+
// otherwise: move to the next comparator to solve the tie.
603617
if (result != 0) {
604618
return result > 0;
605619
}
606620
}
607-
// no comparator means the existing order needs to be preserved
621+
// no comparator means the rows are pre-ordered by ES for the column at
622+
// the current index and the existing order needs to be preserved
608623
else {
609-
// check the values - if they are equal move to the next comparator
610-
// otherwise return the row order
624+
// check the values - if they are not equal return the row order
625+
// otherwise: move to the next comparator to solve the tie.
611626
if (Objects.equals(vl, vr) == false) {
612627
return l.v2().compareTo(r.v2()) > 0;
613628
}

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SourceGenerator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ private static void sorting(QueryContainer container, SearchSourceBuilder source
105105
source.sort("_doc");
106106
return;
107107
}
108-
for (Sort sortable : container.sort()) {
108+
for (Sort sortable : container.sort().values()) {
109109
SortBuilder<?> sortBuilder = null;
110110

111111
if (sortable instanceof AttributeSort) {

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/planner/QueryFolder.java

+20-12
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.xpack.sql.expression.function.Functions;
2323
import org.elasticsearch.xpack.sql.expression.function.ScoreAttribute;
2424
import org.elasticsearch.xpack.sql.expression.function.aggregate.AggregateFunction;
25+
import org.elasticsearch.xpack.sql.expression.function.aggregate.AggregateFunctionAttribute;
2526
import org.elasticsearch.xpack.sql.expression.function.aggregate.CompoundNumericAggregate;
2627
import org.elasticsearch.xpack.sql.expression.function.aggregate.Count;
2728
import org.elasticsearch.xpack.sql.expression.function.aggregate.InnerAggregate;
@@ -461,20 +462,27 @@ protected PhysicalPlan rule(OrderExec plan) {
461462
GroupByKey group = qContainer.findGroupForAgg(attr);
462463

463464
// TODO: might need to validate whether the target field or group actually exist
464-
if (group != null && group != Aggs.IMPLICIT_GROUP_KEY) {
465+
if (group!=null && group!=Aggs.IMPLICIT_GROUP_KEY) {
465466
qContainer = qContainer.updateGroup(group.with(direction));
466467
}
467-
else {
468-
// scalar functions typically require script ordering
469-
if (attr instanceof ScalarFunctionAttribute) {
470-
// nope, use scripted sorting
471-
qContainer = qContainer.addSort(
472-
new ScriptSort(((ScalarFunctionAttribute) attr).script(), direction, missing));
473-
} else if (attr instanceof ScoreAttribute) {
474-
qContainer = qContainer.addSort(new ScoreSort(direction, missing));
475-
} else {
476-
qContainer = qContainer.addSort(new AttributeSort(attr, direction, missing));
477-
}
468+
469+
// scalar functions typically require script ordering
470+
if (attr instanceof ScalarFunctionAttribute) {
471+
ScalarFunctionAttribute sf = (ScalarFunctionAttribute) attr;
472+
// nope, use scripted sorting
473+
qContainer = qContainer.addSort(sf.id(), new ScriptSort(sf.script(), direction, missing));
474+
}
475+
// score
476+
else if (attr instanceof ScoreAttribute) {
477+
qContainer = qContainer.addSort(attr.id(), new ScoreSort(direction, missing));
478+
}
479+
// agg function
480+
else if (attr instanceof AggregateFunctionAttribute) {
481+
AggregateFunctionAttribute afa = (AggregateFunctionAttribute) attr;
482+
qContainer = qContainer.addSort(afa.innerId(), new AttributeSort(attr, direction, missing));
483+
// field, histogram
484+
} else {
485+
qContainer = qContainer.addSort(attr.id(), new AttributeSort(attr, direction, missing));
478486
}
479487
}
480488

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/querydsl/container/QueryContainer.java

+47-44
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,12 @@
4040
import java.util.Collections;
4141
import java.util.Comparator;
4242
import java.util.LinkedHashMap;
43-
import java.util.LinkedHashSet;
4443
import java.util.List;
4544
import java.util.Map;
4645
import java.util.Objects;
47-
import java.util.Set;
4846

4947
import static java.util.Collections.emptyList;
5048
import static java.util.Collections.emptyMap;
51-
import static java.util.Collections.emptySet;
5249
import static java.util.Collections.singletonMap;
5350
import static org.elasticsearch.xpack.sql.util.CollectionUtils.combine;
5451

@@ -79,7 +76,7 @@ public class QueryContainer {
7976
// at scrolling, their inputs (leaves) get updated
8077
private final AttributeMap<Pipe> scalarFunctions;
8178

82-
private final Set<Sort> sort;
79+
private final Map<ExpressionId, Sort> sort;
8380
private final int limit;
8481

8582
// computed
@@ -96,15 +93,15 @@ public QueryContainer(Query query,
9693
Map<ExpressionId, Attribute> aliases,
9794
Map<String, GroupByKey> pseudoFunctions,
9895
AttributeMap<Pipe> scalarFunctions,
99-
Set<Sort> sort,
96+
Map<ExpressionId, Sort> sort,
10097
int limit) {
10198
this.query = query;
10299
this.aggs = aggs == null ? Aggs.EMPTY : aggs;
103100
this.fields = fields == null || fields.isEmpty() ? emptyList() : fields;
104101
this.aliases = aliases == null || aliases.isEmpty() ? Collections.emptyMap() : aliases;
105102
this.pseudoFunctions = pseudoFunctions == null || pseudoFunctions.isEmpty() ? emptyMap() : pseudoFunctions;
106103
this.scalarFunctions = scalarFunctions == null || scalarFunctions.isEmpty() ? AttributeMap.emptyAttributeMap() : scalarFunctions;
107-
this.sort = sort == null || sort.isEmpty() ? emptySet() : sort;
104+
this.sort = sort == null || sort.isEmpty() ? emptyMap() : sort;
108105
this.limit = limit;
109106
}
110107

@@ -118,46 +115,52 @@ public List<Tuple<Integer, Comparator>> sortingColumns() {
118115
return emptyList();
119116
}
120117

118+
for (Sort s : sort.values()) {
119+
if (isAggregateSort(s)) {
120+
customSort = Boolean.TRUE;
121+
break;
122+
}
123+
}
124+
125+
// If no custom sort is used break early
126+
if (customSort == null) {
127+
customSort = Boolean.FALSE;
128+
return emptyList();
129+
}
130+
121131
List<Tuple<Integer, Comparator>> sortingColumns = new ArrayList<>(sort.size());
132+
for (Map.Entry<ExpressionId, Sort> entry : sort.entrySet()) {
133+
ExpressionId expressionId = entry.getKey();
134+
Sort s = entry.getValue();
122135

123-
boolean aggSort = false;
124-
for (Sort s : sort) {
125-
Tuple<Integer, Comparator> tuple = new Tuple<>(Integer.valueOf(-1), null);
126-
127-
if (s instanceof AttributeSort) {
128-
AttributeSort as = (AttributeSort) s;
129-
// find the relevant column of each aggregate function
130-
if (as.attribute() instanceof AggregateFunctionAttribute) {
131-
aggSort = true;
132-
AggregateFunctionAttribute afa = (AggregateFunctionAttribute) as.attribute();
133-
afa = (AggregateFunctionAttribute) aliases.getOrDefault(afa.innerId(), afa);
134-
int atIndex = -1;
135-
for (int i = 0; i < fields.size(); i++) {
136-
Tuple<FieldExtraction, ExpressionId> field = fields.get(i);
137-
if (field.v2().equals(afa.innerId())) {
138-
atIndex = i;
139-
break;
140-
}
141-
}
142-
143-
if (atIndex == -1) {
144-
throw new SqlIllegalArgumentException("Cannot find backing column for ordering aggregation [{}]", afa.name());
145-
}
146-
// assemble a comparator for it
147-
Comparator comp = s.direction() == Sort.Direction.ASC ? Comparator.naturalOrder() : Comparator.reverseOrder();
148-
comp = s.missing() == Sort.Missing.FIRST ? Comparator.nullsFirst(comp) : Comparator.nullsLast(comp);
149-
150-
tuple = new Tuple<>(Integer.valueOf(atIndex), comp);
136+
int atIndex = -1;
137+
for (int i = 0; i < fields.size(); i++) {
138+
Tuple<FieldExtraction, ExpressionId> field = fields.get(i);
139+
if (field.v2().equals(expressionId)) {
140+
atIndex = i;
141+
break;
151142
}
152143
}
153-
sortingColumns.add(tuple);
154-
}
144+
if (atIndex == -1) {
145+
throw new SqlIllegalArgumentException("Cannot find backing column for ordering aggregation [{}]", s);
146+
}
155147

156-
if (customSort == null) {
157-
customSort = Boolean.valueOf(aggSort);
148+
// assemble a comparator for it, if it's not an AggregateSort
149+
// then it's pre-sorted by ES so use null
150+
Comparator comp = null;
151+
if (isAggregateSort(s)) {
152+
comp = s.direction() == Sort.Direction.ASC ? Comparator.naturalOrder() : Comparator.reverseOrder();
153+
comp = s.missing() == Sort.Missing.FIRST ? Comparator.nullsFirst(comp) : Comparator.nullsLast(comp);
154+
}
155+
156+
sortingColumns.add(new Tuple<>(Integer.valueOf(atIndex), comp));
158157
}
159158

160-
return aggSort ? sortingColumns : emptyList();
159+
return sortingColumns;
160+
}
161+
162+
private boolean isAggregateSort(Sort s) {
163+
return s instanceof AttributeSort && ((AttributeSort) s).attribute() instanceof AggregateFunctionAttribute;
161164
}
162165

163166
/**
@@ -212,7 +215,7 @@ public Map<String, GroupByKey> pseudoFunctions() {
212215
return pseudoFunctions;
213216
}
214217

215-
public Set<Sort> sort() {
218+
public Map<ExpressionId, Sort> sort() {
216219
return sort;
217220
}
218221

@@ -260,10 +263,10 @@ public QueryContainer withScalarProcessors(AttributeMap<Pipe> procs) {
260263
return new QueryContainer(query, aggs, fields, aliases, pseudoFunctions, procs, sort, limit);
261264
}
262265

263-
public QueryContainer addSort(Sort sortable) {
264-
Set<Sort> sort = new LinkedHashSet<>(this.sort);
265-
sort.add(sortable);
266-
return new QueryContainer(query, aggs, fields, aliases, pseudoFunctions, scalarFunctions, sort, limit);
266+
public QueryContainer addSort(ExpressionId expressionId, Sort sortable) {
267+
Map<ExpressionId, Sort> newSort = new LinkedHashMap<>(this.sort);
268+
newSort.put(expressionId, sortable);
269+
return new QueryContainer(query, aggs, fields, aliases, pseudoFunctions, scalarFunctions, newSort, limit);
267270
}
268271

269272
private String aliasName(Attribute attr) {

0 commit comments

Comments
 (0)