forked from open-telemetry/opentelemetry-python-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbase_test.py
122 lines (103 loc) · 4.13 KB
/
base_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent.futures import ThreadPoolExecutor, as_completed
from random import randint
import flask
from werkzeug.test import Client
from werkzeug.wrappers import Response
from opentelemetry import context, trace
class InstrumentationTest:
@staticmethod
def _hello_endpoint(helloid):
if helloid == 500:
raise ValueError(":-(")
return "Hello: " + str(helloid)
@staticmethod
def _sqlcommenter_endpoint():
current_context = context.get_current()
sqlcommenter_flask_values = current_context.get(
"SQLCOMMENTER_ORM_TAGS_AND_VALUES", {}
)
return sqlcommenter_flask_values
@staticmethod
def _copy_context_endpoint():
@flask.copy_current_request_context
def _extract_header():
return flask.request.headers["x-req"]
# Despite `_extract_header` copying the request context,
# calling it shouldn't detach the parent Flask span's contextvar
request_header = _extract_header()
return {
"span_name": trace.get_current_span().name,
"request_header": request_header,
}
@staticmethod
def _multithreaded_endpoint(count):
def do_random_stuff():
@flask.copy_current_request_context
def inner():
return randint(0, 100)
return inner
executor = ThreadPoolExecutor(count)
futures = []
for _ in range(count):
futures.append(executor.submit(do_random_stuff()))
numbers = []
for future in as_completed(futures):
numbers.append(future.result())
return " ".join([str(i) for i in numbers])
@staticmethod
def _custom_response_headers():
resp = flask.Response("test response")
resp.headers["content-type"] = "text/plain; charset=utf-8"
resp.headers["content-length"] = "13"
resp.headers[
"my-custom-header"
] = "my-custom-value-1,my-custom-header-2"
resp.headers[
"my-custom-regex-header-1"
] = "my-custom-regex-value-1,my-custom-regex-value-2"
resp.headers[
"My-Custom-Regex-Header-2"
] = "my-custom-regex-value-3,my-custom-regex-value-4"
resp.headers["my-secret-header"] = "my-secret-value"
return resp
@staticmethod
def _repeat_custom_response_headers():
headers = {
"content-type": "text/plain; charset=utf-8",
"my-custom-header": ["my-custom-value-1", "my-custom-header-2"],
}
return flask.Response("test response", headers=headers)
def _common_initialization(self):
def excluded_endpoint():
return "excluded"
def excluded2_endpoint():
return "excluded2"
# pylint: disable=no-member
self.app.route("/hello/<int:helloid>")(self._hello_endpoint)
self.app.route("/sqlcommenter")(self._sqlcommenter_endpoint)
self.app.route("/multithreaded")(self._multithreaded_endpoint)
self.app.route("/copy_context")(self._copy_context_endpoint)
self.app.route("/excluded/<int:helloid>")(self._hello_endpoint)
self.app.route("/excluded")(excluded_endpoint)
self.app.route("/excluded2")(excluded2_endpoint)
self.app.route("/test_custom_response_headers")(
self._custom_response_headers
)
self.app.route("/test_repeat_custom_response_headers")(
self._repeat_custom_response_headers
)
# pylint: disable=attribute-defined-outside-init
self.client = Client(self.app, Response)