Skip to content

Commit e016808

Browse files
authored
Merge branch 'main' into re-export-tonic
2 parents a46b881 + bc82d4f commit e016808

File tree

10 files changed

+80
-47
lines changed

10 files changed

+80
-47
lines changed

opentelemetry-sdk/CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ also modified to suppress telemetry before invoking exporters.
2121
- TODO/Placeholder: Add ability to configure cardinality limits via Instrument
2222
advisory.
2323

24+
- *Breaking* change for custom `MetricReader` authors.
25+
The `shutdown_with_timeout` method is added to `MetricReader` trait.
26+
`collect` method on `MetricReader` modified to return `OTelSdkResult`.
27+
2428
## 0.29.0
2529

2630
Released 2025-Mar-21

opentelemetry-sdk/benches/metric.rs

+6-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
use rand::Rng;
2-
use std::sync::{Arc, Weak};
3-
41
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
52
use opentelemetry::{
63
metrics::{Counter, Histogram, MeterProvider as _},
@@ -10,11 +7,13 @@ use opentelemetry_sdk::{
107
error::OTelSdkResult,
118
metrics::{
129
data::ResourceMetrics, new_view, reader::MetricReader, Aggregation, Instrument,
13-
InstrumentKind, ManualReader, MetricResult, Pipeline, SdkMeterProvider, Stream,
14-
Temporality, View,
10+
InstrumentKind, ManualReader, Pipeline, SdkMeterProvider, Stream, Temporality, View,
1511
},
1612
Resource,
1713
};
14+
use rand::Rng;
15+
use std::sync::{Arc, Weak};
16+
use std::time::Duration;
1817

1918
#[derive(Clone, Debug)]
2019
struct SharedReader(Arc<dyn MetricReader>);
@@ -24,15 +23,15 @@ impl MetricReader for SharedReader {
2423
self.0.register_pipeline(pipeline)
2524
}
2625

27-
fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
26+
fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
2827
self.0.collect(rm)
2928
}
3029

3130
fn force_flush(&self) -> OTelSdkResult {
3231
self.0.force_flush()
3332
}
3433

35-
fn shutdown(&self) -> OTelSdkResult {
34+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
3635
self.0.shutdown()
3736
}
3837

opentelemetry-sdk/src/metrics/manual_reader.rs

+11-7
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1+
use opentelemetry::otel_debug;
2+
use std::time::Duration;
13
use std::{
24
fmt,
35
sync::{Mutex, Weak},
46
};
57

6-
use opentelemetry::otel_debug;
7-
88
use crate::{
99
error::{OTelSdkError, OTelSdkResult},
10-
metrics::{MetricError, MetricResult, Temporality},
10+
metrics::Temporality,
1111
};
1212

1313
use super::{
@@ -90,12 +90,16 @@ impl MetricReader for ManualReader {
9090
/// callbacks necessary and returning the results.
9191
///
9292
/// Returns an error if called after shutdown.
93-
fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
94-
let inner = self.inner.lock()?;
93+
fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
94+
let inner = self
95+
.inner
96+
.lock()
97+
.map_err(|_| OTelSdkError::InternalFailure("Failed to lock pipeline".into()))?;
98+
9599
match &inner.sdk_producer.as_ref().and_then(|w| w.upgrade()) {
96100
Some(producer) => producer.produce(rm)?,
97101
None => {
98-
return Err(MetricError::Other(
102+
return Err(OTelSdkError::InternalFailure(
99103
"reader is shut down or not registered".into(),
100104
))
101105
}
@@ -110,7 +114,7 @@ impl MetricReader for ManualReader {
110114
}
111115

112116
/// Closes any connections and frees any resources used by the reader.
113-
fn shutdown(&self) -> OTelSdkResult {
117+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
114118
let mut inner = self
115119
.inner
116120
.lock()

opentelemetry-sdk/src/metrics/meter_provider.rs

+16-7
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
use core::fmt;
2+
use opentelemetry::{
3+
metrics::{Meter, MeterProvider},
4+
otel_debug, otel_error, otel_info, InstrumentationScope,
5+
};
6+
use std::time::Duration;
27
use std::{
38
collections::HashMap,
49
sync::{
@@ -7,11 +12,6 @@ use std::{
712
},
813
};
914

10-
use opentelemetry::{
11-
metrics::{Meter, MeterProvider},
12-
otel_debug, otel_error, otel_info, InstrumentationScope,
13-
};
14-
1515
use crate::error::OTelSdkResult;
1616
use crate::Resource;
1717

@@ -109,13 +109,18 @@ impl SdkMeterProvider {
109109
///
110110
/// There is no guaranteed that all telemetry be flushed or all resources have
111111
/// been released on error.
112-
pub fn shutdown(&self) -> OTelSdkResult {
112+
pub fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
113113
otel_debug!(
114114
name: "MeterProvider.Shutdown",
115115
message = "User initiated shutdown of MeterProvider."
116116
);
117117
self.inner.shutdown()
118118
}
119+
120+
/// shutdown with default timeout
121+
pub fn shutdown(&self) -> OTelSdkResult {
122+
self.shutdown_with_timeout(Duration::from_secs(5))
123+
}
119124
}
120125

121126
impl SdkMeterProviderInner {
@@ -130,7 +135,7 @@ impl SdkMeterProviderInner {
130135
}
131136
}
132137

133-
fn shutdown(&self) -> OTelSdkResult {
138+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
134139
if self
135140
.shutdown_invoked
136141
.swap(true, std::sync::atomic::Ordering::SeqCst)
@@ -141,6 +146,10 @@ impl SdkMeterProviderInner {
141146
self.pipes.shutdown()
142147
}
143148
}
149+
150+
fn shutdown(&self) -> OTelSdkResult {
151+
self.shutdown_with_timeout(Duration::from_secs(5))
152+
}
144153
}
145154

146155
impl Drop for SdkMeterProviderInner {

opentelemetry-sdk/src/metrics/periodic_reader.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, Context};
1212

1313
use crate::{
1414
error::{OTelSdkError, OTelSdkResult},
15-
metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
15+
metrics::{exporter::PushMetricExporter, reader::SdkProducer},
1616
Resource,
1717
};
1818

@@ -357,11 +357,11 @@ impl<E: PushMetricExporter> PeriodicReaderInner<E> {
357357
self.exporter.temporality()
358358
}
359359

360-
fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
360+
fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
361361
let producer = self.producer.lock().expect("lock poisoned");
362362
if let Some(p) = producer.as_ref() {
363363
p.upgrade()
364-
.ok_or_else(|| MetricError::Other("pipeline is dropped".into()))?
364+
.ok_or(OTelSdkError::AlreadyShutdown)?
365365
.produce(rm)?;
366366
Ok(())
367367
} else {
@@ -371,7 +371,9 @@ impl<E: PushMetricExporter> PeriodicReaderInner<E> {
371371
This occurs when a periodic reader is created but not associated with a MeterProvider \
372372
by calling `.with_reader(reader)` on MeterProviderBuilder."
373373
);
374-
Err(MetricError::Other("MeterProvider is not registered".into()))
374+
Err(OTelSdkError::InternalFailure(
375+
"MeterProvider is not registered".into(),
376+
))
375377
}
376378
}
377379

@@ -479,7 +481,7 @@ impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
479481
self.inner.register_pipeline(pipeline);
480482
}
481483

482-
fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
484+
fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
483485
self.inner.collect(rm)
484486
}
485487

@@ -491,7 +493,7 @@ impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
491493
// completion, and avoid blocking the thread. The default shutdown on drop
492494
// can still use blocking call. If user already explicitly called shutdown,
493495
// drop won't call shutdown again.
494-
fn shutdown(&self) -> OTelSdkResult {
496+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
495497
self.inner.shutdown()
496498
}
497499

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

+14-8
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use opentelemetry::{otel_debug, otel_error};
1616
use crate::runtime::{to_interval_stream, Runtime};
1717
use crate::{
1818
error::{OTelSdkError, OTelSdkResult},
19-
metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
19+
metrics::{exporter::PushMetricExporter, reader::SdkProducer},
2020
Resource,
2121
};
2222

@@ -351,10 +351,14 @@ impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
351351
worker(self);
352352
}
353353

354-
fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
355-
let inner = self.inner.lock()?;
354+
fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
355+
let inner = self
356+
.inner
357+
.lock()
358+
.map_err(|_| OTelSdkError::InternalFailure("Failed to lock pipeline".into()))?;
359+
356360
if inner.is_shutdown {
357-
return Err(MetricError::Other("reader is shut down".into()));
361+
return Err(OTelSdkError::AlreadyShutdown);
358362
}
359363

360364
if let Some(producer) = match &inner.sdk_producer_or_worker {
@@ -363,7 +367,9 @@ impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
363367
} {
364368
producer.produce(rm)?;
365369
} else {
366-
return Err(MetricError::Other("reader is not registered".into()));
370+
return Err(OTelSdkError::InternalFailure(
371+
"reader is not registered".into(),
372+
));
367373
}
368374

369375
Ok(())
@@ -390,7 +396,7 @@ impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
390396
.and_then(|res| res)
391397
}
392398

393-
fn shutdown(&self) -> OTelSdkResult {
399+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
394400
let mut inner = self
395401
.inner
396402
.lock()
@@ -434,8 +440,8 @@ impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
434440
#[cfg(all(test, feature = "testing"))]
435441
mod tests {
436442
use super::PeriodicReader;
443+
use crate::error::OTelSdkError;
437444
use crate::metrics::reader::MetricReader;
438-
use crate::metrics::MetricError;
439445
use crate::{
440446
metrics::data::ResourceMetrics, metrics::InMemoryMetricExporter, metrics::SdkMeterProvider,
441447
runtime, Resource,
@@ -496,7 +502,7 @@ mod tests {
496502

497503
// Assert
498504
assert!(
499-
matches!(result.unwrap_err(), MetricError::Other(err) if err == "reader is not registered")
505+
matches!(result.unwrap_err(), OTelSdkError::InternalFailure(err) if err == "reader is not registered")
500506
);
501507
}
502508

opentelemetry-sdk/src/metrics/pipeline.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,11 @@ impl Pipeline {
100100

101101
impl SdkProducer for Pipeline {
102102
/// Returns aggregated metrics from a single collection.
103-
fn produce(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
104-
let inner = self.inner.lock()?;
103+
fn produce(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
104+
let inner = self
105+
.inner
106+
.lock()
107+
.map_err(|_| OTelSdkError::InternalFailure("Failed to lock pipeline".into()))?;
105108
otel_debug!(
106109
name: "MeterProviderInvokingObservableCallbacks",
107110
count = inner.callbacks.len(),

opentelemetry-sdk/src/metrics/reader.rs

+10-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
//! Interfaces for reading and producing metrics
2+
use crate::error::OTelSdkResult;
3+
use std::time::Duration;
24
use std::{fmt, sync::Weak};
35

4-
use crate::{error::OTelSdkResult, metrics::MetricResult};
5-
66
use super::{data::ResourceMetrics, pipeline::Pipeline, InstrumentKind, Temporality};
77

88
/// The interface used between the SDK and an exporter.
@@ -30,7 +30,7 @@ pub trait MetricReader: fmt::Debug + Send + Sync + 'static {
3030
/// SDK and stores it in the provided [ResourceMetrics] reference.
3131
///
3232
/// An error is returned if this is called after shutdown.
33-
fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()>;
33+
fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult;
3434

3535
/// Flushes all metric measurements held in an export pipeline.
3636
///
@@ -46,7 +46,12 @@ pub trait MetricReader: fmt::Debug + Send + Sync + 'static {
4646
///
4747
/// After `shutdown` is called, calls to `collect` will perform no operation and
4848
/// instead will return an error indicating the shutdown state.
49-
fn shutdown(&self) -> OTelSdkResult;
49+
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult;
50+
51+
/// shutdown with default timeout
52+
fn shutdown(&self) -> OTelSdkResult {
53+
self.shutdown_with_timeout(Duration::from_secs(5))
54+
}
5055

5156
/// The output temporality, a function of instrument kind.
5257
/// This SHOULD be obtained from the exporter.
@@ -58,5 +63,5 @@ pub trait MetricReader: fmt::Debug + Send + Sync + 'static {
5863
/// Produces metrics for a [MetricReader].
5964
pub(crate) trait SdkProducer: fmt::Debug + Send + Sync {
6065
/// Returns aggregated metrics from a single collection.
61-
fn produce(&self, rm: &mut ResourceMetrics) -> MetricResult<()>;
66+
fn produce(&self, rm: &mut ResourceMetrics) -> OTelSdkResult;
6267
}

opentelemetry-sdk/src/metrics/view.rs

+1
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ pub fn new_view(criteria: Instrument, mask: Stream) -> MetricResult<Box<dyn View
170170
}
171171

172172
#[cfg(test)]
173+
#[cfg(feature = "spec_unstable_metrics_views")]
173174
mod tests {
174175
use super::*;
175176
#[test]

opentelemetry-sdk/src/testing/metrics/metric_reader.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use std::sync::{Arc, Mutex, Weak};
2-
31
use crate::error::{OTelSdkError, OTelSdkResult};
2+
use crate::metrics::Temporality;
43
use crate::metrics::{
54
data::ResourceMetrics, pipeline::Pipeline, reader::MetricReader, InstrumentKind,
65
};
7-
use crate::metrics::{MetricResult, Temporality};
6+
use std::sync::{Arc, Mutex, Weak};
7+
use std::time::Duration;
88

99
#[derive(Debug, Clone)]
1010
pub struct TestMetricReader {
@@ -34,15 +34,15 @@ impl Default for TestMetricReader {
3434
impl MetricReader for TestMetricReader {
3535
fn register_pipeline(&self, _pipeline: Weak<Pipeline>) {}
3636

37-
fn collect(&self, _rm: &mut ResourceMetrics) -> MetricResult<()> {
37+
fn collect(&self, _rm: &mut ResourceMetrics) -> OTelSdkResult {
3838
Ok(())
3939
}
4040

4141
fn force_flush(&self) -> OTelSdkResult {
4242
Ok(())
4343
}
4444

45-
fn shutdown(&self) -> OTelSdkResult {
45+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
4646
let result = self.force_flush();
4747
{
4848
let mut is_shutdown = self.is_shutdown.lock().unwrap();

0 commit comments

Comments
 (0)