Skip to content

Commit f5d2ca2

Browse files
Fix Project after TopN (ESQL-541)
Fixes elastic/elasticsearch-internal#497 Fixes ESQL-560 A query like `from test | sort data | limit 2 | project count` fails because `LocalToGlobalLimitAndTopNExec` planning rule adds a collecting `TopNExec` after last GATHER exchange, to perform last reduce, see ``` TopNExec[[Order[data{f}#6,ASC,LAST]],2[INTEGER]] \_ExchangeExec[GATHER,SINGLE_DISTRIBUTION] \_ProjectExec[[count{f}#4]] // <- `data` is projected away but still used by the TopN node above \_FieldExtractExec[count{f}#4] \_TopNExec[[Order[data{f}#6,ASC,LAST]],2[INTEGER]] \_FieldExtractExec[data{f}#6] \_ExchangeExec[REPARTITION,FIXED_ARBITRARY_DISTRIBUTION] \_EsQueryExec[test], query[][_doc_id{f}#9, _segment_id{f}#10, _shard_id{f}#11] ``` Unfortunately, at that stage the inputs needed by the TopNExec could have been projected away by a ProjectExec, so they could be no longer available. This PR adapts the plan as follows: - add all the projections used by the `TopNExec` to the existing `ProjectExec`, so that they are available when needed - add another ProjectExec on top of the plan, to project away the originally removed projections and preserve the query semantics This approach is a bit dangerous, because it bypasses the mechanism of input/output resolution and validation that happens on the logical plan. The alternative would be to do this manipulation on the logical plan, but it's probably hard to do, because there is no concept of Exchange at that level.
1 parent 8be14da commit f5d2ca2

File tree

3 files changed

+155
-20
lines changed

3 files changed

+155
-20
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java

+43
Original file line numberDiff line numberDiff line change
@@ -850,6 +850,49 @@ public void testFromLimit() {
850850
assertThat(results.values(), contains(anyOf(contains(1L), contains(2L)), anyOf(contains(1L), contains(2L))));
851851
}
852852

853+
public void testProjectAfterTopN() {
854+
EsqlQueryResponse results = run("from test | sort time | limit 2 | project count");
855+
logger.info(results);
856+
assertEquals(1, results.columns().size());
857+
assertEquals(new ColumnInfo("count", "long"), results.columns().get(0));
858+
assertEquals(2, results.values().size());
859+
assertEquals(40L, results.values().get(0).get(0));
860+
assertEquals(42L, results.values().get(1).get(0));
861+
}
862+
863+
public void testProjectAfterTopNDesc() {
864+
EsqlQueryResponse results = run("from test | sort time desc | limit 2 | project count");
865+
logger.info(results);
866+
assertEquals(1, results.columns().size());
867+
assertEquals(new ColumnInfo("count", "long"), results.columns().get(0));
868+
assertEquals(2, results.values().size());
869+
assertEquals(46L, results.values().get(0).get(0));
870+
assertEquals(44L, results.values().get(1).get(0));
871+
}
872+
873+
public void testTopNProjectEval() {
874+
EsqlQueryResponse results = run("from test | sort time | limit 2 | project count | eval x = count + 1");
875+
logger.info(results);
876+
assertEquals(2, results.columns().size());
877+
assertEquals(new ColumnInfo("count", "long"), results.columns().get(0));
878+
assertEquals(new ColumnInfo("x", "long"), results.columns().get(1));
879+
assertEquals(2, results.values().size());
880+
assertEquals(40L, results.values().get(0).get(0));
881+
assertEquals(41L, results.values().get(0).get(1));
882+
assertEquals(42L, results.values().get(1).get(0));
883+
assertEquals(43L, results.values().get(1).get(1));
884+
}
885+
886+
public void testTopNProjectEvalProject() {
887+
EsqlQueryResponse results = run("from test | sort time | limit 2 | project count | eval x = count + 1 | project x");
888+
logger.info(results);
889+
assertEquals(1, results.columns().size());
890+
assertEquals(new ColumnInfo("x", "long"), results.columns().get(0));
891+
assertEquals(2, results.values().size());
892+
assertEquals(41L, results.values().get(0).get(0));
893+
assertEquals(43L, results.values().get(1).get(0));
894+
}
895+
853896
public void testEmptyIndex() {
854897
ElasticsearchAssertions.assertAcked(
855898
client().admin().indices().prepareCreate("test_empty").setMapping("k", "type=keyword", "v", "type=long").get()

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java

+88-19
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.xpack.ql.expression.Expression;
2828
import org.elasticsearch.xpack.ql.expression.Expressions;
2929
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
30+
import org.elasticsearch.xpack.ql.expression.NamedExpression;
3031
import org.elasticsearch.xpack.ql.expression.predicate.Predicates;
3132
import org.elasticsearch.xpack.ql.expression.predicate.logical.BinaryLogic;
3233
import org.elasticsearch.xpack.ql.expression.predicate.operator.comparison.BinaryComparison;
@@ -126,32 +127,100 @@ protected PhysicalPlan rule(LocalPlanExec plan) {
126127
* Copy any limit/sort/topN in the local plan (before the exchange) after it so after gathering the data,
127128
* the limit still applies.
128129
*/
129-
private static class LocalToGlobalLimitAndTopNExec extends Rule<PhysicalPlan, PhysicalPlan> {
130+
private static class LocalToGlobalLimitAndTopNExec extends OptimizerRule<ExchangeExec> {
130131

131-
public PhysicalPlan apply(PhysicalPlan plan) {
132-
return plan.transformUp(UnaryExec.class, u -> {
133-
PhysicalPlan pl = u;
134-
if (u.child()instanceof ExchangeExec exchange) {
135-
var localLimit = findLocalLimitOrTopN(exchange);
136-
if (localLimit != null) {
137-
pl = localLimit.replaceChild(u);
138-
}
139-
}
140-
return pl;
141-
});
132+
private LocalToGlobalLimitAndTopNExec() {
133+
super(OptimizerRules.TransformDirection.UP);
134+
}
135+
136+
@Override
137+
protected PhysicalPlan rule(ExchangeExec exchange) {
138+
if (exchange.getType() == ExchangeExec.Type.GATHER) {
139+
return maybeAddGlobalLimitOrTopN(exchange);
140+
}
141+
return exchange;
142142
}
143143

144-
private UnaryExec findLocalLimitOrTopN(UnaryExec localPlan) {
145-
for (var plan = localPlan.child();;) {
146-
if (plan instanceof LimitExec || plan instanceof TopNExec) {
147-
return (UnaryExec) plan;
144+
/**
145+
* This method copies any Limit/Sort/TopN in the local plan (before the exchange) after it,
146+
* ensuring that all the inputs are available at that point
147+
* eg. if between the exchange and the TopN there is a <code>project</code> that filters out
148+
* some inputs needed by the topN (i.e. the sorting fields), this method also modifies
149+
* the existing <code>project</code> to make these inputs available to the global TopN, and then adds
150+
* another <code>project</code> at the end of the plan, to ensure that the original semantics
151+
* are preserved.
152+
*
153+
* In detail:
154+
* <ol>
155+
* <li>Traverse the plan down starting from the exchange, looking for the first Limit/Sort/TopN</li>
156+
* <li>If a Limit is found, copy it after the Exchange to make it global limit</li>
157+
* <li>If a TopN is found, copy it after the Exchange and ensure that it has all the inputs needed:
158+
* <ol>
159+
* <li>Starting from the TopN, traverse the plan backwards and check that all the nodes propagate
160+
* the inputs needed by the TopN</li>
161+
* <li>If a Project node filters out some of the inputs needed by the TopN,
162+
* replace it with another one that includes those inputs</li>
163+
* <li>Copy the TopN after the exchange, to make it global</li>
164+
* <li>If the outputs of the new global TopN are different from the outputs of the original Exchange,
165+
* add another Project that filters out the unneeded outputs and preserves the original semantics</li>
166+
* </ol>
167+
* </li>
168+
* </ol>
169+
* @param exchange
170+
* @return
171+
*/
172+
private PhysicalPlan maybeAddGlobalLimitOrTopN(ExchangeExec exchange) {
173+
List<UnaryExec> visitedNodes = new ArrayList<>();
174+
visitedNodes.add(exchange);
175+
AttributeSet exchangeOutputSet = exchange.outputSet();
176+
// step 1: traverse the plan and find Limit/TopN
177+
for (var plan = exchange.child();;) {
178+
if (plan instanceof LimitExec limit) {
179+
// Step 2: just add a global Limit
180+
return limit.replaceChild(exchange);
181+
}
182+
if (plan instanceof TopNExec topN) {
183+
// Step 3: copy the TopN after the Exchange and ensure that it has all the inputs needed
184+
Set<Attribute> requiredAttributes = Expressions.references(topN.order()).combine(topN.inputSet());
185+
if (exchangeOutputSet.containsAll(requiredAttributes)) {
186+
return topN.replaceChild(exchange);
187+
}
188+
189+
PhysicalPlan subPlan = topN;
190+
// Step 3.1: Traverse the plan backwards to check inputs available
191+
for (int i = visitedNodes.size() - 1; i >= 0; i--) {
192+
UnaryExec node = visitedNodes.get(i);
193+
if (node instanceof ProjectExec proj && node.outputSet().containsAll(requiredAttributes) == false) {
194+
// Step 3.2: a Project is filtering out some inputs needed by the global TopN,
195+
// replace it with another one that preserves these inputs
196+
List<NamedExpression> newProjections = new ArrayList<>(proj.projections());
197+
for (Attribute attr : requiredAttributes) {
198+
if (newProjections.contains(attr) == false) {
199+
newProjections.add(attr);
200+
}
201+
}
202+
node = new ProjectExec(proj.source(), proj.child(), newProjections);
203+
}
204+
subPlan = node.replaceChild(subPlan);
205+
}
206+
207+
// Step 3.3: add the global TopN right after the exchange
208+
topN = topN.replaceChild(subPlan);
209+
if (exchangeOutputSet.containsAll(topN.output())) {
210+
return topN;
211+
} else {
212+
// Step 3.4: the output propagation is leaking at the end of the plan,
213+
// add one more Project to preserve the original query semantics
214+
return new ProjectExec(topN.source(), topN, new ArrayList<>(exchangeOutputSet));
215+
}
148216
}
149-
// possible to go deeper
150217
if (plan instanceof ProjectExec || plan instanceof EvalExec) {
218+
visitedNodes.add((UnaryExec) plan);
219+
// go deeper with step 1
151220
plan = ((UnaryExec) plan).child();
152221
} else {
153-
// no limit specified
154-
return null;
222+
// no limit specified, return the original plan
223+
return exchange;
155224
}
156225
}
157226
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
3838
import org.elasticsearch.xpack.ql.expression.Expressions;
3939
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
40+
import org.elasticsearch.xpack.ql.expression.NamedExpression;
4041
import org.elasticsearch.xpack.ql.expression.predicate.operator.comparison.BinaryComparison;
4142
import org.elasticsearch.xpack.ql.expression.predicate.operator.comparison.GreaterThan;
4243
import org.elasticsearch.xpack.ql.index.EsIndex;
@@ -48,6 +49,7 @@
4849
import java.util.List;
4950
import java.util.Map;
5051
import java.util.Set;
52+
import java.util.stream.Collectors;
5153

5254
import static java.util.Arrays.asList;
5355
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
@@ -639,14 +641,35 @@ public void testExtractorForEvalWithoutProject() throws Exception {
639641
| sort nullsum
640642
| limit 1
641643
"""));
642-
var topN = as(optimized, TopNExec.class);
644+
var topProject = as(optimized, ProjectExec.class);
645+
var topN = as(topProject.child(), TopNExec.class);
643646
var exchange = as(topN.child(), ExchangeExec.class);
644647
var project = as(exchange.child(), ProjectExec.class);
645648
var extract = as(project.child(), FieldExtractExec.class);
646649
var topNLocal = as(extract.child(), TopNExec.class);
647650
var eval = as(topNLocal.child(), EvalExec.class);
648651
}
649652

653+
public void testProjectAfterTopN() throws Exception {
654+
var optimized = optimizedPlan(physicalPlan("""
655+
from test
656+
| sort emp_no
657+
| project first_name
658+
| limit 2
659+
"""));
660+
var topProject = as(optimized, ProjectExec.class);
661+
assertEquals(1, topProject.projections().size());
662+
assertEquals("first_name", topProject.projections().get(0).name());
663+
var topN = as(topProject.child(), TopNExec.class);
664+
var exchange = as(topN.child(), ExchangeExec.class);
665+
var project = as(exchange.child(), ProjectExec.class);
666+
List<String> projectionNames = project.projections().stream().map(NamedExpression::name).collect(Collectors.toList());
667+
assertTrue(projectionNames.containsAll(List.of("first_name", "emp_no")));
668+
var extract = as(project.child(), FieldExtractExec.class);
669+
var topNLocal = as(extract.child(), TopNExec.class);
670+
var fieldExtract = as(topNLocal.child(), FieldExtractExec.class);
671+
}
672+
650673
private static EsQueryExec source(PhysicalPlan plan) {
651674
if (plan instanceof ExchangeExec exchange) {
652675
assertThat(exchange.getPartitioning(), is(ExchangeExec.Partitioning.FIXED_ARBITRARY_DISTRIBUTION));

0 commit comments

Comments
 (0)