Skip to content

Commit bdc6ecd

Browse files
committed
PoC new approach to build aggregations
1 parent 90b0dd4 commit bdc6ecd

File tree

5 files changed

+313
-8
lines changed

5 files changed

+313
-8
lines changed

opentelemetry-sdk/src/metrics/data/mod.rs

+24
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ pub trait Aggregation: fmt::Debug + any::Any + Send + Sync {
5353
fn as_mut(&mut self) -> &mut dyn any::Any;
5454
}
5555

56+
/// Allow to access data points of an [Aggregation].
57+
pub(crate) trait AggregationDataPoints {
58+
/// The type of data point in the aggregation.
59+
type DataPoint;
60+
/// The data points of the aggregation.
61+
fn points(&mut self) -> &mut Vec<Self::DataPoint>;
62+
}
63+
5664
/// DataPoint is a single data point in a time series.
5765
#[derive(Debug, PartialEq)]
5866
pub struct GaugeDataPoint<T> {
@@ -142,6 +150,14 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Sum<T> {
142150
}
143151
}
144152

153+
impl<T> AggregationDataPoints for Sum<T> {
154+
type DataPoint = SumDataPoint<T>;
155+
156+
fn points(&mut self) -> &mut Vec<Self::DataPoint> {
157+
&mut self.data_points
158+
}
159+
}
160+
145161
/// Represents the histogram of all measurements of values from an instrument.
146162
#[derive(Debug)]
147163
pub struct Histogram<T> {
@@ -228,6 +244,14 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for ExponentialHistogram
228244
}
229245
}
230246

247+
impl<T> AggregationDataPoints for ExponentialHistogram<T> {
248+
type DataPoint = ExponentialHistogramDataPoint<T>;
249+
250+
fn points(&mut self) -> &mut Vec<Self::DataPoint> {
251+
&mut self.data_points
252+
}
253+
}
254+
231255
/// A single exponential histogram data point in a time series.
232256
#[derive(Debug, PartialEq)]
233257
pub struct ExponentialHistogramDataPoint<T> {

opentelemetry-sdk/src/metrics/internal/aggregate.rs

+24-3
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,13 @@ use opentelemetry::KeyValue;
1111
use crate::metrics::{data::Aggregation, Temporality};
1212

1313
use super::{
14-
exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
15-
precomputed_sum::PrecomputedSum, sum::Sum, Number,
14+
aggregate_impl::{create_aggregation, CumulativeValueMap, DeltaValueMap},
15+
exponential_histogram::ExpoHistogram,
16+
histogram::Histogram,
17+
last_value::LastValue,
18+
precomputed_sum::PrecomputedSum,
19+
sum::{Sum, SumNew},
20+
Number,
1621
};
1722

1823
pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000;
@@ -157,7 +162,23 @@ impl<T: Number> AggregateBuilder<T> {
157162

158163
/// Builds a sum aggregate function input and output.
159164
pub(crate) fn sum(&self, monotonic: bool) -> AggregateFns<T> {
160-
Sum::new(self.temporality, self.filter.clone(), monotonic).into()
165+
// this if statement does nothing, but it allows to preserve old code, without compile warnings
166+
if true {
167+
match self.temporality {
168+
Temporality::Delta => create_aggregation(
169+
SumNew { monotonic },
170+
DeltaValueMap::new(()),
171+
self.filter.clone(),
172+
),
173+
_ => create_aggregation(
174+
SumNew { monotonic },
175+
CumulativeValueMap::new(()),
176+
self.filter.clone(),
177+
),
178+
}
179+
} else {
180+
Sum::new(self.temporality, self.filter.clone(), monotonic).into()
181+
}
161182
}
162183

163184
/// Builds a histogram aggregate function input and output.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
use std::{marker::PhantomData, sync::Arc};
2+
3+
use opentelemetry::KeyValue;
4+
5+
use crate::metrics::{
6+
data::{Aggregation, AggregationDataPoints},
7+
Temporality,
8+
};
9+
10+
use super::{
11+
aggregate::{AggregateTime, AttributeSetFilter},
12+
AggregateFns, AggregateTimeInitiator, Aggregator, ComputeAggregation, Measure, Number,
13+
ValueMap,
14+
};
15+
16+
/// Aggregate measurements for attribute sets and collect these aggregates into data points for specific temporality
17+
pub(crate) trait AggregateMap: Send + Sync + 'static {
18+
const TEMPORALITY: Temporality;
19+
type Aggr: Aggregator;
20+
21+
fn measure(&self, value: <Self::Aggr as Aggregator>::PreComputedValue, attributes: &[KeyValue]);
22+
23+
fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, map_fn: MapFn)
24+
where
25+
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP;
26+
}
27+
28+
/// This trait provides aggregation specific functionality
29+
pub(crate) trait AggregationImpl<T>: Send + Sync + 'static {
30+
// an implementation that knows how to aggregate a measurement
31+
type Aggr: Aggregator;
32+
// an implementation that stores collected aggregation data
33+
type AggrData: Aggregation + AggregationDataPoints;
34+
35+
fn precompute(&self, value: T) -> <Self::Aggr as Aggregator>::PreComputedValue;
36+
fn new_aggregation_data(&self, temporality: Temporality, time: AggregateTime)
37+
-> Self::AggrData;
38+
fn reset_aggregation_data(
39+
&self,
40+
existing: &mut Self::AggrData,
41+
temporality: Temporality,
42+
time: AggregateTime,
43+
);
44+
fn build_create_points_fn(
45+
&self,
46+
) -> impl FnMut(Vec<KeyValue>, &Self::Aggr) -> <Self::AggrData as AggregationDataPoints>::DataPoint;
47+
}
48+
49+
pub(crate) fn create_aggregation<A, AM, T>(
50+
aggregation: A,
51+
aggregate_map: AM,
52+
filter: AttributeSetFilter,
53+
) -> AggregateFns<T>
54+
where
55+
AM: AggregateMap,
56+
A: AggregationImpl<T, Aggr = AM::Aggr>,
57+
T: Number,
58+
{
59+
let fns = Arc::new(AggregionFnsImpl {
60+
filter,
61+
aggregation,
62+
aggregate_map,
63+
time: AggregateTimeInitiator::default(),
64+
_marker: Default::default(),
65+
});
66+
AggregateFns {
67+
collect: fns.clone(),
68+
measure: fns,
69+
}
70+
}
71+
72+
struct AggregionFnsImpl<A, AM, T> {
73+
filter: AttributeSetFilter,
74+
aggregation: A,
75+
aggregate_map: AM,
76+
time: AggregateTimeInitiator,
77+
_marker: PhantomData<T>,
78+
}
79+
80+
impl<A, AM, T> Measure<T> for AggregionFnsImpl<A, AM, T>
81+
where
82+
A: AggregationImpl<T>,
83+
AM: AggregateMap<Aggr = A::Aggr>,
84+
T: Number,
85+
{
86+
fn call(&self, measurement: T, attrs: &[KeyValue]) {
87+
self.filter.apply(attrs, |filtered_attrs| {
88+
let precomputed = self.aggregation.precompute(measurement);
89+
self.aggregate_map.measure(precomputed, filtered_attrs);
90+
});
91+
}
92+
}
93+
94+
impl<A, AM, T> ComputeAggregation for AggregionFnsImpl<A, AM, T>
95+
where
96+
A: AggregationImpl<T>,
97+
AM: AggregateMap<Aggr = A::Aggr>,
98+
T: Number,
99+
{
100+
fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
101+
let time = if let Temporality::Delta = AM::TEMPORALITY {
102+
self.time.delta()
103+
} else {
104+
self.time.cumulative()
105+
};
106+
let mut s_data = dest.and_then(|d| d.as_mut().downcast_mut::<A::AggrData>());
107+
let mut new_agg = match s_data.as_mut() {
108+
Some(existing) => {
109+
self.aggregation
110+
.reset_aggregation_data(existing, AM::TEMPORALITY, time);
111+
None
112+
}
113+
None => Some(self.aggregation.new_aggregation_data(AM::TEMPORALITY, time)),
114+
};
115+
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
116+
117+
let create_points_fn = self.aggregation.build_create_points_fn();
118+
self.aggregate_map
119+
.collect_data_points(s_data.points(), create_points_fn);
120+
121+
(
122+
s_data.points().len(),
123+
new_agg.map(|a| Box::new(a) as Box<dyn Aggregation>),
124+
)
125+
}
126+
}
127+
128+
/// At the moment use [`ValueMap`] under the hood (which support both Delta and Cumulative), to implement `AggregateMap` for Delta temporality
129+
/// Later this could be improved to support only Delta temporality
130+
pub(crate) struct DeltaValueMap<A>(ValueMap<A>)
131+
where
132+
A: Aggregator;
133+
134+
impl<A> DeltaValueMap<A>
135+
where
136+
A: Aggregator,
137+
{
138+
pub(crate) fn new(config: A::InitConfig) -> Self {
139+
Self(ValueMap::new(config))
140+
}
141+
}
142+
143+
impl<A> AggregateMap for DeltaValueMap<A>
144+
where
145+
A: Aggregator,
146+
<A as Aggregator>::InitConfig: Send + Sync,
147+
{
148+
const TEMPORALITY: Temporality = Temporality::Delta;
149+
150+
type Aggr = A;
151+
152+
fn measure(
153+
&self,
154+
value: <Self::Aggr as Aggregator>::PreComputedValue,
155+
attributes: &[KeyValue],
156+
) {
157+
self.0.measure(value, attributes);
158+
}
159+
160+
fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, mut map_fn: MapFn)
161+
where
162+
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP,
163+
{
164+
self.0
165+
.collect_and_reset(dest, |attributes, aggr| map_fn(attributes, &aggr));
166+
}
167+
}
168+
169+
/// At the moment use [`ValueMap`] under the hood (which support both Delta and Cumulative), to implement `AggregateMap` for Cumulative temporality
170+
/// Later this could be improved to support only Cumulative temporality
171+
pub(crate) struct CumulativeValueMap<A>(ValueMap<A>)
172+
where
173+
A: Aggregator;
174+
175+
impl<A> CumulativeValueMap<A>
176+
where
177+
A: Aggregator,
178+
{
179+
pub(crate) fn new(config: A::InitConfig) -> Self {
180+
Self(ValueMap::new(config))
181+
}
182+
}
183+
184+
impl<A> AggregateMap for CumulativeValueMap<A>
185+
where
186+
A: Aggregator,
187+
<A as Aggregator>::InitConfig: Send + Sync,
188+
{
189+
const TEMPORALITY: Temporality = Temporality::Cumulative;
190+
191+
type Aggr = A;
192+
193+
fn measure(
194+
&self,
195+
value: <Self::Aggr as Aggregator>::PreComputedValue,
196+
attributes: &[KeyValue],
197+
) {
198+
self.0.measure(value, attributes);
199+
}
200+
201+
fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, map_fn: MapFn)
202+
where
203+
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP,
204+
{
205+
self.0.collect_readonly(dest, map_fn);
206+
}
207+
}

opentelemetry-sdk/src/metrics/internal/mod.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod aggregate;
2+
mod aggregate_impl;
23
mod exponential_histogram;
34
mod histogram;
45
mod last_value;
@@ -12,7 +13,10 @@ use std::ops::{Add, AddAssign, DerefMut, Sub};
1213
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
1314
use std::sync::{Arc, OnceLock, RwLock};
1415

15-
use aggregate::{is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT};
16+
use aggregate::{is_under_cardinality_limit, AggregateTimeInitiator, STREAM_CARDINALITY_LIMIT};
17+
18+
pub(crate) use aggregate_impl::AggregationImpl;
19+
1620
pub(crate) use aggregate::{AggregateBuilder, AggregateFns, ComputeAggregation, Measure};
1721
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
1822
use opentelemetry::{otel_warn, KeyValue};
@@ -25,7 +29,7 @@ fn stream_overflow_attributes() -> &'static Vec<KeyValue> {
2529
STREAM_OVERFLOW_ATTRIBUTES.get_or_init(|| vec![KeyValue::new("otel.metric.overflow", "true")])
2630
}
2731

28-
pub(crate) trait Aggregator {
32+
pub(crate) trait Aggregator: Send + Sync + 'static {
2933
/// A static configuration that is needed in order to initialize aggregator.
3034
/// E.g. bucket_size at creation time .
3135
type InitConfig;

opentelemetry-sdk/src/metrics/internal/sum.rs

+52-3
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ use crate::metrics::data::{self, Aggregation, SumDataPoint};
22
use crate::metrics::Temporality;
33
use opentelemetry::KeyValue;
44

5-
use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter};
6-
use super::{Aggregator, AtomicTracker, ComputeAggregation, Measure, Number};
5+
use super::aggregate::{AggregateTime, AggregateTimeInitiator, AttributeSetFilter};
6+
use super::{AggregationImpl, Aggregator, AtomicTracker, ComputeAggregation, Measure, Number};
77
use super::{AtomicallyUpdate, ValueMap};
88

9-
struct Increment<T>
9+
pub(crate) struct Increment<T>
1010
where
1111
T: AtomicallyUpdate<T>,
1212
{
@@ -162,3 +162,52 @@ where
162162
}
163163
}
164164
}
165+
166+
pub(crate) struct SumNew {
167+
pub(crate) monotonic: bool,
168+
}
169+
170+
impl<T> AggregationImpl<T> for SumNew
171+
where
172+
T: Number,
173+
{
174+
type Aggr = Increment<T>;
175+
type AggrData = data::Sum<T>;
176+
177+
fn precompute(&self, value: T) -> T {
178+
value
179+
}
180+
181+
fn new_aggregation_data(&self, temporality: Temporality, time: AggregateTime) -> data::Sum<T> {
182+
data::Sum {
183+
data_points: vec![],
184+
start_time: time.start,
185+
time: time.current,
186+
temporality,
187+
is_monotonic: self.monotonic,
188+
}
189+
}
190+
191+
fn reset_aggregation_data(
192+
&self,
193+
existing: &mut data::Sum<T>,
194+
temporality: Temporality,
195+
time: AggregateTime,
196+
) {
197+
existing.data_points.clear();
198+
existing.start_time = time.start;
199+
existing.time = time.current;
200+
existing.temporality = temporality;
201+
existing.is_monotonic = self.monotonic;
202+
}
203+
204+
fn build_create_points_fn(
205+
&self,
206+
) -> impl FnMut(Vec<KeyValue>, &Increment<T>) -> SumDataPoint<T> {
207+
|attributes, aggr| SumDataPoint {
208+
attributes,
209+
value: aggr.value.get_value(),
210+
exemplars: vec![],
211+
}
212+
}
213+
}

0 commit comments

Comments
 (0)