From 0db608e296c2dac7a7d03a79df1ff0176852f9cb Mon Sep 17 00:00:00 2001 From: Cole VanOphem Date: Fri, 25 Apr 2025 21:16:16 -0400 Subject: [PATCH 1/5] Optimize PeriodicExportingMetricReader thread usage --- .../periodic_exporting_metric_reader.cc | 85 +++++++------------ 1 file changed, 30 insertions(+), 55 deletions(-) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index f9970312e4..3e247f220c 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -98,8 +98,8 @@ void PeriodicExportingMetricReader::DoBackgroundWork() worker_thread_instrumentation_->OnStart(); } #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ - - do + std::unique_lock lk(cv_m_); + while(true) { auto start = std::chrono::steady_clock::now(); @@ -134,8 +134,6 @@ void PeriodicExportingMetricReader::DoBackgroundWork() worker_thread_instrumentation_->BeforeWait(); } #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ - - std::unique_lock lk(cv_m_); cv_.wait_for(lk, remaining_wait_interval_ms, [this]() { if (is_force_wakeup_background_worker_.load(std::memory_order_acquire)) { @@ -151,8 +149,11 @@ void PeriodicExportingMetricReader::DoBackgroundWork() worker_thread_instrumentation_->AfterWait(); } #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ - - } while (IsShutdown() != true); + if(IsShutdown()) + { + return; + } + } #ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW if (worker_thread_instrumentation_ != nullptr) @@ -164,61 +165,40 @@ void PeriodicExportingMetricReader::DoBackgroundWork() bool PeriodicExportingMetricReader::CollectAndExportOnce() { - std::atomic cancel_export_for_timeout{false}; - std::uint64_t notify_force_flush = force_flush_pending_sequence_.load(std::memory_order_acquire); - std::unique_ptr task_thread; - #if OPENTELEMETRY_HAVE_EXCEPTIONS try { #endif - std::promise sender; - auto receiver = sender.get_future(); - - task_thread.reset( - new std::thread([this, &cancel_export_for_timeout, sender = std::move(sender)] { #ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW - if (collect_thread_instrumentation_ != nullptr) - { - collect_thread_instrumentation_->OnStart(); - collect_thread_instrumentation_->BeforeLoad(); - } + if (collect_thread_instrumentation_ != nullptr) + { + collect_thread_instrumentation_->OnStart(); + collect_thread_instrumentation_->BeforeLoad(); + } #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ + auto start = std::chrono::steady_clock::now(); + this->Collect([this, &start](ResourceMetrics &metric_data) { + auto end = std::chrono::steady_clock::now(); + if ((end - start) > this->export_timeout_millis_) + { + OTEL_INTERNAL_LOG_ERROR( + "[Periodic Exporting Metric Reader] Collect took longer configured time: " + << this->export_timeout_millis_.count() << " ms, and timed out"); + return false; + } + this->exporter_->Export(metric_data); + return true; + }); - this->Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) { - if (cancel_export_for_timeout.load(std::memory_order_acquire)) - { - OTEL_INTERNAL_LOG_ERROR( - "[Periodic Exporting Metric Reader] Collect took longer configured time: " - << this->export_timeout_millis_.count() << " ms, and timed out"); - return false; - } - this->exporter_->Export(metric_data); - return true; - }); - - const_cast &>(sender).set_value(); #ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW - if (collect_thread_instrumentation_ != nullptr) - { - collect_thread_instrumentation_->AfterLoad(); - collect_thread_instrumentation_->OnEnd(); - } + if (collect_thread_instrumentation_ != nullptr) + { + collect_thread_instrumentation_->AfterLoad(); + collect_thread_instrumentation_->OnEnd(); + } #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ - })); - - std::future_status status; - do - { - status = receiver.wait_for(std::chrono::milliseconds(export_timeout_millis_)); - if (status == std::future_status::timeout) - { - cancel_export_for_timeout.store(true, std::memory_order_release); - break; - } - } while (status != std::future_status::ready); #if OPENTELEMETRY_HAVE_EXCEPTIONS } catch (std::exception &e) @@ -235,11 +215,6 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() } #endif - if (task_thread && task_thread->joinable()) - { - task_thread->join(); - } - std::uint64_t notified_sequence = force_flush_notified_sequence_.load(std::memory_order_acquire); while (notify_force_flush > notified_sequence) { From 9c7da8e99309b7192c5256eb87e50365cbb3f18e Mon Sep 17 00:00:00 2001 From: Amr <91903599+pr1nceray@users.noreply.github.com> Date: Sun, 27 Apr 2025 15:46:29 -0400 Subject: [PATCH 2/5] Updated periodic_exporting_metric_reader Replaced early returns with breaks in PeriodicExportingMetricReader::DoBackgroundWork to fix worker_thread_instrumentation_ behavior. --- sdk/src/metrics/export/periodic_exporting_metric_reader.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 3e247f220c..34489bc661 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -151,7 +151,7 @@ void PeriodicExportingMetricReader::DoBackgroundWork() #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ if(IsShutdown()) { - return; + break; } } From eec1dcc0380c9ff8a5c006d0059245e293107d19 Mon Sep 17 00:00:00 2001 From: Amr <91903599+pr1nceray@users.noreply.github.com> Date: Mon, 28 Apr 2025 05:37:07 -0400 Subject: [PATCH 3/5] Updated periodic_exporting_metric_reader.cc Fixed formatting so that it aligns with the clangformat Removed unnecessary future includes to address errors raised by include-what-you-use --- .../periodic_exporting_metric_reader.cc | 53 ++++++++----------- 1 file changed, 22 insertions(+), 31 deletions(-) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 34489bc661..09561e0975 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -10,7 +10,6 @@ #include #include #include -#include #include #include "opentelemetry/common/timestamp.h" @@ -24,13 +23,6 @@ #include "opentelemetry/sdk/metrics/push_metric_exporter.h" #include "opentelemetry/version.h" -#if defined(_MSC_VER) -# pragma warning(suppress : 5204) -# include -#else -# include -#endif - #if OPENTELEMETRY_HAVE_EXCEPTIONS # include #endif @@ -99,7 +91,7 @@ void PeriodicExportingMetricReader::DoBackgroundWork() } #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ std::unique_lock lk(cv_m_); - while(true) + while (true) { auto start = std::chrono::steady_clock::now(); @@ -149,7 +141,7 @@ void PeriodicExportingMetricReader::DoBackgroundWork() worker_thread_instrumentation_->AfterWait(); } #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ - if(IsShutdown()) + if (IsShutdown()) { break; } @@ -171,33 +163,32 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() { #endif #ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW - if (collect_thread_instrumentation_ != nullptr) - { - collect_thread_instrumentation_->OnStart(); - collect_thread_instrumentation_->BeforeLoad(); - } -#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ - auto start = std::chrono::steady_clock::now(); - this->Collect([this, &start](ResourceMetrics &metric_data) { - auto end = std::chrono::steady_clock::now(); - if ((end - start) > this->export_timeout_millis_) + if (collect_thread_instrumentation_ != nullptr) { - OTEL_INTERNAL_LOG_ERROR( - "[Periodic Exporting Metric Reader] Collect took longer configured time: " - << this->export_timeout_millis_.count() << " ms, and timed out"); - return false; + collect_thread_instrumentation_->OnStart(); + collect_thread_instrumentation_->BeforeLoad(); } +#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ + auto start = std::chrono::steady_clock::now(); + this->Collect([this, &start](ResourceMetrics &metric_data) { + auto end = std::chrono::steady_clock::now(); + if ((end - start) > this->export_timeout_millis_) + { + OTEL_INTERNAL_LOG_ERROR( + "[Periodic Exporting Metric Reader] Collect took longer configured time: " + << this->export_timeout_millis_.count() << " ms, and timed out"); + return false; + } this->exporter_->Export(metric_data); return true; - }); - + }); #ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW - if (collect_thread_instrumentation_ != nullptr) - { - collect_thread_instrumentation_->AfterLoad(); - collect_thread_instrumentation_->OnEnd(); - } + if (collect_thread_instrumentation_ != nullptr) + { + collect_thread_instrumentation_->AfterLoad(); + collect_thread_instrumentation_->OnEnd(); + } #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ #if OPENTELEMETRY_HAVE_EXCEPTIONS } From c428f0c799ba6e2c2820ab5e4441cb1585292bb0 Mon Sep 17 00:00:00 2001 From: Amr <91903599+pr1nceray@users.noreply.github.com> Date: Sat, 3 May 2025 01:21:31 -0400 Subject: [PATCH 4/5] Updated periodic_exporting_metric_reader.cc Improved scoping of the unique lock that is used in PeriodicExportingMetricReader::DoBackgroundWork Changed while loop logic from while(true)to be similar to original code (do-while) --- .../export/periodic_exporting_metric_reader.cc | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 09561e0975..0fbde50bc8 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -90,11 +90,9 @@ void PeriodicExportingMetricReader::DoBackgroundWork() worker_thread_instrumentation_->OnStart(); } #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ - std::unique_lock lk(cv_m_); - while (true) + do { auto start = std::chrono::steady_clock::now(); - #ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW if (worker_thread_instrumentation_ != nullptr) { @@ -126,6 +124,7 @@ void PeriodicExportingMetricReader::DoBackgroundWork() worker_thread_instrumentation_->BeforeWait(); } #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ + std::unique_lock lk(cv_m_); cv_.wait_for(lk, remaining_wait_interval_ms, [this]() { if (is_force_wakeup_background_worker_.load(std::memory_order_acquire)) { @@ -141,11 +140,7 @@ void PeriodicExportingMetricReader::DoBackgroundWork() worker_thread_instrumentation_->AfterWait(); } #endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ - if (IsShutdown()) - { - break; - } - } + } while (IsShutdown() != true); #ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW if (worker_thread_instrumentation_ != nullptr) From f8c3b5f1ceb07788ca9b4f47a9e42301def05620 Mon Sep 17 00:00:00 2001 From: Cole VanOphem Date: Mon, 5 May 2025 23:51:40 -0400 Subject: [PATCH 5/5] Update CHANGELOG.md with PR 3383 --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f01050441d..64b2213c58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,9 @@ Increment the: * [API] Add Enabled method to Tracer [#3357](https://github.com/open-telemetry/opentelemetry-cpp/pull/3357) +* [SDK] Optimize PeriodicExportingMetricReader thread usage + [#3383](https://github.com/open-telemetry/opentelemetry-cpp/pull/3383) + ## [1.20 2025-04-01] * [BUILD] Update opentelemetry-proto version