diff --git a/opentelemetry-appender-tracing/benches/log-attributes.rs b/opentelemetry-appender-tracing/benches/log-attributes.rs index 397fca0443..ec344d2104 100644 --- a/opentelemetry-appender-tracing/benches/log-attributes.rs +++ b/opentelemetry-appender-tracing/benches/log-attributes.rs @@ -21,7 +21,6 @@ | otel_11_attributes | 625 ns | +106 ns | // vec! initial capacity is 5. 11th attribute causes vec! to be reallocated | otel_12_attributes | 676 ns | +51 ns | */ - use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::InstrumentationScope; use opentelemetry_appender_tracing::layer as tracing_layer; @@ -30,6 +29,7 @@ use opentelemetry_sdk::logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider}; use opentelemetry_sdk::Resource; #[cfg(not(target_os = "windows"))] use pprof::criterion::{Output, PProfProfiler}; +use std::time::Duration; use tracing::error; use tracing_subscriber::prelude::*; use tracing_subscriber::Registry; @@ -44,7 +44,7 @@ impl LogProcessor for NoopProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index ff5b8530f0..2ba92fc0b6 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -22,7 +22,6 @@ | ot_layer_disabled | 12 ns | | ot_layer_enabled | 186 ns | */ - use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::InstrumentationScope; use opentelemetry_appender_tracing::layer as tracing_layer; @@ -31,6 +30,7 @@ use opentelemetry_sdk::logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider}; use opentelemetry_sdk::Resource; #[cfg(not(target_os = "windows"))] use pprof::criterion::{Output, PProfProfiler}; +use std::time::Duration; use tracing::error; use tracing_subscriber::prelude::*; use tracing_subscriber::Layer; @@ -54,7 +54,7 @@ impl LogProcessor for NoopProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index 6a15bc9dba..b61d86fb75 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -232,6 +232,7 @@ mod tests { use opentelemetry_sdk::logs::{LogBatch, LogExporter}; use opentelemetry_sdk::logs::{SdkLogRecord, SdkLoggerProvider}; use opentelemetry_sdk::trace::{Sampler, SdkTracerProvider}; + use std::time::Duration; use tracing::{error, warn}; use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; @@ -821,7 +822,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index f1a992fc9a..a206c18cb2 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -229,6 +229,7 @@ mod tests { use opentelemetry_sdk::logs::LogProcessor; use opentelemetry_sdk::logs::SdkLoggerProvider; use opentelemetry_sdk::{logs::LogBatch, logs::SdkLogRecord, Resource}; + use std::time; #[derive(Debug)] struct MockProcessor; @@ -240,7 +241,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: time::Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/benches/log.rs b/opentelemetry-sdk/benches/log.rs index fe78faabdd..39cf35c6db 100644 --- a/opentelemetry-sdk/benches/log.rs +++ b/opentelemetry-sdk/benches/log.rs @@ -17,6 +17,7 @@ RAM: 64.0 GB use opentelemetry::time::now; use std::collections::HashMap; +use std::time::Duration; use criterion::{criterion_group, criterion_main, Criterion}; @@ -38,7 +39,7 @@ impl LogProcessor for NoopProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index 5fd4ac40dc..520b0f21c0 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -13,6 +13,7 @@ use opentelemetry::time::now; use opentelemetry_sdk::error::OTelSdkResult; use std::sync::Mutex; +use std::time::Duration; use criterion::{criterion_group, criterion_main, Criterion}; @@ -73,7 +74,7 @@ impl LogProcessor for ExportingProcessorWithFuture { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } @@ -104,7 +105,7 @@ impl LogProcessor for ExportingProcessorWithoutFuture { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/benches/log_processor.rs b/opentelemetry-sdk/benches/log_processor.rs index b2b53aa013..242e4911d0 100644 --- a/opentelemetry-sdk/benches/log_processor.rs +++ b/opentelemetry-sdk/benches/log_processor.rs @@ -15,6 +15,7 @@ use opentelemetry::time::now; use std::{ sync::{Arc, Mutex}, thread::sleep, + time::Duration, }; use criterion::{criterion_group, criterion_main, Criterion}; @@ -54,7 +55,7 @@ impl LogProcessor for NoopProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } @@ -71,7 +72,7 @@ impl LogProcessor for CloningProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } @@ -117,7 +118,7 @@ impl LogProcessor for SendToChannelProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/benches/metric.rs b/opentelemetry-sdk/benches/metric.rs index 3706998e2c..867ca095bb 100644 --- a/opentelemetry-sdk/benches/metric.rs +++ b/opentelemetry-sdk/benches/metric.rs @@ -1,6 +1,3 @@ -use rand::Rng; -use std::sync::{Arc, Weak}; - use criterion::{criterion_group, criterion_main, Bencher, Criterion}; use opentelemetry::{ metrics::{Counter, Histogram, MeterProvider as _}, @@ -15,6 +12,9 @@ use opentelemetry_sdk::{ }, Resource, }; +use rand::Rng; +use std::sync::{Arc, Weak}; +use std::time::Duration; #[derive(Clone, Debug)] struct SharedReader(Arc); @@ -32,8 +32,8 @@ impl MetricReader for SharedReader { self.0.force_flush() } - fn shutdown(&self) -> OTelSdkResult { - self.0.shutdown() + fn shutdown(&self, timeout: Duration) -> OTelSdkResult { + self.0.shutdown(timeout) } fn temporality(&self, kind: InstrumentKind) -> Temporality { diff --git a/opentelemetry-sdk/src/logs/batch_log_processor.rs b/opentelemetry-sdk/src/logs/batch_log_processor.rs index 34825bd775..bf73d46b5e 100644 --- a/opentelemetry-sdk/src/logs/batch_log_processor.rs +++ b/opentelemetry-sdk/src/logs/batch_log_processor.rs @@ -132,7 +132,6 @@ pub struct BatchLogProcessor { message_sender: SyncSender, // Control channel to store control messages for the worker thread handle: Mutex>>, forceflush_timeout: Duration, - shutdown_timeout: Duration, export_log_message_sent: Arc, current_batch_size: Arc, max_export_batch_size: usize, @@ -256,7 +255,7 @@ impl LogProcessor for BatchLogProcessor { } } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, timeout: Duration) -> OTelSdkResult { let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); let max_queue_size = self.max_queue_size; if dropped_logs > 0 { @@ -272,7 +271,7 @@ impl LogProcessor for BatchLogProcessor { match self.message_sender.try_send(BatchMessage::Shutdown(sender)) { Ok(_) => { receiver - .recv_timeout(self.shutdown_timeout) + .recv_timeout(timeout) .map(|_| { // join the background thread after receiving back the // shutdown signal @@ -287,7 +286,7 @@ impl LogProcessor for BatchLogProcessor { name: "BatchLogProcessor.Shutdown.Timeout", message = "BatchLogProcessor shutdown timing out." ); - OTelSdkError::Timeout(self.shutdown_timeout) + OTelSdkError::Timeout(timeout) } _ => { otel_error!( @@ -488,7 +487,6 @@ impl BatchLogProcessor { message_sender, handle: Mutex::new(Some(handle)), forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable - shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable dropped_logs_count: AtomicUsize::new(0), max_queue_size, export_log_message_sent: Arc::new(AtomicBool::new(false)), @@ -961,7 +959,7 @@ mod tests { processor.emit(&mut record, &instrumentation); processor.force_flush().unwrap(); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); // todo: expect to see errors here. How should we assert this? processor.emit(&mut record, &instrumentation); assert_eq!(1, exporter.get_emitted_logs().unwrap().len()); @@ -973,27 +971,27 @@ mod tests { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "current_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "multi_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "multi_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index bffbbb0bb5..900b752c1d 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -34,6 +34,7 @@ use opentelemetry::logs::Severity; use opentelemetry::InstrumentationScope; use std::fmt::Debug; +use std::time::Duration; /// The interface for plugging into a [`SdkLogger`]. /// @@ -56,7 +57,7 @@ pub trait LogProcessor: Send + Sync + Debug { /// Shuts down the processor. /// After shutdown returns the log processor should stop processing any logs. /// It's up to the implementation on when to drop the LogProcessor. - fn shutdown(&self) -> OTelSdkResult; + fn shutdown(&self, timeout: Duration) -> OTelSdkResult; #[cfg(feature = "spec_unstable_logs_enabled")] /// Check if logging is enabled fn event_enabled(&self, _level: Severity, _target: &str, _name: Option<&str>) -> bool { @@ -81,6 +82,7 @@ pub(crate) mod tests { use opentelemetry::logs::{Logger, LoggerProvider}; use opentelemetry::{InstrumentationScope, Key}; use std::sync::{Arc, Mutex}; + use std::time::Duration; #[derive(Debug, Clone)] pub(crate) struct MockLogExporter { @@ -138,7 +140,7 @@ pub(crate) mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } @@ -168,7 +170,7 @@ pub(crate) mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs index f86a0d27af..1a83f4f4c9 100644 --- a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs @@ -87,7 +87,7 @@ impl LogProcessor for BatchLogProcessor { .and_then(std::convert::identity) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); let max_queue_size = self.max_queue_size; if dropped_logs > 0 { @@ -546,7 +546,7 @@ mod tests { processor.emit(&mut record, &instrumentation); processor.force_flush().unwrap(); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); // todo: expect to see errors here. How should we assert this? processor.emit(&mut record, &instrumentation); assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) @@ -561,7 +561,7 @@ mod tests { runtime::TokioCurrentThread, ); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "current_thread")] @@ -578,7 +578,7 @@ mod tests { // // deadlock happens in shutdown with tokio current_thread runtime // - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "current_thread")] @@ -589,7 +589,7 @@ mod tests { BatchConfig::default(), runtime::TokioCurrentThread, ); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "multi_thread")] @@ -597,7 +597,7 @@ mod tests { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "multi_thread")] @@ -605,7 +605,7 @@ mod tests { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[derive(Debug)] @@ -633,7 +633,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } @@ -663,7 +663,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } } @@ -808,7 +808,7 @@ mod tests { processor.emit(&mut record, &instrumentation); processor.force_flush().unwrap(); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); // todo: expect to see errors here. How should we assert this? processor.emit(&mut record, &instrumentation); assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) @@ -824,7 +824,7 @@ mod tests { // // deadlock happens in shutdown with tokio current_thread runtime // - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "current_thread")] @@ -837,7 +837,7 @@ mod tests { runtime::TokioCurrentThread, ); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "multi_thread")] @@ -846,7 +846,7 @@ mod tests { let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } #[tokio::test(flavor = "multi_thread")] @@ -858,6 +858,6 @@ mod tests { runtime::TokioCurrentThread, ); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); } } diff --git a/opentelemetry-sdk/src/logs/logger_provider.rs b/opentelemetry-sdk/src/logs/logger_provider.rs index 1bc61bd65b..56f255913c 100644 --- a/opentelemetry-sdk/src/logs/logger_provider.rs +++ b/opentelemetry-sdk/src/logs/logger_provider.rs @@ -3,6 +3,7 @@ use crate::error::{OTelSdkError, OTelSdkResult}; use crate::logs::LogExporter; use crate::Resource; use opentelemetry::{otel_debug, otel_info, InstrumentationScope}; +use std::time::Duration; use std::{ borrow::Cow, sync::{ @@ -100,8 +101,13 @@ impl SdkLoggerProvider { } } - /// Shuts down this `LoggerProvider` + /// Shuts down this `LoggerProvider` with default timeout. pub fn shutdown(&self) -> OTelSdkResult { + self.shutdown_with_timeout(Duration::from_secs(5)) // TODO: make this configurable + } + + /// Shuts down this `LoggerProvider` with a timeout. + pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { otel_debug!( name: "LoggerProvider.ShutdownInvokedByUser", ); @@ -112,7 +118,7 @@ impl SdkLoggerProvider { .is_ok() { // propagate the shutdown signal to processors - let result = self.inner.shutdown(); + let result = self.inner.shutdown(timeout); if result.iter().all(|res| res.is_ok()) { Ok(()) } else { @@ -139,10 +145,10 @@ struct LoggerProviderInner { impl LoggerProviderInner { /// Shuts down the `LoggerProviderInner` and returns any errors. - pub(crate) fn shutdown(&self) -> Vec { + pub(crate) fn shutdown(&self, timeout: Duration) -> Vec { let mut results = vec![]; for processor in &self.processors { - let result = processor.shutdown(); + let result = processor.shutdown(timeout); if let Err(err) = &result { // Log at debug level because: // - The error is also returned to the user for handling (if applicable) @@ -164,7 +170,7 @@ impl Drop for LoggerProviderInner { name: "LoggerProvider.Drop", message = "Last reference of LoggerProvider dropped, initiating shutdown." ); - let _ = self.shutdown(); // errors are handled within shutdown + let _ = self.shutdown(Duration::from_secs(5)); // errors are handled within shutdown } else { otel_debug!( name: "LoggerProvider.Drop.AlreadyShutdown", @@ -338,7 +344,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { self.is_shutdown .lock() .map(|mut is_shutdown| *is_shutdown = true) @@ -783,7 +789,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { *self.shutdown_called.lock().unwrap() = true; Ok(()) } @@ -814,7 +820,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { let mut count = self.shutdown_count.lock().unwrap(); *count += 1; Ok(()) diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 0da96bb730..6ff036aa64 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -42,6 +42,7 @@ mod tests { use opentelemetry::{Context, InstrumentationScope}; use std::borrow::Borrow; use std::collections::HashMap; + use std::time::Duration; #[test] fn logging_sdk_test() { @@ -167,7 +168,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> crate::error::OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> crate::error::OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/src/logs/simple_log_processor.rs b/opentelemetry-sdk/src/logs/simple_log_processor.rs index 3ff9a59287..587905c615 100644 --- a/opentelemetry-sdk/src/logs/simple_log_processor.rs +++ b/opentelemetry-sdk/src/logs/simple_log_processor.rs @@ -28,6 +28,7 @@ use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope}; use std::fmt::Debug; use std::sync::atomic::AtomicBool; use std::sync::Mutex; +use std::time::Duration; /// A [`LogProcessor`] designed for testing and debugging purpose, that immediately /// exports log records as they are emitted. Log records are exported synchronously @@ -114,7 +115,7 @@ impl LogProcessor for SimpleLogProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { self.is_shutdown .store(true, std::sync::atomic::Ordering::Relaxed); if let Ok(exporter) = self.exporter.lock() { @@ -214,7 +215,7 @@ mod tests { processor.emit(&mut record, &instrumentation); - processor.shutdown().unwrap(); + processor.shutdown(Duration::from_secs(5)).unwrap(); let is_shutdown = processor .is_shutdown diff --git a/opentelemetry-sdk/src/metrics/manual_reader.rs b/opentelemetry-sdk/src/metrics/manual_reader.rs index 9a9f8915ae..ecb1585d0c 100644 --- a/opentelemetry-sdk/src/metrics/manual_reader.rs +++ b/opentelemetry-sdk/src/metrics/manual_reader.rs @@ -1,10 +1,10 @@ +use opentelemetry::otel_debug; +use std::time::Duration; use std::{ fmt, sync::{Mutex, Weak}, }; -use opentelemetry::otel_debug; - use crate::{ error::{OTelSdkError, OTelSdkResult}, metrics::{MetricError, MetricResult, Temporality}, @@ -110,7 +110,7 @@ impl MetricReader for ManualReader { } /// Closes any connections and frees any resources used by the reader. - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { let mut inner = self .inner .lock() diff --git a/opentelemetry-sdk/src/metrics/meter_provider.rs b/opentelemetry-sdk/src/metrics/meter_provider.rs index a1cf2f9dec..c56a3f2297 100644 --- a/opentelemetry-sdk/src/metrics/meter_provider.rs +++ b/opentelemetry-sdk/src/metrics/meter_provider.rs @@ -1,4 +1,9 @@ use core::fmt; +use opentelemetry::{ + metrics::{Meter, MeterProvider}, + otel_debug, otel_error, otel_info, InstrumentationScope, +}; +use std::time::Duration; use std::{ collections::HashMap, sync::{ @@ -7,11 +12,6 @@ use std::{ }, }; -use opentelemetry::{ - metrics::{Meter, MeterProvider}, - otel_debug, otel_error, otel_info, InstrumentationScope, -}; - use crate::error::OTelSdkResult; use crate::Resource; @@ -109,12 +109,16 @@ impl SdkMeterProvider { /// /// There is no guaranteed that all telemetry be flushed or all resources have /// been released on error. - pub fn shutdown(&self) -> OTelSdkResult { + pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { otel_debug!( name: "MeterProvider.Shutdown", message = "User initiated shutdown of MeterProvider." ); - self.inner.shutdown() + self.inner.shutdown(timeout) + } + /// Shuts down with Default timeout of 5 seconds. + pub fn shutdown(&self) -> OTelSdkResult { + self.shutdown_with_timeout(Duration::from_secs(5)) } } @@ -130,7 +134,7 @@ impl SdkMeterProviderInner { } } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, timeout: Duration) -> OTelSdkResult { if self .shutdown_invoked .swap(true, std::sync::atomic::Ordering::SeqCst) @@ -138,7 +142,7 @@ impl SdkMeterProviderInner { // If the previous value was true, shutdown was already invoked. Err(crate::error::OTelSdkError::AlreadyShutdown) } else { - self.pipes.shutdown() + self.pipes.shutdown(timeout) } } } @@ -157,7 +161,7 @@ impl Drop for SdkMeterProviderInner { name: "MeterProvider.Drop", message = "Last reference of MeterProvider dropped, initiating shutdown." ); - if let Err(err) = self.shutdown() { + if let Err(err) = self.shutdown(Duration::from_secs(5)) { otel_error!( name: "MeterProvider.Drop.ShutdownFailed", message = "Shutdown attempt failed during drop of MeterProvider.", diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 1e9f5bd16f..b50737b603 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -441,7 +441,7 @@ impl PeriodicReaderInner { } } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, timeout: Duration) -> OTelSdkResult { // TODO: See if this is better to be created upfront. let (response_tx, response_rx) = mpsc::channel(); self.message_sender @@ -449,7 +449,7 @@ impl PeriodicReaderInner { .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; // TODO: Make this timeout configurable. - match response_rx.recv_timeout(Duration::from_secs(5)) { + match response_rx.recv_timeout(timeout) { Ok(response) => { if response { Ok(()) @@ -457,9 +457,7 @@ impl PeriodicReaderInner { Err(OTelSdkError::InternalFailure("Failed to shutdown".into())) } } - Err(mpsc::RecvTimeoutError::Timeout) => { - Err(OTelSdkError::Timeout(Duration::from_secs(5))) - } + Err(mpsc::RecvTimeoutError::Timeout) => Err(OTelSdkError::Timeout(timeout)), Err(mpsc::RecvTimeoutError::Disconnected) => { Err(OTelSdkError::InternalFailure("Failed to shutdown".into())) } @@ -490,8 +488,8 @@ impl MetricReader for PeriodicReader { // completion, and avoid blocking the thread. The default shutdown on drop // can still use blocking call. If user already explicitly called shutdown, // drop won't call shutdown again. - fn shutdown(&self) -> OTelSdkResult { - self.inner.shutdown() + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { + self.inner.shutdown(_timeout) } /// To construct a [MetricReader][metric-reader] when setting up an SDK, diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs index 77c18f76e8..58d9937e6f 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs @@ -390,7 +390,7 @@ impl MetricReader for PeriodicReader { .and_then(|res| res) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { let mut inner = self .inner .lock() diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index d8c9429c51..f3f28cb708 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -1,12 +1,12 @@ use core::fmt; +use opentelemetry::{otel_debug, InstrumentationScope, KeyValue}; +use std::time::Duration; use std::{ borrow::Cow, collections::{HashMap, HashSet}, sync::{Arc, Mutex}, }; -use opentelemetry::{otel_debug, InstrumentationScope, KeyValue}; - use crate::{ error::{OTelSdkError, OTelSdkResult}, metrics::{ @@ -95,8 +95,8 @@ impl Pipeline { } /// Shut down pipeline - fn shutdown(&self) -> OTelSdkResult { - self.reader.shutdown() + fn shutdown(&self, timeout: Duration) -> OTelSdkResult { + self.reader.shutdown(timeout) } } @@ -664,10 +664,10 @@ impl Pipelines { } /// Shut down all pipelines - pub(crate) fn shutdown(&self) -> OTelSdkResult { + pub(crate) fn shutdown(&self, timeout: Duration) -> OTelSdkResult { let mut errs = vec![]; for pipeline in &self.0 { - if let Err(err) = pipeline.shutdown() { + if let Err(err) = pipeline.shutdown(timeout) { errs.push(err); } } diff --git a/opentelemetry-sdk/src/metrics/reader.rs b/opentelemetry-sdk/src/metrics/reader.rs index 04710bdd41..ba4f6056f9 100644 --- a/opentelemetry-sdk/src/metrics/reader.rs +++ b/opentelemetry-sdk/src/metrics/reader.rs @@ -1,7 +1,7 @@ //! Interfaces for reading and producing metrics -use std::{fmt, sync::Weak}; - use crate::{error::OTelSdkResult, metrics::MetricResult}; +use std::time::Duration; +use std::{fmt, sync::Weak}; use super::{data::ResourceMetrics, pipeline::Pipeline, InstrumentKind, Temporality}; @@ -46,7 +46,7 @@ pub trait MetricReader: fmt::Debug + Send + Sync + 'static { /// /// After `shutdown` is called, calls to `collect` will perform no operation and /// instead will return an error indicating the shutdown state. - fn shutdown(&self) -> OTelSdkResult; + fn shutdown(&self, timeout: Duration) -> OTelSdkResult; /// The output temporality, a function of instrument kind. /// This SHOULD be obtained from the exporter. diff --git a/opentelemetry-sdk/src/testing/metrics/metric_reader.rs b/opentelemetry-sdk/src/testing/metrics/metric_reader.rs index 041eebfddb..ef62e41f03 100644 --- a/opentelemetry-sdk/src/testing/metrics/metric_reader.rs +++ b/opentelemetry-sdk/src/testing/metrics/metric_reader.rs @@ -1,10 +1,10 @@ -use std::sync::{Arc, Mutex, Weak}; - use crate::error::{OTelSdkError, OTelSdkResult}; use crate::metrics::{ data::ResourceMetrics, pipeline::Pipeline, reader::MetricReader, InstrumentKind, }; use crate::metrics::{MetricResult, Temporality}; +use std::sync::{Arc, Mutex, Weak}; +use std::time::Duration; #[derive(Debug, Clone)] pub struct TestMetricReader { @@ -42,7 +42,7 @@ impl MetricReader for TestMetricReader { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { let result = self.force_flush(); { let mut is_shutdown = self.is_shutdown.lock().unwrap(); diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 1f57892380..24c242c11d 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -14,6 +14,8 @@ ~1.1 B/sec (when disabled) */ +use std::time::Duration; + use opentelemetry::InstrumentationScope; use opentelemetry_appender_tracing::layer; use opentelemetry_sdk::error::OTelSdkResult; @@ -54,7 +56,7 @@ impl LogProcessor for MockLogProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) }