Skip to content

Commit 3456835

Browse files
authored
Merge branch 'main' into issue-3230
2 parents c428f0c + a145a56 commit 3456835

File tree

3 files changed

+191
-1
lines changed

3 files changed

+191
-1
lines changed

sdk/test/metrics/BUILD

+17
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,23 @@ cc_test(
4949
],
5050
)
5151

52+
cc_test(
53+
name = "stress_tests",
54+
timeout = "long",
55+
srcs = glob(["*_test_stress.cc"]),
56+
copts = [
57+
"-DUNIT_TESTING",
58+
],
59+
tags = [
60+
"metrics",
61+
"test",
62+
],
63+
deps = [
64+
"metrics_common_test_utils",
65+
"@com_google_googletest//:gtest_main",
66+
],
67+
)
68+
5269
otel_cc_benchmark(
5370
name = "attributes_processor_benchmark",
5471
srcs = [

sdk/test/metrics/CMakeLists.txt

+2-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ foreach(
3333
metric_reader_test
3434
observable_registry_test
3535
periodic_exporting_metric_reader_test
36-
instrument_metadata_validator_test)
36+
instrument_metadata_validator_test
37+
metric_test_stress)
3738
add_executable(${testname} "${testname}.cc")
3839
target_link_libraries(
3940
${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}
+172
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#include <gtest/gtest.h>
5+
6+
#include <stdint.h>
7+
#include <atomic>
8+
#include <chrono>
9+
#include <random>
10+
#include <thread>
11+
#include <utility>
12+
#include <vector>
13+
14+
#include "common.h"
15+
#include "opentelemetry/context/context.h"
16+
#include "opentelemetry/metrics/meter.h"
17+
#include "opentelemetry/metrics/sync_instruments.h"
18+
#include "opentelemetry/nostd/function_ref.h"
19+
#include "opentelemetry/nostd/shared_ptr.h"
20+
#include "opentelemetry/nostd/unique_ptr.h"
21+
#include "opentelemetry/nostd/variant.h"
22+
#include "opentelemetry/sdk/common/exporter_utils.h"
23+
#include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h"
24+
#include "opentelemetry/sdk/metrics/data/metric_data.h"
25+
#include "opentelemetry/sdk/metrics/data/point_data.h"
26+
#include "opentelemetry/sdk/metrics/export/metric_producer.h"
27+
#include "opentelemetry/sdk/metrics/instruments.h"
28+
#include "opentelemetry/sdk/metrics/meter_provider.h"
29+
#include "opentelemetry/sdk/metrics/metric_reader.h"
30+
#include "opentelemetry/sdk/metrics/push_metric_exporter.h"
31+
32+
using namespace opentelemetry;
33+
using namespace opentelemetry::sdk::instrumentationscope;
34+
using namespace opentelemetry::sdk::metrics;
35+
36+
class MockMetricExporterForStress : public opentelemetry::sdk::metrics::PushMetricExporter
37+
{
38+
public:
39+
MockMetricExporterForStress() = default;
40+
41+
opentelemetry::sdk::metrics::AggregationTemporality GetAggregationTemporality(
42+
opentelemetry::sdk::metrics::InstrumentType) const noexcept override
43+
{
44+
return AggregationTemporality::kDelta;
45+
}
46+
47+
opentelemetry::sdk::common::ExportResult Export(
48+
const opentelemetry::sdk::metrics::ResourceMetrics &) noexcept override
49+
{
50+
return opentelemetry::sdk::common::ExportResult::kSuccess;
51+
}
52+
53+
bool ForceFlush(std::chrono::microseconds) noexcept override { return true; }
54+
55+
bool Shutdown(std::chrono::microseconds) noexcept override { return true; }
56+
};
57+
58+
TEST(HistogramStress, UnsignedInt64)
59+
{
60+
MeterProvider mp;
61+
auto m = mp.GetMeter("meter1", "version1", "schema1");
62+
63+
std::unique_ptr<MockMetricExporterForStress> exporter(new MockMetricExporterForStress());
64+
std::shared_ptr<MetricReader> reader{new MockMetricReader(std::move(exporter))};
65+
mp.AddMetricReader(reader);
66+
67+
auto h = m->CreateUInt64Histogram("histogram1", "histogram1_description", "histogram1_unit");
68+
69+
//
70+
// Start a dedicated thread to collect the metrics
71+
//
72+
std::vector<HistogramPointData> actuals;
73+
auto stop_collecting = std::make_shared<std::atomic<bool>>(false);
74+
auto collect_thread = std::thread([&reader, &actuals, stop_collecting]() {
75+
while (!*stop_collecting)
76+
{
77+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
78+
reader->Collect([&](ResourceMetrics &rm) {
79+
for (const ScopeMetrics &smd : rm.scope_metric_data_)
80+
{
81+
for (const MetricData &md : smd.metric_data_)
82+
{
83+
for (const PointDataAttributes &dp : md.point_data_attr_)
84+
{
85+
actuals.push_back(opentelemetry::nostd::get<HistogramPointData>(dp.point_data));
86+
}
87+
}
88+
}
89+
return true;
90+
});
91+
}
92+
});
93+
94+
//
95+
// Start logging threads
96+
//
97+
int record_thread_count = std::thread::hardware_concurrency() - 1;
98+
if (record_thread_count <= 0)
99+
{
100+
record_thread_count = 1;
101+
}
102+
103+
std::vector<std::thread> threads(record_thread_count);
104+
constexpr int iterations_per_thread = 2000000;
105+
auto expected_sum = std::make_shared<std::atomic<uint64_t>>(0);
106+
107+
for (int i = 0; i < record_thread_count; ++i)
108+
{
109+
threads[i] = std::thread([&] {
110+
std::random_device rd;
111+
std::mt19937 random_engine(rd());
112+
std::uniform_int_distribution<> gen_random(1, 20000);
113+
114+
for (int j = 0; j < iterations_per_thread; ++j)
115+
{
116+
int64_t val = gen_random(random_engine);
117+
expected_sum->fetch_add(val, std::memory_order_relaxed);
118+
h->Record(val, {});
119+
}
120+
});
121+
}
122+
123+
for (int i = 0; i < record_thread_count; ++i)
124+
{
125+
threads[i].join();
126+
}
127+
128+
//
129+
// Stop the dedicated collection thread
130+
//
131+
*stop_collecting = true;
132+
collect_thread.join();
133+
134+
//
135+
// run the the final collection
136+
//
137+
reader->Collect([&](ResourceMetrics &rm) {
138+
for (const ScopeMetrics &smd : rm.scope_metric_data_)
139+
{
140+
for (const MetricData &md : smd.metric_data_)
141+
{
142+
for (const PointDataAttributes &dp : md.point_data_attr_)
143+
{
144+
actuals.push_back(opentelemetry::nostd::get<HistogramPointData>(dp.point_data));
145+
}
146+
}
147+
}
148+
return true;
149+
});
150+
151+
//
152+
// Aggregate the results
153+
//
154+
int64_t expected_count = record_thread_count * iterations_per_thread;
155+
int64_t collected_count = 0;
156+
int64_t collected_sum = 0;
157+
for (const auto &actual : actuals)
158+
{
159+
int64_t collected_bucket_sum = 0;
160+
for (const auto &count : actual.counts_)
161+
{
162+
collected_bucket_sum += count;
163+
}
164+
ASSERT_EQ(collected_bucket_sum, actual.count_);
165+
166+
collected_sum += opentelemetry::nostd::get<int64_t>(actual.sum_);
167+
collected_count += actual.count_;
168+
}
169+
170+
ASSERT_EQ(expected_count, collected_count);
171+
ASSERT_EQ(*expected_sum, collected_sum);
172+
}

0 commit comments

Comments
 (0)