-
Notifications
You must be signed in to change notification settings - Fork 698
/
Copy path__init__.py
146 lines (116 loc) · 4.95 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import threading
import typing
from functools import wraps
from os import environ
from pkg_resources import iter_entry_points
from opentelemetry.context.context import Context, _RuntimeContext
from opentelemetry.environment_variables import OTEL_PYTHON_CONTEXT
logger = logging.getLogger(__name__)
_RUNTIME_CONTEXT = None # type: typing.Optional[_RuntimeContext]
_RUNTIME_CONTEXT_LOCK = threading.Lock()
_F = typing.TypeVar("_F", bound=typing.Callable[..., typing.Any])
def _load_runtime_context(func: _F) -> _F:
"""A decorator used to initialize the global RuntimeContext
Returns:
A wrapper of the decorated method.
"""
@wraps(func) # type: ignore[misc]
def wrapper( # type: ignore[misc]
*args: typing.Tuple[typing.Any, typing.Any],
**kwargs: typing.Dict[typing.Any, typing.Any]
) -> typing.Optional[typing.Any]:
global _RUNTIME_CONTEXT # pylint: disable=global-statement
with _RUNTIME_CONTEXT_LOCK:
if _RUNTIME_CONTEXT is None:
# FIXME use a better implementation of a configuration manager to avoid having
# to get configuration values straight from environment variables
default_context = "contextvars_context"
configured_context = environ.get(
OTEL_PYTHON_CONTEXT, default_context
) # type: str
try:
_RUNTIME_CONTEXT = next(
iter_entry_points(
"opentelemetry_context", configured_context
)
).load()()
except Exception: # pylint: disable=broad-except
logger.error(
"Failed to load context: %s", configured_context
)
return func(*args, **kwargs) # type: ignore[misc]
return typing.cast(_F, wrapper) # type: ignore[misc]
def get_value(key: str, context: typing.Optional[Context] = None) -> "object":
"""To access the local state of a concern, the RuntimeContext API
provides a function which takes a context and a key as input,
and returns a value.
Args:
key: The key of the value to retrieve.
context: The context from which to retrieve the value, if None, the current context is used.
Returns:
The value associated with the key.
"""
return context.get(key) if context is not None else get_current().get(key)
def set_value(
key: str, value: "object", context: typing.Optional[Context] = None
) -> Context:
"""To record the local state of a cross-cutting concern, the
RuntimeContext API provides a function which takes a context, a
key, and a value as input, and returns an updated context
which contains the new value.
Args:
key: The key of the entry to set.
value: The value of the entry to set.
context: The context to copy, if None, the current context is used.
Returns:
A new `Context` containing the value set.
"""
if context is None:
context = get_current()
new_values = context.copy()
new_values[key] = value
return Context(new_values)
@_load_runtime_context # type: ignore
def get_current() -> Context:
"""To access the context associated with program execution,
the Context API provides a function which takes no arguments
and returns a Context.
Returns:
The current `Context` object.
"""
return _RUNTIME_CONTEXT.get_current() # type:ignore
@_load_runtime_context # type: ignore
def attach(context: Context) -> object:
"""Associates a Context with the caller's current execution unit. Returns
a token that can be used to restore the previous Context.
Args:
context: The Context to set as current.
Returns:
A token that can be used with `detach` to reset the context.
"""
return _RUNTIME_CONTEXT.attach(context) # type:ignore
@_load_runtime_context # type: ignore
def detach(token: object) -> None:
"""Resets the Context associated with the caller's current execution unit
to the value it had before attaching a specified Context.
Args:
token: The Token that was returned by a previous call to attach a Context.
"""
try:
_RUNTIME_CONTEXT.detach(token) # type: ignore
except Exception: # pylint: disable=broad-except
logger.error("Failed to detach context")