Skip to content

Commit 30f0bf5

Browse files
Add OTLP protocol class & protos (#821)
* Add protos under packages for otlp * Add common otlp proto payload methods * Add new oltp protocol class * Remove ML event from log message * Remove params, add api-key header & expose path The params are not relevant to OTLP so remove these. The api-key header is how we provide the license key to OTLP so add this. The path to upload dimensional metrics and events are different in OTLP so expose the path so it can be overriden inside the coresponding data_collector methods. * Add otlp_port and otlp_host settings * Default to JSON if protobuf not available & warn * Move otlp_utils to core * Call encode in protocol class * Patch issues with data collector * Move resource to utils & add log proto imports --------- Co-authored-by: Tim Pansino <[email protected]>
1 parent e970884 commit 30f0bf5

File tree

17 files changed

+926
-9
lines changed

17 files changed

+926
-9
lines changed

MANIFEST.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ include newrelic/common/cacert.pem
88
include newrelic/packages/wrapt/LICENSE
99
include newrelic/packages/wrapt/README
1010
include newrelic/packages/urllib3/LICENSE.txt
11+
include newrelic/packages/opentelemetry_proto/LICENSE.txt

newrelic/common/agent_http.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ def __init__(
9292
compression_method="gzip",
9393
max_payload_size_in_bytes=1000000,
9494
audit_log_fp=None,
95+
default_content_encoding_header="Identity",
9596
):
9697
self._audit_log_fp = audit_log_fp
9798

@@ -240,6 +241,7 @@ def __init__(
240241
compression_method="gzip",
241242
max_payload_size_in_bytes=1000000,
242243
audit_log_fp=None,
244+
default_content_encoding_header="Identity",
243245
):
244246
self._host = host
245247
port = self._port = port
@@ -248,6 +250,7 @@ def __init__(
248250
self._compression_method = compression_method
249251
self._max_payload_size_in_bytes = max_payload_size_in_bytes
250252
self._audit_log_fp = audit_log_fp
253+
self._default_content_encoding_header = default_content_encoding_header
251254

252255
self._prefix = ""
253256

@@ -419,11 +422,9 @@ def send_request(
419422
method=self._compression_method,
420423
level=self._compression_level,
421424
)
422-
content_encoding = self._compression_method
423-
else:
424-
content_encoding = "Identity"
425-
426-
merged_headers["Content-Encoding"] = content_encoding
425+
merged_headers["Content-Encoding"] = self._compression_method
426+
elif self._default_content_encoding_header:
427+
merged_headers["Content-Encoding"] = self._default_content_encoding_header
427428

428429
request_id = self.log_request(
429430
self._audit_log_fp,
@@ -489,6 +490,7 @@ def __init__(
489490
compression_method="gzip",
490491
max_payload_size_in_bytes=1000000,
491492
audit_log_fp=None,
493+
default_content_encoding_header="Identity",
492494
):
493495
proxy = self._parse_proxy(proxy_scheme, proxy_host, None, None, None)
494496
if proxy and proxy.scheme == "https":
@@ -515,6 +517,7 @@ def __init__(
515517
compression_method,
516518
max_payload_size_in_bytes,
517519
audit_log_fp,
520+
default_content_encoding_header,
518521
)
519522

520523

newrelic/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,8 @@ def _process_configuration(section):
320320
_process_setting(section, "api_key", "get", None)
321321
_process_setting(section, "host", "get", None)
322322
_process_setting(section, "port", "getint", None)
323+
_process_setting(section, "otlp_host", "get", None)
324+
_process_setting(section, "otlp_port", "getint", None)
323325
_process_setting(section, "ssl", "getboolean", None)
324326
_process_setting(section, "proxy_scheme", "get", None)
325327
_process_setting(section, "proxy_host", "get", None)

newrelic/core/agent_protocol.py

Lines changed: 90 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
global_settings_dump,
3939
)
4040
from newrelic.core.internal_metrics import internal_count_metric
41+
from newrelic.core.otlp_utils import OTLP_CONTENT_TYPE, otlp_encode
4142
from newrelic.network.exceptions import (
4243
DiscardDataForRequest,
4344
ForceAgentDisconnect,
@@ -217,11 +218,16 @@ def __exit__(self, exc, value, tb):
217218
def close_connection(self):
218219
self.client.close_connection()
219220

220-
def send(self, method, payload=()):
221+
def send(
222+
self,
223+
method,
224+
payload=(),
225+
path="/agent_listener/invoke_raw_method",
226+
):
221227
params, headers, payload = self._to_http(method, payload)
222228

223229
try:
224-
response = self.client.send_request(params=params, headers=headers, payload=payload)
230+
response = self.client.send_request(path=path, params=params, headers=headers, payload=payload)
225231
except NetworkInterfaceException:
226232
# All HTTP errors are currently retried
227233
raise RetryDataForRequest
@@ -253,7 +259,10 @@ def send(self, method, payload=()):
253259
exception = self.STATUS_CODE_RESPONSE.get(status, DiscardDataForRequest)
254260
raise exception
255261
if status == 200:
256-
return json_decode(data.decode("utf-8"))["return_value"]
262+
return self.decode_response(data)
263+
264+
def decode_response(self, response):
265+
return json_decode(response.decode("utf-8"))["return_value"]
257266

258267
def _to_http(self, method, payload=()):
259268
params = dict(self._params)
@@ -516,3 +525,81 @@ def connect(
516525
# can be modified later
517526
settings.aws_lambda_metadata = aws_lambda_metadata
518527
return cls(settings, client_cls=client_cls)
528+
529+
530+
class OtlpProtocol(AgentProtocol):
531+
def __init__(self, settings, host=None, client_cls=ApplicationModeClient):
532+
if settings.audit_log_file:
533+
audit_log_fp = open(settings.audit_log_file, "a")
534+
else:
535+
audit_log_fp = None
536+
537+
self.client = client_cls(
538+
host=host or settings.otlp_host,
539+
port=settings.otlp_port or 4318,
540+
proxy_scheme=settings.proxy_scheme,
541+
proxy_host=settings.proxy_host,
542+
proxy_port=settings.proxy_port,
543+
proxy_user=settings.proxy_user,
544+
proxy_pass=settings.proxy_pass,
545+
timeout=settings.agent_limits.data_collector_timeout,
546+
ca_bundle_path=settings.ca_bundle_path,
547+
disable_certificate_validation=settings.debug.disable_certificate_validation,
548+
compression_threshold=settings.agent_limits.data_compression_threshold,
549+
compression_level=settings.agent_limits.data_compression_level,
550+
compression_method=settings.compressed_content_encoding,
551+
max_payload_size_in_bytes=1000000,
552+
audit_log_fp=audit_log_fp,
553+
default_content_encoding_header=None,
554+
)
555+
556+
self._params = {}
557+
self._headers = {
558+
"api-key": settings.license_key,
559+
}
560+
561+
# In Python 2, the JSON is loaded with unicode keys and values;
562+
# however, the header name must be a non-unicode value when given to
563+
# the HTTP library. This code converts the header name from unicode to
564+
# non-unicode.
565+
if settings.request_headers_map:
566+
for k, v in settings.request_headers_map.items():
567+
if not isinstance(k, str):
568+
k = k.encode("utf-8")
569+
self._headers[k] = v
570+
571+
# Content-Type should be protobuf, but falls back to JSON if protobuf is not installed.
572+
self._headers["Content-Type"] = OTLP_CONTENT_TYPE
573+
self._run_token = settings.agent_run_id
574+
575+
# Logging
576+
self._proxy_host = settings.proxy_host
577+
self._proxy_port = settings.proxy_port
578+
self._proxy_user = settings.proxy_user
579+
580+
# Do not access configuration anywhere inside the class
581+
self.configuration = settings
582+
583+
@classmethod
584+
def connect(
585+
cls,
586+
app_name,
587+
linked_applications,
588+
environment,
589+
settings,
590+
client_cls=ApplicationModeClient,
591+
):
592+
with cls(settings, client_cls=client_cls) as protocol:
593+
pass
594+
595+
return protocol
596+
597+
def _to_http(self, method, payload=()):
598+
params = dict(self._params)
599+
params["method"] = method
600+
if self._run_token:
601+
params["run_id"] = self._run_token
602+
return params, self._headers, otlp_encode(payload)
603+
604+
def decode_response(self, response):
605+
return response.decode("utf-8")

newrelic/core/config.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ def create_settings(nested):
104104

105105
class TopLevelSettings(Settings):
106106
_host = None
107+
_otlp_host = None
107108

108109
@property
109110
def host(self):
@@ -115,6 +116,16 @@ def host(self):
115116
def host(self, value):
116117
self._host = value
117118

119+
@property
120+
def otlp_host(self):
121+
if self._otlp_host:
122+
return self._otlp_host
123+
return default_otlp_host(self.host)
124+
125+
@otlp_host.setter
126+
def otlp_host(self, value):
127+
self._otlp_host = value
128+
118129

119130
class AttributesSettings(Settings):
120131
pass
@@ -560,6 +571,24 @@ def default_host(license_key):
560571
return host
561572

562573

574+
def default_otlp_host(host):
575+
HOST_MAP = {
576+
"collector.newrelic.com": "otlp.nr-data.net",
577+
"collector.eu.newrelic.com": "otlp.eu01.nr-data.net",
578+
"gov-collector.newrelic.com": "gov-otlp.nr-data.net",
579+
"staging-collector.newrelic.com": "staging-otlp.nr-data.net",
580+
"staging-collector.eu.newrelic.com": "staging-otlp.eu01.nr-data.net",
581+
"staging-gov-collector.newrelic.com": "staging-gov-otlp.nr-data.net",
582+
"fake-collector.newrelic.com": "fake-otlp.nr-data.net",
583+
}
584+
otlp_host = HOST_MAP.get(host, None)
585+
if not otlp_host:
586+
default = HOST_MAP["collector.newrelic.com"]
587+
_logger.warn("Unable to find corresponding OTLP host using default %s" % default)
588+
otlp_host = default
589+
return otlp_host
590+
591+
563592
_LOG_LEVEL = {
564593
"CRITICAL": logging.CRITICAL,
565594
"ERROR": logging.ERROR,
@@ -585,7 +614,9 @@ def default_host(license_key):
585614
_settings.ssl = _environ_as_bool("NEW_RELIC_SSL", True)
586615

587616
_settings.host = os.environ.get("NEW_RELIC_HOST")
617+
_settings.otlp_host = os.environ.get("NEW_RELIC_OTLP_HOST")
588618
_settings.port = int(os.environ.get("NEW_RELIC_PORT", "0"))
619+
_settings.otlp_port = int(os.environ.get("NEW_RELIC_OTLP_PORT", "0"))
589620

590621
_settings.agent_run_id = None
591622
_settings.entity_guid = None

newrelic/core/data_collector.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@
2525
DeveloperModeClient,
2626
ServerlessModeClient,
2727
)
28-
from newrelic.core.agent_protocol import AgentProtocol, ServerlessModeProtocol
28+
from newrelic.core.agent_protocol import (
29+
AgentProtocol,
30+
OtlpProtocol,
31+
ServerlessModeProtocol,
32+
)
2933
from newrelic.core.agent_streaming import StreamingRpc
3034
from newrelic.core.config import global_settings
3135

@@ -36,12 +40,16 @@
3640

3741
class Session(object):
3842
PROTOCOL = AgentProtocol
43+
OTLP_PROTOCOL = OtlpProtocol
3944
CLIENT = ApplicationModeClient
4045

4146
def __init__(self, app_name, linked_applications, environment, settings):
4247
self._protocol = self.PROTOCOL.connect(
4348
app_name, linked_applications, environment, settings, client_cls=self.CLIENT
4449
)
50+
self._otlp_protocol = self.OTLP_PROTOCOL.connect(
51+
app_name, linked_applications, environment, settings, client_cls=self.CLIENT
52+
)
4553
self._rpc = None
4654

4755
@property

newrelic/core/otlp_utils.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Copyright 2010 New Relic, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""This module provides common utilities for interacting with OTLP protocol buffers."""
16+
17+
import logging
18+
19+
_logger = logging.getLogger(__name__)
20+
21+
try:
22+
from newrelic.packages.opentelemetry_proto.common_pb2 import AnyValue, KeyValue
23+
from newrelic.packages.opentelemetry_proto.logs_pb2 import (
24+
LogRecord,
25+
ResourceLogs,
26+
ScopeLogs,
27+
)
28+
from newrelic.packages.opentelemetry_proto.metrics_pb2 import (
29+
AggregationTemporality,
30+
Metric,
31+
MetricsData,
32+
NumberDataPoint,
33+
ResourceMetrics,
34+
ScopeMetrics,
35+
Sum,
36+
Summary,
37+
SummaryDataPoint,
38+
)
39+
from newrelic.packages.opentelemetry_proto.resource_pb2 import Resource
40+
41+
AGGREGATION_TEMPORALITY_DELTA = AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA
42+
ValueAtQuantile = SummaryDataPoint.ValueAtQuantile
43+
44+
otlp_encode = lambda payload: payload.SerializeToString()
45+
OTLP_CONTENT_TYPE = "application/x-protobuf"
46+
47+
except ImportError:
48+
from newrelic.common.encoding_utils import json_encode
49+
50+
def otlp_encode(*args, **kwargs):
51+
_logger.warn(
52+
"Using OTLP integration while protobuf is not installed. This may result in larger payload sizes and data loss."
53+
)
54+
return json_encode(*args, **kwargs)
55+
56+
Resource = dict
57+
ValueAtQuantile = dict
58+
AnyValue = dict
59+
KeyValue = dict
60+
NumberDataPoint = dict
61+
SummaryDataPoint = dict
62+
Sum = dict
63+
Summary = dict
64+
Metric = dict
65+
MetricsData = dict
66+
ScopeMetrics = dict
67+
ResourceMetrics = dict
68+
AGGREGATION_TEMPORALITY_DELTA = 1
69+
ResourceLogs = dict
70+
ScopeLogs = dict
71+
LogRecord = dict
72+
OTLP_CONTENT_TYPE = "application/json"
73+
74+
75+
def create_key_value(key, value):
76+
if isinstance(value, bool):
77+
return KeyValue(key=key, value=AnyValue(bool_value=value))
78+
elif isinstance(value, int):
79+
return KeyValue(key=key, value=AnyValue(int_value=value))
80+
elif isinstance(value, float):
81+
return KeyValue(key=key, value=AnyValue(double_value=value))
82+
elif isinstance(value, str):
83+
return KeyValue(key=key, value=AnyValue(string_value=value))
84+
# Technically AnyValue accepts array, kvlist, and bytes however, since
85+
# those are not valid custom attribute types according to our api spec,
86+
# we will not bother to support them here either.
87+
else:
88+
_logger.warn("Unsupported attribute value type %s: %s." % (key, value))
89+
90+
91+
def create_key_values_from_iterable(iterable):
92+
if isinstance(iterable, dict):
93+
iterable = iterable.items()
94+
95+
# The create_key_value list may return None if the value is an unsupported type
96+
# so filter None values out before returning.
97+
return list(
98+
filter(
99+
lambda i: i is not None,
100+
(create_key_value(key, value) for key, value in iterable),
101+
)
102+
)
103+
104+
105+
def create_resource(attributes=None):
106+
attributes = attributes or {"instrumentation.provider": "nr_performance_monitoring"}
107+
return Resource(attributes=create_key_values_from_iterable(attributes))

0 commit comments

Comments
 (0)