Skip to content

OC Exporter - send start_timestamp, resource labels, and convert labels to strings #937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ext/opentelemetry-ext-opencensusexporter/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Send start_timestamp and convert labels to strings
([#937](https://github.com/open-telemetry/opentelemetry-python/pull/937))

## 0.8b0

Released 2020-05-27
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
from typing import Sequence

import grpc
from google.protobuf.timestamp_pb2 import Timestamp
from opencensus.proto.agent.metrics.v1 import (
metrics_service_pb2,
metrics_service_pb2_grpc,
)
from opencensus.proto.metrics.v1 import metrics_pb2
from opencensus.proto.resource.v1 import resource_pb2

import opentelemetry.ext.opencensusexporter.util as utils
from opentelemetry.sdk.metrics import Counter, Metric
Expand Down Expand Up @@ -65,6 +67,8 @@ def __init__(
self.client = client

self.node = utils.get_node(service_name, host_name)
self.exporter_start_timestamp = Timestamp()
self.exporter_start_timestamp.GetCurrentTime()

def export(
self, metric_records: Sequence[MetricRecord]
Expand All @@ -89,7 +93,9 @@ def shutdown(self) -> None:
def generate_metrics_requests(
self, metrics: Sequence[MetricRecord]
) -> metrics_service_pb2.ExportMetricsServiceRequest:
collector_metrics = translate_to_collector(metrics)
collector_metrics = translate_to_collector(
metrics, self.exporter_start_timestamp
)
service_request = metrics_service_pb2.ExportMetricsServiceRequest(
node=self.node, metrics=collector_metrics
)
Expand All @@ -99,6 +105,7 @@ def generate_metrics_requests(
# pylint: disable=too-many-branches
def translate_to_collector(
metric_records: Sequence[MetricRecord],
exporter_start_timestamp: Timestamp,
) -> Sequence[metrics_pb2.Metric]:
collector_metrics = []
for metric_record in metric_records:
Expand All @@ -109,7 +116,8 @@ def translate_to_collector(
label_keys.append(metrics_pb2.LabelKey(key=label_tuple[0]))
label_values.append(
metrics_pb2.LabelValue(
has_value=label_tuple[1] is not None, value=label_tuple[1]
has_value=label_tuple[1] is not None,
value=str(label_tuple[1]),
)
)

Expand All @@ -121,13 +129,23 @@ def translate_to_collector(
label_keys=label_keys,
)

# If cumulative and stateful, explicitly set the start_timestamp to
# exporter start time.
if metric_record.instrument.meter.batcher.stateful:
start_timestamp = exporter_start_timestamp
else:
start_timestamp = None

timeseries = metrics_pb2.TimeSeries(
label_values=label_values,
points=[get_collector_point(metric_record)],
start_timestamp=start_timestamp,
)
collector_metrics.append(
metrics_pb2.Metric(
metric_descriptor=metric_descriptor, timeseries=[timeseries]
metric_descriptor=metric_descriptor,
timeseries=[timeseries],
resource=get_resource(metric_record),
)
)
return collector_metrics
Expand Down Expand Up @@ -162,3 +180,11 @@ def get_collector_point(metric_record: MetricRecord) -> metrics_pb2.Point:
)
)
return point


def get_resource(metric_record: MetricRecord) -> resource_pb2.Resource:
resource_labels = metric_record.instrument.meter.resource.labels
return resource_pb2.Resource(
type=resource_labels.get("service.name"),
labels={k: str(v) for k, v in resource_labels.items()},
)
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,24 @@
MetricsExportResult,
aggregate,
)
from opentelemetry.sdk.resources import Resource


# pylint: disable=no-member
class TestCollectorMetricsExporter(unittest.TestCase):
@classmethod
def setUpClass(cls):
# pylint: disable=protected-access
metrics.set_meter_provider(MeterProvider())
cls._resource_labels = {
"service.name": "some_application",
"key_with_int_val": 321,
"key_with_true": True,
}
metrics.set_meter_provider(
MeterProvider(resource=Resource(cls._resource_labels))
)
cls._meter = metrics.get_meter(__name__)
cls._labels = {"environment": "staging"}
cls._labels = {"environment": "staging", "number": 321}
cls._key_labels = get_labels_as_key(cls._labels)

def test_constructor(self):
Expand Down Expand Up @@ -119,7 +127,7 @@ def test_export(self):
client=mock_client, host_name=host_name
)
test_metric = self._meter.create_metric(
"testname", "testdesc", "unit", int, Counter, ["environment"]
"testname", "testdesc", "unit", int, Counter, self._labels.keys(),
)
record = MetricRecord(
test_metric, self._key_labels, aggregate.SumAggregator(),
Expand All @@ -142,13 +150,16 @@ def test_export(self):

def test_translate_to_collector(self):
test_metric = self._meter.create_metric(
"testname", "testdesc", "unit", int, Counter, ["environment"]
"testname", "testdesc", "unit", int, Counter, self._labels.keys()
)
aggregator = aggregate.SumAggregator()
aggregator.update(123)
aggregator.take_checkpoint()
record = MetricRecord(test_metric, self._key_labels, aggregator,)
output_metrics = metrics_exporter.translate_to_collector([record])
start_timestamp = Timestamp()
output_metrics = metrics_exporter.translate_to_collector(
[record], start_timestamp,
)
self.assertEqual(len(output_metrics), 1)
self.assertIsInstance(output_metrics[0], metrics_pb2.Metric)
self.assertEqual(output_metrics[0].metric_descriptor.name, "testname")
Expand All @@ -161,14 +172,45 @@ def test_translate_to_collector(self):
metrics_pb2.MetricDescriptor.CUMULATIVE_INT64,
)
self.assertEqual(
len(output_metrics[0].metric_descriptor.label_keys), 1
len(output_metrics[0].metric_descriptor.label_keys), 2
)
self.assertEqual(
output_metrics[0].metric_descriptor.label_keys[0].key,
"environment",
)
self.assertEqual(
output_metrics[0].metric_descriptor.label_keys[1].key, "number",
)

self.assertIsNotNone(output_metrics[0].resource)
self.assertEqual(
output_metrics[0].resource.type,
self._resource_labels["service.name"],
)
self.assertEqual(
output_metrics[0].resource.labels["service.name"],
self._resource_labels["service.name"],
)
self.assertIsInstance(
output_metrics[0].resource.labels["key_with_int_val"], str,
)
self.assertEqual(
output_metrics[0].resource.labels["key_with_int_val"],
str(self._resource_labels["key_with_int_val"]),
)
self.assertIsInstance(
output_metrics[0].resource.labels["key_with_true"], str,
)
self.assertEqual(
output_metrics[0].resource.labels["key_with_true"],
str(self._resource_labels["key_with_true"]),
)

self.assertEqual(len(output_metrics[0].timeseries), 1)
self.assertEqual(len(output_metrics[0].timeseries[0].label_values), 1)
self.assertEqual(len(output_metrics[0].timeseries[0].label_values), 2)
self.assertEqual(
output_metrics[0].timeseries[0].start_timestamp, start_timestamp
)
self.assertEqual(
output_metrics[0].timeseries[0].label_values[0].has_value, True
)
Expand Down