Skip to content

Commit e073d4d

Browse files
authored
Add support for OTLP/HTTP log exporter (#2462)
1 parent 6e282d2 commit e073d4d

File tree

4 files changed

+626
-1
lines changed

4 files changed

+626
-1
lines changed

CHANGELOG.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.12.0rc1-0.31b0...HEAD)
99

10+
- `opentelemetry-exporter-otlp-proto-http` Add support for OTLP/HTTP log exporter
11+
([#2462](https://github.com/open-telemetry/opentelemetry-python/pull/2462))
1012
- Fix yield of `None`-valued points
1113
([#2745](https://github.com/open-telemetry/opentelemetry-python/pull/2745))
1214
- Add missing `to_json` methods
@@ -115,7 +117,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
115117
pages that have moved, see
116118
[#2453](https://github.com/open-telemetry/opentelemetry-python/pull/2453), and
117119
[#2498](https://github.com/open-telemetry/opentelemetry-python/pull/2498).
118-
- `opentelemetry-exporter-otlp-grpc` update SDK dependency to ~1.9.
120+
- `opentelemetry-exporter-otlp-proto-grpc` update SDK dependency to ~1.9.
119121
([#2442](https://github.com/open-telemetry/opentelemetry-python/pull/2442))
120122
- bugfix(auto-instrumentation): attach OTLPHandler to root logger
121123
([#2450](https://github.com/open-telemetry/opentelemetry-python/pull/2450))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import gzip
16+
import logging
17+
import zlib
18+
from io import BytesIO
19+
from os import environ
20+
from typing import Dict, Optional, Sequence
21+
from time import sleep
22+
23+
import requests
24+
from backoff import expo
25+
26+
from opentelemetry.sdk.environment_variables import (
27+
OTEL_EXPORTER_OTLP_CERTIFICATE,
28+
OTEL_EXPORTER_OTLP_COMPRESSION,
29+
OTEL_EXPORTER_OTLP_ENDPOINT,
30+
OTEL_EXPORTER_OTLP_HEADERS,
31+
OTEL_EXPORTER_OTLP_TIMEOUT,
32+
)
33+
from opentelemetry.sdk._logs.export import (
34+
LogExporter,
35+
LogExportResult,
36+
LogData,
37+
)
38+
from opentelemetry.exporter.otlp.proto.http import Compression
39+
from opentelemetry.exporter.otlp.proto.http._log_exporter.encoder import (
40+
_ProtobufEncoder,
41+
)
42+
from opentelemetry.util.re import parse_headers
43+
44+
45+
_logger = logging.getLogger(__name__)
46+
47+
48+
DEFAULT_COMPRESSION = Compression.NoCompression
49+
DEFAULT_ENDPOINT = "http://localhost:4318/"
50+
DEFAULT_LOGS_EXPORT_PATH = "v1/logs"
51+
DEFAULT_TIMEOUT = 10 # in seconds
52+
53+
54+
class OTLPLogExporter(LogExporter):
55+
56+
_MAX_RETRY_TIMEOUT = 64
57+
58+
def __init__(
59+
self,
60+
endpoint: Optional[str] = None,
61+
certificate_file: Optional[str] = None,
62+
headers: Optional[Dict[str, str]] = None,
63+
timeout: Optional[int] = None,
64+
compression: Optional[Compression] = None,
65+
):
66+
self._endpoint = endpoint or _append_logs_path(
67+
environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT)
68+
)
69+
self._certificate_file = certificate_file or environ.get(
70+
OTEL_EXPORTER_OTLP_CERTIFICATE, True
71+
)
72+
headers_string = environ.get(OTEL_EXPORTER_OTLP_HEADERS, "")
73+
self._headers = headers or parse_headers(headers_string)
74+
self._timeout = timeout or int(
75+
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT)
76+
)
77+
self._compression = compression or _compression_from_env()
78+
self._session = requests.Session()
79+
self._session.headers.update(self._headers)
80+
self._session.headers.update(
81+
{"Content-Type": _ProtobufEncoder._CONTENT_TYPE}
82+
)
83+
if self._compression is not Compression.NoCompression:
84+
self._session.headers.update(
85+
{"Content-Encoding": self._compression.value}
86+
)
87+
self._shutdown = False
88+
89+
def _export(self, serialized_data: str):
90+
data = serialized_data
91+
if self._compression == Compression.Gzip:
92+
gzip_data = BytesIO()
93+
with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream:
94+
gzip_stream.write(serialized_data)
95+
data = gzip_data.getvalue()
96+
elif self._compression == Compression.Deflate:
97+
data = zlib.compress(bytes(serialized_data))
98+
99+
return self._session.post(
100+
url=self._endpoint,
101+
data=data,
102+
verify=self._certificate_file,
103+
timeout=self._timeout,
104+
)
105+
106+
@staticmethod
107+
def _retryable(resp: requests.Response) -> bool:
108+
if resp.status_code == 408:
109+
return True
110+
if resp.status_code >= 500 and resp.status_code <= 599:
111+
return True
112+
return False
113+
114+
def export(self, batch: Sequence[LogData]) -> LogExportResult:
115+
# After the call to Shutdown subsequent calls to Export are
116+
# not allowed and should return a Failure result.
117+
if self._shutdown:
118+
_logger.warning("Exporter already shutdown, ignoring batch")
119+
return LogExportResult.FAILURE
120+
121+
serialized_data = _ProtobufEncoder.serialize(batch)
122+
123+
for delay in expo(max_value=self._MAX_RETRY_TIMEOUT):
124+
125+
if delay == self._MAX_RETRY_TIMEOUT:
126+
return LogExportResult.FAILURE
127+
128+
resp = self._export(serialized_data)
129+
# pylint: disable=no-else-return
130+
if resp.status_code in (200, 202):
131+
return LogExportResult.SUCCESS
132+
elif self._retryable(resp):
133+
_logger.warning(
134+
"Transient error %s encountered while exporting logs batch, retrying in %ss.",
135+
resp.reason,
136+
delay,
137+
)
138+
sleep(delay)
139+
continue
140+
else:
141+
_logger.error(
142+
"Failed to export logs batch code: %s, reason: %s",
143+
resp.status_code,
144+
resp.text,
145+
)
146+
return LogExportResult.FAILURE
147+
return LogExportResult.FAILURE
148+
149+
def shutdown(self):
150+
if self._shutdown:
151+
_logger.warning("Exporter already shutdown, ignoring call")
152+
return
153+
self._session.close()
154+
self._shutdown = True
155+
156+
157+
def _compression_from_env() -> Compression:
158+
compression = (
159+
environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none").lower().strip()
160+
)
161+
return Compression(compression)
162+
163+
164+
def _append_logs_path(endpoint: str) -> str:
165+
if endpoint.endswith("/"):
166+
return endpoint + DEFAULT_LOGS_EXPORT_PATH
167+
return endpoint + f"/{DEFAULT_LOGS_EXPORT_PATH}"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import Sequence, List
16+
17+
from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import (
18+
ExportLogsServiceRequest,
19+
)
20+
from opentelemetry.proto.logs.v1.logs_pb2 import (
21+
ScopeLogs,
22+
ResourceLogs,
23+
)
24+
from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord
25+
from opentelemetry.exporter.otlp.proto.http.trace_exporter.encoder import (
26+
_encode_instrumentation_scope,
27+
_encode_resource,
28+
_encode_span_id,
29+
_encode_trace_id,
30+
_encode_value,
31+
_encode_attributes,
32+
)
33+
34+
35+
from opentelemetry.sdk._logs.export import LogData
36+
37+
38+
class _ProtobufEncoder:
39+
_CONTENT_TYPE = "application/x-protobuf"
40+
41+
@classmethod
42+
def serialize(cls, batch: Sequence[LogData]) -> str:
43+
return cls.encode(batch).SerializeToString()
44+
45+
@staticmethod
46+
def encode(batch: Sequence[LogData]) -> ExportLogsServiceRequest:
47+
return ExportLogsServiceRequest(
48+
resource_logs=_encode_resource_logs(batch)
49+
)
50+
51+
52+
def _encode_log(log_data: LogData) -> PB2LogRecord:
53+
kwargs = {}
54+
kwargs["time_unix_nano"] = log_data.log_record.timestamp
55+
kwargs["span_id"] = _encode_span_id(log_data.log_record.span_id)
56+
kwargs["trace_id"] = _encode_trace_id(log_data.log_record.trace_id)
57+
kwargs["flags"] = int(log_data.log_record.trace_flags)
58+
kwargs["body"] = _encode_value(log_data.log_record.body)
59+
kwargs["severity_text"] = log_data.log_record.severity_text
60+
kwargs["attributes"] = _encode_attributes(log_data.log_record.attributes)
61+
kwargs["severity_number"] = log_data.log_record.severity_number.value
62+
63+
return PB2LogRecord(**kwargs)
64+
65+
66+
def _encode_resource_logs(batch: Sequence[LogData]) -> List[ResourceLogs]:
67+
68+
sdk_resource_logs = {}
69+
70+
for sdk_log in batch:
71+
sdk_resource = sdk_log.log_record.resource
72+
sdk_instrumentation = sdk_log.instrumentation_scope or None
73+
pb2_log = _encode_log(sdk_log)
74+
75+
if sdk_resource not in sdk_resource_logs.keys():
76+
sdk_resource_logs[sdk_resource] = {sdk_instrumentation: [pb2_log]}
77+
elif sdk_instrumentation not in sdk_resource_logs[sdk_resource].keys():
78+
sdk_resource_logs[sdk_resource][sdk_instrumentation] = [pb2_log]
79+
else:
80+
sdk_resource_logs[sdk_resource][sdk_instrumentation].append(
81+
pb2_log
82+
)
83+
84+
pb2_resource_logs = []
85+
86+
for sdk_resource, sdk_instrumentations in sdk_resource_logs.items():
87+
scope_logs = []
88+
for sdk_instrumentation, pb2_logs in sdk_instrumentations.items():
89+
scope_logs.append(
90+
ScopeLogs(
91+
scope=(_encode_instrumentation_scope(sdk_instrumentation)),
92+
log_records=pb2_logs,
93+
)
94+
)
95+
pb2_resource_logs.append(
96+
ResourceLogs(
97+
resource=_encode_resource(sdk_resource),
98+
scope_logs=scope_logs,
99+
)
100+
)
101+
102+
return pb2_resource_logs

0 commit comments

Comments
 (0)