diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index a7aa09aac9..e8d27466e1 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -120,10 +120,6 @@ impl LogProcessor for EnrichWithBaggageLogProcessor { fn force_flush(&self) -> OTelSdkResult { Ok(()) } - - fn shutdown(&self) -> OTelSdkResult { - Ok(()) - } } /// A custom span processor that enriches spans with baggage attributes. Baggage diff --git a/opentelemetry-appender-tracing/benches/log-attributes.rs b/opentelemetry-appender-tracing/benches/log-attributes.rs index 26eb1849c6..aaf819b372 100644 --- a/opentelemetry-appender-tracing/benches/log-attributes.rs +++ b/opentelemetry-appender-tracing/benches/log-attributes.rs @@ -43,10 +43,6 @@ impl LogProcessor for NoopProcessor { fn force_flush(&self) -> OTelSdkResult { Ok(()) } - - fn shutdown(&self) -> OTelSdkResult { - Ok(()) - } } /// Creates a single benchmark for a specific number of attributes diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 759a3582bc..90b15235e7 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -54,10 +54,6 @@ impl LogProcessor for NoopProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { - Ok(()) - } - fn event_enabled( &self, _level: opentelemetry::logs::Severity, diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index 3958f9f4de..131b8fc816 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -877,10 +877,6 @@ mod tests { fn force_flush(&self) -> OTelSdkResult { Ok(()) } - - fn shutdown(&self) -> OTelSdkResult { - Ok(()) - } } #[cfg(feature = "spec_unstable_logs_enabled")] diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index f1a992fc9a..008ee4cc4d 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -239,10 +239,6 @@ mod tests { fn force_flush(&self) -> OTelSdkResult { Ok(()) } - - fn shutdown(&self) -> OTelSdkResult { - Ok(()) - } } fn create_test_log_data( diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index a61eab177b..9b1c8ecdf6 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -26,6 +26,7 @@ also modified to suppress telemetry before invoking exporters. instead of the string `"true"`. [#2878](https://github.com/open-telemetry/opentelemetry-rust/issues/2878) - The `shutdown_with_timeout` method is added to LogExporter trait. +- The `shutdown_with_timeout` method is added to LogProvider and LogProcessor trait. - *Breaking* `MetricError`, `MetricResult` no longer public (except when `spec_unstable_metrics_views` feature flag is enabled). `OTelSdkResult` should be used instead, wherever applicable. [#2906](https://github.com/open-telemetry/opentelemetry-rust/pull/2906) diff --git a/opentelemetry-sdk/src/logs/batch_log_processor.rs b/opentelemetry-sdk/src/logs/batch_log_processor.rs index 7f37914f28..758a038d5c 100644 --- a/opentelemetry-sdk/src/logs/batch_log_processor.rs +++ b/opentelemetry-sdk/src/logs/batch_log_processor.rs @@ -132,7 +132,6 @@ pub struct BatchLogProcessor { message_sender: SyncSender, // Control channel to store control messages for the worker thread handle: Mutex>>, forceflush_timeout: Duration, - shutdown_timeout: Duration, export_log_message_sent: Arc, current_batch_size: Arc, max_export_batch_size: usize, @@ -256,7 +255,7 @@ impl LogProcessor for BatchLogProcessor { } } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); let max_queue_size = self.max_queue_size; if dropped_logs > 0 { @@ -272,7 +271,7 @@ impl LogProcessor for BatchLogProcessor { match self.message_sender.try_send(BatchMessage::Shutdown(sender)) { Ok(_) => { receiver - .recv_timeout(self.shutdown_timeout) + .recv_timeout(timeout) .map(|_| { // join the background thread after receiving back the // shutdown signal @@ -287,7 +286,7 @@ impl LogProcessor for BatchLogProcessor { name: "BatchLogProcessor.Shutdown.Timeout", message = "BatchLogProcessor shutdown timing out." ); - OTelSdkError::Timeout(self.shutdown_timeout) + OTelSdkError::Timeout(timeout) } _ => { otel_error!( @@ -489,7 +488,6 @@ impl BatchLogProcessor { message_sender, handle: Mutex::new(Some(handle)), forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable - shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable dropped_logs_count: AtomicUsize::new(0), max_queue_size, export_log_message_sent: Arc::new(AtomicBool::new(false)), diff --git a/opentelemetry-sdk/src/logs/concurrent_log_processor.rs b/opentelemetry-sdk/src/logs/concurrent_log_processor.rs index d77a0cc6a0..052e3d9796 100644 --- a/opentelemetry-sdk/src/logs/concurrent_log_processor.rs +++ b/opentelemetry-sdk/src/logs/concurrent_log_processor.rs @@ -1,4 +1,5 @@ use opentelemetry::{otel_info, InstrumentationScope}; +use std::time::Duration; use crate::{error::OTelSdkResult, Resource}; @@ -43,8 +44,8 @@ impl LogProcessor for SimpleConcurrentLogProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { - self.exporter.shutdown() + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { + self.exporter.shutdown_with_timeout(timeout) } #[cfg(feature = "spec_unstable_logs_enabled")] diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index c37eefd9c5..3f7fa62975 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -34,6 +34,7 @@ use opentelemetry::logs::Severity; use opentelemetry::InstrumentationScope; use std::fmt::Debug; +use std::time::Duration; /// The interface for plugging into a [`SdkLogger`]. /// @@ -56,7 +57,13 @@ pub trait LogProcessor: Send + Sync + Debug { /// Shuts down the processor. /// After shutdown returns the log processor should stop processing any logs. /// It's up to the implementation on when to drop the LogProcessor. - fn shutdown(&self) -> OTelSdkResult; + fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { + Ok(()) + } + /// Shuts down the processor with default timeout. + fn shutdown(&self) -> OTelSdkResult { + self.shutdown_with_timeout(Duration::from_secs(5)) + } #[cfg(feature = "spec_unstable_logs_enabled")] /// Check if logging is enabled fn event_enabled(&self, _level: Severity, _target: &str, _name: Option<&str>) -> bool { @@ -133,10 +140,6 @@ pub(crate) mod tests { fn force_flush(&self) -> OTelSdkResult { Ok(()) } - - fn shutdown(&self) -> OTelSdkResult { - Ok(()) - } } #[derive(Debug)] @@ -163,10 +166,6 @@ pub(crate) mod tests { fn force_flush(&self) -> OTelSdkResult { Ok(()) } - - fn shutdown(&self) -> OTelSdkResult { - Ok(()) - } } #[test] diff --git a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs index 94fd26be5a..230815be8d 100644 --- a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs @@ -628,10 +628,6 @@ mod tests { fn force_flush(&self) -> OTelSdkResult { Ok(()) } - - fn shutdown(&self) -> OTelSdkResult { - Ok(()) - } } #[derive(Debug)] @@ -658,10 +654,6 @@ mod tests { fn force_flush(&self) -> OTelSdkResult { Ok(()) } - - fn shutdown(&self) -> OTelSdkResult { - Ok(()) - } } #[test] fn test_log_data_modification_by_multiple_processors() { diff --git a/opentelemetry-sdk/src/logs/logger_provider.rs b/opentelemetry-sdk/src/logs/logger_provider.rs index 254f7378cf..667a95fa18 100644 --- a/opentelemetry-sdk/src/logs/logger_provider.rs +++ b/opentelemetry-sdk/src/logs/logger_provider.rs @@ -3,6 +3,7 @@ use crate::error::{OTelSdkError, OTelSdkResult}; use crate::logs::LogExporter; use crate::Resource; use opentelemetry::{otel_debug, otel_info, InstrumentationScope}; +use std::time::Duration; use std::{ borrow::Cow, sync::{ @@ -96,7 +97,7 @@ impl SdkLoggerProvider { } /// Shuts down this `LoggerProvider` - pub fn shutdown(&self) -> OTelSdkResult { + pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { otel_debug!( name: "LoggerProvider.ShutdownInvokedByUser", ); @@ -107,7 +108,7 @@ impl SdkLoggerProvider { .is_ok() { // propagate the shutdown signal to processors - let result = self.inner.shutdown(); + let result = self.inner.shutdown_with_timeout(timeout); if result.iter().all(|res| res.is_ok()) { Ok(()) } else { @@ -123,6 +124,11 @@ impl SdkLoggerProvider { Err(OTelSdkError::AlreadyShutdown) } } + + /// Shuts down this `LoggerProvider` with default timeout + pub fn shutdown(&self) -> OTelSdkResult { + self.shutdown_with_timeout(Duration::from_secs(5)) + } } #[derive(Debug)] @@ -133,10 +139,10 @@ struct LoggerProviderInner { impl LoggerProviderInner { /// Shuts down the `LoggerProviderInner` and returns any errors. - pub(crate) fn shutdown(&self) -> Vec { + pub(crate) fn shutdown_with_timeout(&self, timeout: Duration) -> Vec { let mut results = vec![]; for processor in &self.processors { - let result = processor.shutdown(); + let result = processor.shutdown_with_timeout(timeout); if let Err(err) = &result { // Log at debug level because: // - The error is also returned to the user for handling (if applicable) @@ -149,6 +155,11 @@ impl LoggerProviderInner { } results } + + /// Shuts down the `LoggerProviderInner` with default timeout and returns any errors. + pub(crate) fn shutdown(&self) -> Vec { + self.shutdown_with_timeout(Duration::from_secs(5)) + } } impl Drop for LoggerProviderInner { @@ -330,7 +341,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { self.is_shutdown .lock() .map(|mut is_shutdown| *is_shutdown = true) @@ -383,10 +394,6 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { - Ok(()) - } - fn set_resource(&mut self, resource: &Resource) { let mut res = self.resource.lock().unwrap(); *res = resource.clone(); @@ -903,7 +910,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { *self.shutdown_called.lock().unwrap() = true; Ok(()) } @@ -934,7 +941,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { let mut count = self.shutdown_count.lock().unwrap(); *count += 1; Ok(())