forked from open-telemetry/opentelemetry-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasgitestutil.py
76 lines (62 loc) · 2.12 KB
/
asgitestutil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from asgiref.testing import ApplicationCommunicator
from opentelemetry.test.spantestutil import SpanTestBase
def setup_testing_defaults(scope):
scope.update(
{
"client": ("127.0.0.1", 32767),
"headers": [],
"http_version": "1.0",
"method": "GET",
"path": "/",
"query_string": b"",
"scheme": "http",
"server": ("127.0.0.1", 80),
"type": "http",
}
)
class AsgiTestBase(SpanTestBase):
def setUp(self):
super().setUp()
self.scope = {}
setup_testing_defaults(self.scope)
self.communicator = None
def tearDown(self):
if self.communicator:
asyncio.get_event_loop().run_until_complete(
self.communicator.wait()
)
def seed_app(self, app):
self.communicator = ApplicationCommunicator(app, self.scope)
def send_input(self, message):
asyncio.get_event_loop().run_until_complete(
self.communicator.send_input(message)
)
def send_default_request(self):
self.send_input({"type": "http.request", "body": b""})
def get_output(self):
output = asyncio.get_event_loop().run_until_complete(
self.communicator.receive_output(0)
)
return output
def get_all_output(self):
outputs = []
while True:
try:
outputs.append(self.get_output())
except asyncio.TimeoutError:
break
return outputs