Skip to content

Use dedicated ShutdownResult for Metric SDK shutdown #2573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions examples/metrics-basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
async fn main() {
// Initialize the MeterProvider with the stdout Exporter.
let meter_provider = init_meter_provider();

Expand Down Expand Up @@ -140,6 +140,32 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Metrics are exported by default every 30 seconds when using stdout exporter,
// however shutting down the MeterProvider here instantly flushes
// the metrics, instead of waiting for the 30 sec interval.
meter_provider.shutdown()?;
Ok(())
let shutdown_result = meter_provider.shutdown();

// Demonstrate handling the shutdown result.
match shutdown_result {
Ok(_) => println!("MeterProvider shutdown successfully"),
Err(e) => {
match e {
opentelemetry_sdk::error::ShutdownError::InternalFailure(e) => {
// This indicates some failure during shutdown.
// Not much to do here, but log the error.
// So users at least know something went wrong,
// and possibly explain why some metrics were not exported.
println!("MeterProvider shutdown failed: {}", e)
}
opentelemetry_sdk::error::ShutdownError::AlreadyShutdown => {
// This indicates some user code tried to shutdown elsewhere.
// user need to review their code to ensure shutdown is called only once.
println!("MeterProvider already shutdown")
}
opentelemetry_sdk::error::ShutdownError::Timeout(e) => {
// This indicates the shutdown timed out, and a good
// hint to user to increase the timeout or even retry.
// (Shutdown method does not allow custom timeout today, but that is temporary)
println!("MeterProvider shutdown timed out after {:?}", e)
}
}
}
}
}
13 changes: 11 additions & 2 deletions opentelemetry-otlp/src/exporter/http/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use async_trait::async_trait;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::otel_debug;
use opentelemetry_sdk::error::{ShutdownError, ShutdownResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::{MetricError, MetricResult};

Expand Down Expand Up @@ -43,8 +44,16 @@
Ok(())
}

fn shutdown(&self) -> MetricResult<()> {
let _ = self.client.lock()?.take();
fn shutdown(&self) -> ShutdownResult {
self.client
.lock()
.map_err(|e| {
ShutdownError::InternalFailure(format!(
"Internal Error. Failed to acquire lock: {}",
e
))
})?
.take();

Check warning on line 56 in opentelemetry-otlp/src/exporter/http/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/metrics.rs#L47-L56

Added lines #L47 - L56 were not covered by tests

Ok(())
}
Expand Down
13 changes: 11 additions & 2 deletions opentelemetry-otlp/src/exporter/tonic/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use opentelemetry_proto::tonic::collector::metrics::v1::{
metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
};
use opentelemetry_sdk::error::{ShutdownError, ShutdownResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::{MetricError, MetricResult};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
Expand Down Expand Up @@ -89,8 +90,16 @@
Ok(())
}

fn shutdown(&self) -> MetricResult<()> {
let _ = self.inner.lock()?.take();
fn shutdown(&self) -> ShutdownResult {
self.inner
.lock()
.map_err(|e| {
ShutdownError::InternalFailure(format!(
"Internal Error. Failed to acquire lock: {}",
e
))
})?
.take();

Check warning on line 102 in opentelemetry-otlp/src/exporter/tonic/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/metrics.rs#L93-L102

Added lines #L93 - L102 were not covered by tests

Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use async_trait::async_trait;
use core::fmt;
use opentelemetry_sdk::error::ShutdownResult;
use opentelemetry_sdk::metrics::MetricResult;

use opentelemetry_sdk::metrics::{
Expand Down Expand Up @@ -123,7 +124,7 @@
#[async_trait]
pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>;
fn shutdown(&self) -> MetricResult<()>;
fn shutdown(&self) -> ShutdownResult;
}

/// Export metrics in OTEL format.
Expand All @@ -149,7 +150,7 @@
Ok(())
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {

Check warning on line 153 in opentelemetry-otlp/src/metric.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/metric.rs#L153

Added line #L153 was not covered by tests
self.client.shutdown()
}

Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-sdk/benches/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use opentelemetry::{
Key, KeyValue,
};
use opentelemetry_sdk::{
error::ShutdownResult,
metrics::{
data::ResourceMetrics, new_view, reader::MetricReader, Aggregation, Instrument,
InstrumentKind, ManualReader, MetricResult, Pipeline, SdkMeterProvider, Stream,
Expand All @@ -31,7 +32,7 @@ impl MetricReader for SharedReader {
self.0.force_flush()
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
self.0.shutdown()
}

Expand Down
38 changes: 38 additions & 0 deletions opentelemetry-sdk/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,45 @@
//! Wrapper for error from trace, logs and metrics part of open telemetry.

use std::{result::Result, time::Duration};

use thiserror::Error;

/// Trait for errors returned by exporters
pub trait ExportError: std::error::Error + Send + Sync + 'static {
/// The name of exporter that returned this error
fn exporter_name(&self) -> &'static str;
}

#[derive(Error, Debug)]
/// Errors that can occur during shutdown.
pub enum ShutdownError {
/// Shutdown has already been invoked.
///
/// While shutdown is idempotent and calling it multiple times has no
/// impact, this error suggests that another part of the application is
/// invoking `shutdown` earlier than intended. Users should review their
/// code to identify unintended or duplicate shutdown calls and ensure it is
/// only triggered once at the correct place.
#[error("Shutdown already invoked")]
AlreadyShutdown,

/// Shutdown timed out before completing.
///
/// This does not necessarily indicate a failure—shutdown may still be
/// complete. If this occurs frequently, consider increasing the timeout
/// duration to allow more time for completion.
#[error("Shutdown timed out after {0:?}")]
Timeout(Duration),

/// Shutdown failed due to an internal error.
///
/// The error message is intended for logging purposes only and should not
/// be used to make programmatic decisions. It is implementation-specific
/// and subject to change without notice. Consumers of this error should not
/// rely on its content beyond logging.
#[error("Shutdown failed: {0}")]
InternalFailure(String),
}

/// A specialized `Result` type for Shutdown operations.
pub type ShutdownResult = Result<(), ShutdownError>;
3 changes: 2 additions & 1 deletion opentelemetry-sdk/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Interfaces for exporting metrics
use async_trait::async_trait;

use crate::error::ShutdownResult;
use crate::metrics::MetricResult;

use crate::metrics::data::ResourceMetrics;
Expand All @@ -27,7 +28,7 @@ pub trait PushMetricExporter: Send + Sync + 'static {
///
/// After Shutdown is called, calls to Export will perform no operation and
/// instead will return an error indicating the shutdown state.
fn shutdown(&self) -> MetricResult<()>;
fn shutdown(&self) -> ShutdownResult;

/// Access the [Temporality] of the MetricExporter.
fn temporality(&self) -> Temporality;
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-sdk/src/metrics/in_memory_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::error::ShutdownResult;
use crate::metrics::data::{self, Gauge, Sum};
use crate::metrics::data::{Histogram, Metric, ResourceMetrics, ScopeMetrics};
use crate::metrics::exporter::PushMetricExporter;
Expand Down Expand Up @@ -277,7 +278,7 @@ impl PushMetricExporter for InMemoryMetricExporter {
Ok(()) // In this implementation, flush does nothing
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
Ok(())
}

Expand Down
11 changes: 8 additions & 3 deletions opentelemetry-sdk/src/metrics/manual_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

use opentelemetry::otel_debug;

use crate::metrics::{MetricError, MetricResult, Temporality};
use crate::{
error::{ShutdownError, ShutdownResult},
metrics::{MetricError, MetricResult, Temporality},
};

use super::{
data::ResourceMetrics,
Expand Down Expand Up @@ -107,8 +110,10 @@
}

/// Closes any connections and frees any resources used by the reader.
fn shutdown(&self) -> MetricResult<()> {
let mut inner = self.inner.lock()?;
fn shutdown(&self) -> ShutdownResult {
let mut inner = self.inner.lock().map_err(|e| {
ShutdownError::InternalFailure(format!("Internal Error. Failed to acquire lock: {}", e))
})?;

Check warning on line 116 in opentelemetry-sdk/src/metrics/manual_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/manual_reader.rs#L113-L116

Added lines #L113 - L116 were not covered by tests

// Any future call to collect will now return an error.
inner.sdk_producer = None;
Expand Down
13 changes: 7 additions & 6 deletions opentelemetry-sdk/src/metrics/meter_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ use opentelemetry::{
otel_debug, otel_error, otel_info, InstrumentationScope,
};

use crate::metrics::{MetricError, MetricResult};
use crate::Resource;
use crate::{
error::ShutdownResult,
metrics::{MetricError, MetricResult},
};

use super::{
meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines, reader::MetricReader, view::View,
Expand Down Expand Up @@ -108,7 +111,7 @@ impl SdkMeterProvider {
///
/// There is no guaranteed that all telemetry be flushed or all resources have
/// been released on error.
pub fn shutdown(&self) -> MetricResult<()> {
pub fn shutdown(&self) -> ShutdownResult {
otel_info!(
name: "MeterProvider.Shutdown",
message = "User initiated shutdown of MeterProvider."
Expand All @@ -131,15 +134,13 @@ impl SdkMeterProviderInner {
}
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
if self
.shutdown_invoked
.swap(true, std::sync::atomic::Ordering::SeqCst)
{
// If the previous value was true, shutdown was already invoked.
Err(MetricError::Other(
"MeterProvider shutdown already invoked.".into(),
))
Err(crate::error::ShutdownError::AlreadyShutdown)
} else {
self.pipes.shutdown()
}
Expand Down
27 changes: 14 additions & 13 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn};

use crate::{
error::{ShutdownError, ShutdownResult},
metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
Resource,
};
Expand Down Expand Up @@ -399,27 +400,27 @@
}
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
// TODO: See if this is better to be created upfront.
let (response_tx, response_rx) = mpsc::channel();
self.message_sender
.send(Message::Shutdown(response_tx))
.map_err(|e| MetricError::Other(e.to_string()))?;
.map_err(|e| ShutdownError::InternalFailure(e.to_string()))?;

// TODO: Make this timeout configurable.
match response_rx.recv_timeout(Duration::from_secs(5)) {
Ok(response) => {
if response {
Ok(())
} else {
Err(MetricError::Other("Failed to shutdown".into()))
Err(ShutdownError::InternalFailure("Failed to shutdown".into()))

Check warning on line 416 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L416

Added line #L416 was not covered by tests
}
}
Err(mpsc::RecvTimeoutError::Timeout) => Err(MetricError::Other(
"Failed to shutdown due to Timeout".into(),
)),
Err(mpsc::RecvTimeoutError::Timeout) => {
Err(ShutdownError::Timeout(Duration::from_secs(5)))

Check warning on line 420 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L420

Added line #L420 was not covered by tests
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
Err(MetricError::Other("Failed to shutdown".into()))
Err(ShutdownError::InternalFailure("Failed to shutdown".into()))

Check warning on line 423 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L423

Added line #L423 was not covered by tests
}
}
}
Expand Down Expand Up @@ -448,7 +449,7 @@
// completion, and avoid blocking the thread. The default shutdown on drop
// can still use blocking call. If user already explicitly called shutdown,
// drop won't call shutdown again.
fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
self.inner.shutdown()
}

Expand All @@ -468,10 +469,10 @@
mod tests {
use super::PeriodicReader;
use crate::{
metrics::InMemoryMetricExporter,
error::ShutdownResult,
metrics::{
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader, MetricError,
MetricResult, SdkMeterProvider, Temporality,
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
InMemoryMetricExporter, MetricError, MetricResult, SdkMeterProvider, Temporality,
},
Resource,
};
Expand Down Expand Up @@ -521,7 +522,7 @@
Ok(())
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
Ok(())
}

Expand All @@ -545,7 +546,7 @@
Ok(())
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
self.is_shutdown.store(true, Ordering::Relaxed);
Ok(())
}
Expand Down
Loading