-
Notifications
You must be signed in to change notification settings - Fork 477
[SDK] Optimize PeriodicExportingMetricReader Thread Usage #3383
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
0db608e
9c7da8e
eec1dcc
5ee3b9d
c428f0c
3456835
73039e8
f8c3b5f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -98,8 +98,8 @@ void PeriodicExportingMetricReader::DoBackgroundWork() | |
worker_thread_instrumentation_->OnStart(); | ||
} | ||
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ | ||
|
||
do | ||
std::unique_lock<std::mutex> lk(cv_m_); | ||
while(true) | ||
{ | ||
auto start = std::chrono::steady_clock::now(); | ||
|
||
|
@@ -134,8 +134,6 @@ void PeriodicExportingMetricReader::DoBackgroundWork() | |
worker_thread_instrumentation_->BeforeWait(); | ||
} | ||
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ | ||
|
||
std::unique_lock<std::mutex> lk(cv_m_); | ||
cv_.wait_for(lk, remaining_wait_interval_ms, [this]() { | ||
if (is_force_wakeup_background_worker_.load(std::memory_order_acquire)) | ||
{ | ||
|
@@ -151,8 +149,11 @@ void PeriodicExportingMetricReader::DoBackgroundWork() | |
worker_thread_instrumentation_->AfterWait(); | ||
} | ||
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ | ||
|
||
} while (IsShutdown() != true); | ||
if(IsShutdown()) | ||
{ | ||
return; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't we need call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1, probably we can just |
||
} | ||
} | ||
|
||
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW | ||
if (worker_thread_instrumentation_ != nullptr) | ||
|
@@ -164,61 +165,40 @@ void PeriodicExportingMetricReader::DoBackgroundWork() | |
|
||
bool PeriodicExportingMetricReader::CollectAndExportOnce() | ||
{ | ||
std::atomic<bool> cancel_export_for_timeout{false}; | ||
|
||
std::uint64_t notify_force_flush = force_flush_pending_sequence_.load(std::memory_order_acquire); | ||
std::unique_ptr<std::thread> task_thread; | ||
|
||
#if OPENTELEMETRY_HAVE_EXCEPTIONS | ||
try | ||
{ | ||
#endif | ||
std::promise<void> sender; | ||
auto receiver = sender.get_future(); | ||
|
||
task_thread.reset( | ||
new std::thread([this, &cancel_export_for_timeout, sender = std::move(sender)] { | ||
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW | ||
if (collect_thread_instrumentation_ != nullptr) | ||
{ | ||
collect_thread_instrumentation_->OnStart(); | ||
collect_thread_instrumentation_->BeforeLoad(); | ||
} | ||
if (collect_thread_instrumentation_ != nullptr) | ||
{ | ||
collect_thread_instrumentation_->OnStart(); | ||
collect_thread_instrumentation_->BeforeLoad(); | ||
} | ||
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ | ||
auto start = std::chrono::steady_clock::now(); | ||
this->Collect([this, &start](ResourceMetrics &metric_data) { | ||
auto end = std::chrono::steady_clock::now(); | ||
if ((end - start) > this->export_timeout_millis_) | ||
{ | ||
OTEL_INTERNAL_LOG_ERROR( | ||
"[Periodic Exporting Metric Reader] Collect took longer configured time: " | ||
<< this->export_timeout_millis_.count() << " ms, and timed out"); | ||
return false; | ||
} | ||
this->exporter_->Export(metric_data); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to self - the timeout interval Nothing for this PR, as this is the existing behavior, but something to discuss further. |
||
return true; | ||
}); | ||
|
||
this->Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) { | ||
if (cancel_export_for_timeout.load(std::memory_order_acquire)) | ||
{ | ||
OTEL_INTERNAL_LOG_ERROR( | ||
"[Periodic Exporting Metric Reader] Collect took longer configured time: " | ||
<< this->export_timeout_millis_.count() << " ms, and timed out"); | ||
return false; | ||
} | ||
this->exporter_->Export(metric_data); | ||
return true; | ||
}); | ||
|
||
const_cast<std::promise<void> &>(sender).set_value(); | ||
|
||
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW | ||
if (collect_thread_instrumentation_ != nullptr) | ||
{ | ||
collect_thread_instrumentation_->AfterLoad(); | ||
collect_thread_instrumentation_->OnEnd(); | ||
} | ||
if (collect_thread_instrumentation_ != nullptr) | ||
{ | ||
collect_thread_instrumentation_->AfterLoad(); | ||
collect_thread_instrumentation_->OnEnd(); | ||
} | ||
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */ | ||
})); | ||
|
||
std::future_status status; | ||
do | ||
{ | ||
status = receiver.wait_for(std::chrono::milliseconds(export_timeout_millis_)); | ||
if (status == std::future_status::timeout) | ||
{ | ||
cancel_export_for_timeout.store(true, std::memory_order_release); | ||
break; | ||
} | ||
} while (status != std::future_status::ready); | ||
#if OPENTELEMETRY_HAVE_EXCEPTIONS | ||
} | ||
catch (std::exception &e) | ||
|
@@ -235,11 +215,6 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() | |
} | ||
#endif | ||
|
||
if (task_thread && task_thread->joinable()) | ||
{ | ||
task_thread->join(); | ||
} | ||
|
||
std::uint64_t notified_sequence = force_flush_notified_sequence_.load(std::memory_order_acquire); | ||
while (notify_force_flush > notified_sequence) | ||
{ | ||
|
Uh oh!
There was an error while loading. Please reload this page.