Skip to content

Commit b0d7c7d

Browse files
authored
Add logical/physical plans time-series aggregate (#126178)
We need to store extra information for Aggregate and AggregateExec for time-series aggregations. Previously, I added a type (standard or time_series), but this was not enough. This PR removes it and replaces it with extensions of Aggregate and AggregateExec. I considered adding an extra map of metadata to Aggregate and AggregateExec, but this approach seems simpler.
1 parent 496c62d commit b0d7c7d

22 files changed

+342
-220
lines changed

Diff for: server/src/main/java/org/elasticsearch/TransportVersions.java

+1
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ static TransportVersion def(int id) {
211211
public static final TransportVersion REPOSITORIES_METADATA_AS_PROJECT_CUSTOM = def(9_042_0_00);
212212
public static final TransportVersion BATCHED_QUERY_PHASE_VERSION = def(9_043_0_00);
213213
public static final TransportVersion REMOTE_EXCEPTION = def(9_044_0_00);
214+
public static final TransportVersion ESQL_REMOVE_AGGREGATE_TYPE = def(9_045_0_00);
214215

215216
/*
216217
* STOP! READ THIS FIRST! No, really,

Diff for: x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,18 @@
2323
public class TimeSeriesAggregationOperator extends HashAggregationOperator {
2424

2525
public record Factory(
26-
BlockHash.GroupSpec tsidGroup,
27-
BlockHash.GroupSpec timestampGroup,
26+
List<BlockHash.GroupSpec> groups,
2827
AggregatorMode aggregatorMode,
2928
List<GroupingAggregator.Factory> aggregators,
3029
int maxPageSize
3130
) implements OperatorFactory {
3231
@Override
3332
public Operator get(DriverContext driverContext) {
3433
// TODO: use TimeSeriesBlockHash when possible
35-
return new HashAggregationOperator(
34+
return new TimeSeriesAggregationOperator(
3635
aggregators,
3736
() -> BlockHash.build(
38-
List.of(tsidGroup, timestampGroup),
37+
groups,
3938
driverContext.blockFactory(),
4039
maxPageSize,
4140
true // we can enable optimizations as the inputs are vectors

Diff for: x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineProjections.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ protected LogicalPlan rule(UnaryPlan plan) {
5353
// project can be fully removed
5454
if (newAggs != null) {
5555
var newGroups = replacePrunedAliasesUsedInGroupBy(a.groupings(), aggs, newAggs);
56-
plan = new Aggregate(a.source(), a.child(), a.aggregateType(), newGroups, newAggs);
56+
plan = a.with(newGroups, newAggs);
5757
}
5858
}
5959
return plan;
@@ -75,10 +75,8 @@ protected LogicalPlan rule(UnaryPlan plan) {
7575
throw new EsqlIllegalArgumentException("Expected an Attribute, got {}", grouping);
7676
}
7777
}
78-
plan = new Aggregate(
79-
a.source(),
78+
plan = a.with(
8079
p.child(),
81-
a.aggregateType(),
8280
combineUpperGroupingsAndLowerProjections(groupingAttrs, p.projections()),
8381
combineProjections(a.aggregates(), p.projections())
8482
);

Diff for: x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ExtractAggregateCommonFilter.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,6 @@ protected LogicalPlan rule(Aggregate aggregate) {
6767
}
6868

6969
// build the new agg on top of extracted filter
70-
return new Aggregate(
71-
aggregate.source(),
72-
new Filter(aggregate.source(), aggregate.child(), common.v1()),
73-
aggregate.aggregateType(),
74-
aggregate.groupings(),
75-
newAggs
76-
);
70+
return aggregate.with(new Filter(aggregate.source(), aggregate.child(), common.v1()), aggregate.groupings(), newAggs);
7771
}
7872
}

Diff for: x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java

+2-14
Original file line numberDiff line numberDiff line change
@@ -73,22 +73,10 @@ public LogicalPlan apply(LogicalPlan plan) {
7373
} else {
7474
// Aggs cannot produce pages with 0 columns, so retain one grouping.
7575
remaining = List.of(Expressions.attribute(aggregate.groupings().get(0)));
76-
p = new Aggregate(
77-
aggregate.source(),
78-
aggregate.child(),
79-
aggregate.aggregateType(),
80-
aggregate.groupings(),
81-
remaining
82-
);
76+
p = aggregate.with(aggregate.groupings(), remaining);
8377
}
8478
} else {
85-
p = new Aggregate(
86-
aggregate.source(),
87-
aggregate.child(),
88-
aggregate.aggregateType(),
89-
aggregate.groupings(),
90-
remaining
91-
);
79+
p = aggregate.with(aggregate.groupings(), remaining);
9280
}
9381
}
9482
} else if (p instanceof Eval eval) {

Diff for: x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/SubstituteSurrogates.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ protected LogicalPlan rule(Aggregate aggregate) {
107107
if (changed) {
108108
var source = aggregate.source();
109109
if (newAggs.isEmpty() == false) {
110-
plan = new Aggregate(source, aggregate.child(), aggregate.aggregateType(), aggregate.groupings(), newAggs);
110+
plan = aggregate.with(aggregate.child(), aggregate.groupings(), newAggs);
111111
} else {
112112
// All aggs actually have been surrogates for (foldable) expressions, e.g.
113113
// \_Aggregate[[],[AVG([1, 2][INTEGER]) AS s]]

Diff for: x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java

+21-20
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@
2626
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
2727
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
2828
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
29+
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
2930

3031
import java.util.ArrayList;
3132
import java.util.HashMap;
3233
import java.util.List;
3334
import java.util.Map;
34-
import java.util.stream.Stream;
3535

3636
/**
3737
* Rate aggregation is special because it must be computed per time series, regardless of the grouping keys.
@@ -123,10 +123,14 @@ public TranslateTimeSeriesAggregate() {
123123

124124
@Override
125125
protected LogicalPlan rule(Aggregate aggregate) {
126-
return translate(aggregate);
126+
if (aggregate instanceof TimeSeriesAggregate ts) {
127+
return translate(ts);
128+
} else {
129+
return aggregate;
130+
}
127131
}
128132

129-
LogicalPlan translate(Aggregate aggregate) {
133+
LogicalPlan translate(TimeSeriesAggregate aggregate) {
130134
Map<Rate, Alias> rateAggs = new HashMap<>();
131135
List<NamedExpression> firstPassAggs = new ArrayList<>();
132136
List<NamedExpression> secondPassAggs = new ArrayList<>();
@@ -153,7 +157,8 @@ LogicalPlan translate(Aggregate aggregate) {
153157
}
154158
}
155159
if (rateAggs.isEmpty()) {
156-
return aggregate;
160+
// no time-series aggregations, run a regular aggregation instead.
161+
return new Aggregate(aggregate.source(), aggregate.child(), aggregate.groupings(), aggregate.aggregates());
157162
}
158163
Holder<Attribute> tsid = new Holder<>();
159164
Holder<Attribute> timestamp = new Holder<>();
@@ -204,7 +209,7 @@ LogicalPlan translate(Aggregate aggregate) {
204209
}
205210
secondPassGroupings.add(new Alias(g.source(), g.name(), newFinalGroup.toAttribute(), g.id()));
206211
}
207-
LogicalPlan relation = aggregate.child().transformUp(EsRelation.class, r -> {
212+
LogicalPlan newChild = aggregate.child().transformUp(EsRelation.class, r -> {
208213
if (r.output().contains(tsid.get()) == false) {
209214
return new EsRelation(
210215
r.source(),
@@ -217,26 +222,22 @@ LogicalPlan translate(Aggregate aggregate) {
217222
return r;
218223
}
219224
});
220-
return newAggregate(
221-
newAggregate(relation, Aggregate.AggregateType.TIME_SERIES, firstPassAggs, firstPassGroupings),
222-
Aggregate.AggregateType.STANDARD,
223-
secondPassAggs,
224-
secondPassGroupings
225+
final var firstPhase = new TimeSeriesAggregate(
226+
newChild.source(),
227+
newChild,
228+
firstPassGroupings,
229+
mergeExpressions(firstPassAggs, firstPassGroupings)
225230
);
231+
return new Aggregate(firstPhase.source(), firstPhase, secondPassGroupings, mergeExpressions(secondPassAggs, secondPassGroupings));
226232
}
227233

228-
private static Aggregate newAggregate(
229-
LogicalPlan child,
230-
Aggregate.AggregateType type,
234+
private static List<? extends NamedExpression> mergeExpressions(
231235
List<? extends NamedExpression> aggregates,
232236
List<Expression> groupings
233237
) {
234-
return new Aggregate(
235-
child.source(),
236-
child,
237-
type,
238-
groupings,
239-
Stream.concat(aggregates.stream(), groupings.stream().map(Expressions::attribute)).toList()
240-
);
238+
List<NamedExpression> merged = new ArrayList<>(aggregates.size() + groupings.size());
239+
merged.addAll(aggregates);
240+
groupings.forEach(g -> merged.add(Expressions.attribute(g)));
241+
return merged;
241242
}
242243
}

Diff for: x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.xpack.esql.plan.logical.Rename;
6767
import org.elasticsearch.xpack.esql.plan.logical.Row;
6868
import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval;
69+
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
6970
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
7071
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
7172
import org.elasticsearch.xpack.esql.plan.logical.show.ShowInfo;
@@ -312,7 +313,13 @@ public PlanFactory visitInsistCommand(EsqlBaseParser.InsistCommandContext ctx) {
312313
@Override
313314
public PlanFactory visitStatsCommand(EsqlBaseParser.StatsCommandContext ctx) {
314315
final Stats stats = stats(source(ctx), ctx.grouping, ctx.stats);
315-
return input -> new Aggregate(source(ctx), input, Aggregate.AggregateType.STANDARD, stats.groupings, stats.aggregates);
316+
return input -> {
317+
if (input.anyMatch(p -> p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES)) {
318+
return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates);
319+
} else {
320+
return new Aggregate(source(ctx), input, stats.groupings, stats.aggregates);
321+
}
322+
};
316323
}
317324

318325
private record Stats(List<Expression> groupings, List<? extends NamedExpression> aggregates) {}
@@ -362,10 +369,7 @@ public PlanFactory visitInlinestatsCommand(EsqlBaseParser.InlinestatsCommandCont
362369
List<NamedExpression> groupings = visitGrouping(ctx.grouping);
363370
aggregates.addAll(groupings);
364371
// TODO: add support for filters
365-
return input -> new InlineStats(
366-
source(ctx),
367-
new Aggregate(source(ctx), input, Aggregate.AggregateType.STANDARD, new ArrayList<>(groupings), aggregates)
368-
);
372+
return input -> new InlineStats(source(ctx), new Aggregate(source(ctx), input, new ArrayList<>(groupings), aggregates));
369373
}
370374

371375
@Override

Diff for: x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java

+4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
2222
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
2323
import org.elasticsearch.xpack.esql.plan.logical.Project;
24+
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
2425
import org.elasticsearch.xpack.esql.plan.logical.TopN;
2526
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
2627
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
@@ -46,6 +47,7 @@
4647
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
4748
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
4849
import org.elasticsearch.xpack.esql.plan.physical.SubqueryExec;
50+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
4951
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
5052

5153
import java.util.ArrayList;
@@ -79,6 +81,7 @@ public static List<NamedWriteableRegistry.Entry> logical() {
7981
MvExpand.ENTRY,
8082
OrderBy.ENTRY,
8183
Project.ENTRY,
84+
TimeSeriesAggregate.ENTRY,
8285
TopN.ENTRY
8386
);
8487
}
@@ -105,6 +108,7 @@ public static List<NamedWriteableRegistry.Entry> physical() {
105108
ProjectExec.ENTRY,
106109
ShowExec.ENTRY,
107110
SubqueryExec.ENTRY,
111+
TimeSeriesAggregateExec.ENTRY,
108112
TopNExec.ENTRY
109113
);
110114
}

Diff for: x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java

+22-56
Original file line numberDiff line numberDiff line change
@@ -48,61 +48,35 @@ public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAwar
4848
Aggregate::new
4949
);
5050

51-
public enum AggregateType {
52-
STANDARD,
53-
TIME_SERIES;
54-
55-
static void writeType(StreamOutput out, AggregateType type) throws IOException {
56-
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
57-
out.writeString(type.name());
58-
} else if (type != STANDARD) {
59-
throw new IllegalStateException("cluster is not ready to support aggregate type [" + type + "]");
60-
}
61-
}
62-
63-
static AggregateType readType(StreamInput in) throws IOException {
64-
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
65-
return AggregateType.valueOf(in.readString());
66-
} else {
67-
return STANDARD;
68-
}
69-
}
70-
}
71-
72-
private final AggregateType aggregateType;
73-
private final List<Expression> groupings;
74-
private final List<? extends NamedExpression> aggregates;
51+
protected final List<Expression> groupings;
52+
protected final List<? extends NamedExpression> aggregates;
7553

76-
private List<Attribute> lazyOutput;
54+
protected List<Attribute> lazyOutput;
7755

78-
public Aggregate(
79-
Source source,
80-
LogicalPlan child,
81-
AggregateType aggregateType,
82-
List<Expression> groupings,
83-
List<? extends NamedExpression> aggregates
84-
) {
56+
public Aggregate(Source source, LogicalPlan child, List<Expression> groupings, List<? extends NamedExpression> aggregates) {
8557
super(source, child);
86-
this.aggregateType = aggregateType;
8758
this.groupings = groupings;
8859
this.aggregates = aggregates;
8960
}
9061

9162
public Aggregate(StreamInput in) throws IOException {
92-
this(
93-
Source.readFrom((PlanStreamInput) in),
94-
in.readNamedWriteable(LogicalPlan.class),
95-
AggregateType.readType(in),
96-
in.readNamedWriteableCollectionAsList(Expression.class),
97-
in.readNamedWriteableCollectionAsList(NamedExpression.class)
98-
);
63+
super(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(LogicalPlan.class));
64+
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)
65+
&& in.getTransportVersion().before(TransportVersions.ESQL_REMOVE_AGGREGATE_TYPE)) {
66+
in.readString();
67+
}
68+
this.groupings = in.readNamedWriteableCollectionAsList(Expression.class);
69+
this.aggregates = in.readNamedWriteableCollectionAsList(NamedExpression.class);
9970
}
10071

10172
@Override
10273
public void writeTo(StreamOutput out) throws IOException {
10374
Source.EMPTY.writeTo(out);
10475
out.writeNamedWriteable(child());
105-
AggregateType.writeType(out, aggregateType());
76+
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)
77+
&& out.getTransportVersion().before(TransportVersions.ESQL_REMOVE_AGGREGATE_TYPE)) {
78+
out.writeString("STANDARD");
79+
}
10680
out.writeNamedWriteableCollection(groupings);
10781
out.writeNamedWriteableCollection(aggregates());
10882
}
@@ -113,25 +87,21 @@ public String getWriteableName() {
11387
}
11488

11589
@Override
116-
protected NodeInfo<Aggregate> info() {
117-
return NodeInfo.create(this, Aggregate::new, child(), aggregateType, groupings, aggregates);
90+
protected NodeInfo<? extends Aggregate> info() {
91+
return NodeInfo.create(this, Aggregate::new, child(), groupings, aggregates);
11892
}
11993

12094
@Override
12195
public Aggregate replaceChild(LogicalPlan newChild) {
122-
return new Aggregate(source(), newChild, aggregateType, groupings, aggregates);
96+
return new Aggregate(source(), newChild, groupings, aggregates);
12397
}
12498

12599
public Aggregate with(List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
126100
return with(child(), newGroupings, newAggregates);
127101
}
128102

129103
public Aggregate with(LogicalPlan child, List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
130-
return new Aggregate(source(), child, aggregateType(), newGroupings, newAggregates);
131-
}
132-
133-
public AggregateType aggregateType() {
134-
return aggregateType;
104+
return new Aggregate(source(), child, newGroupings, newAggregates);
135105
}
136106

137107
public List<Expression> groupings() {
@@ -144,10 +114,7 @@ public List<? extends NamedExpression> aggregates() {
144114

145115
@Override
146116
public String telemetryLabel() {
147-
return switch (aggregateType) {
148-
case STANDARD -> "STATS";
149-
case TIME_SERIES -> "TIME_SERIES";
150-
};
117+
return "STATS";
151118
}
152119

153120
@Override
@@ -184,7 +151,7 @@ public static AttributeSet computeReferences(List<? extends NamedExpression> agg
184151

185152
@Override
186153
public int hashCode() {
187-
return Objects.hash(aggregateType, groupings, aggregates, child());
154+
return Objects.hash(groupings, aggregates, child());
188155
}
189156

190157
@Override
@@ -198,8 +165,7 @@ public boolean equals(Object obj) {
198165
}
199166

200167
Aggregate other = (Aggregate) obj;
201-
return aggregateType == other.aggregateType
202-
&& Objects.equals(groupings, other.groupings)
168+
return Objects.equals(groupings, other.groupings)
203169
&& Objects.equals(aggregates, other.aggregates)
204170
&& Objects.equals(child(), other.child());
205171
}

0 commit comments

Comments
 (0)