From 109b79573369b6d5e9e5309fd97aaf102992b277 Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Tue, 7 Feb 2023 17:36:17 +0000 Subject: [PATCH 1/5] Move encoding of logs to opentelemetry-exporter-otlp-proto-common --- .../LICENSE | 201 ++++++++++++++ .../README.rst | 27 ++ .../pyproject.toml | 44 +++ .../exporter/otlp/proto/common/__init__.py | 18 ++ .../otlp/proto/common/_internal/__init__.py | 97 +++++++ .../_internal/_log_encoder}/__init__.py | 64 ++--- .../otlp/proto/common/_log_encoder.py | 20 ++ .../exporter/otlp/proto/common/py.typed | 0 .../exporter/otlp/proto/common/version.py | 15 ++ .../tests/test_log_encoder.py | 253 ++++++++++++++++++ .../pyproject.toml | 1 + .../otlp/proto/grpc/_log_exporter/__init__.py | 93 +------ .../pyproject.toml | 1 + .../otlp/proto/http/_log_exporter/__init__.py | 6 +- .../tests/test_proto_log_exporter.py | 169 +----------- tox.ini | 11 + 16 files changed, 717 insertions(+), 303 deletions(-) create mode 100644 exporter/opentelemetry-exporter-otlp-proto-common/LICENSE create mode 100644 exporter/opentelemetry-exporter-otlp-proto-common/README.rst create mode 100644 exporter/opentelemetry-exporter-otlp-proto-common/pyproject.toml create mode 100644 exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/__init__.py create mode 100644 exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py rename exporter/{opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/encoder => opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/_log_encoder}/__init__.py (58%) create mode 100644 exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_log_encoder.py create mode 100644 exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/py.typed create mode 100644 exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/version.py create mode 100644 exporter/opentelemetry-exporter-otlp-proto-common/tests/test_log_encoder.py diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/LICENSE b/exporter/opentelemetry-exporter-otlp-proto-common/LICENSE new file mode 100644 index 00000000000..1ef7dad2c5c --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright The OpenTelemetry Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/README.rst b/exporter/opentelemetry-exporter-otlp-proto-common/README.rst new file mode 100644 index 00000000000..9756a49bc35 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/README.rst @@ -0,0 +1,27 @@ +OpenTelemetry Protobuf Encoding +=============================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-exporter-otlp-proto-common.svg + :target: https://pypi.org/project/opentelemetry-exporter-otlp-proto-common/ + +This library is provided as a convenience to encode to Protobuf. Currently used by: + +* opentelemetry-exporter-otlp-proto-grpc +* opentelemetry-exporter-otlp-proto-http + + +Installation +------------ + +:: + + pip install opentelemetry-exporter-otlp-proto-common + + +References +---------- + +* `OpenTelemetry `_ +* `OpenTelemetry Protocol Specification `_ diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/pyproject.toml b/exporter/opentelemetry-exporter-otlp-proto-common/pyproject.toml new file mode 100644 index 00000000000..5f872cd0867 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/pyproject.toml @@ -0,0 +1,44 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "opentelemetry-exporter-otlp-proto-common" +dynamic = ["version"] +description = "OpenTelemetry Protobuf encoding" +readme = "README.rst" +license = "Apache-2.0" +requires-python = ">=3.7" +authors = [ + { name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" }, +] +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", +] +dependencies = [ + "opentelemetry-proto == 1.18.0.dev", +] + +[project.urls] +Homepage = "https://github.com/open-telemetry/opentelemetry-python/tree/main/exporter/opentelemetry-exporter-otlp-proto-common" + +[tool.hatch.version] +path = "src/opentelemetry/exporter/otlp/proto/common/version.py" + +[tool.hatch.build.targets.sdist] +include = [ + "/src", + "/tests", +] + +[tool.hatch.build.targets.wheel] +packages = ["src/opentelemetry"] diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/__init__.py new file mode 100644 index 00000000000..2d336aee834 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/__init__.py @@ -0,0 +1,18 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from opentelemetry.exporter.otlp.proto.common.version import __version__ + +__all__ = ["__version__"] diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py new file mode 100644 index 00000000000..e0de8d3d264 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py @@ -0,0 +1,97 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +from collections.abc import Sequence +from typing import Any, Optional, List + +from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.proto.common.v1.common_pb2 import ( + InstrumentationScope as PB2InstrumentationScope, +) +from opentelemetry.proto.resource.v1.resource_pb2 import ( + Resource as PB2Resource, +) +from opentelemetry.proto.common.v1.common_pb2 import AnyValue as PB2AnyValue +from opentelemetry.proto.common.v1.common_pb2 import KeyValue as PB2KeyValue +from opentelemetry.proto.common.v1.common_pb2 import ( + ArrayValue as PB2ArrayValue, +) +from opentelemetry.sdk.trace import Resource +from opentelemetry.util.types import Attributes + + +_logger = logging.getLogger(__name__) + + +def _encode_instrumentation_scope( + instrumentation_scope: InstrumentationScope, +) -> PB2InstrumentationScope: + if instrumentation_scope is None: + return PB2InstrumentationScope() + return PB2InstrumentationScope( + name=instrumentation_scope.name, + version=instrumentation_scope.version, + ) + + +def _encode_resource(resource: Resource) -> PB2Resource: + return PB2Resource(attributes=_encode_attributes(resource.attributes)) + + +def _encode_value(value: Any) -> PB2AnyValue: + if isinstance(value, bool): + return PB2AnyValue(bool_value=value) + if isinstance(value, str): + return PB2AnyValue(string_value=value) + if isinstance(value, int): + return PB2AnyValue(int_value=value) + if isinstance(value, float): + return PB2AnyValue(double_value=value) + if isinstance(value, Sequence): + return PB2AnyValue( + array_value=PB2ArrayValue(values=[_encode_value(v) for v in value]) + ) + # tracing specs currently does not support Mapping type attributes. + # elif isinstance(value, abc.Mapping): + # pass + raise Exception(f"Invalid type {type(value)} of value {value}") + + +def _encode_key_value(key: str, value: Any) -> PB2KeyValue: + return PB2KeyValue(key=key, value=_encode_value(value)) + + +def _encode_span_id(span_id: int) -> bytes: + return span_id.to_bytes(length=8, byteorder="big", signed=False) + + +def _encode_trace_id(trace_id: int) -> bytes: + return trace_id.to_bytes(length=16, byteorder="big", signed=False) + + +def _encode_attributes( + attributes: Attributes, +) -> Optional[List[PB2KeyValue]]: + if attributes: + pb2_attributes = [] + for key, value in attributes.items(): + try: + pb2_attributes.append(_encode_key_value(key, value)) + except Exception as error: # pylint: disable=broad-except + _logger.exception(error) + else: + pb2_attributes = None + return pb2_attributes diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/encoder/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/_log_encoder/__init__.py similarity index 58% rename from exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/encoder/__init__.py rename to exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/_log_encoder/__init__.py index 866f8878bba..7c135d90baf 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/encoder/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/_log_encoder/__init__.py @@ -11,9 +11,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +from collections import defaultdict from typing import Sequence, List +from opentelemetry.exporter.otlp.proto.common._internal import ( + _encode_instrumentation_scope, + _encode_resource, + _encode_span_id, + _encode_trace_id, + _encode_value, + _encode_attributes, +) from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( ExportLogsServiceRequest, ) @@ -22,62 +30,36 @@ ResourceLogs, ) from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord -from opentelemetry.exporter.otlp.proto.http.trace_exporter.encoder import ( - _encode_instrumentation_scope, - _encode_resource, - _encode_span_id, - _encode_trace_id, - _encode_value, - _encode_attributes, -) - from opentelemetry.sdk._logs import LogData -class _ProtobufEncoder: - @classmethod - def serialize(cls, batch: Sequence[LogData]) -> str: - return cls.encode(batch).SerializeToString() - - @staticmethod - def encode(batch: Sequence[LogData]) -> ExportLogsServiceRequest: - return ExportLogsServiceRequest( - resource_logs=_encode_resource_logs(batch) - ) +def encode_logs(batch: Sequence[LogData]) -> ExportLogsServiceRequest: + return ExportLogsServiceRequest(resource_logs=_encode_resource_logs(batch)) def _encode_log(log_data: LogData) -> PB2LogRecord: - kwargs = {} - kwargs["time_unix_nano"] = log_data.log_record.timestamp - kwargs["span_id"] = _encode_span_id(log_data.log_record.span_id) - kwargs["trace_id"] = _encode_trace_id(log_data.log_record.trace_id) - kwargs["flags"] = int(log_data.log_record.trace_flags) - kwargs["body"] = _encode_value(log_data.log_record.body) - kwargs["severity_text"] = log_data.log_record.severity_text - kwargs["attributes"] = _encode_attributes(log_data.log_record.attributes) - kwargs["severity_number"] = log_data.log_record.severity_number.value - - return PB2LogRecord(**kwargs) + return PB2LogRecord( + time_unix_nano=log_data.log_record.timestamp, + span_id=_encode_span_id(log_data.log_record.span_id), + trace_id=_encode_trace_id(log_data.log_record.trace_id), + flags=int(log_data.log_record.trace_flags), + body=_encode_value(log_data.log_record.body), + severity_text=log_data.log_record.severity_text, + attributes=_encode_attributes(log_data.log_record.attributes), + severity_number=log_data.log_record.severity_number.value, + ) def _encode_resource_logs(batch: Sequence[LogData]) -> List[ResourceLogs]: - - sdk_resource_logs = {} + sdk_resource_logs = defaultdict(lambda: defaultdict(list)) for sdk_log in batch: sdk_resource = sdk_log.log_record.resource sdk_instrumentation = sdk_log.instrumentation_scope or None pb2_log = _encode_log(sdk_log) - if sdk_resource not in sdk_resource_logs.keys(): - sdk_resource_logs[sdk_resource] = {sdk_instrumentation: [pb2_log]} - elif sdk_instrumentation not in sdk_resource_logs[sdk_resource].keys(): - sdk_resource_logs[sdk_resource][sdk_instrumentation] = [pb2_log] - else: - sdk_resource_logs[sdk_resource][sdk_instrumentation].append( - pb2_log - ) + sdk_resource_logs[sdk_resource][sdk_instrumentation].append(pb2_log) pb2_resource_logs = [] diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_log_encoder.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_log_encoder.py new file mode 100644 index 00000000000..f34ff8223c6 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_log_encoder.py @@ -0,0 +1,20 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from opentelemetry.exporter.otlp.proto.common._internal._log_encoder import ( + encode_logs, +) + +__all__ = ["encode_logs"] diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/py.typed b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/py.typed new file mode 100644 index 00000000000..e69de29bb2d diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/version.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/version.py new file mode 100644 index 00000000000..80d12781d3d --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "1.18.0.dev" diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_log_encoder.py b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_log_encoder.py new file mode 100644 index 00000000000..1cd86b2833e --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_log_encoder.py @@ -0,0 +1,253 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from typing import List, Tuple + +from opentelemetry._logs import SeverityNumber +from opentelemetry.exporter.otlp.proto.common._internal import ( + _encode_attributes, + _encode_span_id, + _encode_trace_id, + _encode_value, +) +from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs +from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( + ExportLogsServiceRequest, +) +from opentelemetry.proto.common.v1.common_pb2 import AnyValue as PB2AnyValue +from opentelemetry.proto.common.v1.common_pb2 import ( + InstrumentationScope as PB2InstrumentationScope, +) +from opentelemetry.proto.common.v1.common_pb2 import KeyValue as PB2KeyValue +from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord +from opentelemetry.proto.logs.v1.logs_pb2 import ( + ResourceLogs as PB2ResourceLogs, +) +from opentelemetry.proto.logs.v1.logs_pb2 import ScopeLogs as PB2ScopeLogs +from opentelemetry.proto.resource.v1.resource_pb2 import ( + Resource as PB2Resource, +) +from opentelemetry.sdk._logs import LogData +from opentelemetry.sdk._logs import LogRecord as SDKLogRecord +from opentelemetry.sdk.resources import Resource as SDKResource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.trace import TraceFlags + + +class TestOTLPLogEncoder(unittest.TestCase): + def test_encode(self): + sdk_logs, expected_encoding = self.get_test_logs() + self.assertEqual(encode_logs(sdk_logs), expected_encoding) + + @staticmethod + def _get_sdk_log_data() -> List[LogData]: + log1 = LogData( + log_record=SDKLogRecord( + timestamp=1644650195189786880, + trace_id=89564621134313219400156819398935297684, + span_id=1312458408527513268, + trace_flags=TraceFlags(0x01), + severity_text="WARN", + severity_number=SeverityNumber.WARN, + body="Do not go gentle into that good night. Rage, rage against the dying of the light", + resource=SDKResource({"first_resource": "value"}), + attributes={"a": 1, "b": "c"}, + ), + instrumentation_scope=InstrumentationScope( + "first_name", "first_version" + ), + ) + + log2 = LogData( + log_record=SDKLogRecord( + timestamp=1644650249738562048, + trace_id=0, + span_id=0, + trace_flags=TraceFlags.DEFAULT, + severity_text="WARN", + severity_number=SeverityNumber.WARN, + body="Cooper, this is no time for caution!", + resource=SDKResource({"second_resource": "CASE"}), + attributes={}, + ), + instrumentation_scope=InstrumentationScope( + "second_name", "second_version" + ), + ) + + log3 = LogData( + log_record=SDKLogRecord( + timestamp=1644650427658989056, + trace_id=271615924622795969659406376515024083555, + span_id=4242561578944770265, + trace_flags=TraceFlags(0x01), + severity_text="DEBUG", + severity_number=SeverityNumber.DEBUG, + body="To our galaxy", + resource=SDKResource({"second_resource": "CASE"}), + attributes={"a": 1, "b": "c"}, + ), + instrumentation_scope=None, + ) + + log4 = LogData( + log_record=SDKLogRecord( + timestamp=1644650584292683008, + trace_id=212592107417388365804938480559624925555, + span_id=6077757853989569223, + trace_flags=TraceFlags(0x01), + severity_text="INFO", + severity_number=SeverityNumber.INFO, + body="Love is the one thing that transcends time and space", + resource=SDKResource({"first_resource": "value"}), + attributes={"filename": "model.py", "func_name": "run_method"}, + ), + instrumentation_scope=InstrumentationScope( + "another_name", "another_version" + ), + ) + + return [log1, log2, log3, log4] + + def get_test_logs( + self, + ) -> Tuple[List[SDKLogRecord], ExportLogsServiceRequest]: + sdk_logs = self._get_sdk_log_data() + + pb2_service_request = ExportLogsServiceRequest( + resource_logs=[ + PB2ResourceLogs( + resource=PB2Resource( + attributes=[ + PB2KeyValue( + key="first_resource", + value=PB2AnyValue(string_value="value"), + ) + ] + ), + scope_logs=[ + PB2ScopeLogs( + scope=PB2InstrumentationScope( + name="first_name", version="first_version" + ), + log_records=[ + PB2LogRecord( + time_unix_nano=1644650195189786880, + trace_id=_encode_trace_id( + 89564621134313219400156819398935297684 + ), + span_id=_encode_span_id( + 1312458408527513268 + ), + flags=int(TraceFlags(0x01)), + severity_text="WARN", + severity_number=SeverityNumber.WARN.value, + body=_encode_value( + "Do not go gentle into that good night. Rage, rage against the dying of the light" + ), + attributes=_encode_attributes( + {"a": 1, "b": "c"} + ), + ) + ], + ), + PB2ScopeLogs( + scope=PB2InstrumentationScope( + name="another_name", + version="another_version", + ), + log_records=[ + PB2LogRecord( + time_unix_nano=1644650584292683008, + trace_id=_encode_trace_id( + 212592107417388365804938480559624925555 + ), + span_id=_encode_span_id( + 6077757853989569223 + ), + flags=int(TraceFlags(0x01)), + severity_text="INFO", + severity_number=SeverityNumber.INFO.value, + body=_encode_value( + "Love is the one thing that transcends time and space" + ), + attributes=_encode_attributes( + { + "filename": "model.py", + "func_name": "run_method", + } + ), + ) + ], + ), + ], + ), + PB2ResourceLogs( + resource=PB2Resource( + attributes=[ + PB2KeyValue( + key="second_resource", + value=PB2AnyValue(string_value="CASE"), + ) + ] + ), + scope_logs=[ + PB2ScopeLogs( + scope=PB2InstrumentationScope( + name="second_name", + version="second_version", + ), + log_records=[ + PB2LogRecord( + time_unix_nano=1644650249738562048, + trace_id=_encode_trace_id(0), + span_id=_encode_span_id(0), + flags=int(TraceFlags.DEFAULT), + severity_text="WARN", + severity_number=SeverityNumber.WARN.value, + body=_encode_value( + "Cooper, this is no time for caution!" + ), + attributes={}, + ), + ], + ), + PB2ScopeLogs( + scope=PB2InstrumentationScope(), + log_records=[ + PB2LogRecord( + time_unix_nano=1644650427658989056, + trace_id=_encode_trace_id( + 271615924622795969659406376515024083555 + ), + span_id=_encode_span_id( + 4242561578944770265 + ), + flags=int(TraceFlags(0x01)), + severity_text="DEBUG", + severity_number=SeverityNumber.DEBUG.value, + body=_encode_value("To our galaxy"), + attributes=_encode_attributes( + {"a": 1, "b": "c"} + ), + ), + ], + ), + ], + ), + ] + ) + + return sdk_logs, pb2_service_request diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/pyproject.toml b/exporter/opentelemetry-exporter-otlp-proto-grpc/pyproject.toml index 0850727b025..31914a53e03 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/pyproject.toml +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "opentelemetry-api ~= 1.15", "opentelemetry-proto == 1.18.0.dev", "opentelemetry-sdk ~= 1.18.0.dev", + "opentelemetry-exporter-otlp-proto-common == 1.18.0.dev", ] [project.optional-dependencies] diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py index 887f9d8b2c2..ef1b77de27a 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py @@ -15,11 +15,11 @@ from typing import Dict, Optional, Tuple, Union, Sequence from typing import Sequence as TypingSequence from grpc import ChannelCredentials, Compression + +from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.exporter.otlp.proto.grpc.exporter import ( OTLPExporterMixin, - get_resource_data, _get_credentials, - _translate_value, environ_to_compression, ) from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( @@ -28,12 +28,6 @@ from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import ( LogsServiceStub, ) -from opentelemetry.proto.common.v1.common_pb2 import InstrumentationScope -from opentelemetry.proto.logs.v1.logs_pb2 import ( - ScopeLogs, - ResourceLogs, -) -from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord from opentelemetry.sdk._logs import LogRecord as SDKLogRecord from opentelemetry.sdk._logs import LogData from opentelemetry.sdk._logs.export import LogExporter, LogExportResult @@ -105,91 +99,10 @@ def __init__( } ) - def _translate_time(self, log_data: LogData) -> None: - self._collector_kwargs[ - "time_unix_nano" - ] = log_data.log_record.timestamp - - def _translate_span_id(self, log_data: LogData) -> None: - self._collector_kwargs[ - "span_id" - ] = log_data.log_record.span_id.to_bytes(8, "big") - - def _translate_trace_id(self, log_data: LogData) -> None: - self._collector_kwargs[ - "trace_id" - ] = log_data.log_record.trace_id.to_bytes(16, "big") - - def _translate_trace_flags(self, log_data: LogData) -> None: - self._collector_kwargs["flags"] = int(log_data.log_record.trace_flags) - - def _translate_body(self, log_data: LogData): - self._collector_kwargs["body"] = _translate_value( - log_data.log_record.body - ) - - def _translate_severity_text(self, log_data: LogData): - self._collector_kwargs[ - "severity_text" - ] = log_data.log_record.severity_text - def _translate_data( self, data: Sequence[LogData] ) -> ExportLogsServiceRequest: - # pylint: disable=attribute-defined-outside-init - - sdk_resource_scope_logs = {} - - for log_data in data: - resource = log_data.log_record.resource - - scope_logs_map = sdk_resource_scope_logs.get(resource, {}) - if not scope_logs_map: - sdk_resource_scope_logs[resource] = scope_logs_map - - scope_logs = scope_logs_map.get(log_data.instrumentation_scope) - if not scope_logs: - if log_data.instrumentation_scope is not None: - scope_logs_map[log_data.instrumentation_scope] = ScopeLogs( - scope=InstrumentationScope( - name=log_data.instrumentation_scope.name, - version=log_data.instrumentation_scope.version, - ) - ) - else: - scope_logs_map[ - log_data.instrumentation_scope - ] = ScopeLogs() - - scope_logs = scope_logs_map.get(log_data.instrumentation_scope) - - self._collector_kwargs = {} - - self._translate_time(log_data) - self._translate_span_id(log_data) - self._translate_trace_id(log_data) - self._translate_trace_flags(log_data) - self._translate_body(log_data) - self._translate_severity_text(log_data) - self._collector_kwargs["attributes"] = self._translate_attributes( - log_data.log_record.attributes - ) - - self._collector_kwargs[ - "severity_number" - ] = log_data.log_record.severity_number.value - - scope_logs.log_records.append( - PB2LogRecord(**self._collector_kwargs) - ) - - return ExportLogsServiceRequest( - resource_logs=get_resource_data( - sdk_resource_scope_logs, - ResourceLogs, - "logs", - ) - ) + return encode_logs(data) def export(self, batch: Sequence[LogData]) -> LogExportResult: return self._export(batch) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml b/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml index 55dcd3f5514..47aafb6ee5a 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml +++ b/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ "opentelemetry-api ~= 1.15", "opentelemetry-proto == 1.18.0.dev", "opentelemetry-sdk ~= 1.18.0.dev", + "opentelemetry-exporter-otlp-proto-common == 1.18.0.dev", "requests ~= 2.7", ] diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index dfd70180e06..cbd6471246b 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -23,6 +23,7 @@ import backoff import requests +from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_COMPRESSION, @@ -44,9 +45,6 @@ _OTLP_HTTP_HEADERS, Compression, ) -from opentelemetry.exporter.otlp.proto.http._log_exporter.encoder import ( - _ProtobufEncoder, -) from opentelemetry.util.re import parse_env_headers @@ -147,7 +145,7 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult: _logger.warning("Exporter already shutdown, ignoring batch") return LogExportResult.FAILURE - serialized_data = _ProtobufEncoder.serialize(batch) + serialized_data = encode_logs(batch).SerializeToString() for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT): diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index 05bdfb9af30..5cf20b881b1 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -15,7 +15,7 @@ # pylint: disable=protected-access import unittest -from typing import List, Tuple +from typing import List from unittest.mock import MagicMock, patch import requests @@ -31,30 +31,7 @@ OTLPLogExporter, _is_backoff_v2, ) -from opentelemetry.exporter.otlp.proto.http._log_exporter.encoder import ( - _encode_attributes, - _encode_span_id, - _encode_trace_id, - _encode_value, - _ProtobufEncoder, -) from opentelemetry.exporter.otlp.proto.http.version import __version__ -from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( - ExportLogsServiceRequest, -) -from opentelemetry.proto.common.v1.common_pb2 import AnyValue as PB2AnyValue -from opentelemetry.proto.common.v1.common_pb2 import ( - InstrumentationScope as PB2InstrumentationScope, -) -from opentelemetry.proto.common.v1.common_pb2 import KeyValue as PB2KeyValue -from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord -from opentelemetry.proto.logs.v1.logs_pb2 import ( - ResourceLogs as PB2ResourceLogs, -) -from opentelemetry.proto.logs.v1.logs_pb2 import ScopeLogs as PB2ScopeLogs -from opentelemetry.proto.resource.v1.resource_pb2 import ( - Resource as PB2Resource, -) from opentelemetry.sdk._logs import LogData from opentelemetry.sdk._logs import LogRecord as SDKLogRecord from opentelemetry.sdk.environment_variables import ( @@ -190,19 +167,6 @@ def test_exporter_env(self): ) self.assertIsInstance(exporter._session, requests.Session) - def test_encode(self): - sdk_logs, expected_encoding = self.get_test_logs() - self.assertEqual( - _ProtobufEncoder().encode(sdk_logs), expected_encoding - ) - - def test_serialize(self): - sdk_logs, expected_encoding = self.get_test_logs() - self.assertEqual( - _ProtobufEncoder().serialize(sdk_logs), - expected_encoding.SerializeToString(), - ) - @responses.activate @patch("opentelemetry.exporter.otlp.proto.http._log_exporter.backoff") @patch("opentelemetry.exporter.otlp.proto.http._log_exporter.sleep") @@ -298,134 +262,3 @@ def _get_sdk_log_data() -> List[LogData]: ) return [log1, log2, log3, log4] - - def get_test_logs( - self, - ) -> Tuple[List[SDKLogRecord], ExportLogsServiceRequest]: - sdk_logs = self._get_sdk_log_data() - - pb2_service_request = ExportLogsServiceRequest( - resource_logs=[ - PB2ResourceLogs( - resource=PB2Resource( - attributes=[ - PB2KeyValue( - key="first_resource", - value=PB2AnyValue(string_value="value"), - ) - ] - ), - scope_logs=[ - PB2ScopeLogs( - scope=PB2InstrumentationScope( - name="first_name", version="first_version" - ), - log_records=[ - PB2LogRecord( - time_unix_nano=1644650195189786880, - trace_id=_encode_trace_id( - 89564621134313219400156819398935297684 - ), - span_id=_encode_span_id( - 1312458408527513268 - ), - flags=int(TraceFlags(0x01)), - severity_text="WARN", - severity_number=SeverityNumber.WARN.value, - body=_encode_value( - "Do not go gentle into that good night. Rage, rage against the dying of the light" - ), - attributes=_encode_attributes( - {"a": 1, "b": "c"} - ), - ) - ], - ), - PB2ScopeLogs( - scope=PB2InstrumentationScope( - name="another_name", - version="another_version", - ), - log_records=[ - PB2LogRecord( - time_unix_nano=1644650584292683008, - trace_id=_encode_trace_id( - 212592107417388365804938480559624925555 - ), - span_id=_encode_span_id( - 6077757853989569223 - ), - flags=int(TraceFlags(0x01)), - severity_text="INFO", - severity_number=SeverityNumber.INFO.value, - body=_encode_value( - "Love is the one thing that transcends time and space" - ), - attributes=_encode_attributes( - { - "filename": "model.py", - "func_name": "run_method", - } - ), - ) - ], - ), - ], - ), - PB2ResourceLogs( - resource=PB2Resource( - attributes=[ - PB2KeyValue( - key="second_resource", - value=PB2AnyValue(string_value="CASE"), - ) - ] - ), - scope_logs=[ - PB2ScopeLogs( - scope=PB2InstrumentationScope( - name="second_name", - version="second_version", - ), - log_records=[ - PB2LogRecord( - time_unix_nano=1644650249738562048, - trace_id=_encode_trace_id(0), - span_id=_encode_span_id(0), - flags=int(TraceFlags.DEFAULT), - severity_text="WARN", - severity_number=SeverityNumber.WARN.value, - body=_encode_value( - "Cooper, this is no time for caution!" - ), - attributes={}, - ), - ], - ), - PB2ScopeLogs( - scope=PB2InstrumentationScope(), - log_records=[ - PB2LogRecord( - time_unix_nano=1644650427658989056, - trace_id=_encode_trace_id( - 271615924622795969659406376515024083555 - ), - span_id=_encode_span_id( - 4242561578944770265 - ), - flags=int(TraceFlags(0x01)), - severity_text="DEBUG", - severity_number=SeverityNumber.DEBUG.value, - body=_encode_value("To our galaxy"), - attributes=_encode_attributes( - {"a": 1, "b": "c"} - ), - ), - ], - ), - ], - ), - ] - ) - - return sdk_logs, pb2_service_request diff --git a/tox.ini b/tox.ini index 843a0680ed9..2f451ba796f 100644 --- a/tox.ini +++ b/tox.ini @@ -37,6 +37,8 @@ envlist = py3{7,8,9,10,11}-opentelemetry-exporter-opencensus ; exporter-opencensus intentionally excluded from pypy3 + py3{7,8,9,10,11}-proto{3,4}-opentelemetry-exporter-otlp-proto-common + ; opentelemetry-exporter-otlp py3{7,8,9,10,11}-opentelemetry-exporter-otlp-combined ; intentionally excluded from pypy3 @@ -109,6 +111,7 @@ changedir = exporter-jaeger-proto-grpc: exporter/opentelemetry-exporter-jaeger-proto-grpc/tests exporter-jaeger-thrift: exporter/opentelemetry-exporter-jaeger-thrift/tests exporter-opencensus: exporter/opentelemetry-exporter-opencensus/tests + exporter-otlp-proto-common: exporter/opentelemetry-exporter-otlp-proto-common/tests exporter-otlp-combined: exporter/opentelemetry-exporter-otlp/tests exporter-otlp-proto-grpc: exporter/opentelemetry-exporter-otlp-proto-grpc/tests exporter-otlp-proto-http: exporter/opentelemetry-exporter-otlp-proto-http/tests @@ -138,15 +141,21 @@ commands_pre = opencensus: pip install {toxinidir}/exporter/opentelemetry-exporter-opencensus + exporter-otlp-proto-common: pip install {toxinidir}/opentelemetry-proto + exporter-otlp-proto-common: pip install {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-common + exporter-otlp-combined: pip install {toxinidir}/opentelemetry-proto + exporter-otlp-combined: pip install {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-common exporter-otlp-combined: pip install {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-grpc exporter-otlp-combined: pip install {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-http exporter-otlp-combined: pip install {toxinidir}/exporter/opentelemetry-exporter-otlp exporter-otlp-proto-grpc: pip install {toxinidir}/opentelemetry-proto + exporter-otlp-proto-grpc: pip install {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-common exporter-otlp-proto-grpc: pip install {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-grpc exporter-otlp-proto-http: pip install {toxinidir}/opentelemetry-proto + exporter-otlp-proto-http: pip install {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-common exporter-otlp-proto-http: pip install {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-http[test] exporter-jaeger-combined: pip install {toxinidir}/exporter/opentelemetry-exporter-jaeger-proto-grpc {toxinidir}/exporter/opentelemetry-exporter-jaeger-thrift {toxinidir}/exporter/opentelemetry-exporter-jaeger @@ -232,6 +241,7 @@ commands_pre = python -m pip install -e {toxinidir}/exporter/opentelemetry-exporter-jaeger-thrift[test] python -m pip install -e {toxinidir}/exporter/opentelemetry-exporter-jaeger[test] python -m pip install -e {toxinidir}/exporter/opentelemetry-exporter-opencensus[test] + python -m pip install -e {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-common[test] python -m pip install -e {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-grpc[test] python -m pip install -e {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-http[test] python -m pip install -e {toxinidir}/exporter/opentelemetry-exporter-otlp[test] @@ -307,6 +317,7 @@ commands_pre = ; opencensus exporter does not work with protobuf 4 proto3: -e {toxinidir}/exporter/opentelemetry-exporter-opencensus \ -e {toxinidir}/opentelemetry-proto \ + -e {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-common \ -e {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-grpc \ -e {toxinidir}/exporter/opentelemetry-exporter-otlp-proto-http \ -e {toxinidir}/exporter/opentelemetry-exporter-otlp From 1a990602869e43adc363dae7a4e442c607b24f38 Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Mon, 13 Mar 2023 23:46:20 +0000 Subject: [PATCH 2/5] Move encoding of traces to opentelemetry-exporter-otlp-proto-common --- .../_internal/trace_encoder/__init__.py | 181 +++++++++++ .../otlp/proto/common/trace_encoder.py | 20 ++ .../tests/test_trace_encoder.py} | 25 +- .../proto/grpc/trace_exporter/__init__.py | 178 +---------- .../proto/http/trace_exporter/__init__.py | 8 +- .../http/trace_exporter/encoder/__init__.py | 296 +++--------------- 6 files changed, 261 insertions(+), 447 deletions(-) create mode 100644 exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/trace_encoder/__init__.py create mode 100644 exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/trace_encoder.py rename exporter/{opentelemetry-exporter-otlp-proto-http/tests/test_protobuf_encoder.py => opentelemetry-exporter-otlp-proto-common/tests/test_trace_encoder.py} (96%) diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/trace_encoder/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/trace_encoder/__init__.py new file mode 100644 index 00000000000..46cf628dd13 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/trace_encoder/__init__.py @@ -0,0 +1,181 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from collections import defaultdict +from typing import List, Optional, Sequence + +from opentelemetry.exporter.otlp.proto.common._internal import ( + _encode_trace_id, + _encode_span_id, + _encode_instrumentation_scope, + _encode_attributes, + _encode_resource, +) +from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( + ExportTraceServiceRequest as PB2ExportTraceServiceRequest, +) +from opentelemetry.proto.trace.v1.trace_pb2 import ( + ScopeSpans as PB2ScopeSpans, +) +from opentelemetry.proto.trace.v1.trace_pb2 import ( + ResourceSpans as PB2ResourceSpans, +) +from opentelemetry.proto.trace.v1.trace_pb2 import Span as PB2SPan +from opentelemetry.proto.trace.v1.trace_pb2 import Status as PB2Status +from opentelemetry.sdk.trace import Event, ReadableSpan +from opentelemetry.trace import Link +from opentelemetry.trace import SpanKind +from opentelemetry.trace.span import SpanContext, TraceState, Status + +# pylint: disable=E1101 +_SPAN_KIND_MAP = { + SpanKind.INTERNAL: PB2SPan.SpanKind.SPAN_KIND_INTERNAL, + SpanKind.SERVER: PB2SPan.SpanKind.SPAN_KIND_SERVER, + SpanKind.CLIENT: PB2SPan.SpanKind.SPAN_KIND_CLIENT, + SpanKind.PRODUCER: PB2SPan.SpanKind.SPAN_KIND_PRODUCER, + SpanKind.CONSUMER: PB2SPan.SpanKind.SPAN_KIND_CONSUMER, +} + +_logger = logging.getLogger(__name__) + + +def encode_spans( + sdk_spans: Sequence[ReadableSpan], +) -> PB2ExportTraceServiceRequest: + return PB2ExportTraceServiceRequest( + resource_spans=_encode_resource_spans(sdk_spans) + ) + + +def _encode_resource_spans( + sdk_spans: Sequence[ReadableSpan], +) -> List[PB2ResourceSpans]: + # We need to inspect the spans and group + structure them as: + # + # Resource + # Instrumentation Library + # Spans + # + # First loop organizes the SDK spans in this structure. Protobuf messages + # are not hashable so we stick with SDK data in this phase. + # + # Second loop encodes the data into Protobuf format. + # + sdk_resource_spans = defaultdict(lambda: defaultdict(list)) + + for sdk_span in sdk_spans: + sdk_resource = sdk_span.resource + sdk_instrumentation = sdk_span.instrumentation_scope or None + pb2_span = _encode_span(sdk_span) + + sdk_resource_spans[sdk_resource][sdk_instrumentation].append(pb2_span) + + pb2_resource_spans = [] + + for sdk_resource, sdk_instrumentations in sdk_resource_spans.items(): + scope_spans = [] + for sdk_instrumentation, pb2_spans in sdk_instrumentations.items(): + scope_spans.append( + PB2ScopeSpans( + scope=(_encode_instrumentation_scope(sdk_instrumentation)), + spans=pb2_spans, + ) + ) + pb2_resource_spans.append( + PB2ResourceSpans( + resource=_encode_resource(sdk_resource), + scope_spans=scope_spans, + ) + ) + + return pb2_resource_spans + + +def _encode_span(sdk_span: ReadableSpan) -> PB2SPan: + span_context = sdk_span.get_span_context() + return PB2SPan( + trace_id=_encode_trace_id(span_context.trace_id), + span_id=_encode_span_id(span_context.span_id), + trace_state=_encode_trace_state(span_context.trace_state), + parent_span_id=_encode_parent_id(sdk_span.parent), + name=sdk_span.name, + kind=_SPAN_KIND_MAP[sdk_span.kind], + start_time_unix_nano=sdk_span.start_time, + end_time_unix_nano=sdk_span.end_time, + attributes=_encode_attributes(sdk_span.attributes), + events=_encode_events(sdk_span.events), + links=_encode_links(sdk_span.links), + status=_encode_status(sdk_span.status), + dropped_attributes_count=sdk_span.dropped_attributes, + dropped_events_count=sdk_span.dropped_events, + dropped_links_count=sdk_span.dropped_links, + ) + + +def _encode_events( + events: Sequence[Event], +) -> Optional[List[PB2SPan.Event]]: + pb2_events = None + if events: + pb2_events = [] + for event in events: + encoded_event = PB2SPan.Event( + name=event.name, + time_unix_nano=event.timestamp, + attributes=_encode_attributes(event.attributes), + dropped_attributes_count=event.attributes.dropped, + ) + pb2_events.append(encoded_event) + return pb2_events + + +def _encode_links(links: Sequence[Link]) -> Sequence[PB2SPan.Link]: + pb2_links = None + if links: + pb2_links = [] + for link in links: + encoded_link = PB2SPan.Link( + trace_id=_encode_trace_id(link.context.trace_id), + span_id=_encode_span_id(link.context.span_id), + attributes=_encode_attributes(link.attributes), + dropped_attributes_count=link.attributes.dropped, + ) + pb2_links.append(encoded_link) + return pb2_links + + +def _encode_status(status: Status) -> Optional[PB2Status]: + pb2_status = None + if status is not None: + pb2_status = PB2Status( + code=status.status_code.value, + message=status.description, + ) + return pb2_status + + +def _encode_trace_state(trace_state: TraceState) -> Optional[str]: + pb2_trace_state = None + if trace_state is not None: + pb2_trace_state = ",".join( + [f"{key}={value}" for key, value in (trace_state.items())] + ) + return pb2_trace_state + + +def _encode_parent_id(context: Optional[SpanContext]) -> Optional[bytes]: + if context: + return _encode_span_id(context.span_id) + return None diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/trace_encoder.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/trace_encoder.py new file mode 100644 index 00000000000..2af57652000 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/trace_encoder.py @@ -0,0 +1,20 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from opentelemetry.exporter.otlp.proto.common._internal.trace_encoder import ( + encode_spans, +) + +__all__ = ["encode_spans"] diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_protobuf_encoder.py b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_trace_encoder.py similarity index 96% rename from exporter/opentelemetry-exporter-otlp-proto-http/tests/test_protobuf_encoder.py rename to exporter/opentelemetry-exporter-otlp-proto-common/tests/test_trace_encoder.py index 7145ddbfa97..c0a05483f10 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_protobuf_encoder.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_trace_encoder.py @@ -17,13 +17,15 @@ import unittest from typing import List, Tuple -from opentelemetry.exporter.otlp.proto.http.trace_exporter.encoder import ( - _SPAN_KIND_MAP, +from opentelemetry.exporter.otlp.proto.common._internal import ( _encode_span_id, - _encode_status, _encode_trace_id, - _ProtobufEncoder, ) +from opentelemetry.exporter.otlp.proto.common._internal.trace_encoder import ( + _SPAN_KIND_MAP, + _encode_status, +) +from opentelemetry.exporter.otlp.proto.common.trace_encoder import encode_spans from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( ExportTraceServiceRequest as PB2ExportTraceServiceRequest, ) @@ -55,19 +57,10 @@ from opentelemetry.trace.status import StatusCode as SDKStatusCode -class TestProtobufEncoder(unittest.TestCase): - def test_encode(self): +class TestOTLPTraceEncoder(unittest.TestCase): + def test_encode_spans(self): otel_spans, expected_encoding = self.get_exhaustive_test_spans() - self.assertEqual( - _ProtobufEncoder().encode(otel_spans), expected_encoding - ) - - def test_serialize(self): - otel_spans, expected_encoding = self.get_exhaustive_test_spans() - self.assertEqual( - _ProtobufEncoder().serialize(otel_spans), - expected_encoding.SerializeToString(), - ) + self.assertEqual(encode_spans(otel_spans), expected_encoding) @staticmethod def get_exhaustive_otel_span_list() -> List[SDKSpan]: diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py index cfabe3ffce2..72bd0368850 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py @@ -21,11 +21,15 @@ from grpc import ChannelCredentials, Compression +from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( + encode_spans, +) from opentelemetry.exporter.otlp.proto.grpc.exporter import ( OTLPExporterMixin, _get_credentials, - _translate_key_values, environ_to_compression, +) +from opentelemetry.exporter.otlp.proto.grpc.exporter import ( # noqa: F401 get_resource_data, ) from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( @@ -34,13 +38,15 @@ from opentelemetry.proto.collector.trace.v1.trace_service_pb2_grpc import ( TraceServiceStub, ) -from opentelemetry.proto.common.v1.common_pb2 import InstrumentationScope -from opentelemetry.proto.trace.v1.trace_pb2 import ( +from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401 + InstrumentationScope, +) +from opentelemetry.proto.trace.v1.trace_pb2 import ( # noqa: F401 ScopeSpans, ResourceSpans, + Span as CollectorSpan, ) -from opentelemetry.proto.trace.v1.trace_pb2 import Span as CollectorSpan -from opentelemetry.proto.trace.v1.trace_pb2 import Status +from opentelemetry.proto.trace.v1.trace_pb2 import Status # noqa: F401 from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE, OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, @@ -126,170 +132,10 @@ def __init__( } ) - def _translate_name(self, sdk_span: ReadableSpan) -> None: - self._collector_kwargs["name"] = sdk_span.name - - def _translate_start_time(self, sdk_span: ReadableSpan) -> None: - self._collector_kwargs["start_time_unix_nano"] = sdk_span.start_time - - def _translate_end_time(self, sdk_span: ReadableSpan) -> None: - self._collector_kwargs["end_time_unix_nano"] = sdk_span.end_time - - def _translate_span_id(self, sdk_span: ReadableSpan) -> None: - self._collector_kwargs["span_id"] = sdk_span.context.span_id.to_bytes( - 8, "big" - ) - - def _translate_trace_id(self, sdk_span: ReadableSpan) -> None: - self._collector_kwargs[ - "trace_id" - ] = sdk_span.context.trace_id.to_bytes(16, "big") - - def _translate_parent(self, sdk_span: ReadableSpan) -> None: - if sdk_span.parent is not None: - self._collector_kwargs[ - "parent_span_id" - ] = sdk_span.parent.span_id.to_bytes(8, "big") - - def _translate_context_trace_state(self, sdk_span: ReadableSpan) -> None: - if sdk_span.context.trace_state is not None: - self._collector_kwargs["trace_state"] = ",".join( - [ - f"{key}={value}" - for key, value in (sdk_span.context.trace_state.items()) - ] - ) - - def _translate_events(self, sdk_span: ReadableSpan) -> None: - if sdk_span.events: - self._collector_kwargs["events"] = [] - - for sdk_span_event in sdk_span.events: - - collector_span_event = CollectorSpan.Event( - name=sdk_span_event.name, - time_unix_nano=sdk_span_event.timestamp, - dropped_attributes_count=sdk_span_event.attributes.dropped, - ) - - for key, value in sdk_span_event.attributes.items(): - try: - collector_span_event.attributes.append( - _translate_key_values(key, value) - ) - # pylint: disable=broad-except - except Exception as error: - logger.exception(error) - - self._collector_kwargs["events"].append(collector_span_event) - - def _translate_links(self, sdk_span: ReadableSpan) -> None: - if sdk_span.links: - self._collector_kwargs["links"] = [] - - for sdk_span_link in sdk_span.links: - - collector_span_link = CollectorSpan.Link( - trace_id=( - sdk_span_link.context.trace_id.to_bytes(16, "big") - ), - span_id=(sdk_span_link.context.span_id.to_bytes(8, "big")), - dropped_attributes_count=sdk_span_link.attributes.dropped, - ) - - for key, value in sdk_span_link.attributes.items(): - try: - collector_span_link.attributes.append( - _translate_key_values(key, value) - ) - # pylint: disable=broad-except - except Exception as error: - logger.exception(error) - - self._collector_kwargs["links"].append(collector_span_link) - - def _translate_status(self, sdk_span: ReadableSpan) -> None: - # pylint: disable=no-member - if sdk_span.status is not None: - self._collector_kwargs["status"] = Status( - code=sdk_span.status.status_code.value, - message=sdk_span.status.description, - ) - def _translate_data( self, data: Sequence[ReadableSpan] ) -> ExportTraceServiceRequest: - # pylint: disable=attribute-defined-outside-init - - sdk_resource_scope_spans = {} - - for sdk_span in data: - scope_spans_map = sdk_resource_scope_spans.get( - sdk_span.resource, {} - ) - # If we haven't seen the Resource yet, add it to the map - if not scope_spans_map: - sdk_resource_scope_spans[sdk_span.resource] = scope_spans_map - scope_spans = scope_spans_map.get(sdk_span.instrumentation_scope) - # If we haven't seen the InstrumentationScope for this Resource yet, add it to the map - if not scope_spans: - if sdk_span.instrumentation_scope is not None: - scope_spans_map[ - sdk_span.instrumentation_scope - ] = ScopeSpans( - scope=InstrumentationScope( - name=sdk_span.instrumentation_scope.name, - version=sdk_span.instrumentation_scope.version, - ) - ) - else: - # If no InstrumentationScope, store in None key - scope_spans_map[ - sdk_span.instrumentation_scope - ] = ScopeSpans() - scope_spans = scope_spans_map.get(sdk_span.instrumentation_scope) - self._collector_kwargs = {} - - self._translate_name(sdk_span) - self._translate_start_time(sdk_span) - self._translate_end_time(sdk_span) - self._translate_span_id(sdk_span) - self._translate_trace_id(sdk_span) - self._translate_parent(sdk_span) - self._translate_context_trace_state(sdk_span) - self._collector_kwargs["attributes"] = self._translate_attributes( - sdk_span.attributes - ) - self._translate_events(sdk_span) - self._translate_links(sdk_span) - self._translate_status(sdk_span) - if sdk_span.dropped_attributes: - self._collector_kwargs[ - "dropped_attributes_count" - ] = sdk_span.dropped_attributes - if sdk_span.dropped_events: - self._collector_kwargs[ - "dropped_events_count" - ] = sdk_span.dropped_events - if sdk_span.dropped_links: - self._collector_kwargs[ - "dropped_links_count" - ] = sdk_span.dropped_links - - self._collector_kwargs["kind"] = getattr( - CollectorSpan.SpanKind, - f"SPAN_KIND_{sdk_span.kind.name}", - ) - - scope_spans.spans.append(CollectorSpan(**self._collector_kwargs)) - - return ExportTraceServiceRequest( - resource_spans=get_resource_data( - sdk_resource_scope_spans, - ResourceSpans, - "spans", - ) - ) + return encode_spans(data) def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: return self._export(spans) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index 623d7dbf0ae..b8e21f56af9 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -23,6 +23,9 @@ import backoff import requests +from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( + encode_spans, +) from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE, OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, @@ -40,9 +43,6 @@ _OTLP_HTTP_HEADERS, Compression, ) -from opentelemetry.exporter.otlp.proto.http.trace_exporter.encoder import ( - _ProtobufEncoder, -) from opentelemetry.util.re import parse_env_headers @@ -143,7 +143,7 @@ def export(self, spans) -> SpanExportResult: _logger.warning("Exporter already shutdown, ignoring batch") return SpanExportResult.FAILURE - serialized_data = _ProtobufEncoder.serialize(spans) + serialized_data = encode_spans(spans).SerializeToString() for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT): diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/encoder/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/encoder/__init__.py index c1c9fe88643..a0036ecd24a 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/encoder/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/encoder/__init__.py @@ -12,277 +12,51 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -from collections import abc -from typing import Any, List, Optional, Sequence +import logging # noqa: F401 +from collections import abc # noqa: F401 +from typing import Any, List, Optional, Sequence # noqa: F401 -from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( +from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( # noqa: F401 ExportTraceServiceRequest as PB2ExportTraceServiceRequest, ) -from opentelemetry.proto.common.v1.common_pb2 import AnyValue as PB2AnyValue -from opentelemetry.proto.common.v1.common_pb2 import ( +from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401 + AnyValue as PB2AnyValue, +) +from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401 ArrayValue as PB2ArrayValue, ) -from opentelemetry.proto.common.v1.common_pb2 import ( +from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401 InstrumentationScope as PB2InstrumentationScope, ) -from opentelemetry.proto.common.v1.common_pb2 import KeyValue as PB2KeyValue -from opentelemetry.proto.resource.v1.resource_pb2 import ( +from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401 + KeyValue as PB2KeyValue, +) +from opentelemetry.proto.resource.v1.resource_pb2 import ( # noqa: F401 Resource as PB2Resource, ) -from opentelemetry.proto.trace.v1.trace_pb2 import ( +from opentelemetry.proto.trace.v1.trace_pb2 import ( # noqa: F401 ScopeSpans as PB2ScopeSpans, ) -from opentelemetry.proto.trace.v1.trace_pb2 import ( +from opentelemetry.proto.trace.v1.trace_pb2 import ( # noqa: F401 ResourceSpans as PB2ResourceSpans, ) -from opentelemetry.proto.trace.v1.trace_pb2 import Span as PB2SPan -from opentelemetry.proto.trace.v1.trace_pb2 import Status as PB2Status -from opentelemetry.sdk.trace import Event -from opentelemetry.sdk.util.instrumentation import InstrumentationScope -from opentelemetry.sdk.trace import Resource -from opentelemetry.sdk.trace import Span as SDKSpan -from opentelemetry.trace import Link -from opentelemetry.trace import SpanKind -from opentelemetry.trace.span import SpanContext, TraceState, Status -from opentelemetry.util.types import Attributes - -# pylint: disable=E1101 -_SPAN_KIND_MAP = { - SpanKind.INTERNAL: PB2SPan.SpanKind.SPAN_KIND_INTERNAL, - SpanKind.SERVER: PB2SPan.SpanKind.SPAN_KIND_SERVER, - SpanKind.CLIENT: PB2SPan.SpanKind.SPAN_KIND_CLIENT, - SpanKind.PRODUCER: PB2SPan.SpanKind.SPAN_KIND_PRODUCER, - SpanKind.CONSUMER: PB2SPan.SpanKind.SPAN_KIND_CONSUMER, -} - -_logger = logging.getLogger(__name__) - - -class _ProtobufEncoder: - @classmethod - def serialize(cls, sdk_spans: Sequence[SDKSpan]) -> str: - return cls.encode(sdk_spans).SerializeToString() - - @staticmethod - def encode(sdk_spans: Sequence[SDKSpan]) -> PB2ExportTraceServiceRequest: - return PB2ExportTraceServiceRequest( - resource_spans=_encode_resource_spans(sdk_spans) - ) - - -def _encode_resource_spans( - sdk_spans: Sequence[SDKSpan], -) -> List[PB2ResourceSpans]: - # We need to inspect the spans and group + structure them as: - # - # Resource - # Instrumentation Library - # Spans - # - # First loop organizes the SDK spans in this structure. Protobuf messages - # are not hashable so we stick with SDK data in this phase. - # - # Second loop encodes the data into Protobuf format. - # - sdk_resource_spans = {} - - for sdk_span in sdk_spans: - sdk_resource = sdk_span.resource - sdk_instrumentation = sdk_span.instrumentation_scope or None - pb2_span = _encode_span(sdk_span) - - if sdk_resource not in sdk_resource_spans.keys(): - sdk_resource_spans[sdk_resource] = { - sdk_instrumentation: [pb2_span] - } - elif ( - sdk_instrumentation not in sdk_resource_spans[sdk_resource].keys() - ): - sdk_resource_spans[sdk_resource][sdk_instrumentation] = [pb2_span] - else: - sdk_resource_spans[sdk_resource][sdk_instrumentation].append( - pb2_span - ) - - pb2_resource_spans = [] - - for sdk_resource, sdk_instrumentations in sdk_resource_spans.items(): - scope_spans = [] - for sdk_instrumentation, pb2_spans in sdk_instrumentations.items(): - scope_spans.append( - PB2ScopeSpans( - scope=(_encode_instrumentation_scope(sdk_instrumentation)), - spans=pb2_spans, - ) - ) - pb2_resource_spans.append( - PB2ResourceSpans( - resource=_encode_resource(sdk_resource), - scope_spans=scope_spans, - ) - ) - - return pb2_resource_spans - - -def _encode_span(sdk_span: SDKSpan) -> PB2SPan: - span_context = sdk_span.get_span_context() - return PB2SPan( - trace_id=_encode_trace_id(span_context.trace_id), - span_id=_encode_span_id(span_context.span_id), - trace_state=_encode_trace_state(span_context.trace_state), - parent_span_id=_encode_parent_id(sdk_span.parent), - name=sdk_span.name, - kind=_SPAN_KIND_MAP[sdk_span.kind], - start_time_unix_nano=sdk_span.start_time, - end_time_unix_nano=sdk_span.end_time, - attributes=_encode_attributes(sdk_span.attributes), - events=_encode_events(sdk_span.events), - links=_encode_links(sdk_span.links), - status=_encode_status(sdk_span.status), - ) - - -def _encode_events( - events: Sequence[Event], -) -> Optional[List[PB2SPan.Event]]: - pb2_events = None - if events: - pb2_events = [] - for event in events: - encoded_event = PB2SPan.Event( - name=event.name, - time_unix_nano=event.timestamp, - ) - for key, value in event.attributes.items(): - try: - encoded_event.attributes.append( - _encode_key_value(key, value) - ) - # pylint: disable=broad-except - except Exception as error: - _logger.exception(error) - pb2_events.append(encoded_event) - return pb2_events - - -def _encode_links(links: List[Link]) -> List[PB2SPan.Link]: - pb2_links = None - if links: - pb2_links = [] - for link in links: - encoded_link = PB2SPan.Link( - trace_id=_encode_trace_id(link.context.trace_id), - span_id=_encode_span_id(link.context.span_id), - ) - for key, value in link.attributes.items(): - try: - encoded_link.attributes.append( - _encode_key_value(key, value) - ) - # pylint: disable=broad-except - except Exception as error: - _logger.exception(error) - pb2_links.append(encoded_link) - return pb2_links - - -def _encode_status(status: Status) -> Optional[PB2Status]: - pb2_status = None - if status is not None: - pb2_status = PB2Status( - code=status.status_code.value, - message=status.description, - ) - return pb2_status - - -def _encode_trace_state(trace_state: TraceState) -> Optional[str]: - pb2_trace_state = None - if trace_state is not None: - pb2_trace_state = ",".join( - [f"{key}={value}" for key, value in (trace_state.items())] - ) - return pb2_trace_state - - -def _encode_parent_id(context: Optional[SpanContext]) -> Optional[bytes]: - if isinstance(context, SpanContext): - encoded_parent_id = _encode_span_id(context.span_id) - else: - encoded_parent_id = None - return encoded_parent_id - - -def _encode_attributes( - attributes: Attributes, -) -> Optional[List[PB2KeyValue]]: - if attributes: - pb2_attributes = [] - for key, value in attributes.items(): - try: - pb2_attributes.append(_encode_key_value(key, value)) - except Exception as error: # pylint: disable=broad-except - _logger.exception(error) - else: - pb2_attributes = None - return pb2_attributes - - -def _encode_resource(resource: Resource) -> PB2Resource: - pb2_resource = PB2Resource() - for key, value in resource.attributes.items(): - try: - # pylint: disable=no-member - pb2_resource.attributes.append(_encode_key_value(key, value)) - except Exception as error: # pylint: disable=broad-except - _logger.exception(error) - return pb2_resource - - -def _encode_instrumentation_scope( - instrumentation_scope: InstrumentationScope, -) -> PB2InstrumentationScope: - if instrumentation_scope is None: - pb2_instrumentation_scope = PB2InstrumentationScope() - else: - pb2_instrumentation_scope = PB2InstrumentationScope( - name=instrumentation_scope.name, - version=instrumentation_scope.version, - ) - return pb2_instrumentation_scope - - -def _encode_value(value: Any) -> PB2AnyValue: - if isinstance(value, bool): - any_value = PB2AnyValue(bool_value=value) - elif isinstance(value, str): - any_value = PB2AnyValue(string_value=value) - elif isinstance(value, int): - any_value = PB2AnyValue(int_value=value) - elif isinstance(value, float): - any_value = PB2AnyValue(double_value=value) - elif isinstance(value, abc.Sequence): - any_value = PB2AnyValue( - array_value=PB2ArrayValue(values=[_encode_value(v) for v in value]) - ) - # tracing specs currently does not support Mapping type attributes. - # elif isinstance(value, abc.Mapping): - # pass - else: - raise Exception(f"Invalid type {type(value)} of value {value}") - return any_value - - -def _encode_key_value(key: str, value: Any) -> PB2KeyValue: - any_value = _encode_value(value) - return PB2KeyValue(key=key, value=any_value) - - -def _encode_span_id(span_id: int) -> bytes: - return span_id.to_bytes(length=8, byteorder="big", signed=False) - - -def _encode_trace_id(trace_id: int) -> bytes: - return trace_id.to_bytes(length=16, byteorder="big", signed=False) +from opentelemetry.proto.trace.v1.trace_pb2 import ( # noqa: F401 + Span as PB2SPan, +) +from opentelemetry.proto.trace.v1.trace_pb2 import ( # noqa: F401 + Status as PB2Status, +) +from opentelemetry.sdk.trace import Event # noqa: F401 +from opentelemetry.sdk.util.instrumentation import ( # noqa: F401 + InstrumentationScope, +) +from opentelemetry.sdk.trace import Resource # noqa: F401 +from opentelemetry.sdk.trace import Span as SDKSpan # noqa: F401 +from opentelemetry.trace import Link # noqa: F401 +from opentelemetry.trace import SpanKind # noqa: F401 +from opentelemetry.trace.span import ( # noqa: F401 + SpanContext, + TraceState, + Status, +) +from opentelemetry.util.types import Attributes # noqa: F401 From 68002cfd195d148094203570cc0290cb0dd043eb Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Mon, 13 Mar 2023 23:31:29 +0000 Subject: [PATCH 3/5] Move encoding of metrics to opentelemetry-exporter-otlp-proto-common --- .../otlp/proto/common/_internal/__init__.py | 43 +- .../_internal/metrics_encoder/__init__.py | 199 +++++ .../otlp/proto/common/metrics_encoder.py | 20 + .../tests/test_metrics_encoder.py | 813 ++++++++++++++++++ .../pyproject.toml | 1 + .../exporter/otlp/proto/grpc/exporter.py | 41 +- .../proto/grpc/metric_exporter/__init__.py | 179 +--- .../tests/test_otlp_metrics_exporter.py | 775 +---------------- .../pyproject.toml | 1 + .../proto/http/metric_exporter/__init__.py | 238 +---- .../metrics/test_otlp_metrics_exporter.py | 5 +- 11 files changed, 1140 insertions(+), 1175 deletions(-) create mode 100644 exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py create mode 100644 exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/metrics_encoder.py create mode 100644 exporter/opentelemetry-exporter-otlp-proto-common/tests/test_metrics_encoder.py diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py index e0de8d3d264..2f5d7413245 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py @@ -15,7 +15,7 @@ import logging from collections.abc import Sequence -from typing import Any, Optional, List +from typing import Any, Mapping, Optional, List, Callable, TypeVar, Dict from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.proto.common.v1.common_pb2 import ( @@ -26,6 +26,9 @@ ) from opentelemetry.proto.common.v1.common_pb2 import AnyValue as PB2AnyValue from opentelemetry.proto.common.v1.common_pb2 import KeyValue as PB2KeyValue +from opentelemetry.proto.common.v1.common_pb2 import ( + KeyValueList as PB2KeyValueList, +) from opentelemetry.proto.common.v1.common_pb2 import ( ArrayValue as PB2ArrayValue, ) @@ -35,6 +38,9 @@ _logger = logging.getLogger(__name__) +_TypingResourceT = TypeVar("_TypingResourceT") +_ResourceDataT = TypeVar("_ResourceDataT") + def _encode_instrumentation_scope( instrumentation_scope: InstrumentationScope, @@ -64,9 +70,12 @@ def _encode_value(value: Any) -> PB2AnyValue: return PB2AnyValue( array_value=PB2ArrayValue(values=[_encode_value(v) for v in value]) ) - # tracing specs currently does not support Mapping type attributes. - # elif isinstance(value, abc.Mapping): - # pass + elif isinstance(value, Mapping): + return PB2AnyValue( + kvlist_value=PB2KeyValueList( + values=[_encode_key_value(str(k), v) for k, v in value.items()] + ) + ) raise Exception(f"Invalid type {type(value)} of value {value}") @@ -95,3 +104,29 @@ def _encode_attributes( else: pb2_attributes = None return pb2_attributes + + +def _get_resource_data( + sdk_resource_scope_data: Dict[Resource, _ResourceDataT], + resource_class: Callable[..., _TypingResourceT], + name: str, +) -> List[_TypingResourceT]: + + resource_data = [] + + for ( + sdk_resource, + scope_data, + ) in sdk_resource_scope_data.items(): + collector_resource = PB2Resource( + attributes=_encode_attributes(sdk_resource.attributes) + ) + resource_data.append( + resource_class( + **{ + "resource": collector_resource, + "scope_{}".format(name): scope_data.values(), + } + ) + ) + return resource_data diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py new file mode 100644 index 00000000000..d2693759309 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py @@ -0,0 +1,199 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from opentelemetry.exporter.otlp.proto.common._internal import ( + _encode_attributes, +) +from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( + ExportMetricsServiceRequest, +) +from opentelemetry.proto.common.v1.common_pb2 import InstrumentationScope +from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 +from opentelemetry.sdk.metrics.export import ( + MetricsData, + Gauge, + Histogram, + Sum, + ExponentialHistogram as ExponentialHistogramType, +) +from opentelemetry.proto.resource.v1.resource_pb2 import ( + Resource as PB2Resource, +) + +_logger = logging.getLogger(__name__) + + +def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest: + resource_metrics_dict = {} + + for resource_metrics in data.resource_metrics: + + resource = resource_metrics.resource + + # It is safe to assume that each entry in data.resource_metrics is + # associated with an unique resource. + scope_metrics_dict = {} + + resource_metrics_dict[resource] = scope_metrics_dict + + for scope_metrics in resource_metrics.scope_metrics: + + instrumentation_scope = scope_metrics.scope + + # The SDK groups metrics in instrumentation scopes already so + # there is no need to check for existing instrumentation scopes + # here. + pb2_scope_metrics = pb2.ScopeMetrics( + scope=InstrumentationScope( + name=instrumentation_scope.name, + version=instrumentation_scope.version, + ) + ) + + scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics + + for metric in scope_metrics.metrics: + pb2_metric = pb2.Metric( + name=metric.name, + description=metric.description, + unit=metric.unit, + ) + + if isinstance(metric.data, Gauge): + for data_point in metric.data.data_points: + pt = pb2.NumberDataPoint( + attributes=_encode_attributes( + data_point.attributes + ), + time_unix_nano=data_point.time_unix_nano, + ) + if isinstance(data_point.value, int): + pt.as_int = data_point.value + else: + pt.as_double = data_point.value + pb2_metric.gauge.data_points.append(pt) + + elif isinstance(metric.data, Histogram): + for data_point in metric.data.data_points: + pt = pb2.HistogramDataPoint( + attributes=_encode_attributes( + data_point.attributes + ), + time_unix_nano=data_point.time_unix_nano, + start_time_unix_nano=( + data_point.start_time_unix_nano + ), + count=data_point.count, + sum=data_point.sum, + bucket_counts=data_point.bucket_counts, + explicit_bounds=data_point.explicit_bounds, + max=data_point.max, + min=data_point.min, + ) + pb2_metric.histogram.aggregation_temporality = ( + metric.data.aggregation_temporality + ) + pb2_metric.histogram.data_points.append(pt) + + elif isinstance(metric.data, Sum): + for data_point in metric.data.data_points: + pt = pb2.NumberDataPoint( + attributes=_encode_attributes( + data_point.attributes + ), + start_time_unix_nano=( + data_point.start_time_unix_nano + ), + time_unix_nano=data_point.time_unix_nano, + ) + if isinstance(data_point.value, int): + pt.as_int = data_point.value + else: + pt.as_double = data_point.value + # note that because sum is a message type, the + # fields must be set individually rather than + # instantiating a pb2.Sum and setting it once + pb2_metric.sum.aggregation_temporality = ( + metric.data.aggregation_temporality + ) + pb2_metric.sum.is_monotonic = metric.data.is_monotonic + pb2_metric.sum.data_points.append(pt) + + elif isinstance(metric.data, ExponentialHistogramType): + for data_point in metric.data.data_points: + + if data_point.positive.bucket_counts: + positive = pb2.ExponentialHistogramDataPoint.Buckets( + offset=data_point.positive.offset, + bucket_counts=data_point.positive.bucket_counts, + ) + else: + positive = None + + if data_point.negative.bucket_counts: + negative = pb2.ExponentialHistogramDataPoint.Buckets( + offset=data_point.negative.offset, + bucket_counts=data_point.negative.bucket_counts, + ) + else: + negative = None + + pt = pb2.ExponentialHistogramDataPoint( + attributes=_encode_attributes( + data_point.attributes + ), + time_unix_nano=data_point.time_unix_nano, + start_time_unix_nano=( + data_point.start_time_unix_nano + ), + count=data_point.count, + sum=data_point.sum, + scale=data_point.scale, + zero_count=data_point.zero_count, + positive=positive, + negative=negative, + flags=data_point.flags, + max=data_point.max, + min=data_point.min, + ) + pb2_metric.exponential_histogram.aggregation_temporality = ( + metric.data.aggregation_temporality + ) + pb2_metric.exponential_histogram.data_points.append(pt) + + else: + _logger.warning( + "unsupported data type %s", + metric.data.__class__.__name__, + ) + continue + + pb2_scope_metrics.metrics.append(pb2_metric) + + resource_data = [] + for ( + sdk_resource, + scope_data, + ) in resource_metrics_dict.items(): + resource_data.append( + pb2.ResourceMetrics( + resource=PB2Resource( + attributes=_encode_attributes(sdk_resource.attributes) + ), + scope_metrics=scope_data.values(), + ) + ) + resource_metrics = resource_data + return ExportMetricsServiceRequest(resource_metrics=resource_metrics) diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/metrics_encoder.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/metrics_encoder.py new file mode 100644 index 00000000000..14f8fc3f0d1 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/metrics_encoder.py @@ -0,0 +1,20 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import ( + encode_metrics, +) + +__all__ = ["encode_metrics"] diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_metrics_encoder.py b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_metrics_encoder.py new file mode 100644 index 00000000000..f7c8ceb820c --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_metrics_encoder.py @@ -0,0 +1,813 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=protected-access +import unittest + +from opentelemetry.exporter.otlp.proto.common.metrics_encoder import ( + encode_metrics, +) +from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( + ExportMetricsServiceRequest, +) +from opentelemetry.proto.common.v1.common_pb2 import ( + AnyValue, + InstrumentationScope, + KeyValue, +) +from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 +from opentelemetry.proto.resource.v1.resource_pb2 import ( + Resource as OTLPResource, +) +from opentelemetry.sdk.metrics.export import AggregationTemporality, Buckets +from opentelemetry.sdk.metrics.export import ( + ExponentialHistogram as ExponentialHistogramType, +) +from opentelemetry.sdk.metrics.export import ExponentialHistogramDataPoint +from opentelemetry.sdk.metrics.export import Histogram as HistogramType +from opentelemetry.sdk.metrics.export import ( + HistogramDataPoint, + Metric, + MetricsData, + ResourceMetrics, + ScopeMetrics, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import ( + InstrumentationScope as SDKInstrumentationScope, +) +from opentelemetry.test.metrictestutil import _generate_gauge, _generate_sum + + +class TestOTLPMetricsEncoder(unittest.TestCase): + histogram = Metric( + name="histogram", + description="foo", + unit="s", + data=HistogramType( + data_points=[ + HistogramDataPoint( + attributes={"a": 1, "b": True}, + start_time_unix_nano=1641946016139533244, + time_unix_nano=1641946016139533244, + count=5, + sum=67, + bucket_counts=[1, 4], + explicit_bounds=[10.0, 20.0], + min=8, + max=18, + ) + ], + aggregation_temporality=AggregationTemporality.DELTA, + ), + ) + + def test_encode_sum_int(self): + metrics_data = MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource( + attributes={"a": 1, "b": False}, + schema_url="resource_schema_url", + ), + scope_metrics=[ + ScopeMetrics( + scope=SDKInstrumentationScope( + name="first_name", + version="first_version", + schema_url="insrumentation_scope_schema_url", + ), + metrics=[_generate_sum("sum_int", 33)], + schema_url="instrumentation_scope_schema_url", + ) + ], + schema_url="resource_schema_url", + ) + ] + ) + expected = ExportMetricsServiceRequest( + resource_metrics=[ + pb2.ResourceMetrics( + resource=OTLPResource( + attributes=[ + KeyValue(key="a", value=AnyValue(int_value=1)), + KeyValue( + key="b", value=AnyValue(bool_value=False) + ), + ] + ), + scope_metrics=[ + pb2.ScopeMetrics( + scope=InstrumentationScope( + name="first_name", version="first_version" + ), + metrics=[ + pb2.Metric( + name="sum_int", + unit="s", + description="foo", + sum=pb2.Sum( + data_points=[ + pb2.NumberDataPoint( + attributes=[ + KeyValue( + key="a", + value=AnyValue( + int_value=1 + ), + ), + KeyValue( + key="b", + value=AnyValue( + bool_value=True + ), + ), + ], + start_time_unix_nano=1641946015139533244, + time_unix_nano=1641946016139533244, + as_int=33, + ) + ], + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + ) + ], + ) + ], + ) + ] + ) + actual = encode_metrics(metrics_data) + self.assertEqual(expected, actual) + + def test_encode_sum_double(self): + metrics_data = MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource( + attributes={"a": 1, "b": False}, + schema_url="resource_schema_url", + ), + scope_metrics=[ + ScopeMetrics( + scope=SDKInstrumentationScope( + name="first_name", + version="first_version", + schema_url="insrumentation_scope_schema_url", + ), + metrics=[_generate_sum("sum_double", 2.98)], + schema_url="instrumentation_scope_schema_url", + ) + ], + schema_url="resource_schema_url", + ) + ] + ) + expected = ExportMetricsServiceRequest( + resource_metrics=[ + pb2.ResourceMetrics( + resource=OTLPResource( + attributes=[ + KeyValue(key="a", value=AnyValue(int_value=1)), + KeyValue( + key="b", value=AnyValue(bool_value=False) + ), + ] + ), + scope_metrics=[ + pb2.ScopeMetrics( + scope=InstrumentationScope( + name="first_name", version="first_version" + ), + metrics=[ + pb2.Metric( + name="sum_double", + unit="s", + description="foo", + sum=pb2.Sum( + data_points=[ + pb2.NumberDataPoint( + attributes=[ + KeyValue( + key="a", + value=AnyValue( + int_value=1 + ), + ), + KeyValue( + key="b", + value=AnyValue( + bool_value=True + ), + ), + ], + start_time_unix_nano=1641946015139533244, + time_unix_nano=1641946016139533244, + as_double=2.98, + ) + ], + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + ) + ], + ) + ], + ) + ] + ) + actual = encode_metrics(metrics_data) + self.assertEqual(expected, actual) + + def test_encode_gauge_int(self): + metrics_data = MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource( + attributes={"a": 1, "b": False}, + schema_url="resource_schema_url", + ), + scope_metrics=[ + ScopeMetrics( + scope=SDKInstrumentationScope( + name="first_name", + version="first_version", + schema_url="insrumentation_scope_schema_url", + ), + metrics=[_generate_gauge("gauge_int", 9000)], + schema_url="instrumentation_scope_schema_url", + ) + ], + schema_url="resource_schema_url", + ) + ] + ) + expected = ExportMetricsServiceRequest( + resource_metrics=[ + pb2.ResourceMetrics( + resource=OTLPResource( + attributes=[ + KeyValue(key="a", value=AnyValue(int_value=1)), + KeyValue( + key="b", value=AnyValue(bool_value=False) + ), + ] + ), + scope_metrics=[ + pb2.ScopeMetrics( + scope=InstrumentationScope( + name="first_name", version="first_version" + ), + metrics=[ + pb2.Metric( + name="gauge_int", + unit="s", + description="foo", + gauge=pb2.Gauge( + data_points=[ + pb2.NumberDataPoint( + attributes=[ + KeyValue( + key="a", + value=AnyValue( + int_value=1 + ), + ), + KeyValue( + key="b", + value=AnyValue( + bool_value=True + ), + ), + ], + time_unix_nano=1641946016139533244, + as_int=9000, + ) + ], + ), + ) + ], + ) + ], + ) + ] + ) + actual = encode_metrics(metrics_data) + self.assertEqual(expected, actual) + + def test_encode_gauge_double(self): + metrics_data = MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource( + attributes={"a": 1, "b": False}, + schema_url="resource_schema_url", + ), + scope_metrics=[ + ScopeMetrics( + scope=SDKInstrumentationScope( + name="first_name", + version="first_version", + schema_url="insrumentation_scope_schema_url", + ), + metrics=[_generate_gauge("gauge_double", 52.028)], + schema_url="instrumentation_scope_schema_url", + ) + ], + schema_url="resource_schema_url", + ) + ] + ) + expected = ExportMetricsServiceRequest( + resource_metrics=[ + pb2.ResourceMetrics( + resource=OTLPResource( + attributes=[ + KeyValue(key="a", value=AnyValue(int_value=1)), + KeyValue( + key="b", value=AnyValue(bool_value=False) + ), + ] + ), + scope_metrics=[ + pb2.ScopeMetrics( + scope=InstrumentationScope( + name="first_name", version="first_version" + ), + metrics=[ + pb2.Metric( + name="gauge_double", + unit="s", + description="foo", + gauge=pb2.Gauge( + data_points=[ + pb2.NumberDataPoint( + attributes=[ + KeyValue( + key="a", + value=AnyValue( + int_value=1 + ), + ), + KeyValue( + key="b", + value=AnyValue( + bool_value=True + ), + ), + ], + time_unix_nano=1641946016139533244, + as_double=52.028, + ) + ], + ), + ) + ], + ) + ], + ) + ] + ) + actual = encode_metrics(metrics_data) + self.assertEqual(expected, actual) + + def test_encode_histogram(self): + metrics_data = MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource( + attributes={"a": 1, "b": False}, + schema_url="resource_schema_url", + ), + scope_metrics=[ + ScopeMetrics( + scope=SDKInstrumentationScope( + name="first_name", + version="first_version", + schema_url="insrumentation_scope_schema_url", + ), + metrics=[self.histogram], + schema_url="instrumentation_scope_schema_url", + ) + ], + schema_url="resource_schema_url", + ) + ] + ) + expected = ExportMetricsServiceRequest( + resource_metrics=[ + pb2.ResourceMetrics( + resource=OTLPResource( + attributes=[ + KeyValue(key="a", value=AnyValue(int_value=1)), + KeyValue( + key="b", value=AnyValue(bool_value=False) + ), + ] + ), + scope_metrics=[ + pb2.ScopeMetrics( + scope=InstrumentationScope( + name="first_name", version="first_version" + ), + metrics=[ + pb2.Metric( + name="histogram", + unit="s", + description="foo", + histogram=pb2.Histogram( + data_points=[ + pb2.HistogramDataPoint( + attributes=[ + KeyValue( + key="a", + value=AnyValue( + int_value=1 + ), + ), + KeyValue( + key="b", + value=AnyValue( + bool_value=True + ), + ), + ], + start_time_unix_nano=1641946016139533244, + time_unix_nano=1641946016139533244, + count=5, + sum=67, + bucket_counts=[1, 4], + explicit_bounds=[10.0, 20.0], + exemplars=[], + flags=pb2.DataPointFlags.FLAG_NONE, + max=18.0, + min=8.0, + ) + ], + aggregation_temporality=AggregationTemporality.DELTA, + ), + ) + ], + ) + ], + ) + ] + ) + actual = encode_metrics(metrics_data) + self.assertEqual(expected, actual) + + def test_encode_multiple_scope_histogram(self): + metrics_data = MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource( + attributes={"a": 1, "b": False}, + schema_url="resource_schema_url", + ), + scope_metrics=[ + ScopeMetrics( + scope=SDKInstrumentationScope( + name="first_name", + version="first_version", + schema_url="insrumentation_scope_schema_url", + ), + metrics=[self.histogram, self.histogram], + schema_url="instrumentation_scope_schema_url", + ), + ScopeMetrics( + scope=SDKInstrumentationScope( + name="second_name", + version="second_version", + schema_url="insrumentation_scope_schema_url", + ), + metrics=[self.histogram], + schema_url="instrumentation_scope_schema_url", + ), + ScopeMetrics( + scope=SDKInstrumentationScope( + name="third_name", + version="third_version", + schema_url="insrumentation_scope_schema_url", + ), + metrics=[self.histogram], + schema_url="instrumentation_scope_schema_url", + ), + ], + schema_url="resource_schema_url", + ) + ] + ) + expected = ExportMetricsServiceRequest( + resource_metrics=[ + pb2.ResourceMetrics( + resource=OTLPResource( + attributes=[ + KeyValue(key="a", value=AnyValue(int_value=1)), + KeyValue( + key="b", value=AnyValue(bool_value=False) + ), + ] + ), + scope_metrics=[ + pb2.ScopeMetrics( + scope=InstrumentationScope( + name="first_name", version="first_version" + ), + metrics=[ + pb2.Metric( + name="histogram", + unit="s", + description="foo", + histogram=pb2.Histogram( + data_points=[ + pb2.HistogramDataPoint( + attributes=[ + KeyValue( + key="a", + value=AnyValue( + int_value=1 + ), + ), + KeyValue( + key="b", + value=AnyValue( + bool_value=True + ), + ), + ], + start_time_unix_nano=1641946016139533244, + time_unix_nano=1641946016139533244, + count=5, + sum=67, + bucket_counts=[1, 4], + explicit_bounds=[10.0, 20.0], + exemplars=[], + flags=pb2.DataPointFlags.FLAG_NONE, + max=18.0, + min=8.0, + ) + ], + aggregation_temporality=AggregationTemporality.DELTA, + ), + ), + pb2.Metric( + name="histogram", + unit="s", + description="foo", + histogram=pb2.Histogram( + data_points=[ + pb2.HistogramDataPoint( + attributes=[ + KeyValue( + key="a", + value=AnyValue( + int_value=1 + ), + ), + KeyValue( + key="b", + value=AnyValue( + bool_value=True + ), + ), + ], + start_time_unix_nano=1641946016139533244, + time_unix_nano=1641946016139533244, + count=5, + sum=67, + bucket_counts=[1, 4], + explicit_bounds=[10.0, 20.0], + exemplars=[], + flags=pb2.DataPointFlags.FLAG_NONE, + max=18.0, + min=8.0, + ) + ], + aggregation_temporality=AggregationTemporality.DELTA, + ), + ), + ], + ), + pb2.ScopeMetrics( + scope=InstrumentationScope( + name="second_name", version="second_version" + ), + metrics=[ + pb2.Metric( + name="histogram", + unit="s", + description="foo", + histogram=pb2.Histogram( + data_points=[ + pb2.HistogramDataPoint( + attributes=[ + KeyValue( + key="a", + value=AnyValue( + int_value=1 + ), + ), + KeyValue( + key="b", + value=AnyValue( + bool_value=True + ), + ), + ], + start_time_unix_nano=1641946016139533244, + time_unix_nano=1641946016139533244, + count=5, + sum=67, + bucket_counts=[1, 4], + explicit_bounds=[10.0, 20.0], + exemplars=[], + flags=pb2.DataPointFlags.FLAG_NONE, + max=18.0, + min=8.0, + ) + ], + aggregation_temporality=AggregationTemporality.DELTA, + ), + ) + ], + ), + pb2.ScopeMetrics( + scope=InstrumentationScope( + name="third_name", version="third_version" + ), + metrics=[ + pb2.Metric( + name="histogram", + unit="s", + description="foo", + histogram=pb2.Histogram( + data_points=[ + pb2.HistogramDataPoint( + attributes=[ + KeyValue( + key="a", + value=AnyValue( + int_value=1 + ), + ), + KeyValue( + key="b", + value=AnyValue( + bool_value=True + ), + ), + ], + start_time_unix_nano=1641946016139533244, + time_unix_nano=1641946016139533244, + count=5, + sum=67, + bucket_counts=[1, 4], + explicit_bounds=[10.0, 20.0], + exemplars=[], + flags=pb2.DataPointFlags.FLAG_NONE, + max=18.0, + min=8.0, + ) + ], + aggregation_temporality=AggregationTemporality.DELTA, + ), + ) + ], + ), + ], + ) + ] + ) + actual = encode_metrics(metrics_data) + self.assertEqual(expected, actual) + + def test_encode_exponential_histogram(self): + exponential_histogram = Metric( + name="exponential_histogram", + description="description", + unit="unit", + data=ExponentialHistogramType( + data_points=[ + ExponentialHistogramDataPoint( + attributes={"a": 1, "b": True}, + start_time_unix_nano=0, + time_unix_nano=1, + count=2, + sum=3, + scale=4, + zero_count=5, + positive=Buckets(offset=6, bucket_counts=[7, 8]), + negative=Buckets(offset=9, bucket_counts=[10, 11]), + flags=12, + min=13.0, + max=14.0, + ) + ], + aggregation_temporality=AggregationTemporality.DELTA, + ), + ) + + metrics_data = MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource( + attributes={"a": 1, "b": False}, + schema_url="resource_schema_url", + ), + scope_metrics=[ + ScopeMetrics( + scope=SDKInstrumentationScope( + name="first_name", + version="first_version", + schema_url="insrumentation_scope_schema_url", + ), + metrics=[exponential_histogram], + schema_url="instrumentation_scope_schema_url", + ) + ], + schema_url="resource_schema_url", + ) + ] + ) + expected = ExportMetricsServiceRequest( + resource_metrics=[ + pb2.ResourceMetrics( + resource=OTLPResource( + attributes=[ + KeyValue(key="a", value=AnyValue(int_value=1)), + KeyValue( + key="b", value=AnyValue(bool_value=False) + ), + ] + ), + scope_metrics=[ + pb2.ScopeMetrics( + scope=InstrumentationScope( + name="first_name", version="first_version" + ), + metrics=[ + pb2.Metric( + name="exponential_histogram", + unit="unit", + description="description", + exponential_histogram=pb2.ExponentialHistogram( + data_points=[ + pb2.ExponentialHistogramDataPoint( + attributes=[ + KeyValue( + key="a", + value=AnyValue( + int_value=1 + ), + ), + KeyValue( + key="b", + value=AnyValue( + bool_value=True + ), + ), + ], + start_time_unix_nano=0, + time_unix_nano=1, + count=2, + sum=3, + scale=4, + zero_count=5, + positive=pb2.ExponentialHistogramDataPoint.Buckets( + offset=6, + bucket_counts=[7, 8], + ), + negative=pb2.ExponentialHistogramDataPoint.Buckets( + offset=9, + bucket_counts=[10, 11], + ), + flags=12, + exemplars=[], + min=13.0, + max=14.0, + ) + ], + aggregation_temporality=AggregationTemporality.DELTA, + ), + ) + ], + ) + ], + ) + ] + ) + # pylint: disable=protected-access + actual = encode_metrics(metrics_data) + self.assertEqual(expected, actual) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/pyproject.toml b/exporter/opentelemetry-exporter-otlp-proto-grpc/pyproject.toml index 31914a53e03..82ec2b0a19d 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/pyproject.toml +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/pyproject.toml @@ -25,6 +25,7 @@ classifiers = [ "Programming Language :: Python :: 3.11", ] dependencies = [ + "Deprecated >= 1.2.6", "backoff >= 1.10.0, < 2.0.0; python_version<'3.7'", "backoff >= 1.10.0, < 3.0.0; python_version>='3.7'", "googleapis-common-protos ~= 1.52", diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 496fe365f86..471d5fe3011 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -25,6 +25,11 @@ from typing import TypeVar from urllib.parse import urlparse +from deprecated import deprecated + +from opentelemetry.exporter.otlp.proto.common._internal import ( + _get_resource_data, +) import backoff from google.rpc.error_details_pb2 import RetryInfo from grpc import ( @@ -45,7 +50,7 @@ ArrayValue, KeyValue, ) -from opentelemetry.proto.resource.v1.resource_pb2 import Resource +from opentelemetry.proto.resource.v1.resource_pb2 import Resource # noqa: F401 from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_COMPRESSION, @@ -130,40 +135,16 @@ def _translate_key_values(key: str, value: Any) -> KeyValue: return KeyValue(key=key, value=_translate_value(value)) +@deprecated( + version="1.18.0", + reason="Use one of the encoders from opentelemetry-exporter-otlp-proto-common instead", +) def get_resource_data( sdk_resource_scope_data: Dict[SDKResource, ResourceDataT], resource_class: Callable[..., TypingResourceT], name: str, ) -> List[TypingResourceT]: - resource_data = [] - - for ( - sdk_resource, - scope_data, - ) in sdk_resource_scope_data.items(): - - collector_resource = Resource() - - for key, value in sdk_resource.attributes.items(): - - try: - # pylint: disable=no-member - collector_resource.attributes.append( - _translate_key_values(key, value) - ) - except Exception as error: # pylint: disable=broad-except - logger.exception(error) - - resource_data.append( - resource_class( - **{ - "resource": collector_resource, - "scope_{}".format(name): scope_data.values(), - } - ) - ) - - return resource_data + return _get_resource_data(sdk_resource_scope_data, resource_class, name) def _load_credential_from_file(filepath) -> ChannelCredentials: diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py index 99325b64f91..c388a726b6d 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py @@ -17,21 +17,29 @@ from typing import Dict, Iterable, List, Optional, Tuple, Union from typing import Sequence as TypingSequence from grpc import ChannelCredentials, Compression + +from opentelemetry.exporter.otlp.proto.common.metrics_encoder import ( + encode_metrics, +) from opentelemetry.sdk.metrics._internal.aggregation import Aggregation from opentelemetry.exporter.otlp.proto.grpc.exporter import ( OTLPExporterMixin, - get_resource_data, _get_credentials, environ_to_compression, ) +from opentelemetry.exporter.otlp.proto.grpc.exporter import ( # noqa: F401 + get_resource_data, +) from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( ExportMetricsServiceRequest, ) from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2_grpc import ( MetricsServiceStub, ) -from opentelemetry.proto.common.v1.common_pb2 import InstrumentationScope -from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 +from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401 + InstrumentationScope, +) +from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 # noqa: F401 from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE, OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, @@ -52,14 +60,16 @@ from opentelemetry.sdk.metrics.export import ( AggregationTemporality, DataPointT, - Gauge, - Histogram as HistogramType, Metric, MetricExporter, MetricExportResult, MetricsData, ResourceMetrics, ScopeMetrics, +) +from opentelemetry.sdk.metrics.export import ( # noqa: F401 + Gauge, + Histogram as HistogramType, Sum, ExponentialHistogram as ExponentialHistogramType, ) @@ -196,164 +206,7 @@ def __init__( def _translate_data( self, data: MetricsData ) -> ExportMetricsServiceRequest: - - resource_metrics_dict = {} - - for resource_metrics in data.resource_metrics: - - resource = resource_metrics.resource - - # It is safe to assume that each entry in data.resource_metrics is - # associated with an unique resource. - scope_metrics_dict = {} - - resource_metrics_dict[resource] = scope_metrics_dict - - for scope_metrics in resource_metrics.scope_metrics: - - instrumentation_scope = scope_metrics.scope - - # The SDK groups metrics in instrumentation scopes already so - # there is no need to check for existing instrumentation scopes - # here. - pb2_scope_metrics = pb2.ScopeMetrics( - scope=InstrumentationScope( - name=instrumentation_scope.name, - version=instrumentation_scope.version, - ) - ) - - scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics - - for metric in scope_metrics.metrics: - pb2_metric = pb2.Metric( - name=metric.name, - description=metric.description, - unit=metric.unit, - ) - - if isinstance(metric.data, Gauge): - for data_point in metric.data.data_points: - pt = pb2.NumberDataPoint( - attributes=self._translate_attributes( - data_point.attributes - ), - time_unix_nano=data_point.time_unix_nano, - ) - if isinstance(data_point.value, int): - pt.as_int = data_point.value - else: - pt.as_double = data_point.value - pb2_metric.gauge.data_points.append(pt) - - elif isinstance(metric.data, HistogramType): - for data_point in metric.data.data_points: - pt = pb2.HistogramDataPoint( - attributes=self._translate_attributes( - data_point.attributes - ), - time_unix_nano=data_point.time_unix_nano, - start_time_unix_nano=( - data_point.start_time_unix_nano - ), - count=data_point.count, - sum=data_point.sum, - bucket_counts=data_point.bucket_counts, - explicit_bounds=data_point.explicit_bounds, - max=data_point.max, - min=data_point.min, - ) - pb2_metric.histogram.aggregation_temporality = ( - metric.data.aggregation_temporality - ) - pb2_metric.histogram.data_points.append(pt) - - elif isinstance(metric.data, Sum): - for data_point in metric.data.data_points: - pt = pb2.NumberDataPoint( - attributes=self._translate_attributes( - data_point.attributes - ), - start_time_unix_nano=( - data_point.start_time_unix_nano - ), - time_unix_nano=data_point.time_unix_nano, - ) - if isinstance(data_point.value, int): - pt.as_int = data_point.value - else: - pt.as_double = data_point.value - # note that because sum is a message type, the - # fields must be set individually rather than - # instantiating a pb2.Sum and setting it once - pb2_metric.sum.aggregation_temporality = ( - metric.data.aggregation_temporality - ) - pb2_metric.sum.is_monotonic = ( - metric.data.is_monotonic - ) - pb2_metric.sum.data_points.append(pt) - - elif isinstance(metric.data, ExponentialHistogramType): - for data_point in metric.data.data_points: - - if data_point.positive.bucket_counts: - positive = pb2.ExponentialHistogramDataPoint.Buckets( - offset=data_point.positive.offset, - bucket_counts=data_point.positive.bucket_counts, - ) - else: - positive = None - - if data_point.negative.bucket_counts: - negative = pb2.ExponentialHistogramDataPoint.Buckets( - offset=data_point.negative.offset, - bucket_counts=data_point.negative.bucket_counts, - ) - else: - negative = None - - pt = pb2.ExponentialHistogramDataPoint( - attributes=self._translate_attributes( - data_point.attributes - ), - time_unix_nano=data_point.time_unix_nano, - start_time_unix_nano=( - data_point.start_time_unix_nano - ), - count=data_point.count, - sum=data_point.sum, - scale=data_point.scale, - zero_count=data_point.zero_count, - positive=positive, - negative=negative, - flags=data_point.flags, - max=data_point.max, - min=data_point.min, - ) - pb2_metric.exponential_histogram.aggregation_temporality = ( - metric.data.aggregation_temporality - ) - pb2_metric.exponential_histogram.data_points.append( - pt - ) - - else: - _logger.warning( - "unsupported data type %s", - metric.data.__class__.__name__, - ) - continue - - pb2_scope_metrics.metrics.append(pb2_metric) - - return ExportMetricsServiceRequest( - resource_metrics=get_resource_data( - resource_metrics_dict, - pb2.ResourceMetrics, - "metrics", - ) - ) + return encode_metrics(data) def export( self, diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index 9cd805e9690..62500802290 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -33,22 +33,13 @@ ) from opentelemetry.exporter.otlp.proto.grpc.version import __version__ from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( - ExportMetricsServiceRequest, ExportMetricsServiceResponse, ) from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2_grpc import ( MetricsServiceServicer, add_MetricsServiceServicer_to_server, ) -from opentelemetry.proto.common.v1.common_pb2 import ( - AnyValue, - InstrumentationScope, - KeyValue, -) -from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 -from opentelemetry.proto.resource.v1.resource_pb2 import ( - Resource as OTLPResource, -) +from opentelemetry.proto.common.v1.common_pb2 import InstrumentationScope from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE, @@ -67,17 +58,9 @@ ObservableUpDownCounter, UpDownCounter, ) -from opentelemetry.sdk.metrics.export import AggregationTemporality, Buckets -from opentelemetry.sdk.metrics.export import ( - ExponentialHistogram as ExponentialHistogramType, -) from opentelemetry.sdk.metrics.export import ( - ExponentialHistogramDataPoint, + AggregationTemporality, Gauge, -) -from opentelemetry.sdk.metrics.export import Histogram as HistogramType -from opentelemetry.sdk.metrics.export import ( - HistogramDataPoint, Metric, MetricExportResult, MetricsData, @@ -89,7 +72,7 @@ from opentelemetry.sdk.util.instrumentation import ( InstrumentationScope as SDKInstrumentationScope, ) -from opentelemetry.test.metrictestutil import _generate_gauge, _generate_sum +from opentelemetry.test.metrictestutil import _generate_sum THIS_DIR = dirname(__file__) @@ -153,53 +136,6 @@ def setUp(self): self.server.start() - histogram = Metric( - name="histogram", - description="foo", - unit="s", - data=HistogramType( - data_points=[ - HistogramDataPoint( - attributes={"a": 1, "b": True}, - start_time_unix_nano=1641946016139533244, - time_unix_nano=1641946016139533244, - count=5, - sum=67, - bucket_counts=[1, 4], - explicit_bounds=[10.0, 20.0], - min=8, - max=18, - ) - ], - aggregation_temporality=AggregationTemporality.DELTA, - ), - ) - - exponential_histogram = Metric( - name="exponential_histogram", - description="description", - unit="unit", - data=ExponentialHistogramType( - data_points=[ - ExponentialHistogramDataPoint( - attributes={"a": 1, "b": True}, - start_time_unix_nano=0, - time_unix_nano=1, - count=2, - sum=3, - scale=4, - zero_count=5, - positive=Buckets(offset=6, bucket_counts=[7, 8]), - negative=Buckets(offset=9, bucket_counts=[10, 11]), - flags=12, - min=13.0, - max=14.0, - ) - ], - aggregation_temporality=AggregationTemporality.DELTA, - ), - ) - self.metrics = { "sum_int": MetricsData( resource_metrics=[ @@ -222,162 +158,9 @@ def setUp(self): schema_url="resource_schema_url", ) ] - ), - "sum_double": MetricsData( - resource_metrics=[ - ResourceMetrics( - resource=Resource( - attributes={"a": 1, "b": False}, - schema_url="resource_schema_url", - ), - scope_metrics=[ - ScopeMetrics( - scope=SDKInstrumentationScope( - name="first_name", - version="first_version", - schema_url="insrumentation_scope_schema_url", - ), - metrics=[_generate_sum("sum_double", 2.98)], - schema_url="instrumentation_scope_schema_url", - ) - ], - schema_url="resource_schema_url", - ) - ] - ), - "gauge_int": MetricsData( - resource_metrics=[ - ResourceMetrics( - resource=Resource( - attributes={"a": 1, "b": False}, - schema_url="resource_schema_url", - ), - scope_metrics=[ - ScopeMetrics( - scope=SDKInstrumentationScope( - name="first_name", - version="first_version", - schema_url="insrumentation_scope_schema_url", - ), - metrics=[_generate_gauge("gauge_int", 9000)], - schema_url="instrumentation_scope_schema_url", - ) - ], - schema_url="resource_schema_url", - ) - ] - ), - "gauge_double": MetricsData( - resource_metrics=[ - ResourceMetrics( - resource=Resource( - attributes={"a": 1, "b": False}, - schema_url="resource_schema_url", - ), - scope_metrics=[ - ScopeMetrics( - scope=SDKInstrumentationScope( - name="first_name", - version="first_version", - schema_url="insrumentation_scope_schema_url", - ), - metrics=[ - _generate_gauge("gauge_double", 52.028) - ], - schema_url="instrumentation_scope_schema_url", - ) - ], - schema_url="resource_schema_url", - ) - ] - ), - "histogram": MetricsData( - resource_metrics=[ - ResourceMetrics( - resource=Resource( - attributes={"a": 1, "b": False}, - schema_url="resource_schema_url", - ), - scope_metrics=[ - ScopeMetrics( - scope=SDKInstrumentationScope( - name="first_name", - version="first_version", - schema_url="insrumentation_scope_schema_url", - ), - metrics=[histogram], - schema_url="instrumentation_scope_schema_url", - ) - ], - schema_url="resource_schema_url", - ) - ] - ), - "exponential_histogram": MetricsData( - resource_metrics=[ - ResourceMetrics( - resource=Resource( - attributes={"a": 1, "b": False}, - schema_url="resource_schema_url", - ), - scope_metrics=[ - ScopeMetrics( - scope=SDKInstrumentationScope( - name="first_name", - version="first_version", - schema_url="insrumentation_scope_schema_url", - ), - metrics=[exponential_histogram], - schema_url="instrumentation_scope_schema_url", - ) - ], - schema_url="resource_schema_url", - ) - ] - ), + ) } - self.multiple_scope_histogram = MetricsData( - resource_metrics=[ - ResourceMetrics( - resource=Resource( - attributes={"a": 1, "b": False}, - schema_url="resource_schema_url", - ), - scope_metrics=[ - ScopeMetrics( - scope=SDKInstrumentationScope( - name="first_name", - version="first_version", - schema_url="insrumentation_scope_schema_url", - ), - metrics=[histogram, histogram], - schema_url="instrumentation_scope_schema_url", - ), - ScopeMetrics( - scope=SDKInstrumentationScope( - name="second_name", - version="second_version", - schema_url="insrumentation_scope_schema_url", - ), - metrics=[histogram], - schema_url="instrumentation_scope_schema_url", - ), - ScopeMetrics( - scope=SDKInstrumentationScope( - name="third_name", - version="third_version", - schema_url="insrumentation_scope_schema_url", - ), - metrics=[histogram], - schema_url="instrumentation_scope_schema_url", - ), - ], - schema_url="resource_schema_url", - ) - ] - ) - def tearDown(self): self.server.stop(None) @@ -665,556 +448,6 @@ def test_failure(self): MetricExportResult.FAILURE, ) - def test_translate_sum_int(self): - expected = ExportMetricsServiceRequest( - resource_metrics=[ - pb2.ResourceMetrics( - resource=OTLPResource( - attributes=[ - KeyValue(key="a", value=AnyValue(int_value=1)), - KeyValue( - key="b", value=AnyValue(bool_value=False) - ), - ] - ), - scope_metrics=[ - pb2.ScopeMetrics( - scope=InstrumentationScope( - name="first_name", version="first_version" - ), - metrics=[ - pb2.Metric( - name="sum_int", - unit="s", - description="foo", - sum=pb2.Sum( - data_points=[ - pb2.NumberDataPoint( - attributes=[ - KeyValue( - key="a", - value=AnyValue( - int_value=1 - ), - ), - KeyValue( - key="b", - value=AnyValue( - bool_value=True - ), - ), - ], - start_time_unix_nano=1641946015139533244, - time_unix_nano=1641946016139533244, - as_int=33, - ) - ], - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - ) - ], - ) - ], - ) - ] - ) - # pylint: disable=protected-access - actual = self.exporter._translate_data(self.metrics["sum_int"]) - self.assertEqual(expected, actual) - - def test_translate_sum_double(self): - expected = ExportMetricsServiceRequest( - resource_metrics=[ - pb2.ResourceMetrics( - resource=OTLPResource( - attributes=[ - KeyValue(key="a", value=AnyValue(int_value=1)), - KeyValue( - key="b", value=AnyValue(bool_value=False) - ), - ] - ), - scope_metrics=[ - pb2.ScopeMetrics( - scope=InstrumentationScope( - name="first_name", version="first_version" - ), - metrics=[ - pb2.Metric( - name="sum_double", - unit="s", - description="foo", - sum=pb2.Sum( - data_points=[ - pb2.NumberDataPoint( - attributes=[ - KeyValue( - key="a", - value=AnyValue( - int_value=1 - ), - ), - KeyValue( - key="b", - value=AnyValue( - bool_value=True - ), - ), - ], - start_time_unix_nano=1641946015139533244, - time_unix_nano=1641946016139533244, - as_double=2.98, - ) - ], - aggregation_temporality=AggregationTemporality.CUMULATIVE, - is_monotonic=True, - ), - ) - ], - ) - ], - ) - ] - ) - # pylint: disable=protected-access - actual = self.exporter._translate_data(self.metrics["sum_double"]) - self.assertEqual(expected, actual) - - def test_translate_gauge_int(self): - expected = ExportMetricsServiceRequest( - resource_metrics=[ - pb2.ResourceMetrics( - resource=OTLPResource( - attributes=[ - KeyValue(key="a", value=AnyValue(int_value=1)), - KeyValue( - key="b", value=AnyValue(bool_value=False) - ), - ] - ), - scope_metrics=[ - pb2.ScopeMetrics( - scope=InstrumentationScope( - name="first_name", version="first_version" - ), - metrics=[ - pb2.Metric( - name="gauge_int", - unit="s", - description="foo", - gauge=pb2.Gauge( - data_points=[ - pb2.NumberDataPoint( - attributes=[ - KeyValue( - key="a", - value=AnyValue( - int_value=1 - ), - ), - KeyValue( - key="b", - value=AnyValue( - bool_value=True - ), - ), - ], - time_unix_nano=1641946016139533244, - as_int=9000, - ) - ], - ), - ) - ], - ) - ], - ) - ] - ) - # pylint: disable=protected-access - actual = self.exporter._translate_data(self.metrics["gauge_int"]) - self.assertEqual(expected, actual) - - def test_translate_gauge_double(self): - expected = ExportMetricsServiceRequest( - resource_metrics=[ - pb2.ResourceMetrics( - resource=OTLPResource( - attributes=[ - KeyValue(key="a", value=AnyValue(int_value=1)), - KeyValue( - key="b", value=AnyValue(bool_value=False) - ), - ] - ), - scope_metrics=[ - pb2.ScopeMetrics( - scope=InstrumentationScope( - name="first_name", version="first_version" - ), - metrics=[ - pb2.Metric( - name="gauge_double", - unit="s", - description="foo", - gauge=pb2.Gauge( - data_points=[ - pb2.NumberDataPoint( - attributes=[ - KeyValue( - key="a", - value=AnyValue( - int_value=1 - ), - ), - KeyValue( - key="b", - value=AnyValue( - bool_value=True - ), - ), - ], - time_unix_nano=1641946016139533244, - as_double=52.028, - ) - ], - ), - ) - ], - ) - ], - ) - ] - ) - # pylint: disable=protected-access - actual = self.exporter._translate_data(self.metrics["gauge_double"]) - self.assertEqual(expected, actual) - - def test_translate_histogram(self): - expected = ExportMetricsServiceRequest( - resource_metrics=[ - pb2.ResourceMetrics( - resource=OTLPResource( - attributes=[ - KeyValue(key="a", value=AnyValue(int_value=1)), - KeyValue( - key="b", value=AnyValue(bool_value=False) - ), - ] - ), - scope_metrics=[ - pb2.ScopeMetrics( - scope=InstrumentationScope( - name="first_name", version="first_version" - ), - metrics=[ - pb2.Metric( - name="histogram", - unit="s", - description="foo", - histogram=pb2.Histogram( - data_points=[ - pb2.HistogramDataPoint( - attributes=[ - KeyValue( - key="a", - value=AnyValue( - int_value=1 - ), - ), - KeyValue( - key="b", - value=AnyValue( - bool_value=True - ), - ), - ], - start_time_unix_nano=1641946016139533244, - time_unix_nano=1641946016139533244, - count=5, - sum=67, - bucket_counts=[1, 4], - explicit_bounds=[10.0, 20.0], - exemplars=[], - flags=pb2.DataPointFlags.FLAG_NONE, - max=18.0, - min=8.0, - ) - ], - aggregation_temporality=AggregationTemporality.DELTA, - ), - ) - ], - ) - ], - ) - ] - ) - # pylint: disable=protected-access - actual = self.exporter._translate_data(self.metrics["histogram"]) - self.assertEqual(expected, actual) - - def test_translate_exponential_histogram(self): - expected = ExportMetricsServiceRequest( - resource_metrics=[ - pb2.ResourceMetrics( - resource=OTLPResource( - attributes=[ - KeyValue(key="a", value=AnyValue(int_value=1)), - KeyValue( - key="b", value=AnyValue(bool_value=False) - ), - ] - ), - scope_metrics=[ - pb2.ScopeMetrics( - scope=InstrumentationScope( - name="first_name", version="first_version" - ), - metrics=[ - pb2.Metric( - name="exponential_histogram", - unit="unit", - description="description", - exponential_histogram=pb2.ExponentialHistogram( - data_points=[ - pb2.ExponentialHistogramDataPoint( - attributes=[ - KeyValue( - key="a", - value=AnyValue( - int_value=1 - ), - ), - KeyValue( - key="b", - value=AnyValue( - bool_value=True - ), - ), - ], - start_time_unix_nano=0, - time_unix_nano=1, - count=2, - sum=3, - scale=4, - zero_count=5, - positive=pb2.ExponentialHistogramDataPoint.Buckets( - offset=6, - bucket_counts=[7, 8], - ), - negative=pb2.ExponentialHistogramDataPoint.Buckets( - offset=9, - bucket_counts=[10, 11], - ), - flags=12, - exemplars=[], - min=13.0, - max=14.0, - ) - ], - aggregation_temporality=AggregationTemporality.DELTA, - ), - ) - ], - ) - ], - ) - ] - ) - # pylint: disable=protected-access - actual = self.exporter._translate_data( - self.metrics["exponential_histogram"] - ) - self.assertEqual(expected, actual) - - def test_translate_multiple_scope_histogram(self): - expected = ExportMetricsServiceRequest( - resource_metrics=[ - pb2.ResourceMetrics( - resource=OTLPResource( - attributes=[ - KeyValue(key="a", value=AnyValue(int_value=1)), - KeyValue( - key="b", value=AnyValue(bool_value=False) - ), - ] - ), - scope_metrics=[ - pb2.ScopeMetrics( - scope=InstrumentationScope( - name="first_name", version="first_version" - ), - metrics=[ - pb2.Metric( - name="histogram", - unit="s", - description="foo", - histogram=pb2.Histogram( - data_points=[ - pb2.HistogramDataPoint( - attributes=[ - KeyValue( - key="a", - value=AnyValue( - int_value=1 - ), - ), - KeyValue( - key="b", - value=AnyValue( - bool_value=True - ), - ), - ], - start_time_unix_nano=1641946016139533244, - time_unix_nano=1641946016139533244, - count=5, - sum=67, - bucket_counts=[1, 4], - explicit_bounds=[10.0, 20.0], - exemplars=[], - flags=pb2.DataPointFlags.FLAG_NONE, - max=18.0, - min=8.0, - ) - ], - aggregation_temporality=AggregationTemporality.DELTA, - ), - ), - pb2.Metric( - name="histogram", - unit="s", - description="foo", - histogram=pb2.Histogram( - data_points=[ - pb2.HistogramDataPoint( - attributes=[ - KeyValue( - key="a", - value=AnyValue( - int_value=1 - ), - ), - KeyValue( - key="b", - value=AnyValue( - bool_value=True - ), - ), - ], - start_time_unix_nano=1641946016139533244, - time_unix_nano=1641946016139533244, - count=5, - sum=67, - bucket_counts=[1, 4], - explicit_bounds=[10.0, 20.0], - exemplars=[], - flags=pb2.DataPointFlags.FLAG_NONE, - max=18.0, - min=8.0, - ) - ], - aggregation_temporality=AggregationTemporality.DELTA, - ), - ), - ], - ), - pb2.ScopeMetrics( - scope=InstrumentationScope( - name="second_name", version="second_version" - ), - metrics=[ - pb2.Metric( - name="histogram", - unit="s", - description="foo", - histogram=pb2.Histogram( - data_points=[ - pb2.HistogramDataPoint( - attributes=[ - KeyValue( - key="a", - value=AnyValue( - int_value=1 - ), - ), - KeyValue( - key="b", - value=AnyValue( - bool_value=True - ), - ), - ], - start_time_unix_nano=1641946016139533244, - time_unix_nano=1641946016139533244, - count=5, - sum=67, - bucket_counts=[1, 4], - explicit_bounds=[10.0, 20.0], - exemplars=[], - flags=pb2.DataPointFlags.FLAG_NONE, - max=18.0, - min=8.0, - ) - ], - aggregation_temporality=AggregationTemporality.DELTA, - ), - ) - ], - ), - pb2.ScopeMetrics( - scope=InstrumentationScope( - name="third_name", version="third_version" - ), - metrics=[ - pb2.Metric( - name="histogram", - unit="s", - description="foo", - histogram=pb2.Histogram( - data_points=[ - pb2.HistogramDataPoint( - attributes=[ - KeyValue( - key="a", - value=AnyValue( - int_value=1 - ), - ), - KeyValue( - key="b", - value=AnyValue( - bool_value=True - ), - ), - ], - start_time_unix_nano=1641946016139533244, - time_unix_nano=1641946016139533244, - count=5, - sum=67, - bucket_counts=[1, 4], - explicit_bounds=[10.0, 20.0], - exemplars=[], - flags=pb2.DataPointFlags.FLAG_NONE, - max=18.0, - min=8.0, - ) - ], - aggregation_temporality=AggregationTemporality.DELTA, - ), - ) - ], - ), - ], - ) - ] - ) - # pylint: disable=protected-access - actual = self.exporter._translate_data(self.multiple_scope_histogram) - self.assertEqual(expected, actual) - def test_split_metrics_data_many_data_points(self): # GIVEN metrics_data = MetricsData( diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml b/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml index 47aafb6ee5a..54f2b67249a 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml +++ b/exporter/opentelemetry-exporter-otlp-proto-http/pyproject.toml @@ -25,6 +25,7 @@ classifiers = [ "Programming Language :: Python :: 3.11", ] dependencies = [ + "Deprecated >= 1.2.6", "backoff >= 1.10.0, < 2.0.0; python_version<'3.7'", "backoff >= 1.10.0, < 3.0.0; python_version>='3.7'", "googleapis-common-protos ~= 1.52", diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index ffd5102a2d9..c2950b999cb 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -15,24 +15,35 @@ import logging import zlib from os import environ -from typing import Dict, Optional, Sequence, Any, Callable, List, Mapping +from typing import Dict, Optional, Any, Callable, List +from typing import Sequence, Mapping # noqa: F401 + from io import BytesIO from time import sleep +from deprecated import deprecated +from opentelemetry.exporter.otlp.proto.common._internal import ( + _get_resource_data, +) +from opentelemetry.exporter.otlp.proto.common.metrics_encoder import ( + encode_metrics, +) from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.sdk.metrics._internal.aggregation import Aggregation -from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( +from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( # noqa: F401 ExportMetricsServiceRequest, ) -from opentelemetry.proto.common.v1.common_pb2 import ( +from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401 AnyValue, ArrayValue, KeyValue, KeyValueList, ) -from opentelemetry.proto.common.v1.common_pb2 import InstrumentationScope -from opentelemetry.proto.resource.v1.resource_pb2 import Resource -from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 +from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401 + InstrumentationScope, +) +from opentelemetry.proto.resource.v1.resource_pb2 import Resource # noqa: F401 +from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 # noqa: F401 from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, OTEL_EXPORTER_OTLP_ENDPOINT, @@ -56,11 +67,13 @@ ) from opentelemetry.sdk.metrics.export import ( AggregationTemporality, - Gauge, - Histogram as HistogramType, MetricExporter, MetricExportResult, MetricsData, +) +from opentelemetry.sdk.metrics.export import ( # noqa: F401 + Gauge, + Histogram as HistogramType, Sum, ) from opentelemetry.sdk.resources import Resource as SDKResource @@ -68,6 +81,9 @@ import backoff import requests +from opentelemetry.proto.resource.v1.resource_pb2 import ( + Resource as PB2Resource, +) _logger = logging.getLogger(__name__) @@ -220,140 +236,13 @@ def _retryable(resp: requests.Response) -> bool: return True return False - def _translate_data( - self, data: MetricsData - ) -> ExportMetricsServiceRequest: - - resource_metrics_dict = {} - - for resource_metrics in data.resource_metrics: - - resource = resource_metrics.resource - - # It is safe to assume that each entry in data.resource_metrics is - # associated with an unique resource. - scope_metrics_dict = {} - - resource_metrics_dict[resource] = scope_metrics_dict - - for scope_metrics in resource_metrics.scope_metrics: - - instrumentation_scope = scope_metrics.scope - - # The SDK groups metrics in instrumentation scopes already so - # there is no need to check for existing instrumentation scopes - # here. - pb2_scope_metrics = pb2.ScopeMetrics( - scope=InstrumentationScope( - name=instrumentation_scope.name, - version=instrumentation_scope.version, - ) - ) - - scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics - - for metric in scope_metrics.metrics: - pb2_metric = pb2.Metric( - name=metric.name, - description=metric.description, - unit=metric.unit, - ) - - if isinstance(metric.data, Gauge): - for data_point in metric.data.data_points: - pt = pb2.NumberDataPoint( - attributes=self._translate_attributes( - data_point.attributes - ), - time_unix_nano=data_point.time_unix_nano, - ) - if isinstance(data_point.value, int): - pt.as_int = data_point.value - else: - pt.as_double = data_point.value - pb2_metric.gauge.data_points.append(pt) - - elif isinstance(metric.data, HistogramType): - for data_point in metric.data.data_points: - pt = pb2.HistogramDataPoint( - attributes=self._translate_attributes( - data_point.attributes - ), - time_unix_nano=data_point.time_unix_nano, - start_time_unix_nano=( - data_point.start_time_unix_nano - ), - count=data_point.count, - sum=data_point.sum, - bucket_counts=data_point.bucket_counts, - explicit_bounds=data_point.explicit_bounds, - max=data_point.max, - min=data_point.min, - ) - pb2_metric.histogram.aggregation_temporality = ( - metric.data.aggregation_temporality - ) - pb2_metric.histogram.data_points.append(pt) - - elif isinstance(metric.data, Sum): - for data_point in metric.data.data_points: - pt = pb2.NumberDataPoint( - attributes=self._translate_attributes( - data_point.attributes - ), - start_time_unix_nano=( - data_point.start_time_unix_nano - ), - time_unix_nano=data_point.time_unix_nano, - ) - if isinstance(data_point.value, int): - pt.as_int = data_point.value - else: - pt.as_double = data_point.value - # note that because sum is a message type, the - # fields must be set individually rather than - # instantiating a pb2.Sum and setting it once - pb2_metric.sum.aggregation_temporality = ( - metric.data.aggregation_temporality - ) - pb2_metric.sum.is_monotonic = ( - metric.data.is_monotonic - ) - pb2_metric.sum.data_points.append(pt) - else: - _logger.warn( - "unsupported datapoint type %s", metric.point - ) - continue - - pb2_scope_metrics.metrics.append(pb2_metric) - - return ExportMetricsServiceRequest( - resource_metrics=get_resource_data( - resource_metrics_dict, - pb2.ResourceMetrics, - "metrics", - ) - ) - - def _translate_attributes(self, attributes) -> Sequence[KeyValue]: - output = [] - if attributes: - - for key, value in attributes.items(): - try: - output.append(_translate_key_values(key, value)) - except Exception as error: # pylint: disable=broad-except - _logger.exception(error) - return output - def export( self, metrics_data: MetricsData, timeout_millis: float = 10_000, **kwargs, ) -> MetricExportResult: - serialized_data = self._translate_data(metrics_data) + serialized_data = encode_metrics(metrics_data) for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT): if delay == self._MAX_RETRY_TIMEOUT: @@ -391,79 +280,16 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: return True -def _translate_value(value: Any) -> KeyValue: - - if isinstance(value, bool): - any_value = AnyValue(bool_value=value) - - elif isinstance(value, str): - any_value = AnyValue(string_value=value) - - elif isinstance(value, int): - any_value = AnyValue(int_value=value) - - elif isinstance(value, float): - any_value = AnyValue(double_value=value) - - elif isinstance(value, Sequence): - any_value = AnyValue( - array_value=ArrayValue(values=[_translate_value(v) for v in value]) - ) - - elif isinstance(value, Mapping): - any_value = AnyValue( - kvlist_value=KeyValueList( - values=[ - _translate_key_values(str(k), v) for k, v in value.items() - ] - ) - ) - - else: - raise Exception(f"Invalid type {type(value)} of value {value}") - - return any_value - - -def _translate_key_values(key: str, value: Any) -> KeyValue: - return KeyValue(key=key, value=_translate_value(value)) - - +@deprecated( + version="1.18.0", + reason="Use one of the encoders from opentelemetry-exporter-otlp-proto-common instead", +) def get_resource_data( sdk_resource_scope_data: Dict[SDKResource, Any], # ResourceDataT? - resource_class: Callable[..., Resource], + resource_class: Callable[..., PB2Resource], name: str, -) -> List[Resource]: - - resource_data = [] - - for ( - sdk_resource, - scope_data, - ) in sdk_resource_scope_data.items(): - - collector_resource = Resource() - - for key, value in sdk_resource.attributes.items(): - - try: - # pylint: disable=no-member - collector_resource.attributes.append( - _translate_key_values(key, value) - ) - except Exception as error: # pylint: disable=broad-except - _logger.exception(error) - - resource_data.append( - resource_class( - **{ - "resource": collector_resource, - "scope_{}".format(name): scope_data.values(), - } - ) - ) - - return resource_data +) -> List[PB2Resource]: + return _get_resource_data(sdk_resource_scope_data, resource_class, name) def _compression_from_env() -> Compression: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 9f57a23ae2a..81e6c1442e4 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -21,6 +21,9 @@ from requests.models import Response from responses import POST, activate, add +from opentelemetry.exporter.otlp.proto.common.metrics_encoder import ( + encode_metrics, +) from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( DEFAULT_COMPRESSION, @@ -281,7 +284,7 @@ def test_serialization(self, mock_post): MetricExportResult.SUCCESS, ) - serialized_data = exporter._translate_data(self.metrics["sum_int"]) + serialized_data = encode_metrics(self.metrics["sum_int"]) mock_post.assert_called_once_with( url=exporter._endpoint, data=serialized_data.SerializeToString(), From 7087c010df4df06f84534499b1473e130682a2af Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Tue, 14 Feb 2023 16:24:01 +0000 Subject: [PATCH 4/5] Update CHANGELOG.md --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 77e55796889..3923d7eda55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Move Protobuf encoding to its own package + ([#3169](https://github.com/open-telemetry/opentelemetry-python/pull/3169)) - Add experimental feature to detect resource detectors in auto instrumentation ([#3181](https://github.com/open-telemetry/opentelemetry-python/pull/3181)) - Fix exporting of ExponentialBucketHistogramAggregation from opentelemetry.sdk.metrics.view From 82a107568cd00d06a8efdee7e2bbfbee5bccef75 Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 1 Mar 2023 13:20:44 +0000 Subject: [PATCH 5/5] Fix typing in metrictestutil.py --- .../src/opentelemetry/test/metrictestutil.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py b/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py index 59b3a45d20a..895904af03f 100644 --- a/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py +++ b/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py @@ -40,7 +40,7 @@ def _generate_metric( def _generate_sum( name, value, attributes=None, description=None, unit=None -) -> Sum: +) -> Metric: if attributes is None: attributes = BoundedAttributes(attributes={"a": 1, "b": True}) return _generate_metric( @@ -64,7 +64,7 @@ def _generate_sum( def _generate_gauge( name, value, attributes=None, description=None, unit=None -) -> Gauge: +) -> Metric: if attributes is None: attributes = BoundedAttributes(attributes={"a": 1, "b": True}) return _generate_metric( @@ -86,7 +86,7 @@ def _generate_gauge( def _generate_unsupported_metric( name, attributes=None, description=None, unit=None -) -> Sum: +) -> Metric: return _generate_metric( name, None,