diff --git a/exporter/opentelemetry-exporter-opencensus/CHANGELOG.md b/exporter/opentelemetry-exporter-opencensus/CHANGELOG.md index c7f0cf69b5c..13924e489b3 100644 --- a/exporter/opentelemetry-exporter-opencensus/CHANGELOG.md +++ b/exporter/opentelemetry-exporter-opencensus/CHANGELOG.md @@ -4,6 +4,8 @@ - Change package name to opentelemetry-exporter-opencensus ([#953](https://github.com/open-telemetry/opentelemetry-python/pull/953)) +- Send start_timestamp and convert labels to strings + ([#937](https://github.com/open-telemetry/opentelemetry-python/pull/937)) ## 0.8b0 diff --git a/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/metrics_exporter/__init__.py b/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/metrics_exporter/__init__.py index e83e779df67..76986a8a59d 100644 --- a/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/metrics_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/metrics_exporter/__init__.py @@ -15,14 +15,16 @@ """OpenCensus Collector Metrics Exporter.""" import logging -from typing import Sequence +from typing import Dict, Sequence import grpc +from google.protobuf.timestamp_pb2 import Timestamp from opencensus.proto.agent.metrics.v1 import ( metrics_service_pb2, metrics_service_pb2_grpc, ) from opencensus.proto.metrics.v1 import metrics_pb2 +from opencensus.proto.resource.v1 import resource_pb2 import opentelemetry.exporter.opencensus.util as utils from opentelemetry.sdk.metrics import Counter, Metric @@ -34,6 +36,14 @@ DEFAULT_ENDPOINT = "localhost:55678" +# In priority order. See collector impl https://bit.ly/2DvJW6y +_OT_LABEL_PRESENCE_TO_RESOURCE_TYPE = ( + ("container.name", "container"), + ("k8s.pod.name", "k8s"), + ("host.name", "host"), + ("cloud.provider", "cloud"), +) + logger = logging.getLogger(__name__) @@ -65,6 +75,8 @@ def __init__( self.client = client self.node = utils.get_node(service_name, host_name) + self.exporter_start_timestamp = Timestamp() + self.exporter_start_timestamp.GetCurrentTime() def export( self, metric_records: Sequence[MetricRecord] @@ -89,7 +101,9 @@ def shutdown(self) -> None: def generate_metrics_requests( self, metrics: Sequence[MetricRecord] ) -> metrics_service_pb2.ExportMetricsServiceRequest: - collector_metrics = translate_to_collector(metrics) + collector_metrics = translate_to_collector( + metrics, self.exporter_start_timestamp + ) service_request = metrics_service_pb2.ExportMetricsServiceRequest( node=self.node, metrics=collector_metrics ) @@ -99,6 +113,7 @@ def generate_metrics_requests( # pylint: disable=too-many-branches def translate_to_collector( metric_records: Sequence[MetricRecord], + exporter_start_timestamp: Timestamp, ) -> Sequence[metrics_pb2.Metric]: collector_metrics = [] for metric_record in metric_records: @@ -109,7 +124,8 @@ def translate_to_collector( label_keys.append(metrics_pb2.LabelKey(key=label_tuple[0])) label_values.append( metrics_pb2.LabelValue( - has_value=label_tuple[1] is not None, value=label_tuple[1] + has_value=label_tuple[1] is not None, + value=str(label_tuple[1]), ) ) @@ -121,13 +137,23 @@ def translate_to_collector( label_keys=label_keys, ) + # If cumulative and stateful, explicitly set the start_timestamp to + # exporter start time. + if metric_record.instrument.meter.batcher.stateful: + start_timestamp = exporter_start_timestamp + else: + start_timestamp = None + timeseries = metrics_pb2.TimeSeries( label_values=label_values, points=[get_collector_point(metric_record)], + start_timestamp=start_timestamp, ) collector_metrics.append( metrics_pb2.Metric( - metric_descriptor=metric_descriptor, timeseries=[timeseries] + metric_descriptor=metric_descriptor, + timeseries=[timeseries], + resource=get_resource(metric_record), ) ) return collector_metrics @@ -162,3 +188,22 @@ def get_collector_point(metric_record: MetricRecord) -> metrics_pb2.Point: ) ) return point + + +def get_resource(metric_record: MetricRecord) -> resource_pb2.Resource: + resource_labels = metric_record.instrument.meter.resource.labels + return resource_pb2.Resource( + type=infer_oc_resource_type(resource_labels), + labels={k: str(v) for k, v in resource_labels.items()}, + ) + + +def infer_oc_resource_type(resource_labels: Dict[str, str]) -> str: + """Convert from OT resource labels to OC resource type""" + for ( + ot_resource_key, + oc_resource_type, + ) in _OT_LABEL_PRESENCE_TO_RESOURCE_TYPE: + if ot_resource_key in resource_labels: + return oc_resource_type + return "" diff --git a/exporter/opentelemetry-exporter-opencensus/tests/test_otcollector_metrics_exporter.py b/exporter/opentelemetry-exporter-opencensus/tests/test_otcollector_metrics_exporter.py index 1403c6c59e9..1ec1a574487 100644 --- a/exporter/opentelemetry-exporter-opencensus/tests/test_otcollector_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-opencensus/tests/test_otcollector_metrics_exporter.py @@ -32,6 +32,7 @@ MetricsExportResult, aggregate, ) +from opentelemetry.sdk.resources import Resource # pylint: disable=no-member @@ -39,9 +40,16 @@ class TestCollectorMetricsExporter(unittest.TestCase): @classmethod def setUpClass(cls): # pylint: disable=protected-access - metrics.set_meter_provider(MeterProvider()) + cls._resource_labels = { + "key_with_str_value": "some string", + "key_with_int_val": 321, + "key_with_true": True, + } + metrics.set_meter_provider( + MeterProvider(resource=Resource(cls._resource_labels)) + ) cls._meter = metrics.get_meter(__name__) - cls._labels = {"environment": "staging"} + cls._labels = {"environment": "staging", "number": 321} cls._key_labels = get_dict_as_key(cls._labels) def test_constructor(self): @@ -119,7 +127,7 @@ def test_export(self): client=mock_client, host_name=host_name ) test_metric = self._meter.create_metric( - "testname", "testdesc", "unit", int, Counter, + "testname", "testdesc", "unit", int, Counter, self._labels.keys(), ) record = MetricRecord( test_metric, self._key_labels, aggregate.SumAggregator(), @@ -142,13 +150,16 @@ def test_export(self): def test_translate_to_collector(self): test_metric = self._meter.create_metric( - "testname", "testdesc", "unit", int, Counter, + "testname", "testdesc", "unit", int, Counter, self._labels.keys() ) aggregator = aggregate.SumAggregator() aggregator.update(123) aggregator.take_checkpoint() record = MetricRecord(test_metric, self._key_labels, aggregator,) - output_metrics = metrics_exporter.translate_to_collector([record]) + start_timestamp = Timestamp() + output_metrics = metrics_exporter.translate_to_collector( + [record], start_timestamp, + ) self.assertEqual(len(output_metrics), 1) self.assertIsInstance(output_metrics[0], metrics_pb2.Metric) self.assertEqual(output_metrics[0].metric_descriptor.name, "testname") @@ -161,14 +172,44 @@ def test_translate_to_collector(self): metrics_pb2.MetricDescriptor.CUMULATIVE_INT64, ) self.assertEqual( - len(output_metrics[0].metric_descriptor.label_keys), 1 + len(output_metrics[0].metric_descriptor.label_keys), 2 ) self.assertEqual( output_metrics[0].metric_descriptor.label_keys[0].key, "environment", ) + self.assertEqual( + output_metrics[0].metric_descriptor.label_keys[1].key, "number", + ) + + self.assertIsNotNone(output_metrics[0].resource) + self.assertEqual( + output_metrics[0].resource.type, "", + ) + self.assertEqual( + output_metrics[0].resource.labels["key_with_str_value"], + self._resource_labels["key_with_str_value"], + ) + self.assertIsInstance( + output_metrics[0].resource.labels["key_with_int_val"], str, + ) + self.assertEqual( + output_metrics[0].resource.labels["key_with_int_val"], + str(self._resource_labels["key_with_int_val"]), + ) + self.assertIsInstance( + output_metrics[0].resource.labels["key_with_true"], str, + ) + self.assertEqual( + output_metrics[0].resource.labels["key_with_true"], + str(self._resource_labels["key_with_true"]), + ) + self.assertEqual(len(output_metrics[0].timeseries), 1) - self.assertEqual(len(output_metrics[0].timeseries[0].label_values), 1) + self.assertEqual(len(output_metrics[0].timeseries[0].label_values), 2) + self.assertEqual( + output_metrics[0].timeseries[0].start_timestamp, start_timestamp + ) self.assertEqual( output_metrics[0].timeseries[0].label_values[0].has_value, True ) @@ -187,3 +228,59 @@ def test_translate_to_collector(self): self.assertEqual( output_metrics[0].timeseries[0].points[0].int64_value, 123 ) + + def test_infer_ot_resource_type(self): + # empty resource + self.assertEqual(metrics_exporter.infer_oc_resource_type({}), "") + + # container + self.assertEqual( + metrics_exporter.infer_oc_resource_type( + { + "k8s.cluster.name": "cluster1", + "k8s.pod.name": "pod1", + "k8s.namespace.name": "namespace1", + "container.name": "container-name1", + "cloud.account.id": "proj1", + "cloud.zone": "zone1", + } + ), + "container", + ) + + # k8s pod + self.assertEqual( + metrics_exporter.infer_oc_resource_type( + { + "k8s.cluster.name": "cluster1", + "k8s.pod.name": "pod1", + "k8s.namespace.name": "namespace1", + "cloud.zone": "zone1", + } + ), + "k8s", + ) + + # host + self.assertEqual( + metrics_exporter.infer_oc_resource_type( + { + "k8s.cluster.name": "cluster1", + "cloud.zone": "zone1", + "host.name": "node1", + } + ), + "host", + ) + + # cloud + self.assertEqual( + metrics_exporter.infer_oc_resource_type( + { + "cloud.provider": "gcp", + "host.id": "inst1", + "cloud.zone": "zone1", + } + ), + "cloud", + )