Skip to content

FORK - allow EVAL/DISSECT/STATS in branches #125937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 10, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,86 @@ fork1 | 5.603396578413904E18 | 2 | all we have to decide is w
fork2 | 2.3447541759648727E18 | 3 | be excellent to each other
fork2 | 6.093784261960139E18 | 2 | all we have to decide is what to do with the time that is given to us
;

forkWithEvals
required_capability: fork

FROM employees
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = "abc" | EVAL y = 1)
(WHERE emp_no == 10081 OR emp_no == 10087 | EVAL x = "def" | EVAL z = 2)
| KEEP _fork, emp_no, x, y, z
| SORT _fork, emp_no
;

_fork:keyword | emp_no:integer | x:keyword | y:integer | z:integer
fork1 | 10048 | abc | 1 | null
fork1 | 10081 | abc | 1 | null
fork2 | 10081 | def | null | 2
fork2 | 10087 | def | null | 2
;

forkWithStats
required_capability: fork

FROM employees
| FORK (WHERE emp_no == 10048 OR emp_no == 10081)
(WHERE emp_no == 10081 OR emp_no == 10087)
(STATS x = COUNT(*), y = MAX(emp_no), z = MIN(emp_no))
(STATS x = COUNT(*), y = MIN(emp_no))
| KEEP _fork, emp_no, x, y, z
| SORT _fork, emp_no
;

_fork:keyword | emp_no:integer | x:long | y:integer | z:integer
fork1 | 10048 | null | null | null
fork1 | 10081 | null | null | null
fork2 | 10081 | null | null | null
fork2 | 10087 | null | null | null
fork3 | null | 100 | 10100 | 10001
fork4 | null | 100 | 10001 | null
;

forkWithDissect
required_capability: fork

FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
| FORK (EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name)
| DISSECT a "%{x} %{y} %{z}" )
(EVAL b = CONCAT(last_name, " ", emp_no::keyword, " ", first_name)
| DISSECT b "%{x} %{y} %{w}" )
| KEEP _fork, emp_no, x, y, z, w
| SORT _fork, emp_no
;

_fork:keyword | emp_no:integer | x:keyword | y:keyword | z:keyword | w:keyword
fork1 | 10048 | Florian | 10048 | Syrotiuk | null
fork1 | 10081 | Zhongwei | 10081 | Rosen | null
fork2 | 10048 | Syrotiuk | 10048 | null | Florian
fork2 | 10081 | Rosen | 10081 | null | Zhongwei
;

forkWithMixOfCommands
required_capability: fork

FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
| FORK ( EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name)
| DISSECT a "%{x} %{y} %{z}"
| EVAL y = y::keyword )
( STATS x = COUNT(*)::keyword, y = MAX(emp_no)::keyword, z = MIN(emp_no)::keyword )
( SORT emp_no ASC | LIMIT 2 | EVAL x = last_name )
( EVAL x = "abc" | EVAL y = "aaa" )
| KEEP _fork, emp_no, x, y, z, a
| SORT _fork, emp_no
;

_fork:keyword | emp_no:integer | x:keyword | y:keyword | z:keyword | a:keyword
fork1 | 10048 | Florian | 10048 | Syrotiuk | Florian 10048 Syrotiuk
fork1 | 10081 | Zhongwei | 10081 | Rosen | Zhongwei 10081 Rosen
fork2 | null | 2 | 10081 | 10048 | null
fork3 | 10048 | Syrotiuk | null | null | null
fork3 | 10081 | Rosen | null | null | null
fork4 | 10048 | abc | aaa | null | null
fork4 | 10081 | abc | aaa | null | null
;
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.xpack.esql.parser.ParsingException;
import org.junit.Before;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Predicate;
Expand Down Expand Up @@ -501,6 +502,75 @@ public void testSubqueryWithoutLimitOnly() { // this should
}
}

public void testWithEvalSimple() {
var query = """
FROM test
| WHERE content:"cat"
| FORK ( EVAL a = 1 )
( EVAL a = 2 )
| KEEP a, _fork, id, content
""";

try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("a", "_fork", "id", "content"));

Iterable<Iterable<Object>> expectedValues = List.of(
List.of(1, "fork1", 5, "There is also a white cat"),
List.of(2, "fork2", 5, "There is also a white cat")
);
assertValues(resp.values(), expectedValues);
}
}

public void testWithEvalDifferentOutputs() {
var query = """
FROM test
| WHERE id == 2
| FORK ( EVAL a = 1 )
( EVAL b = 2 )
| KEEP a, b, _fork
| SORT _fork, a
""";
try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("a", "b", "_fork"));
Iterable<Iterable<Object>> expectedValues = List.of(
Arrays.stream(new Object[] { 1, null, "fork1" }).toList(),
Arrays.stream(new Object[] { null, 2, "fork2" }).toList()
);
assertValues(resp.values(), expectedValues);
}
}

public void testWithStatsSimple() {
var query = """
FROM test
| FORK (STATS x=COUNT(*), y=VALUES(id))
(WHERE id == 2)
| KEEP _fork, x, y, id
| SORT _fork, id
""";
try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("_fork", "x", "y", "id"));
Iterable<Iterable<Object>> expectedValues = List.of(
Arrays.stream(new Object[] { "fork1", 6L, List.of(1, 2, 3, 4, 5, 6), null }).toList(),
Arrays.stream(new Object[] { "fork2", null, null, 2 }).toList()
);
assertValues(resp.values(), expectedValues);
}
}

public void testWithEvalWithConflictingTypes() {
var query = """
FROM test
| FORK ( EVAL a = 1 )
( EVAL a = "aaaa" )
| KEEP a, _fork
""";

var e = expectThrows(VerificationException.class, () -> run(query));
assertTrue(e.getMessage().contains("Column [a] has conflicting data types"));
}

public void testSubqueryWithUnknownField() {
var query = """
FROM test
Expand Down Expand Up @@ -565,6 +635,19 @@ public void testSubqueryWithUnknownFieldInSort() {
assertTrue(e.getMessage().contains("Unknown column [bar]"));
}

public void testSubqueryWithUnknownFieldInEval() {
var query = """
FROM test
| FORK
( EVAL x = baz + 1)
( WHERE content:"cat" )
| KEEP _fork, id, content
| SORT _fork, id
""";
var e = expectThrows(VerificationException.class, () -> run(query));
assertTrue(e.getMessage().contains("Unknown column [baz]"));
}

public void testOneSubQuery() {
var query = """
FROM test
Expand Down
7 changes: 5 additions & 2 deletions x-pack/plugin/esql/src/main/antlr/EsqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,12 @@ forkSubQueryCommand
;

forkSubQueryProcessingCommand
: whereCommand
| sortCommand
: evalCommand
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use processingCommand and exclude the commands that are not supported by Fork.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually having a bit of trouble with the grammar.
Even if I use processingCommand here, there are some combinations that fail with a parsing exception:

this works:

ROW a=[1,2,3]
| FORK (EVAL a = [2,3 ] | MV_EXPAND a | WHERE a == 2)
       (MV_EXPAND a | WHERE a == 2 )

this fails with a parsing exception:

FROM search-movies
| FORK (STATS x = COUNT(*), y = VALUES(title) | MV_EXPAND y)
       (WHERE title:"Journey")

error:

{
  "error": {
    "root_cause": [
      {
        "type": "parsing_exception",
        "reason": "line 3:66: token recognition error at: ')'"
      }
    ],
    "type": "parsing_exception",
    "reason": "line 3:66: token recognition error at: ')'",
    "caused_by": {
      "type": "lexer_no_viable_alt_exception",
      "reason": null
    }
  },
  "status": 400
}

I am able to use WHERE/LIMIT/SORT/DISSECT/EVAL/STATS without issues.
But using commands like MV_EXPAND/KEEP/RENAME/DROP/GROK in FORK branches fails with a parsing errors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that ultimately we should be able to effectively remove this list and replace it with processingCommand (that was my original intention when I added it), but this PR is a good step forward in that direction. Let's decouple this, as it will need even more extensive and new testing which is better in a subsequent PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am able to use WHERE/LIMIT/SORT/DISSECT/EVAL/STATS without issues. But using commands like MV_EXPAND/KEEP/RENAME/DROP/GROK in FORK branches fails with a parsing errors.

GROK does not throw ParsingException for me if it is added under forkSubQueryProcessingCommand. I wonder if it is related to the mode of the commands, the commands that can be recognized under FORK have EXPRESSION_MODE.

+ curl -u elastic:password -v -X POST 'localhost:9200/_query?format=txt&pretty' -H 'Content-Type: application/json' '-d
{
  "query": "from sample_data | fork (grok message \"%{WORD:x} %{WORD:y}\") (dissect message \"%{x} %{y}\") | keep message, x, y, _fork"
}

       message       |       x       |       y       |     _fork     
---------------------+---------------+---------------+---------------
Connected to 10.1.0.3|Connected      |to             |fork1          
Connected to 10.1.0.2|Connected      |to             |fork1          
Disconnected         |null           |null           |fork1          
Connection error     |Connection     |error          |fork1          
Connection error     |Connection     |error          |fork1          
Connection error     |Connection     |error          |fork1          
Connected to 10.1.0.1|Connected      |to             |fork1          
Connected to 10.1.0.3|Connected      |to 10.1.0.3    |fork2          
Connected to 10.1.0.2|Connected      |to 10.1.0.2    |fork2          
Disconnected         |null           |null           |fork2          
Connection error     |Connection     |error          |fork2          
Connection error     |Connection     |error          |fork2          
Connection error     |Connection     |error          |fork2          
Connected to 10.1.0.1|Connected      |to 10.1.0.1    |fork2   

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is related to the mode of the commands

you are probably right - I'd like to follow up on the grammar issue separately if that's okay.
if I recall correctly for GROK I was hitting a parsing issue when the FORK subbranch contained multiple commands and not just GROK.

| whereCommand
| limitCommand
| statsCommand
| sortCommand
| dissectCommand
;

rrfCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@
public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerContext> {
// marker list of attributes for plans that do not have any concrete fields to return, but have other computed columns to return
// ie from test | stats c = count(*)
public static final String NO_FIELDS_NAME = "<no-fields>";
public static final List<Attribute> NO_FIELDS = List.of(
new ReferenceAttribute(Source.EMPTY, "<no-fields>", DataType.NULL, Nullability.TRUE, null, true)
new ReferenceAttribute(Source.EMPTY, NO_FIELDS_NAME, DataType.NULL, Nullability.TRUE, null, true)
);

private static final List<Batch<LogicalPlan>> RULES = List.of(
Expand Down Expand Up @@ -499,6 +500,10 @@ protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) {
return resolveKeep(p, childrenOutput);
}

if (plan instanceof Fork f) {
return resolveFork(f, context);
}

if (plan instanceof Eval p) {
return resolveEval(p, childrenOutput);
}
Expand Down Expand Up @@ -714,6 +719,62 @@ private Join resolveLookupJoin(LookupJoin join) {
return join;
}

private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) {
// we align the outputs of the sub plans such that they have the same columns
boolean changed = false;
List<LogicalPlan> newSubPlans = new ArrayList<>();
Set<String> forkColumns = fork.outputSet().names();

for (LogicalPlan logicalPlan : fork.children()) {
Source source = logicalPlan.source();

// find the missing columns
List<Attribute> missing = new ArrayList<>();
Set<String> currentNames = logicalPlan.outputSet().names();
for (Attribute attr : fork.outputSet()) {
if (currentNames.contains(attr.name()) == false) {
missing.add(attr);
}
}

List<Alias> aliases = missing.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use missing.forEach() instead of heavier streams, but optional / preference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this as it is since I felt using map is more natural here - happy to change it if you have a strong preference to use forEach.

.map(attr -> new Alias(source, attr.name(), Literal.of(attr, null)))
.collect(Collectors.toList());
;

// add the missing columns
if (aliases.size() > 0) {
logicalPlan = new Eval(source, logicalPlan, aliases);
changed = true;
}

List<String> subPlanColumns = logicalPlan.output().stream().map(Attribute::name).collect(Collectors.toList());
// We need to add an explicit Keep even if the outputs align
// This is because at the moment the sub plans are executed and optimized separately and the output might change
// during optimizations. Once we add streaming we might not need to add a Keep when the outputs already align.
if (logicalPlan instanceof Keep == false || subPlanColumns.equals(forkColumns) == false) {
changed = true;
List<Attribute> newOutput = new ArrayList<>();
for (String attrName : forkColumns) {
for (Attribute subAttr : logicalPlan.output()) {
if (attrName.equals(subAttr.name())) {
newOutput.add(subAttr);
}
}
}
logicalPlan = new Keep(logicalPlan.source(), logicalPlan, newOutput);
}

newSubPlans.add(logicalPlan);
}

if (changed == false) {
return fork;
}

return new Fork(fork.source(), newSubPlans);
}

private LogicalPlan resolveRerank(Rerank rerank, List<Attribute> childrenOutput) {
List<Alias> newFields = new ArrayList<>();
boolean changed = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import org.elasticsearch.xpack.esql.core.expression.EmptyAttribute;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Fork;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
Expand All @@ -36,6 +38,8 @@ public final class PruneColumns extends Rule<LogicalPlan, LogicalPlan> {
public LogicalPlan apply(LogicalPlan plan) {
// track used references
var used = plan.outputSet().asBuilder();
Holder<Boolean> forkPresent = new Holder<>(false);

// while going top-to-bottom (upstream)
var pl = plan.transformDown(p -> {
// Note: It is NOT required to do anything special for binary plans like JOINs. It is perfectly fine that transformDown descends
Expand All @@ -50,6 +54,14 @@ public LogicalPlan apply(LogicalPlan plan) {
return p;
}

if (p instanceof Fork) {
forkPresent.set(true);
}
// pruning columns for Fork branches can have the side effect of having misaligned outputs
if (forkPresent.get()) {
return p;
}

// remember used
boolean recheck;
// analyze the unused items against dedicated 'producer' nodes such as Eval and Aggregate
Expand Down

Large diffs are not rendered by default.

Loading