Skip to content

Metrics aggregate map is generic over temporality #2530

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions opentelemetry-sdk/src/metrics/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@
fn as_mut(&mut self) -> &mut dyn any::Any;
}

/// Allow to access data points of an [Aggregation].
pub(crate) trait AggregationDataPoints {
/// The type of data point in the aggregation.
type DataPoint;
/// The data points of the aggregation.
fn points(&mut self) -> &mut Vec<Self::DataPoint>;
}

/// DataPoint is a single data point in a time series.
#[derive(Debug, PartialEq)]
pub struct GaugeDataPoint<T> {
Expand Down Expand Up @@ -142,6 +150,14 @@
}
}

impl<T> AggregationDataPoints for Sum<T> {
type DataPoint = SumDataPoint<T>;

fn points(&mut self) -> &mut Vec<Self::DataPoint> {
&mut self.data_points
}
}

/// Represents the histogram of all measurements of values from an instrument.
#[derive(Debug)]
pub struct Histogram<T> {
Expand Down Expand Up @@ -228,6 +244,14 @@
}
}

impl<T> AggregationDataPoints for ExponentialHistogram<T> {
type DataPoint = ExponentialHistogramDataPoint<T>;

fn points(&mut self) -> &mut Vec<Self::DataPoint> {
&mut self.data_points
}

Check warning on line 252 in opentelemetry-sdk/src/metrics/data/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/data/mod.rs#L250-L252

Added lines #L250 - L252 were not covered by tests
}

/// A single exponential histogram data point in a time series.
#[derive(Debug, PartialEq)]
pub struct ExponentialHistogramDataPoint<T> {
Expand Down
27 changes: 24 additions & 3 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@
use crate::metrics::{data::Aggregation, Temporality};

use super::{
exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
precomputed_sum::PrecomputedSum, sum::Sum, Number,
aggregate_impl::{create_aggregation, CumulativeValueMap, DeltaValueMap},
exponential_histogram::ExpoHistogram,
histogram::Histogram,
last_value::LastValue,
precomputed_sum::PrecomputedSum,
sum::{Sum, SumNew},
Number,
};

pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000;
Expand Down Expand Up @@ -157,7 +162,23 @@

/// Builds a sum aggregate function input and output.
pub(crate) fn sum(&self, monotonic: bool) -> AggregateFns<T> {
Sum::new(self.temporality, self.filter.clone(), monotonic).into()
// this if statement does nothing, but it allows to preserve old code, without compile warnings
if true {
match self.temporality {
Temporality::Delta => create_aggregation(
SumNew { monotonic },
DeltaValueMap::new(()),
self.filter.clone(),
),
_ => create_aggregation(
SumNew { monotonic },
CumulativeValueMap::new(()),
self.filter.clone(),
),
}
} else {
Sum::new(self.temporality, self.filter.clone(), monotonic).into()

Check warning on line 180 in opentelemetry-sdk/src/metrics/internal/aggregate.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/aggregate.rs#L180

Added line #L180 was not covered by tests
}
}

/// Builds a histogram aggregate function input and output.
Expand Down
207 changes: 207 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/aggregate_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
use std::{marker::PhantomData, sync::Arc};

use opentelemetry::KeyValue;

use crate::metrics::{
data::{Aggregation, AggregationDataPoints},
Temporality,
};

use super::{
aggregate::{AggregateTime, AttributeSetFilter},
AggregateFns, AggregateTimeInitiator, Aggregator, ComputeAggregation, Measure, Number,
ValueMap,
};

/// Aggregate measurements for attribute sets and collect these aggregates into data points for specific temporality
pub(crate) trait AggregateMap: Send + Sync + 'static {
const TEMPORALITY: Temporality;
type Aggr: Aggregator;

fn measure(&self, value: <Self::Aggr as Aggregator>::PreComputedValue, attributes: &[KeyValue]);

fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP;
}

/// This trait provides aggregation specific functionality
pub(crate) trait AggregationImpl<T>: Send + Sync + 'static {
// an implementation that knows how to aggregate a measurement
type Aggr: Aggregator;
// an implementation that stores collected aggregation data
type AggrData: Aggregation + AggregationDataPoints;

fn precompute(&self, value: T) -> <Self::Aggr as Aggregator>::PreComputedValue;
fn new_aggregation_data(&self, temporality: Temporality, time: AggregateTime)
-> Self::AggrData;
fn reset_aggregation_data(
&self,
existing: &mut Self::AggrData,
temporality: Temporality,
time: AggregateTime,
);
fn build_create_points_fn(
&self,
) -> impl FnMut(Vec<KeyValue>, &Self::Aggr) -> <Self::AggrData as AggregationDataPoints>::DataPoint;
}

pub(crate) fn create_aggregation<A, AM, T>(
aggregation: A,
aggregate_map: AM,
filter: AttributeSetFilter,
) -> AggregateFns<T>
where
AM: AggregateMap,
A: AggregationImpl<T, Aggr = AM::Aggr>,
T: Number,
{
let fns = Arc::new(AggregionFnsImpl {
filter,
aggregation,
aggregate_map,
time: AggregateTimeInitiator::default(),
_marker: Default::default(),
});
AggregateFns {
collect: fns.clone(),
measure: fns,
}
}

struct AggregionFnsImpl<A, AM, T> {
filter: AttributeSetFilter,
aggregation: A,
aggregate_map: AM,
time: AggregateTimeInitiator,
_marker: PhantomData<T>,
}

impl<A, AM, T> Measure<T> for AggregionFnsImpl<A, AM, T>
where
A: AggregationImpl<T>,
AM: AggregateMap<Aggr = A::Aggr>,
T: Number,
{
fn call(&self, measurement: T, attrs: &[KeyValue]) {
self.filter.apply(attrs, |filtered_attrs| {
let precomputed = self.aggregation.precompute(measurement);
self.aggregate_map.measure(precomputed, filtered_attrs);
});
}
}

impl<A, AM, T> ComputeAggregation for AggregionFnsImpl<A, AM, T>
where
A: AggregationImpl<T>,
AM: AggregateMap<Aggr = A::Aggr>,
T: Number,
{
fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
let time = if let Temporality::Delta = AM::TEMPORALITY {
self.time.delta()
} else {
self.time.cumulative()
};
let mut s_data = dest.and_then(|d| d.as_mut().downcast_mut::<A::AggrData>());
let mut new_agg = match s_data.as_mut() {
Some(existing) => {
self.aggregation
.reset_aggregation_data(existing, AM::TEMPORALITY, time);
None
}
None => Some(self.aggregation.new_aggregation_data(AM::TEMPORALITY, time)),
};
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));

let create_points_fn = self.aggregation.build_create_points_fn();
self.aggregate_map
.collect_data_points(s_data.points(), create_points_fn);

(
s_data.points().len(),
new_agg.map(|a| Box::new(a) as Box<dyn Aggregation>),
)
}
}

/// At the moment use [`ValueMap`] under the hood (which support both Delta and Cumulative), to implement `AggregateMap` for Delta temporality
/// Later this could be improved to support only Delta temporality
pub(crate) struct DeltaValueMap<A>(ValueMap<A>)
where
A: Aggregator;

impl<A> DeltaValueMap<A>
where
A: Aggregator,
{
pub(crate) fn new(config: A::InitConfig) -> Self {
Self(ValueMap::new(config))
}
}

impl<A> AggregateMap for DeltaValueMap<A>
where
A: Aggregator,
<A as Aggregator>::InitConfig: Send + Sync,
{
const TEMPORALITY: Temporality = Temporality::Delta;

type Aggr = A;

fn measure(
&self,
value: <Self::Aggr as Aggregator>::PreComputedValue,
attributes: &[KeyValue],
) {
self.0.measure(value, attributes);
}

fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, mut map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP,
{
self.0
.collect_and_reset(dest, |attributes, aggr| map_fn(attributes, &aggr));
}
}

/// At the moment use [`ValueMap`] under the hood (which support both Delta and Cumulative), to implement `AggregateMap` for Cumulative temporality
/// Later this could be improved to support only Cumulative temporality
pub(crate) struct CumulativeValueMap<A>(ValueMap<A>)
where
A: Aggregator;

impl<A> CumulativeValueMap<A>
where
A: Aggregator,
{
pub(crate) fn new(config: A::InitConfig) -> Self {
Self(ValueMap::new(config))
}
}

impl<A> AggregateMap for CumulativeValueMap<A>
where
A: Aggregator,
<A as Aggregator>::InitConfig: Send + Sync,
{
const TEMPORALITY: Temporality = Temporality::Cumulative;

type Aggr = A;

fn measure(
&self,
value: <Self::Aggr as Aggregator>::PreComputedValue,
attributes: &[KeyValue],
) {
self.0.measure(value, attributes);
}

fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP,
{
self.0.collect_readonly(dest, map_fn);
}
}
8 changes: 6 additions & 2 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod aggregate;
mod aggregate_impl;
mod exponential_histogram;
mod histogram;
mod last_value;
Expand All @@ -12,7 +13,10 @@ use std::ops::{Add, AddAssign, DerefMut, Sub};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, OnceLock, RwLock};

use aggregate::{is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT};
use aggregate::{is_under_cardinality_limit, AggregateTimeInitiator, STREAM_CARDINALITY_LIMIT};

pub(crate) use aggregate_impl::AggregationImpl;

pub(crate) use aggregate::{AggregateBuilder, AggregateFns, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
use opentelemetry::{otel_warn, KeyValue};
Expand All @@ -25,7 +29,7 @@ fn stream_overflow_attributes() -> &'static Vec<KeyValue> {
STREAM_OVERFLOW_ATTRIBUTES.get_or_init(|| vec![KeyValue::new("otel.metric.overflow", "true")])
}

pub(crate) trait Aggregator {
pub(crate) trait Aggregator: Send + Sync + 'static {
/// A static configuration that is needed in order to initialize aggregator.
/// E.g. bucket_size at creation time .
type InitConfig;
Expand Down
55 changes: 52 additions & 3 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use crate::metrics::data::{self, Aggregation, SumDataPoint};
use crate::metrics::Temporality;
use opentelemetry::KeyValue;

use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter};
use super::{Aggregator, AtomicTracker, ComputeAggregation, Measure, Number};
use super::aggregate::{AggregateTime, AggregateTimeInitiator, AttributeSetFilter};
use super::{AggregationImpl, Aggregator, AtomicTracker, ComputeAggregation, Measure, Number};
use super::{AtomicallyUpdate, ValueMap};

struct Increment<T>
pub(crate) struct Increment<T>
where
T: AtomicallyUpdate<T>,
{
Expand Down Expand Up @@ -162,3 +162,52 @@ where
}
}
}

pub(crate) struct SumNew {
pub(crate) monotonic: bool,
}

impl<T> AggregationImpl<T> for SumNew
where
T: Number,
{
type Aggr = Increment<T>;
type AggrData = data::Sum<T>;

fn precompute(&self, value: T) -> T {
value
}

fn new_aggregation_data(&self, temporality: Temporality, time: AggregateTime) -> data::Sum<T> {
data::Sum {
data_points: vec![],
start_time: time.start,
time: time.current,
temporality,
is_monotonic: self.monotonic,
}
}

fn reset_aggregation_data(
&self,
existing: &mut data::Sum<T>,
temporality: Temporality,
time: AggregateTime,
) {
existing.data_points.clear();
existing.start_time = time.start;
existing.time = time.current;
existing.temporality = temporality;
existing.is_monotonic = self.monotonic;
}

fn build_create_points_fn(
&self,
) -> impl FnMut(Vec<KeyValue>, &Increment<T>) -> SumDataPoint<T> {
|attributes, aggr| SumDataPoint {
attributes,
value: aggr.value.get_value(),
exemplars: vec![],
}
}
}
Loading