Skip to content

Commit da262be

Browse files
committed
Time series agg experiments.
Typically time_series aggregation is wrapped by a date histogram aggregation. This commit explores idea around making things more efficient for time series agg if this is the case. This commit explores two main ideas: * With time series index searcher docs are emitted in tsid and timestamp order. Because of this within docs of the tsid, the date histogram buckets are also emitted in order to sub aggs. This allows time series aggregator to only keep track of the bucket belonging to the current tsid and bucket ordinal. The removes the need for using BytesKeyedBucketOrds, which in production is very heavy. Also given the fact the tsid is a high cardinality field. For each tsid and buck ordinal combination we keep track of doc count and delegate to sub agg. When the tsid / bucket ordinal combination changes the time series agg on the fly creates a new bucket. Sub aggs of time series agg, only ever contain buckets for a single parent bucket ordinal, this allows to always use a bucket ordinal of value 0. After each bucket has been created the sub agg is cleared. * If the bucket that date histogram creates are contained with the index boundaries of the backing index the shard the search is executed belongs to, then reduction/pipeline aggregation can happen locally only the fly when the time series buckets are created. In order to support this a TimestampBoundsAware interface was added. That can tell a sub agg of a date histogram whether the bounds of parent bucket are within the bounds of the backing index. In this experiment the terms aggregator was hard coded to use min bucket pipeline agg, which gets fed a time series bucket (with sub agg buckets) each time tsid / bucket ordinal combo changes. If buckets are outside backing index boundary then buckets are kept around and pipeline agg is executed in reduce method of InternalTimeSeries response class. This fundamentally changes the time series agg, since the response depends on the pipeline agg used. The `TimeSeriesAggregator3` contains both of these changes. Extra notes: * Date histogram could use `AggregationExecutionContext#getTimestamp()` as source for rounding values into buckets. * I think there is no need for doc count if pipeline aggs reduce on the fly the buckets created by time series agg. * Date agg's filter by filter optimization has been disabled when agg requires in order execution. The time series index searcher doesn't work with filter by filter optimization. Relates to elastic#74660
1 parent d7c0b37 commit da262be

15 files changed

+443
-7
lines changed

modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
import org.elasticsearch.search.aggregations.InternalAggregation;
1818
import org.elasticsearch.search.aggregations.InternalAggregations;
1919
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
20+
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
2021
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
22+
import org.elasticsearch.search.aggregations.pipeline.BucketMetricsPipelineAggregationBuilder;
23+
import org.elasticsearch.search.aggregations.pipeline.BucketMetricsPipelineAggregator;
2124
import org.elasticsearch.xcontent.ObjectParser;
2225
import org.elasticsearch.xcontent.XContentBuilder;
2326

@@ -138,13 +141,28 @@ public int hashCode() {
138141
}
139142

140143
private final List<InternalTimeSeries.InternalBucket> buckets;
144+
private final BucketMetricsPipelineAggregationBuilder<?> pipelineAggregationBuilder;
141145
private final boolean keyed;
142146
// bucketMap gets lazily initialized from buckets in getBucketByKey()
143147
private transient Map<String, InternalTimeSeries.InternalBucket> bucketMap;
144148

145149
public InternalTimeSeries(String name, List<InternalTimeSeries.InternalBucket> buckets, boolean keyed, Map<String, Object> metadata) {
146150
super(name, metadata);
147-
this.buckets = buckets;
151+
this.pipelineAggregationBuilder = null;
152+
this.buckets = Objects.requireNonNull(buckets);
153+
this.keyed = keyed;
154+
}
155+
156+
public InternalTimeSeries(
157+
String name,
158+
BucketMetricsPipelineAggregationBuilder<?> pipelineAggregationBuilder,
159+
List<InternalTimeSeries.InternalBucket> buckets,
160+
boolean keyed,
161+
Map<String, Object> metadata
162+
) {
163+
super(name, metadata);
164+
this.pipelineAggregationBuilder = pipelineAggregationBuilder;
165+
this.buckets = Objects.requireNonNull(buckets);
148166
this.keyed = keyed;
149167
}
150168

@@ -153,6 +171,9 @@ public InternalTimeSeries(String name, List<InternalTimeSeries.InternalBucket> b
153171
*/
154172
public InternalTimeSeries(StreamInput in) throws IOException {
155173
super(in);
174+
pipelineAggregationBuilder = (BucketMetricsPipelineAggregationBuilder<?>) in.readOptionalNamedWriteable(
175+
PipelineAggregationBuilder.class
176+
);
156177
keyed = in.readBoolean();
157178
int size = in.readVInt();
158179
List<InternalTimeSeries.InternalBucket> buckets = new ArrayList<>(size);
@@ -194,6 +215,17 @@ protected void doWriteTo(StreamOutput out) throws IOException {
194215

195216
@Override
196217
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
218+
if (pipelineAggregationBuilder != null) {
219+
// This agg response class is shared now between 3 versions of time series aggregator,
220+
// but if TimeSeriesAggregator3 was the only implementation the reduce method would only do what happens in the if block.
221+
BucketMetricsPipelineAggregator pipelineAggregator = (BucketMetricsPipelineAggregator) pipelineAggregationBuilder.create();
222+
pipelineAggregator.start();
223+
for (InternalBucket bucket : buckets) {
224+
pipelineAggregator.collect(this, bucket);
225+
}
226+
return pipelineAggregator.end();
227+
}
228+
197229
// TODO: optimize single result case either by having a if check here and return aggregations.get(0) or
198230
// by overwriting the mustReduceOnSingleInternalAgg() method
199231
final int initialCapacity = aggregations.stream()
@@ -244,7 +276,8 @@ protected boolean lessThan(IteratorAndCurrent<InternalBucket> a, IteratorAndCurr
244276
reducedBucket = reduceBucket(bucketsWithSameKey, reduceContext);
245277
}
246278
BytesRef tsid = reducedBucket.key;
247-
assert prevTsid == null || tsid.compareTo(prevTsid) > 0;
279+
assert prevTsid == null || tsid.compareTo(prevTsid) > 0
280+
: "prevTsid=" + TimeSeriesIdFieldMapper.decodeTsid(prevTsid) + " tsid=" + TimeSeriesIdFieldMapper.decodeTsid(tsid);
248281
reduced.buckets.add(reducedBucket);
249282
prevTsid = tsid;
250283
}

modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.search.aggregations.AggregatorFactories;
1313
import org.elasticsearch.search.aggregations.AggregatorFactory;
1414
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
15+
import org.elasticsearch.search.aggregations.bucket.histogram.TimestampBoundsAware;
1516
import org.elasticsearch.search.aggregations.support.AggregationContext;
1617

1718
import java.io.IOException;
@@ -36,6 +37,10 @@ public TimeSeriesAggregationFactory(
3637
@Override
3738
protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map<String, Object> metadata)
3839
throws IOException {
39-
return new TimeSeriesAggregator(name, factories, keyed, context, parent, cardinality, metadata);
40+
if (parent instanceof TimestampBoundsAware) {
41+
return new TimeSeriesAggregator3(name, factories, keyed, context, parent, metadata);
42+
} else {
43+
return new TimeSeriesAggregator(name, factories, keyed, context, parent, cardinality, metadata);
44+
}
4045
}
4146
}

modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,15 +92,29 @@ protected void doClose() {
9292
protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, LeafBucketCollector sub) throws IOException {
9393
return new LeafBucketCollectorBase(sub, null) {
9494

95+
// This helps significantly reducing time spent attempting to add bucket + tsid combos that already were added.
96+
long currentTsidOrd = -1;
97+
long currentBucket = -1;
98+
long currentBucketOrdinal;
99+
95100
@Override
96101
public void collect(int doc, long bucket) throws IOException {
102+
if (currentBucket == bucket && currentTsidOrd == aggCtx.getTsidOrd()) {
103+
collectExistingBucket(sub, doc, currentBucketOrdinal);
104+
return;
105+
}
106+
97107
long bucketOrdinal = bucketOrds.add(bucket, aggCtx.getTsid());
98108
if (bucketOrdinal < 0) { // already seen
99109
bucketOrdinal = -1 - bucketOrdinal;
100110
collectExistingBucket(sub, doc, bucketOrdinal);
101111
} else {
102112
collectBucket(sub, doc, bucketOrdinal);
103113
}
114+
115+
currentBucketOrdinal = bucketOrdinal;
116+
currentTsidOrd = aggCtx.getTsidOrd();
117+
currentBucket = bucket;
104118
}
105119
};
106120
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.aggregations.bucket.timeseries;
10+
11+
import org.apache.lucene.util.BytesRef;
12+
import org.elasticsearch.index.TimestampBounds;
13+
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
14+
import org.elasticsearch.search.aggregations.Aggregator;
15+
import org.elasticsearch.search.aggregations.AggregatorFactories;
16+
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
17+
import org.elasticsearch.search.aggregations.InternalAggregation;
18+
import org.elasticsearch.search.aggregations.LeafBucketCollector;
19+
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
20+
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
21+
import org.elasticsearch.search.aggregations.bucket.histogram.TimestampBoundsAware;
22+
import org.elasticsearch.search.aggregations.support.AggregationContext;
23+
24+
import java.io.IOException;
25+
import java.util.ArrayList;
26+
import java.util.HashMap;
27+
import java.util.List;
28+
import java.util.Map;
29+
30+
public class TimeSeriesAggregator2 extends BucketsAggregator {
31+
32+
private final TimestampBounds timestampBounds;
33+
private final TimestampBoundsAware parent;
34+
private final Map<Long, List<InternalTimeSeries.InternalBucket>> results;
35+
private final boolean keyed;
36+
37+
public TimeSeriesAggregator2(
38+
String name,
39+
AggregatorFactories factories,
40+
boolean keyed,
41+
AggregationContext context,
42+
Aggregator parent,
43+
Map<String, Object> metadata
44+
) throws IOException {
45+
super(name, factories, context, parent, CardinalityUpperBound.ONE, metadata);
46+
this.keyed = keyed;
47+
this.timestampBounds = context.getIndexSettings().getTimestampBounds();
48+
this.parent = (TimestampBoundsAware) parent;
49+
this.results = new HashMap<>();
50+
}
51+
52+
@Override
53+
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
54+
// figure out running pipeline aggs here
55+
// context.pipelineTreeRoot().subTree(agg.getName())
56+
completeBucket();
57+
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
58+
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
59+
long owningOrdinal = owningBucketOrds[ordIdx];
60+
List<InternalTimeSeries.InternalBucket> buckets = results.get(owningOrdinal);
61+
if (buckets == null) {
62+
continue;
63+
}
64+
result[ordIdx] = new InternalTimeSeries(name, buckets, keyed, metadata());
65+
}
66+
return result;
67+
}
68+
69+
@Override
70+
public InternalAggregation buildEmptyAggregation() {
71+
return new InternalTimeSeries(name, new ArrayList<>(), false, metadata());
72+
}
73+
74+
BytesRef currentTsid;
75+
int currentTsidOrd = -1;
76+
long currentParentBucket;
77+
long docCount;
78+
79+
@Override
80+
protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, LeafBucketCollector sub) throws IOException {
81+
return new LeafBucketCollectorBase(sub, null) {
82+
83+
@Override
84+
public void collect(int doc, long bucket) throws IOException {
85+
// System.out.println("bucketId=" + bucket);
86+
// System.out.println("tsid=" + TimeSeriesIdFieldMapper.decodeTsid(aggCtx.getTsid()));
87+
88+
if (currentTsidOrd == aggCtx.getTsidOrd() && currentParentBucket == bucket) {
89+
docCount++;
90+
sub.collect(doc, 0L);
91+
return;
92+
}
93+
if (currentTsid != null) {
94+
completeBucket();
95+
}
96+
if (currentTsidOrd != aggCtx.getTsidOrd()) {
97+
currentTsidOrd = aggCtx.getTsidOrd();
98+
currentTsid = BytesRef.deepCopyOf(aggCtx.getTsid());
99+
}
100+
if (currentParentBucket != bucket) {
101+
currentParentBucket = bucket;
102+
}
103+
104+
sub.clear();
105+
docCount = 1;
106+
sub.collect(doc, 0L);
107+
}
108+
};
109+
}
110+
111+
private void completeBucket() throws IOException {
112+
InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket(
113+
currentTsid,
114+
docCount,
115+
buildSubAggsForBuckets(new long[] { 0L })[0],
116+
keyed
117+
);
118+
// System.out.println("complete bucket=" + currentParentBucket);
119+
List<InternalTimeSeries.InternalBucket> result = results.get(currentParentBucket);
120+
if (result == null) {
121+
result = new ArrayList<>();
122+
results.put(currentParentBucket, result);
123+
}
124+
result.add(bucket);
125+
}
126+
127+
}

0 commit comments

Comments
 (0)