forked from open-telemetry/opentelemetry-python-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_load.py
162 lines (143 loc) · 6.12 KB
/
_load.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from functools import cached_property
from logging import getLogger
from os import environ
from opentelemetry.instrumentation.dependencies import (
get_dist_dependency_conflicts,
)
from opentelemetry.instrumentation.distro import BaseDistro, DefaultDistro
from opentelemetry.instrumentation.environment_variables import (
OTEL_PYTHON_CONFIGURATOR,
OTEL_PYTHON_DISABLED_INSTRUMENTATIONS,
OTEL_PYTHON_DISTRO,
)
from opentelemetry.instrumentation.version import __version__
from opentelemetry.util._importlib_metadata import (
EntryPoint,
distributions,
entry_points,
)
_logger = getLogger(__name__)
class _EntryPointDistFinder:
@cached_property
def _mapping(self):
return {
self._key_for(ep): dist
for dist in distributions()
for ep in dist.entry_points
}
def dist_for(self, entry_point: EntryPoint):
dist = getattr(entry_point, "dist", None)
if dist:
return dist
return self._mapping.get(self._key_for(entry_point))
@staticmethod
def _key_for(entry_point: EntryPoint):
return f"{entry_point.group}:{entry_point.name}:{entry_point.value}"
def _load_distro() -> BaseDistro:
distro_name = environ.get(OTEL_PYTHON_DISTRO, None)
for entry_point in entry_points(group="opentelemetry_distro"):
try:
# If no distro is specified, use first to come up.
if distro_name is None or distro_name == entry_point.name:
distro = entry_point.load()()
if not isinstance(distro, BaseDistro):
_logger.debug(
"%s is not an OpenTelemetry Distro. Skipping",
entry_point.name,
)
continue
_logger.debug(
"Distribution %s will be configured", entry_point.name
)
return distro
except Exception as exc: # pylint: disable=broad-except
_logger.exception(
"Distribution %s configuration failed", entry_point.name
)
raise exc
return DefaultDistro()
def _load_instrumentors(distro):
package_to_exclude = environ.get(OTEL_PYTHON_DISABLED_INSTRUMENTATIONS, [])
entry_point_finder = _EntryPointDistFinder()
if isinstance(package_to_exclude, str):
package_to_exclude = package_to_exclude.split(",")
# to handle users entering "requests , flask" or "requests, flask" with spaces
package_to_exclude = [x.strip() for x in package_to_exclude]
for entry_point in entry_points(group="opentelemetry_pre_instrument"):
entry_point.load()()
for entry_point in entry_points(group="opentelemetry_instrumentor"):
if entry_point.name in package_to_exclude:
_logger.debug(
"Instrumentation skipped for library %s", entry_point.name
)
continue
try:
entry_point_dist = entry_point_finder.dist_for(entry_point)
conflict = get_dist_dependency_conflicts(entry_point_dist)
if conflict:
_logger.debug(
"Skipping instrumentation %s: %s",
entry_point.name,
conflict,
)
continue
# tell instrumentation to not run dep checks again as we already did it above
distro.load_instrumentor(entry_point, skip_dep_check=True)
_logger.debug("Instrumented %s", entry_point.name)
except ImportError:
# in scenarios using the kubernetes operator to do autoinstrumentation some
# instrumentors may fail to load (usually requiring binary extensions)
# because the injected autoinstrumentation code does not match the application
# environment regarding python version, libc, etc... In this case it's better
# to skip the single instrumentation rather than failing to load everything
# so treat differently ImportError than the rest of exceptions
_logger.exception(
"Importing of %s failed, skipping it", entry_point.name
)
continue
except Exception as exc: # pylint: disable=broad-except
_logger.exception("Instrumenting of %s failed", entry_point.name)
raise exc
for entry_point in entry_points(group="opentelemetry_post_instrument"):
entry_point.load()()
def _load_configurators():
configurator_name = environ.get(OTEL_PYTHON_CONFIGURATOR, None)
configured = None
for entry_point in entry_points(group="opentelemetry_configurator"):
if configured is not None:
_logger.warning(
"Configuration of %s not loaded, %s already loaded",
entry_point.name,
configured,
)
continue
try:
if (
configurator_name is None
or configurator_name == entry_point.name
):
entry_point.load()().configure(auto_instrumentation_version=__version__) # type: ignore
configured = entry_point.name
else:
_logger.warning(
"Configuration of %s not loaded because %s is set by %s",
entry_point.name,
configurator_name,
OTEL_PYTHON_CONFIGURATOR,
)
except Exception as exc: # pylint: disable=broad-except
_logger.exception("Configuration of %s failed", entry_point.name)
raise exc