Skip to content

Commit 0db608e

Browse files
committed
Optimize PeriodicExportingMetricReader thread usage
1 parent 7801cd9 commit 0db608e

File tree

1 file changed

+30
-55
lines changed

1 file changed

+30
-55
lines changed

sdk/src/metrics/export/periodic_exporting_metric_reader.cc

+30-55
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
9898
worker_thread_instrumentation_->OnStart();
9999
}
100100
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
101-
102-
do
101+
std::unique_lock<std::mutex> lk(cv_m_);
102+
while(true)
103103
{
104104
auto start = std::chrono::steady_clock::now();
105105

@@ -134,8 +134,6 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
134134
worker_thread_instrumentation_->BeforeWait();
135135
}
136136
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
137-
138-
std::unique_lock<std::mutex> lk(cv_m_);
139137
cv_.wait_for(lk, remaining_wait_interval_ms, [this]() {
140138
if (is_force_wakeup_background_worker_.load(std::memory_order_acquire))
141139
{
@@ -151,8 +149,11 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
151149
worker_thread_instrumentation_->AfterWait();
152150
}
153151
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
154-
155-
} while (IsShutdown() != true);
152+
if(IsShutdown())
153+
{
154+
return;
155+
}
156+
}
156157

157158
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
158159
if (worker_thread_instrumentation_ != nullptr)
@@ -164,61 +165,40 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
164165

165166
bool PeriodicExportingMetricReader::CollectAndExportOnce()
166167
{
167-
std::atomic<bool> cancel_export_for_timeout{false};
168-
169168
std::uint64_t notify_force_flush = force_flush_pending_sequence_.load(std::memory_order_acquire);
170-
std::unique_ptr<std::thread> task_thread;
171-
172169
#if OPENTELEMETRY_HAVE_EXCEPTIONS
173170
try
174171
{
175172
#endif
176-
std::promise<void> sender;
177-
auto receiver = sender.get_future();
178-
179-
task_thread.reset(
180-
new std::thread([this, &cancel_export_for_timeout, sender = std::move(sender)] {
181173
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
182-
if (collect_thread_instrumentation_ != nullptr)
183-
{
184-
collect_thread_instrumentation_->OnStart();
185-
collect_thread_instrumentation_->BeforeLoad();
186-
}
174+
if (collect_thread_instrumentation_ != nullptr)
175+
{
176+
collect_thread_instrumentation_->OnStart();
177+
collect_thread_instrumentation_->BeforeLoad();
178+
}
187179
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
180+
auto start = std::chrono::steady_clock::now();
181+
this->Collect([this, &start](ResourceMetrics &metric_data) {
182+
auto end = std::chrono::steady_clock::now();
183+
if ((end - start) > this->export_timeout_millis_)
184+
{
185+
OTEL_INTERNAL_LOG_ERROR(
186+
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
187+
<< this->export_timeout_millis_.count() << " ms, and timed out");
188+
return false;
189+
}
190+
this->exporter_->Export(metric_data);
191+
return true;
192+
});
188193

189-
this->Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) {
190-
if (cancel_export_for_timeout.load(std::memory_order_acquire))
191-
{
192-
OTEL_INTERNAL_LOG_ERROR(
193-
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
194-
<< this->export_timeout_millis_.count() << " ms, and timed out");
195-
return false;
196-
}
197-
this->exporter_->Export(metric_data);
198-
return true;
199-
});
200-
201-
const_cast<std::promise<void> &>(sender).set_value();
202194

203195
#ifdef ENABLE_THREAD_INSTRUMENTATION_PREVIEW
204-
if (collect_thread_instrumentation_ != nullptr)
205-
{
206-
collect_thread_instrumentation_->AfterLoad();
207-
collect_thread_instrumentation_->OnEnd();
208-
}
196+
if (collect_thread_instrumentation_ != nullptr)
197+
{
198+
collect_thread_instrumentation_->AfterLoad();
199+
collect_thread_instrumentation_->OnEnd();
200+
}
209201
#endif /* ENABLE_THREAD_INSTRUMENTATION_PREVIEW */
210-
}));
211-
212-
std::future_status status;
213-
do
214-
{
215-
status = receiver.wait_for(std::chrono::milliseconds(export_timeout_millis_));
216-
if (status == std::future_status::timeout)
217-
{
218-
cancel_export_for_timeout.store(true, std::memory_order_release);
219-
break;
220-
}
221-
} while (status != std::future_status::ready);
222202
#if OPENTELEMETRY_HAVE_EXCEPTIONS
223203
}
224204
catch (std::exception &e)
@@ -235,11 +215,6 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
235215
}
236216
#endif
237217

238-
if (task_thread && task_thread->joinable())
239-
{
240-
task_thread->join();
241-
}
242-
243218
std::uint64_t notified_sequence = force_flush_notified_sequence_.load(std::memory_order_acquire);
244219
while (notify_force_flush > notified_sequence)
245220
{

0 commit comments

Comments
 (0)