Skip to content

Commit 29287d1

Browse files
committed
Add a time series aggregation implementation to make use of the fact that docids are emitted in tsid and parent bucket ordinal.
This is true when the parent aggregation is data histogram (which is typical), due to the fact that TimeSeriesIndexSearcher emits docs in tsid and timestamp order. Relates to #74660
1 parent 2a33538 commit 29287d1

File tree

3 files changed

+165
-4
lines changed

3 files changed

+165
-4
lines changed

modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationBuilder.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.search.aggregations.AggregationBuilder;
1616
import org.elasticsearch.search.aggregations.AggregatorFactories;
1717
import org.elasticsearch.search.aggregations.AggregatorFactory;
18+
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
1819
import org.elasticsearch.search.aggregations.support.AggregationContext;
1920
import org.elasticsearch.xcontent.InstantiatingObjectParser;
2021
import org.elasticsearch.xcontent.ParseField;
@@ -79,7 +80,8 @@ protected AggregatorFactory doBuild(
7980
AggregatorFactory parent,
8081
AggregatorFactories.Builder subFactoriesBuilder
8182
) throws IOException {
82-
return new TimeSeriesAggregationFactory(name, keyed, context, parent, subFactoriesBuilder, metadata);
83+
boolean expectTsidBucketInOrder = parent instanceof DateHistogramAggregatorFactory;
84+
return new TimeSeriesAggregationFactory(name, keyed, context, parent, subFactoriesBuilder, metadata, expectTsidBucketInOrder);
8385
}
8486

8587
@Override

modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationFactory.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,28 @@
2020
public class TimeSeriesAggregationFactory extends AggregatorFactory {
2121

2222
private final boolean keyed;
23+
private final boolean expectTsidBucketInOrder;
2324

2425
public TimeSeriesAggregationFactory(
2526
String name,
2627
boolean keyed,
2728
AggregationContext context,
2829
AggregatorFactory parent,
2930
AggregatorFactories.Builder subFactoriesBuilder,
30-
Map<String, Object> metadata
31-
) throws IOException {
31+
Map<String, Object> metadata,
32+
boolean expectTsidBucketInOrder) throws IOException {
3233
super(name, context, parent, subFactoriesBuilder, metadata);
3334
this.keyed = keyed;
35+
this.expectTsidBucketInOrder = expectTsidBucketInOrder;
3436
}
3537

3638
@Override
3739
protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map<String, Object> metadata)
3840
throws IOException {
39-
return new TimeSeriesAggregator(name, factories, keyed, context, parent, cardinality, metadata);
41+
if (expectTsidBucketInOrder) {
42+
return new TimeSeriesInOrderAggregator(name, factories, keyed, context, parent, cardinality, metadata);
43+
} else {
44+
return new TimeSeriesAggregator(name, factories, keyed, context, parent, cardinality, metadata);
45+
}
4046
}
4147
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.aggregations.bucket.timeseries;
10+
11+
import org.apache.lucene.util.BytesRef;
12+
import org.elasticsearch.common.util.BytesRefArray;
13+
import org.elasticsearch.common.util.LongObjectPagedHashMap;
14+
import org.elasticsearch.core.Releasables;
15+
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
16+
import org.elasticsearch.search.aggregations.Aggregator;
17+
import org.elasticsearch.search.aggregations.AggregatorFactories;
18+
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
19+
import org.elasticsearch.search.aggregations.InternalAggregation;
20+
import org.elasticsearch.search.aggregations.InternalAggregations;
21+
import org.elasticsearch.search.aggregations.LeafBucketCollector;
22+
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
23+
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
24+
import org.elasticsearch.search.aggregations.support.AggregationContext;
25+
26+
import java.io.IOException;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.Map;
30+
31+
public class TimeSeriesInOrderAggregator extends BucketsAggregator {
32+
33+
// reuse tsids between owning bucket ordinals:
34+
private final BytesRefArray collectedTsids;
35+
private final LongObjectPagedHashMap<List<InternalBucket>> results;
36+
private final boolean keyed;
37+
38+
public TimeSeriesInOrderAggregator(
39+
String name,
40+
AggregatorFactories factories,
41+
boolean keyed,
42+
AggregationContext context,
43+
Aggregator parent,
44+
CardinalityUpperBound cardinality,
45+
Map<String, Object> metadata
46+
) throws IOException {
47+
super(name, factories, context, parent, cardinality, metadata);
48+
this.keyed = keyed;
49+
this.results = new LongObjectPagedHashMap<>(1, context.bigArrays());
50+
this.collectedTsids = new BytesRefArray(1, context.bigArrays());
51+
}
52+
53+
@Override
54+
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
55+
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
56+
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
57+
long owningOrdinal = owningBucketOrds[ordIdx];
58+
List<InternalBucket> internalBuckets = results.get(owningOrdinal);
59+
if (internalBuckets != null) {
60+
BytesRef spare = new BytesRef();
61+
List<InternalTimeSeries.InternalBucket> buckets = new ArrayList<>(internalBuckets.size());
62+
for (InternalBucket internalBucket : internalBuckets) {
63+
BytesRef key = collectedTsids.get(internalBucket.tsidOffset, spare);
64+
InternalAggregations internalAggregations = buildSubAggsForBuckets(new long[] { internalBucket.bucketOrd })[0];
65+
buckets.add(
66+
new InternalTimeSeries.InternalBucket(
67+
BytesRef.deepCopyOf(key),
68+
internalBucket.docCount,
69+
internalAggregations,
70+
keyed
71+
)
72+
);
73+
}
74+
result[ordIdx] = new InternalTimeSeries(name, buckets, keyed, metadata());
75+
} else {
76+
result[ordIdx] = buildEmptyAggregation();
77+
}
78+
}
79+
return result;
80+
}
81+
82+
@Override
83+
public InternalAggregation buildEmptyAggregation() {
84+
return new InternalTimeSeries(name, new ArrayList<>(), false, metadata());
85+
}
86+
87+
private BytesRef currentTsid;
88+
private int currentTsidOrd = -1;
89+
private long currentParentBucket = -1;
90+
private long docCount;
91+
// TODO use 0L as bucket ordinal and clear sub aggregations after bucket/parent bucket ordinal combination changes
92+
// Ideally use a constant ordinal (0) here and tsid or parent bucket change reset sub and
93+
// reuse the same ordinal. This is possible because a tsid / parent bucket ordinal are unique and
94+
// don't reappear when either one changes.
95+
private long bucketOrdinalGenerator;
96+
97+
@Override
98+
protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, LeafBucketCollector sub) {
99+
return new LeafBucketCollectorBase(sub, null) {
100+
101+
@Override
102+
public void collect(int doc, long bucket) throws IOException {
103+
if (currentTsidOrd == aggCtx.getTsidOrd() && currentParentBucket == bucket) {
104+
docCount++;
105+
sub.collect(doc, bucketOrdinalGenerator);
106+
return;
107+
}
108+
if (currentTsid != null) {
109+
completeBucket();
110+
bucketOrdinalGenerator++;
111+
}
112+
if (currentTsidOrd != aggCtx.getTsidOrd()) {
113+
currentTsidOrd = aggCtx.getTsidOrd();
114+
currentTsid = aggCtx.getTsid();
115+
116+
collectedTsids.append(currentTsid);
117+
}
118+
if (currentParentBucket != bucket) {
119+
currentParentBucket = bucket;
120+
}
121+
122+
docCount = 1;
123+
sub.collect(doc, bucketOrdinalGenerator);
124+
}
125+
};
126+
}
127+
128+
@Override
129+
protected void doPostCollection() {
130+
if (currentTsid != null) {
131+
completeBucket();
132+
}
133+
}
134+
135+
private void completeBucket() {
136+
InternalBucket bucket = new InternalBucket(collectedTsids.size() - 1, bucketOrdinalGenerator, docCount);
137+
// TODO: instead of collecting all buckets, perform pipeline aggregations here:
138+
// (Then we don't need to keep all these buckets in memory)
139+
List<InternalBucket> result = results.get(currentParentBucket);
140+
if (result == null) {
141+
result = new ArrayList<>();
142+
results.put(currentParentBucket, result);
143+
}
144+
result.add(bucket);
145+
}
146+
147+
@Override
148+
protected void doClose() {
149+
Releasables.close(results, collectedTsids);
150+
}
151+
152+
record InternalBucket(long tsidOffset, long bucketOrd, long docCount) {}
153+
}

0 commit comments

Comments
 (0)