Skip to content

Add SumObserver and UpDownSumObserver instruments #789

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 38 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
2001eeb
value observer
lzchen Jun 2, 2020
4293398
lint
lzchen Jun 2, 2020
e65de5c
instrument
lzchen Jun 2, 2020
38504d4
instrument
lzchen Jun 2, 2020
efd685e
batcher
lzchen Jun 2, 2020
e72f06a
InstrumentT
lzchen Jun 2, 2020
d0505d1
mypy
lzchen Jun 2, 2020
f2e4462
reorder metricrecord
lzchen Jun 2, 2020
0d26ec6
fix tests
lzchen Jun 2, 2020
d99ce5f
Merge branch 'master' of https://github.com/open-telemetry/openteleme…
lzchen Jun 2, 2020
68f980e
fix tests
lzchen Jun 2, 2020
bf17d2d
mypy
lzchen Jun 2, 2020
6e34921
typing
lzchen Jun 2, 2020
2ee2585
fix lint
lzchen Jun 2, 2020
677515a
comment
lzchen Jun 2, 2020
5061df9
sumobserver
lzchen Jun 8, 2020
b8f9eb0
Merge branch 'master' of https://github.com/open-telemetry/openteleme…
lzchen Jun 8, 2020
9f10815
changelog
lzchen Jun 8, 2020
a18ee95
lint
lzchen Jun 8, 2020
edb310d
lint
lzchen Jun 8, 2020
5c67c2a
lint
lzchen Jun 8, 2020
3a468af
Merge branch 'master' into sumobs
lzchen Jun 8, 2020
ce33371
black
lzchen Jun 8, 2020
d3bf365
Merge branch 'master' of https://github.com/open-telemetry/openteleme…
lzchen Jun 8, 2020
1378ad9
Merge branch 'sumobs' of https://github.com/lzchen/opentelemetry-pyth…
lzchen Jun 8, 2020
6e3f645
Merge branch 'master' of https://github.com/open-telemetry/openteleme…
lzchen Jun 8, 2020
aa0007b
lint
lzchen Jun 8, 2020
f33e7e4
fix example
lzchen Jun 9, 2020
541a11c
updownobserver
lzchen Jun 9, 2020
cca2422
Merge branch 'master' of https://github.com/open-telemetry/openteleme…
lzchen Jun 9, 2020
e9489c7
lint
lzchen Jun 9, 2020
f1a2767
address comments
lzchen Jun 9, 2020
2384ed5
Merge branch 'master' of https://github.com/open-telemetry/openteleme…
lzchen Jun 9, 2020
4b5c196
Merge branch 'master' of https://github.com/open-telemetry/openteleme…
lzchen Jun 9, 2020
168c874
black
lzchen Jun 9, 2020
5a37c0c
Update opentelemetry-api/CHANGELOG.md
lzchen Jun 9, 2020
5463add
Update opentelemetry-sdk/CHANGELOG.md
lzchen Jun 9, 2020
9de59a9
lint
lzchen Jun 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/examples/basic_meter/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def get_ram_usage_callback(observer):
description="RAM memory usage",
unit="1",
value_type=float,
observer_type=ValueObserver,
label_keys=(),
)

Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-api/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
([#761](https://github.com/open-telemetry/opentelemetry-python/pull/761))
- Rename Observer to ValueObserver
([#764](https://github.com/open-telemetry/opentelemetry-python/pull/764))
- Add SumObserver in metrics
([#789](https://github.com/open-telemetry/opentelemetry-python/pull/789))

## 0.8b0

Expand Down
24 changes: 24 additions & 0 deletions opentelemetry-api/src/opentelemetry/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,30 @@ def observe(self, value: ValueT, labels: Dict[str, str]) -> None:
"""


class SumObserver(Observer):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to update the ObserverT TypeVar in this file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we have ObserverT = TypeVar("ObserverT", bound=Observer). I believe as long as the class used is a subclass of "Observer" this definition should be accurate?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry you're right!

"""No-op implementation of ``SumObserver``."""

def observe(self, value: ValueT, labels: Dict[str, str]) -> None:
"""Captures ``value`` to the sumobserver.

Args:
value: The value to capture to this sumobserver metric.
labels: Labels associated to ``value``.
"""


class UpDownSumObserver(Observer):
"""No-op implementation of ``UpDownSumObserver``."""

def observe(self, value: ValueT, labels: Dict[str, str]) -> None:
"""Captures ``value`` to the updownsumobserver.

Args:
value: The value to capture to this updownsumobserver metric.
labels: Labels associated to ``value``.
"""


class ValueObserver(Observer):
"""No-op implementation of ``ValueObserver``."""

Expand Down
14 changes: 13 additions & 1 deletion opentelemetry-api/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ def test_bound_valuerecorder(self):
bound_valuerecorder = metrics.BoundValueRecorder()
bound_valuerecorder.record(1)

def test_observer(self):
def test_default_observer(self):
observer = metrics.DefaultObserver()
observer.observe(1, {})

def test_sum_observer(self):
observer = metrics.SumObserver()
observer.observe(1, {})

def test_updown_sum_observer(self):
observer = metrics.UpDownSumObserver()
observer.observe(1, {})

def test_value_observer(self):
observer = metrics.ValueObserver()
observer.observe(1, {})
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
([#775](https://github.com/open-telemetry/opentelemetry-python/pull/775))
- Rename Observer to ValueObserver
([#764](https://github.com/open-telemetry/opentelemetry-python/pull/764))
- Add SumObserver and UpDownSumObserver in metrics
([#789](https://github.com/open-telemetry/opentelemetry-python/pull/789))

## 0.8b0

Expand Down
87 changes: 73 additions & 14 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,16 @@ def record(self, value: metrics_api.ValueT) -> None:


class Metric(metrics_api.Metric):
"""Base class for all metric types.
"""Base class for all synchronous metric types.

Also known as metric instrument. This is the class that is used to
represent a metric that is to be continuously recorded and tracked. Each
metric has a set of bound metrics that are created from the metric. See
`BaseBoundInstrument` for information on bound metric instruments.
This is the class that is used to represent a metric that is to be
synchronously recorded and tracked. Synchronous instruments are called
inside a request, meaning they have an associated distributed context
(i.e. Span context, correlation context). Multiple metric events may occur
for a synchronous instrument within a give collection interval.

Each metric has a set of bound metrics that are created from the metric.
See `BaseBoundInstrument` for information on bound metric instruments.
"""

BOUND_INSTR_TYPE = BaseBoundInstrument
Expand Down Expand Up @@ -190,8 +194,14 @@ def record(
UPDATE_FUNCTION = record


class ValueObserver(metrics_api.ValueObserver):
"""See `opentelemetry.metrics.ValueObserver`."""
class Observer(metrics_api.Observer):
"""Base class for all asynchronous metric types.

Also known as Observers, observer metric instruments are asynchronous in
that they are reported by a callback, once per collection interval, and
lack context. They are permitted to report only one value per distinct
label set per period.
"""

def __init__(
self,
Expand All @@ -218,15 +228,10 @@ def __init__(
def observe(
self, value: metrics_api.ValueT, labels: Dict[str, str]
) -> None:
if not self.enabled:
return
if not isinstance(value, self.value_type):
logger.warning(
"Invalid value passed for %s.", self.value_type.__name__
)
key = get_labels_as_key(labels)
if not self._validate_observe(value, key):
return

key = get_labels_as_key(labels)
if key not in self.aggregators:
# TODO: how to cleanup aggregators?
self.aggregators[key] = self.meter.batcher.aggregator_for(
Expand All @@ -235,6 +240,20 @@ def observe(
aggregator = self.aggregators[key]
aggregator.update(value)

# pylint: disable=W0613
def _validate_observe(
self, value: metrics_api.ValueT, key: Tuple[Tuple[str, str]] = None,
):
if not self.enabled:
return False
if not isinstance(value, self.value_type):
logger.warning(
"Invalid value passed for %s.", self.value_type.__name__
)
return False

return True

def run(self) -> bool:
try:
self.callback(self)
Expand All @@ -252,6 +271,46 @@ def __repr__(self):
)


class SumObserver(Observer, metrics_api.SumObserver):
"""See `opentelemetry.metrics.SumObserver`."""

def _validate_observe(
self, value: metrics_api.ValueT, key: Tuple[Tuple[str, str]] = None,
):
if super()._validate_observe(value, key):
# Must be non-decreasing because monotonic
if (
key is not None
and key in self.aggregators
and self.aggregators[key].current is not None
):
if value < self.aggregators[key].current:
logger.warning("Value passed must be non-decreasing.")
return False
return True
return False


class UpDownSumObserver(Observer, metrics_api.UpDownSumObserver):
"""See `opentelemetry.metrics.UpDownSumObserver`."""

# pylint: disable=W0235
def _validate_observe(
self, value: metrics_api.ValueT, key: Tuple[Tuple[str, str]] = None,
):
return super()._validate_observe(value, key)


class ValueObserver(Observer, metrics_api.ValueObserver):
"""See `opentelemetry.metrics.ValueObserver`."""

# pylint: disable=W0235
def _validate_observe(
self, value: metrics_api.ValueT, key: Tuple[Tuple[str, str]] = None,
):
return super()._validate_observe(value, key)


class Record:
"""Container class used for processing in the `Batcher`"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,34 @@ def merge(self, other):
)


class LastValueAggregator(Aggregator):
"""Aggregator that stores last value results."""

def __init__(self):
super().__init__()
self._lock = threading.Lock()
self.last_update_timestamp = None

def update(self, value):
with self._lock:
self.current = value
self.last_update_timestamp = time_ns()

def take_checkpoint(self):
with self._lock:
self.checkpoint = self.current
self.current = None

def merge(self, other):
last = self.checkpoint.last
self.last_update_timestamp = get_latest_timestamp(
self.last_update_timestamp, other.last_update_timestamp
)
if self.last_update_timestamp == other.last_update_timestamp:
last = other.checkpoint.last
self.checkpoint = last


class ValueObserverAggregator(Aggregator):
"""Same as MinMaxSumCount but also with last value."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
from opentelemetry.metrics import (
Counter,
InstrumentT,
SumObserver,
UpDownSumObserver,
ValueObserver,
ValueRecorder,
)
from opentelemetry.sdk.metrics.export import MetricRecord
from opentelemetry.sdk.metrics.export.aggregate import (
Aggregator,
CounterAggregator,
LastValueAggregator,
MinMaxSumCountAggregator,
ValueObserverAggregator,
)
Expand Down Expand Up @@ -54,6 +57,10 @@ def aggregator_for(self, instrument_type: Type[InstrumentT]) -> Aggregator:
# pylint:disable=R0201
if issubclass(instrument_type, Counter):
return CounterAggregator()
if issubclass(instrument_type, SumObserver) or issubclass(
instrument_type, UpDownSumObserver
):
return LastValueAggregator()
if issubclass(instrument_type, ValueRecorder):
return MinMaxSumCountAggregator()
if issubclass(instrument_type, ValueObserver):
Expand Down
Loading