Skip to content

Commit 46475c0

Browse files
committed
ObservableGauge collect data points since previous collection
1 parent 0e751b4 commit 46475c0

File tree

3 files changed

+46
-14
lines changed

3 files changed

+46
-14
lines changed

opentelemetry-sdk/src/metrics/internal/aggregate.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,12 @@ impl<T: Number> AggregateBuilder<T> {
146146
}
147147

148148
/// Builds a last-value aggregate function input and output.
149-
pub(crate) fn last_value(&self) -> AggregateFns<T> {
150-
LastValue::new(self.temporality, self.filter.clone()).into()
149+
pub(crate) fn last_value(&self, overwrite_temporality: Option<Temporality>) -> AggregateFns<T> {
150+
LastValue::new(
151+
overwrite_temporality.unwrap_or(self.temporality),
152+
self.filter.clone(),
153+
)
154+
.into()
151155
}
152156

153157
/// Builds a precomputed sum aggregate function input and output.
@@ -210,7 +214,7 @@ mod tests {
210214
#[test]
211215
fn last_value_aggregation() {
212216
let AggregateFns { measure, collect } =
213-
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value();
217+
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value(None);
214218
let mut a = Gauge {
215219
data_points: vec![GaugeDataPoint {
216220
attributes: vec![KeyValue::new("a", 1)],

opentelemetry-sdk/src/metrics/mod.rs

+23-9
Original file line numberDiff line numberDiff line change
@@ -1394,22 +1394,32 @@ mod tests {
13941394
}
13951395

13961396
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1397-
async fn asynchronous_instruments_cumulative_with_gap_in_measurements() {
1397+
async fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement() {
13981398
// Run this test with stdout enabled to see output.
1399-
// cargo test asynchronous_instruments_cumulative_with_gap_in_measurements --features=testing -- --nocapture
1399+
// cargo test asynchronous_instruments_cumulative_data_points_only_from_last_measurement --features=testing -- --nocapture
14001400

1401-
asynchronous_instruments_cumulative_with_gap_in_measurements_helper("counter");
1402-
asynchronous_instruments_cumulative_with_gap_in_measurements_helper("updown_counter");
1403-
asynchronous_instruments_cumulative_with_gap_in_measurements_helper("gauge");
1401+
asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1402+
"gauge", true,
1403+
);
1404+
// TODO fix: all asynchronous instruments should not emit data points if not measured
1405+
// but these implementations are still buggy
1406+
asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1407+
"counter", false,
1408+
);
1409+
asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1410+
"updown_counter",
1411+
false,
1412+
);
14041413
}
14051414

1406-
fn asynchronous_instruments_cumulative_with_gap_in_measurements_helper(
1415+
fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
14071416
instrument_name: &'static str,
1417+
should_not_emit: bool,
14081418
) {
14091419
let mut test_context = TestContext::new(Temporality::Cumulative);
14101420
let attributes = Arc::new([KeyValue::new("key1", "value1")]);
14111421

1412-
// Create instrument and emit measurements
1422+
// Create instrument and emit measurements once
14131423
match instrument_name {
14141424
"counter" => {
14151425
let has_run = AtomicBool::new(false);
@@ -1466,8 +1476,12 @@ mod tests {
14661476

14671477
test_context.flush_metrics();
14681478

1469-
// Test that latest export has the same data as the previous one
1470-
assert_correct_export(&mut test_context, instrument_name);
1479+
if should_not_emit {
1480+
test_context.check_no_metrics();
1481+
} else {
1482+
// Test that latest export has the same data as the previous one
1483+
assert_correct_export(&mut test_context, instrument_name);
1484+
}
14711485

14721486
fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
14731487
match instrument_name {

opentelemetry-sdk/src/metrics/pipeline.rs

+16-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323

2424
use self::internal::AggregateFns;
2525

26-
use super::Aggregation;
26+
use super::{Aggregation, Temporality};
2727

2828
/// Connects all of the instruments created by a meter provider to a [MetricReader].
2929
///
@@ -488,9 +488,20 @@ fn aggregate_fn<T: Number>(
488488
match agg {
489489
Aggregation::Default => aggregate_fn(b, &default_aggregation_selector(kind), kind),
490490
Aggregation::Drop => Ok(None),
491-
Aggregation::LastValue => Ok(Some(b.last_value())),
491+
Aggregation::LastValue => {
492+
match kind {
493+
InstrumentKind::Gauge => Ok(Some(b.last_value(None))),
494+
// temporality for LastValue only affects how data points are reported, so we can always use
495+
// delta temporality, because observable instruments should report data points only since previous collection
496+
InstrumentKind::ObservableGauge => Ok(Some(b.last_value(Some(Temporality::Delta)))),
497+
_ => Err(MetricError::Other(format!("LastValue aggregation is only available for Gauge or ObservableGauge, but not for {kind:?}")))
498+
}
499+
}
492500
Aggregation::Sum => {
493501
let fns = match kind {
502+
// TODO implement: observable instruments should not report data points on every collect
503+
// from SDK: For asynchronous instruments with Delta or Cumulative aggregation temporality,
504+
// MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection
494505
InstrumentKind::ObservableCounter => b.precomputed_sum(true),
495506
InstrumentKind::ObservableUpDownCounter => b.precomputed_sum(false),
496507
InstrumentKind::Counter | InstrumentKind::Histogram => b.sum(true),
@@ -508,6 +519,9 @@ fn aggregate_fn<T: Number>(
508519
| InstrumentKind::ObservableUpDownCounter
509520
| InstrumentKind::ObservableGauge
510521
);
522+
// TODO implement: observable instruments should not report data points on every collect
523+
// from SDK: For asynchronous instruments with Delta or Cumulative aggregation temporality,
524+
// MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection
511525
Ok(Some(b.explicit_bucket_histogram(
512526
boundaries.to_vec(),
513527
*record_min_max,

0 commit comments

Comments
 (0)