Skip to content

feat: Add timeout duration to shutdown #2789

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4 changes: 2 additions & 2 deletions opentelemetry-appender-tracing/benches/log-attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
| otel_11_attributes | 625 ns | +106 ns | // vec! initial capacity is 5. 11th attribute causes vec! to be reallocated
| otel_12_attributes | 676 ns | +51 ns |
*/

use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::InstrumentationScope;
use opentelemetry_appender_tracing::layer as tracing_layer;
Expand All @@ -30,6 +29,7 @@ use opentelemetry_sdk::logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider};
use opentelemetry_sdk::Resource;
#[cfg(not(target_os = "windows"))]
use pprof::criterion::{Output, PProfProfiler};
use std::time::Duration;
use tracing::error;
use tracing_subscriber::prelude::*;
use tracing_subscriber::Registry;
Expand All @@ -44,7 +44,7 @@ impl LogProcessor for NoopProcessor {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
| ot_layer_disabled | 12 ns |
| ot_layer_enabled | 186 ns |
*/

use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::InstrumentationScope;
use opentelemetry_appender_tracing::layer as tracing_layer;
Expand All @@ -31,6 +30,7 @@ use opentelemetry_sdk::logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider};
use opentelemetry_sdk::Resource;
#[cfg(not(target_os = "windows"))]
use pprof::criterion::{Output, PProfProfiler};
use std::time::Duration;
use tracing::error;
use tracing_subscriber::prelude::*;
use tracing_subscriber::Layer;
Expand All @@ -54,7 +54,7 @@ impl LogProcessor for NoopProcessor {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ mod tests {
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
use opentelemetry_sdk::logs::{SdkLogRecord, SdkLoggerProvider};
use opentelemetry_sdk::trace::{Sampler, SdkTracerProvider};
use std::time::Duration;
use tracing::{error, warn};
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
Expand Down Expand Up @@ -821,7 +822,7 @@ mod tests {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ mod tests {
use opentelemetry_sdk::logs::LogProcessor;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::{logs::LogBatch, logs::SdkLogRecord, Resource};
use std::time;

#[derive(Debug)]
struct MockProcessor;
Expand All @@ -240,7 +241,7 @@ mod tests {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown(&self, _timeout: time::Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-sdk/benches/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ RAM: 64.0 GB

use opentelemetry::time::now;
use std::collections::HashMap;
use std::time::Duration;

use criterion::{criterion_group, criterion_main, Criterion};

Expand All @@ -38,7 +39,7 @@ impl LogProcessor for NoopProcessor {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/benches/log_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use opentelemetry::time::now;
use opentelemetry_sdk::error::OTelSdkResult;
use std::sync::Mutex;
use std::time::Duration;

use criterion::{criterion_group, criterion_main, Criterion};

Expand Down Expand Up @@ -73,7 +74,7 @@ impl LogProcessor for ExportingProcessorWithFuture {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down Expand Up @@ -104,7 +105,7 @@ impl LogProcessor for ExportingProcessorWithoutFuture {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
7 changes: 4 additions & 3 deletions opentelemetry-sdk/benches/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use opentelemetry::time::now;
use std::{
sync::{Arc, Mutex},
thread::sleep,
time::Duration,
};

use criterion::{criterion_group, criterion_main, Criterion};
Expand Down Expand Up @@ -54,7 +55,7 @@ impl LogProcessor for NoopProcessor {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}
Expand All @@ -71,7 +72,7 @@ impl LogProcessor for CloningProcessor {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down Expand Up @@ -117,7 +118,7 @@ impl LogProcessor for SendToChannelProcessor {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-sdk/benches/metric.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use rand::Rng;
use std::sync::{Arc, Weak};

use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use opentelemetry::{
metrics::{Counter, Histogram, MeterProvider as _},
Expand All @@ -15,6 +12,9 @@ use opentelemetry_sdk::{
},
Resource,
};
use rand::Rng;
use std::sync::{Arc, Weak};
use std::time::Duration;

#[derive(Clone, Debug)]
struct SharedReader(Arc<dyn MetricReader>);
Expand All @@ -32,8 +32,8 @@ impl MetricReader for SharedReader {
self.0.force_flush()
}

fn shutdown(&self) -> OTelSdkResult {
self.0.shutdown()
fn shutdown(&self, timeout: Duration) -> OTelSdkResult {
self.0.shutdown(timeout)
}

fn temporality(&self, kind: InstrumentKind) -> Temporality {
Expand Down
18 changes: 8 additions & 10 deletions opentelemetry-sdk/src/logs/batch_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages for the worker thread
handle: Mutex<Option<thread::JoinHandle<()>>>,
forceflush_timeout: Duration,
shutdown_timeout: Duration,
export_log_message_sent: Arc<AtomicBool>,
current_batch_size: Arc<AtomicUsize>,
max_export_batch_size: usize,
Expand Down Expand Up @@ -256,7 +255,7 @@
}
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown(&self, timeout: Duration) -> OTelSdkResult {
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
let max_queue_size = self.max_queue_size;
if dropped_logs > 0 {
Expand All @@ -272,7 +271,7 @@
match self.message_sender.try_send(BatchMessage::Shutdown(sender)) {
Ok(_) => {
receiver
.recv_timeout(self.shutdown_timeout)
.recv_timeout(timeout)
.map(|_| {
// join the background thread after receiving back the
// shutdown signal
Expand All @@ -287,7 +286,7 @@
name: "BatchLogProcessor.Shutdown.Timeout",
message = "BatchLogProcessor shutdown timing out."
);
OTelSdkError::Timeout(self.shutdown_timeout)
OTelSdkError::Timeout(timeout)

Check warning on line 289 in opentelemetry-sdk/src/logs/batch_log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/batch_log_processor.rs#L289

Added line #L289 was not covered by tests
}
_ => {
otel_error!(
Expand Down Expand Up @@ -488,7 +487,6 @@
message_sender,
handle: Mutex::new(Some(handle)),
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
dropped_logs_count: AtomicUsize::new(0),
max_queue_size,
export_log_message_sent: Arc::new(AtomicBool::new(false)),
Expand Down Expand Up @@ -961,7 +959,7 @@

processor.emit(&mut record, &instrumentation);
processor.force_flush().unwrap();
processor.shutdown().unwrap();
processor.shutdown(Duration::from_secs(5)).unwrap();
// todo: expect to see errors here. How should we assert this?
processor.emit(&mut record, &instrumentation);
assert_eq!(1, exporter.get_emitted_logs().unwrap().len());
Expand All @@ -973,27 +971,27 @@
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());

processor.shutdown().unwrap();
processor.shutdown(Duration::from_secs(5)).unwrap();
}

#[tokio::test(flavor = "current_thread")]
async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
processor.shutdown().unwrap();
processor.shutdown(Duration::from_secs(5)).unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
processor.shutdown().unwrap();
processor.shutdown(Duration::from_secs(5)).unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
processor.shutdown().unwrap();
processor.shutdown(Duration::from_secs(5)).unwrap();
}
}
8 changes: 5 additions & 3 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use opentelemetry::logs::Severity;
use opentelemetry::InstrumentationScope;

use std::fmt::Debug;
use std::time::Duration;

/// The interface for plugging into a [`SdkLogger`].
///
Expand All @@ -56,7 +57,7 @@ pub trait LogProcessor: Send + Sync + Debug {
/// Shuts down the processor.
/// After shutdown returns the log processor should stop processing any logs.
/// It's up to the implementation on when to drop the LogProcessor.
fn shutdown(&self) -> OTelSdkResult;
fn shutdown(&self, timeout: Duration) -> OTelSdkResult;
#[cfg(feature = "spec_unstable_logs_enabled")]
/// Check if logging is enabled
fn event_enabled(&self, _level: Severity, _target: &str, _name: Option<&str>) -> bool {
Expand All @@ -81,6 +82,7 @@ pub(crate) mod tests {
use opentelemetry::logs::{Logger, LoggerProvider};
use opentelemetry::{InstrumentationScope, Key};
use std::sync::{Arc, Mutex};
use std::time::Duration;

#[derive(Debug, Clone)]
pub(crate) struct MockLogExporter {
Expand Down Expand Up @@ -138,7 +140,7 @@ pub(crate) mod tests {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down Expand Up @@ -168,7 +170,7 @@ pub(crate) mod tests {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
Loading