forked from open-telemetry/opentelemetry-python-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_metrics.py
76 lines (63 loc) · 2.35 KB
/
test_metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import threading
import time
from timeit import default_timer
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.test.test_base import TestBase
from .celery_test_tasks import app, task_add
class TestMetrics(TestBase):
def setUp(self):
super().setUp()
self._worker = app.Worker(
app=app, pool="solo", concurrency=1, hostname="celery@akochavi"
)
self._thread = threading.Thread(target=self._worker.start)
self._thread.daemon = True
self._thread.start()
def tearDown(self):
super().tearDown()
self._worker.stop()
self._thread.join()
def get_metrics(self):
result = task_add.delay(1, 2)
timeout = time.time() + 60 * 1 # 1 minutes from now
while not result.ready():
if time.time() > timeout:
break
time.sleep(0.05)
return self.get_sorted_metrics()
def test_basic_metric(self):
CeleryInstrumentor().instrument()
start_time = default_timer()
task_runtime_estimated = (default_timer() - start_time) * 1000
metrics = self.get_metrics()
CeleryInstrumentor().uninstrument()
self.assertEqual(len(metrics), 1)
task_runtime = metrics[0]
print(task_runtime)
self.assertEqual(task_runtime.name, "flower.task.runtime.seconds")
self.assert_metric_expected(
task_runtime,
[
self.create_histogram_data_point(
count=1,
sum_data_point=task_runtime_estimated,
max_data_point=task_runtime_estimated,
min_data_point=task_runtime_estimated,
attributes={
"task": "tests.celery_test_tasks.task_add",
"worker": "celery@akochavi",
},
)
],
est_value_delta=200,
)
def test_metric_uninstrument(self):
CeleryInstrumentor().instrument()
metrics = self.get_metrics()
self.assertEqual(len(metrics), 1)
CeleryInstrumentor().uninstrument()
metrics = self.get_metrics()
self.assertEqual(len(metrics), 1)
for metric in metrics:
for point in list(metric.data.data_points):
self.assertEqual(point.count, 1)