Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype change adding a Authorization Header Setter to OTLP exporters #4432

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ dependencies = [
"googleapis-common-protos ~= 1.52",
"grpcio >= 1.63.2, < 2.0.0",
"opentelemetry-api ~= 1.15",
"opentelemetry-proto == 1.31.0.dev",
"opentelemetry-sdk ~= 1.31.0.dev",
"opentelemetry-exporter-otlp-proto-common == 1.31.0.dev",
"opentelemetry-proto ~= 1.29.0",
"opentelemetry-sdk ~= 1.30.0.dev0",
"opentelemetry-exporter-otlp-proto-common ~= 1.29.0",
]

[project.entry-points.opentelemetry_logs_exporter]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from grpc import ChannelCredentials, Compression
from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
BaseAuthHeaderSetter,
OTLPExporterMixin,
_get_credentials,
environ_to_compression,
Expand Down Expand Up @@ -60,6 +61,7 @@ def __init__(
] = None,
timeout: Optional[int] = None,
compression: Optional[Compression] = None,
auth_header_setter: BaseAuthHeaderSetter = None,
):
if insecure is None:
insecure = environ.get(OTEL_EXPORTER_OTLP_LOGS_INSECURE)
Expand Down Expand Up @@ -99,6 +101,7 @@ def __init__(
"headers": headers,
"timeout": timeout or environ_timeout,
"compression": compression,
"auth_header_setter": auth_header_setter,
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@
}


class BaseAuthHeaderSetter(ABC):

@abstractmethod
def get_auth_header(self) -> Tuple[str, str]:
pass

class InvalidCompressionValueException(Exception):
def __init__(self, environ_key: str, environ_value: str):
super().__init__(
Expand Down Expand Up @@ -196,9 +202,10 @@ def __init__(
] = None,
timeout: Optional[int] = None,
compression: Optional[Compression] = None,
auth_header_setter: BaseAuthHeaderSetter = None,
):
super().__init__()

self._auth_header_setter = auth_header_setter
self._endpoint = endpoint or environ.get(
OTEL_EXPORTER_OTLP_ENDPOINT, "http://localhost:4317"
)
Expand Down Expand Up @@ -293,12 +300,14 @@ def _export(
for delay in _create_exp_backoff_generator(max_value=max_value):
if delay == max_value or self._shutdown:
return self._result.FAILURE

headers = self._headers
if self._auth_header_setter:
headers += self._auth_header_setter.get_auth_header()
with self._export_lock:
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
metadata=headers,
timeout=self._timeout,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
encode_metrics,
)
from opentelemetry.exporter.otlp.proto.grpc.exporter import ( # noqa: F401
BaseAuthHeaderSetter,
OTLPExporterMixin,
_get_credentials,
environ_to_compression,
Expand Down Expand Up @@ -103,6 +104,7 @@ def __init__(
preferred_temporality: Dict[type, AggregationTemporality] = None,
preferred_aggregation: Dict[type, Aggregation] = None,
max_export_batch_size: Optional[int] = None,
auth_header_setter: BaseAuthHeaderSetter = None,
):
if insecure is None:
insecure = environ.get(OTEL_EXPORTER_OTLP_METRICS_INSECURE)
Expand Down Expand Up @@ -144,6 +146,7 @@ def __init__(
headers=headers or environ.get(OTEL_EXPORTER_OTLP_METRICS_HEADERS),
timeout=timeout or environ_timeout,
compression=compression,
auth_header_setter=auth_header_setter,
)

self._max_export_batch_size: Optional[int] = max_export_batch_size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
encode_spans,
)
from opentelemetry.exporter.otlp.proto.grpc.exporter import ( # noqa: F401
BaseAuthHeaderSetter,
OTLPExporterMixin,
_get_credentials,
environ_to_compression,
Expand Down Expand Up @@ -93,6 +94,7 @@ def __init__(
] = None,
timeout: Optional[int] = None,
compression: Optional[Compression] = None,
auth_header_setter: BaseAuthHeaderSetter = None,
):
if insecure is None:
insecure = environ.get(OTEL_EXPORTER_OTLP_TRACES_INSECURE)
Expand Down Expand Up @@ -131,6 +133,7 @@ def __init__(
or environ.get(OTEL_EXPORTER_OTLP_TRACES_HEADERS),
"timeout": timeout or environ_timeout,
"compression": compression,
"auth_header_setter": auth_header_setter,
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "1.31.0.dev"
__version__ = "1.30.0"
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ dependencies = [
"Deprecated >= 1.2.6",
"googleapis-common-protos ~= 1.52",
"opentelemetry-api ~= 1.15",
"opentelemetry-proto == 1.31.0.dev",
"opentelemetry-sdk ~= 1.31.0.dev",
"opentelemetry-exporter-otlp-proto-common == 1.31.0.dev",
"opentelemetry-proto == 1.29.0",
"opentelemetry-sdk ~= 1.30.0.dev",
"opentelemetry-exporter-otlp-proto-common == 1.29.0",
"requests ~= 2.7",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@
from time import sleep
from typing import Dict, Optional, Sequence

import google.auth
import grpc
import requests
from google.auth.transport.grpc import AuthMetadataPlugin

from opentelemetry.exporter.otlp.proto.common._internal import (
_create_exp_backoff_generator,
)
from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
BaseAuthHeaderSetter,
)
from opentelemetry.exporter.otlp.proto.http import (
_OTLP_HTTP_HEADERS,
Compression,
Expand Down Expand Up @@ -75,7 +81,9 @@ def __init__(
timeout: Optional[int] = None,
compression: Optional[Compression] = None,
session: Optional[requests.Session] = None,
auth_header_setter: BaseAuthHeaderSetter = None,
):
self._auth_header_setter = auth_header_setter
self._endpoint = endpoint or environ.get(
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
_append_logs_path(
Expand Down Expand Up @@ -124,6 +132,8 @@ def __init__(
self._shutdown = False

def _export(self, serialized_data: bytes):
if self._auth_header_setter:
self._session.headers["Authorization"] = self._auth_header_setter.get_auth_header()[1]
data = serialized_data
if self._compression == Compression.Gzip:
gzip_data = BytesIO()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
from opentelemetry.exporter.otlp.proto.common.metrics_encoder import (
encode_metrics,
)
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
BaseAuthHeaderSetter,
)
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( # noqa: F401
ExportMetricsServiceRequest,
Expand Down Expand Up @@ -111,7 +114,9 @@ def __init__(
session: Optional[requests.Session] = None,
preferred_temporality: Dict[type, AggregationTemporality] = None,
preferred_aggregation: Dict[type, Aggregation] = None,
auth_header_setter: BaseAuthHeaderSetter = None,
):
self._auth_header_setter = auth_header_setter
self._endpoint = endpoint or environ.get(
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
_append_metrics_path(
Expand Down Expand Up @@ -164,6 +169,8 @@ def __init__(
)

def _export(self, serialized_data: bytes):
if self._auth_header_setter:
self._session.headers["Authorization"] = self._auth_header_setter.get_auth_header()[1]
data = serialized_data
if self._compression == Compression.Gzip:
gzip_data = BytesIO()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
from opentelemetry.exporter.otlp.proto.common.trace_encoder import (
encode_spans,
)
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
BaseAuthHeaderSetter,
)
from opentelemetry.exporter.otlp.proto.http import (
_OTLP_HTTP_HEADERS,
Compression,
Expand Down Expand Up @@ -73,7 +76,9 @@ def __init__(
timeout: Optional[int] = None,
compression: Optional[Compression] = None,
session: Optional[requests.Session] = None,
auth_header_setter: BaseAuthHeaderSetter = None,
):
self._auth_header_setter = auth_header_setter
self._endpoint = endpoint or environ.get(
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
_append_trace_path(
Expand Down Expand Up @@ -121,6 +126,8 @@ def __init__(
self._shutdown = False

def _export(self, serialized_data: bytes):
if self._auth_header_setter:
self._session.headers["Authorization"] = self._auth_header_setter.get_auth_header()[1]
data = serialized_data
if self._compression == Compression.Gzip:
gzip_data = BytesIO()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "1.31.0.dev"
__version__ = "1.30.0.dev"
4 changes: 2 additions & 2 deletions opentelemetry-sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ classifiers = [
"Typing :: Typed",
]
dependencies = [
"opentelemetry-api == 1.31.0.dev",
"opentelemetry-semantic-conventions == 0.52b0.dev",
"opentelemetry-api == 1.30.0",
"opentelemetry-semantic-conventions == 0.51b0",
"typing-extensions >= 3.7.4",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import logging
import os
from abc import ABC, abstractmethod
from inspect import signature
from os import environ
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Type, Union

Expand All @@ -33,12 +34,16 @@
OTEL_PYTHON_ID_GENERATOR,
OTEL_TRACES_EXPORTER,
)
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
BaseAuthHeaderSetter,
)
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk._events import EventLoggerProvider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogExporter
from opentelemetry.sdk.environment_variables import (
_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED,
OTEL_AUTH_HEADER_EXTENSION,
OTEL_EXPORTER_OTLP_LOGS_PROTOCOL,
OTEL_EXPORTER_OTLP_METRICS_PROTOCOL,
OTEL_EXPORTER_OTLP_PROTOCOL,
Expand Down Expand Up @@ -90,6 +95,14 @@
_logger = logging.getLogger(__name__)


def _import_config_component(
selected_component: str, entry_point_name: str
) -> object:
return _import_config_components([selected_component], entry_point_name)[
0
][1]


def _import_config_components(
selected_components: List[str], entry_point_name: str
) -> Sequence[Tuple[str, object]]:
Expand Down Expand Up @@ -194,6 +207,7 @@ def _init_tracing(
id_generator: IdGenerator = None,
sampler: Sampler = None,
resource: Resource = None,
auth_header_setter: BaseAuthHeaderSetter = None,
):
provider = TracerProvider(
id_generator=id_generator,
Expand All @@ -204,6 +218,8 @@ def _init_tracing(

for _, exporter_class in exporters.items():
exporter_args = {}
if "auth_header_setter" in signature(exporter_class).parameters:
exporter_args["auth_header_setter"] = auth_header_setter
provider.add_span_processor(
BatchSpanProcessor(exporter_class(**exporter_args))
)
Expand All @@ -214,12 +230,17 @@ def _init_metrics(
str, Union[Type[MetricExporter], Type[MetricReader]]
],
resource: Resource = None,
auth_header_setter: BaseAuthHeaderSetter = None,
):
metric_readers = []

for _, exporter_or_reader_class in exporters_or_readers.items():
exporter_args = {}

if (
"auth_header_setter"
in signature(exporter_or_reader_class).parameters
):
exporter_args["auth_header_setter"] = auth_header_setter
if issubclass(exporter_or_reader_class, MetricReader):
metric_readers.append(exporter_or_reader_class(**exporter_args))
else:
Expand All @@ -237,12 +258,15 @@ def _init_logging(
exporters: Dict[str, Type[LogExporter]],
resource: Resource = None,
setup_logging_handler: bool = True,
auth_header_setter: BaseAuthHeaderSetter = None,
):
provider = LoggerProvider(resource=resource)
set_logger_provider(provider)

for _, exporter_class in exporters.items():
exporter_args = {}
if "auth_header_setter" in signature(exporter_class).parameters:
exporter_args["auth_header_setter"] = auth_header_setter
provider.add_log_record_processor(
BatchLogRecordProcessor(exporter_class(**exporter_args))
)
Expand Down Expand Up @@ -380,6 +404,14 @@ def _initialize_components(
metric_exporter_names + _get_exporter_names("metrics"),
log_exporter_names + _get_exporter_names("logs"),
)
auth_header_setter = None
auth_ext = os.getenv(OTEL_AUTH_HEADER_EXTENSION)
if auth_ext:
auth_header_setter = _import_config_component(
auth_ext, "opentelemetry_auth_header_extension"
)()
if not isinstance(auth_header_setter, BaseAuthHeaderSetter):
raise RuntimeError(f"{auth_ext} is not an BaseAuthHeaderSetter")
if sampler is None:
sampler_name = _get_sampler()
sampler = _import_sampler(sampler_name)
Expand All @@ -402,8 +434,13 @@ def _initialize_components(
id_generator=id_generator,
sampler=sampler,
resource=resource,
auth_header_setter=auth_header_setter,
)
_init_metrics(
metric_exporters,
resource,
auth_header_setter=auth_header_setter,
)
_init_metrics(metric_exporters, resource)
if setup_logging_handler is None:
setup_logging_handler = (
os.getenv(
Expand All @@ -413,7 +450,12 @@ def _initialize_components(
.lower()
== "true"
)
_init_logging(log_exporters, resource, setup_logging_handler)
_init_logging(
log_exporters,
resource,
setup_logging_handler,
auth_header_setter=auth_header_setter,
)


class _BaseConfigurator(ABC):
Expand Down
Loading