Skip to content

Metrics: add possibility to release handles and observer instruments #435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions docs/examples/metrics/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@
# Therefore, getting a bound metric instrument using the same set of labels
# will yield the same bound metric instrument.
bound_counter = counter.bind(label_set)
bound_counter.add(100)
for i in range(1000):
bound_counter.add(i)

# You can release the bound instrument we you are done
bound_counter.release()

# Direct metric usage
# You can record metrics directly using the metric instrument. You pass in a
Expand All @@ -80,4 +84,5 @@
# (metric, value) pairs. The value would be recorded for each metric using the
# specified labelset for each.
meter.record_batch(label_set, [(counter, 50), (counter2, 70)])
time.sleep(100)

time.sleep(10)
14 changes: 14 additions & 0 deletions opentelemetry-api/src/opentelemetry/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ def record(self, value: ValueT) -> None:
value: The value to record to the bound metric instrument.
"""

def release(self) -> None:
"""No-op implementation of release."""


class BoundCounter:
def add(self, value: ValueT) -> None:
Expand Down Expand Up @@ -347,6 +350,14 @@ def register_observer(
Returns: A new ``Observer`` metric instrument.
"""

@abc.abstractmethod
def unregister_observer(self, observer: "Observer") -> None:
"""Unregisters an ``Observer`` metric instrument.

Args:
observer: The observer to unregister.
"""

@abc.abstractmethod
def get_label_set(self, labels: Dict[str, str]) -> "LabelSet":
"""Gets a `LabelSet` with the given labels.
Expand Down Expand Up @@ -393,6 +404,9 @@ def register_observer(
) -> "Observer":
return DefaultObserver()

def unregister_observer(self, observer: "Observer") -> None:
pass

def get_label_set(self, labels: Dict[str, str]) -> "LabelSet":
# pylint: disable=no-self-use
return DefaultLabelSet()
Expand Down
8 changes: 7 additions & 1 deletion opentelemetry-api/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def test_measure_record(self):
measure.record(1, label_set)

def test_default_bound_metric(self):
metrics.DefaultBoundInstrument()
bound_instrument = metrics.DefaultBoundInstrument()
bound_instrument.release()

def test_bound_counter(self):
bound_counter = metrics.BoundCounter()
Expand All @@ -59,3 +60,8 @@ def test_bound_counter(self):
def test_bound_measure(self):
bound_measure = metrics.BoundMeasure()
bound_measure.record(1)

def test_observer(self):
observer = metrics.DefaultObserver()
label_set = metrics.LabelSet()
observer.observe(1, label_set)
5 changes: 5 additions & 0 deletions opentelemetry-api/tests/test_implementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ def test_register_observer(self):
observer = meter.register_observer(callback, "", "", "", int, (), True)
self.assertIsInstance(observer, metrics.DefaultObserver)

def test_unregister_observer(self):
meter = metrics.DefaultMeter()
observer = metrics.DefaultObserver()
meter.unregister_observer(observer)

def test_get_label_set(self):
meter = metrics.DefaultMeter()
label_set = meter.get_label_set({})
Expand Down
88 changes: 66 additions & 22 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import logging
import threading
from typing import Dict, Sequence, Tuple, Type

from opentelemetry import metrics as metrics_api
Expand Down Expand Up @@ -71,6 +72,8 @@ def __init__(
self.enabled = enabled
self.aggregator = aggregator
self.last_update_timestamp = time_ns()
self._ref_count = 0
self._ref_count_lock = threading.Lock()

def _validate_update(self, value: metrics_api.ValueT) -> bool:
if not self.enabled:
Expand All @@ -86,6 +89,21 @@ def update(self, value: metrics_api.ValueT):
self.last_update_timestamp = time_ns()
self.aggregator.update(value)

def release(self):
self.decrease_ref_count()

def decrease_ref_count(self):
with self._ref_count_lock:
self._ref_count -= 1

def increase_ref_count(self):
with self._ref_count_lock:
self._ref_count += 1

def ref_count(self):
with self._ref_count_lock:
return self._ref_count

def __repr__(self):
return '{}(data="{}", last_update_timestamp={})'.format(
type(self).__name__,
Expand Down Expand Up @@ -137,18 +155,21 @@ def __init__(
self.label_keys = label_keys
self.enabled = enabled
self.bound_instruments = {}
self.bound_instruments_lock = threading.Lock()

def bind(self, label_set: LabelSet) -> BaseBoundInstrument:
"""See `opentelemetry.metrics.Metric.bind`."""
bound_instrument = self.bound_instruments.get(label_set)
if not bound_instrument:
bound_instrument = self.BOUND_INSTR_TYPE(
self.value_type,
self.enabled,
# Aggregator will be created based off type of metric
self.meter.batcher.aggregator_for(self.__class__),
)
self.bound_instruments[label_set] = bound_instrument
with self.bound_instruments_lock:
bound_instrument = self.bound_instruments.get(label_set)
if bound_instrument is None:
bound_instrument = self.BOUND_INSTR_TYPE(
self.value_type,
self.enabled,
# Aggregator will be created based off type of metric
self.meter.batcher.aggregator_for(self.__class__),
)
self.bound_instruments[label_set] = bound_instrument
bound_instrument.increase_ref_count()
return bound_instrument

def __repr__(self):
Expand All @@ -167,7 +188,9 @@ class Counter(Metric, metrics_api.Counter):

def add(self, value: metrics_api.ValueT, label_set: LabelSet) -> None:
"""See `opentelemetry.metrics.Counter.add`."""
self.bind(label_set).add(value)
bound_intrument = self.bind(label_set)
bound_intrument.add(value)
bound_intrument.release()

UPDATE_FUNCTION = add

Expand All @@ -179,7 +202,9 @@ class Measure(Metric, metrics_api.Measure):

def record(self, value: metrics_api.ValueT, label_set: LabelSet) -> None:
"""See `opentelemetry.metrics.Measure.record`."""
self.bind(label_set).record(value)
bound_intrument = self.bind(label_set)
bound_intrument.record(value)
bound_intrument.release()

UPDATE_FUNCTION = record

Expand Down Expand Up @@ -279,6 +304,7 @@ def __init__(
self.metrics = set()
self.observers = set()
self.batcher = UngroupedBatcher(stateful)
self.observers_lock = threading.Lock()
self.resource = resource

def collect(self) -> None:
Expand All @@ -294,26 +320,39 @@ def collect(self) -> None:

def _collect_metrics(self) -> None:
for metric in self.metrics:
if metric.enabled:
if not metric.enabled:
continue

to_remove = []

with metric.bound_instruments_lock:
for label_set, bound_instr in metric.bound_instruments.items():
# TODO: Consider storing records in memory?
record = Record(metric, label_set, bound_instr.aggregator)
# Checkpoints the current aggregators
# Applies different batching logic based on type of batcher
self.batcher.process(record)

if bound_instr.ref_count() == 0:
to_remove.append(label_set)

# Remove handles that were released
for label_set in to_remove:
del metric.bound_instruments[label_set]

def _collect_observers(self) -> None:
for observer in self.observers:
if not observer.enabled:
continue
with self.observers_lock:
for observer in self.observers:
if not observer.enabled:
continue

# TODO: capture timestamp?
if not observer.run():
continue
# TODO: capture timestamp?
if not observer.run():
continue

for label_set, aggregator in observer.aggregators.items():
record = Record(observer, label_set, aggregator)
self.batcher.process(record)
for label_set, aggregator in observer.aggregators.items():
record = Record(observer, label_set, aggregator)
self.batcher.process(record)

def record_batch(
self,
Expand Down Expand Up @@ -368,9 +407,14 @@ def register_observer(
label_keys,
enabled,
)
self.observers.add(ob)
with self.observers_lock:
self.observers.add(ob)
return ob

def unregister_observer(self, observer: "Observer") -> None:
with self.observers_lock:
self.observers.remove(observer)

def get_label_set(self, labels: Dict[str, str]):
"""See `opentelemetry.metrics.Meter.create_metric`.

Expand Down
72 changes: 71 additions & 1 deletion opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_collect(self):
)
kvp = {"key1": "value1"}
label_set = meter.get_label_set(kvp)
counter.add(label_set, 1.0)
counter.add(1.0, label_set)
meter.metrics.add(counter)
meter.collect()
self.assertTrue(batcher_mock.process.called)
Expand Down Expand Up @@ -179,6 +179,18 @@ def test_register_observer(self):
self.assertEqual(observer.label_keys, ())
self.assertTrue(observer.enabled)

def test_unregister_observer(self):
meter = metrics.MeterProvider().get_meter(__name__)

callback = mock.Mock()

observer = meter.register_observer(
callback, "name", "desc", "unit", int, (), True
)

meter.unregister_observer(observer)
self.assertEqual(len(meter.observers), 0)

def test_get_label_set(self):
meter = metrics.MeterProvider().get_meter(__name__)
kvp = {"environment": "staging", "a": "z"}
Expand All @@ -193,6 +205,64 @@ def test_get_label_set_empty(self):
label_set = meter.get_label_set(kvp)
self.assertEqual(label_set, metrics.EMPTY_LABEL_SET)

def test_direct_call_release_bound_instrument(self):
meter = metrics.MeterProvider().get_meter(__name__)
label_keys = ("key1",)
kvp = {"key1": "value1"}
label_set = meter.get_label_set(kvp)

counter = metrics.Counter(
"name", "desc", "unit", float, meter, label_keys
)
meter.metrics.add(counter)
counter.add(4.0, label_set)

measure = metrics.Measure(
"name", "desc", "unit", float, meter, label_keys
)
meter.metrics.add(measure)
measure.record(42.0, label_set)

self.assertEqual(len(counter.bound_instruments), 1)
self.assertEqual(len(measure.bound_instruments), 1)

meter.collect()

self.assertEqual(len(counter.bound_instruments), 0)
self.assertEqual(len(measure.bound_instruments), 0)

def test_release_bound_instrument(self):
meter = metrics.MeterProvider().get_meter(__name__)
label_keys = ("key1",)
kvp = {"key1": "value1"}
label_set = meter.get_label_set(kvp)

counter = metrics.Counter(
"name", "desc", "unit", float, meter, label_keys
)
meter.metrics.add(counter)
bound_counter = counter.bind(label_set)
bound_counter.add(4.0)

measure = metrics.Measure(
"name", "desc", "unit", float, meter, label_keys
)
meter.metrics.add(measure)
bound_measure = measure.bind(label_set)
bound_measure.record(42)

bound_counter.release()
bound_measure.release()

# be sure that bound instruments are only released after collection
self.assertEqual(len(counter.bound_instruments), 1)
self.assertEqual(len(measure.bound_instruments), 1)

meter.collect()

self.assertEqual(len(counter.bound_instruments), 0)
self.assertEqual(len(measure.bound_instruments), 0)


class TestMetric(unittest.TestCase):
def test_bind(self):
Expand Down