forked from open-telemetry/opentelemetry-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
146 lines (120 loc) · 4.58 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from logging import getLogger
from re import split
from typing import Iterable, List, Mapping, Optional, Set
from urllib.parse import quote_plus, unquote_plus
from opentelemetry.baggage import _is_valid_pair, get_all, set_baggage
from opentelemetry.context import get_current
from opentelemetry.context.context import Context
from opentelemetry.propagators import textmap
from opentelemetry.util.re import _DELIMITER_PATTERN
_logger = getLogger(__name__)
class W3CBaggagePropagator(textmap.TextMapPropagator):
"""Extracts and injects Baggage which is used to annotate telemetry."""
_MAX_HEADER_LENGTH = 8192
_MAX_PAIR_LENGTH = 4096
_MAX_PAIRS = 180
_BAGGAGE_HEADER_NAME = "baggage"
def extract(
self,
carrier: textmap.CarrierT,
context: Optional[Context] = None,
getter: textmap.Getter[textmap.CarrierT] = textmap.default_getter,
) -> Context:
"""Extract Baggage from the carrier.
See
`opentelemetry.propagators.textmap.TextMapPropagator.extract`
"""
if context is None:
context = get_current()
header = _extract_first_element(
getter.get(carrier, self._BAGGAGE_HEADER_NAME)
)
if not header:
return context
if len(header) > self._MAX_HEADER_LENGTH:
_logger.warning(
"Baggage header `%s` exceeded the maximum number of bytes per baggage-string",
header,
)
return context
baggage_entries: List[str] = split(_DELIMITER_PATTERN, header)
total_baggage_entries = self._MAX_PAIRS
if len(baggage_entries) > self._MAX_PAIRS:
_logger.warning(
"Baggage header `%s` exceeded the maximum number of list-members",
header,
)
for entry in baggage_entries:
if len(entry) > self._MAX_PAIR_LENGTH:
_logger.warning(
"Baggage entry `%s` exceeded the maximum number of bytes per list-member",
entry,
)
continue
if not entry: # empty string
continue
try:
name, value = entry.split("=", 1)
except Exception: # pylint: disable=broad-except
_logger.warning(
"Baggage list-member `%s` doesn't match the format", entry
)
continue
if not _is_valid_pair(name, value):
_logger.warning("Invalid baggage entry: `%s`", entry)
continue
name = unquote_plus(name).strip().lower()
value = unquote_plus(value).strip()
context = set_baggage(
name,
value,
context=context,
)
total_baggage_entries -= 1
if total_baggage_entries == 0:
break
return context
def inject(
self,
carrier: textmap.CarrierT,
context: Optional[Context] = None,
setter: textmap.Setter[textmap.CarrierT] = textmap.default_setter,
) -> None:
"""Injects Baggage into the carrier.
See
`opentelemetry.propagators.textmap.TextMapPropagator.inject`
"""
baggage_entries = get_all(context=context)
if not baggage_entries:
return
baggage_string = _format_baggage(baggage_entries)
setter.set(carrier, self._BAGGAGE_HEADER_NAME, baggage_string)
@property
def fields(self) -> Set[str]:
"""Returns a set with the fields set in `inject`."""
return {self._BAGGAGE_HEADER_NAME}
def _format_baggage(baggage_entries: Mapping[str, object]) -> str:
return ",".join(
quote_plus(str(key)) + "=" + quote_plus(str(value))
for key, value in baggage_entries.items()
)
def _extract_first_element(
items: Optional[Iterable[textmap.CarrierT]],
) -> Optional[textmap.CarrierT]:
if items is None:
return None
return next(iter(items), None)