diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 5c55ba038ac..f082cce8919 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -13,6 +13,7 @@ # limitations under the License. import abc +import threading from collections import namedtuple @@ -47,62 +48,66 @@ def __init__(self): super().__init__() self.current = 0 self.checkpoint = 0 + self._lock = threading.Lock() def update(self, value): - self.current += value + with self._lock: + self.current += value def take_checkpoint(self): - self.checkpoint = self.current - self.current = 0 + with self._lock: + self.checkpoint = self.current + self.current = 0 def merge(self, other): - self.checkpoint += other.checkpoint + with self._lock: + self.checkpoint += other.checkpoint class MinMaxSumCountAggregator(Aggregator): """Agregator for Measure metrics that keeps min, max, sum and count.""" _TYPE = namedtuple("minmaxsumcount", "min max sum count") + _EMPTY = _TYPE(None, None, None, 0) @classmethod - def _min(cls, val1, val2): - if val1 is None and val2 is None: - return None - return min(val1 or val2, val2 or val1) - - @classmethod - def _max(cls, val1, val2): - if val1 is None and val2 is None: - return None - return max(val1 or val2, val2 or val1) - - @classmethod - def _sum(cls, val1, val2): - if val1 is None and val2 is None: - return None - return (val1 or 0) + (val2 or 0) + def _merge_checkpoint(cls, val1, val2): + if val1 is cls._EMPTY: + return val2 + if val2 is cls._EMPTY: + return val1 + return cls._TYPE( + min(val1.min, val2.min), + max(val1.max, val2.max), + val1.sum + val2.sum, + val1.count + val2.count, + ) def __init__(self): super().__init__() - self.current = self._TYPE(None, None, None, 0) - self.checkpoint = self._TYPE(None, None, None, 0) + self.current = self._EMPTY + self.checkpoint = self._EMPTY + self._lock = threading.Lock() def update(self, value): - self.current = self._TYPE( - self._min(self.current.min, value), - self._max(self.current.max, value), - self._sum(self.current.sum, value), - self.current.count + 1, - ) + with self._lock: + if self.current is self._EMPTY: + self.current = self._TYPE(value, value, value, 1) + else: + self.current = self._TYPE( + min(self.current.min, value), + max(self.current.max, value), + self.current.sum + value, + self.current.count + 1, + ) def take_checkpoint(self): - self.checkpoint = self.current - self.current = self._TYPE(None, None, None, 0) + with self._lock: + self.checkpoint = self.current + self.current = self._EMPTY def merge(self, other): - self.checkpoint = self._TYPE( - self._min(self.checkpoint.min, other.checkpoint.min), - self._max(self.checkpoint.max, other.checkpoint.max), - self._sum(self.checkpoint.sum, other.checkpoint.sum), - self.checkpoint.count + other.checkpoint.count, - ) + with self._lock: + self.checkpoint = self._merge_checkpoint( + self.checkpoint, other.checkpoint + ) diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 5df6c6d08a0..51d7aaaf4fd 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import concurrent.futures +import random import unittest from unittest import mock @@ -222,6 +224,15 @@ def test_ungrouped_batcher_process_not_stateful(self): class TestCounterAggregator(unittest.TestCase): + @staticmethod + def call_update(counter): + update_total = 0 + for _ in range(0, 100000): + val = random.getrandbits(32) + counter.update(val) + update_total += val + return update_total + def test_update(self): counter = CounterAggregator() counter.update(1.0) @@ -243,13 +254,58 @@ def test_merge(self): counter.merge(counter2) self.assertEqual(counter.checkpoint, 4.0) + def test_concurrent_update(self): + counter = CounterAggregator() + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + fut1 = executor.submit(self.call_update, counter) + fut2 = executor.submit(self.call_update, counter) + + updapte_total = fut1.result() + fut2.result() + + counter.take_checkpoint() + self.assertEqual(updapte_total, counter.checkpoint) + + def test_concurrent_update_and_checkpoint(self): + counter = CounterAggregator() + checkpoint_total = 0 + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + fut = executor.submit(self.call_update, counter) + + while not fut.done(): + counter.take_checkpoint() + checkpoint_total += counter.checkpoint + + counter.take_checkpoint() + checkpoint_total += counter.checkpoint + + self.assertEqual(fut.result(), checkpoint_total) + class TestMinMaxSumCountAggregator(unittest.TestCase): + @staticmethod + def call_update(mmsc): + min_ = float("inf") + max_ = float("-inf") + sum_ = 0 + count_ = 0 + for _ in range(0, 100000): + val = random.getrandbits(32) + mmsc.update(val) + if val < min_: + min_ = val + if val > max_: + max_ = val + sum_ += val + count_ += 1 + return MinMaxSumCountAggregator._TYPE(min_, max_, sum_, count_) + def test_update(self): mmsc = MinMaxSumCountAggregator() # test current values without any update self.assertEqual( - mmsc.current, (None, None, None, 0), + mmsc.current, MinMaxSumCountAggregator._EMPTY, ) # call update with some values @@ -267,7 +323,7 @@ def test_checkpoint(self): # take checkpoint wihtout any update mmsc.take_checkpoint() self.assertEqual( - mmsc.checkpoint, (None, None, None, 0), + mmsc.checkpoint, MinMaxSumCountAggregator._EMPTY, ) # call update with some values @@ -282,7 +338,7 @@ def test_checkpoint(self): ) self.assertEqual( - mmsc.current, (None, None, None, 0), + mmsc.current, MinMaxSumCountAggregator._EMPTY, ) def test_merge(self): @@ -299,14 +355,34 @@ def test_merge(self): self.assertEqual( mmsc1.checkpoint, - ( - min(checkpoint1.min, checkpoint2.min), - max(checkpoint1.max, checkpoint2.max), - checkpoint1.sum + checkpoint2.sum, - checkpoint1.count + checkpoint2.count, + MinMaxSumCountAggregator._merge_checkpoint( + checkpoint1, checkpoint2 ), ) + def test_merge_checkpoint(self): + func = MinMaxSumCountAggregator._merge_checkpoint + _type = MinMaxSumCountAggregator._TYPE + empty = MinMaxSumCountAggregator._EMPTY + + ret = func(empty, empty) + self.assertEqual(ret, empty) + + ret = func(empty, _type(0, 0, 0, 0)) + self.assertEqual(ret, _type(0, 0, 0, 0)) + + ret = func(_type(0, 0, 0, 0), empty) + self.assertEqual(ret, _type(0, 0, 0, 0)) + + ret = func(_type(0, 0, 0, 0), _type(0, 0, 0, 0)) + self.assertEqual(ret, _type(0, 0, 0, 0)) + + ret = func(_type(44, 23, 55, 86), empty) + self.assertEqual(ret, _type(44, 23, 55, 86)) + + ret = func(_type(3, 150, 101, 3), _type(1, 33, 44, 2)) + self.assertEqual(ret, _type(1, 150, 101 + 44, 2 + 3)) + def test_merge_with_empty(self): mmsc1 = MinMaxSumCountAggregator() mmsc2 = MinMaxSumCountAggregator() @@ -318,6 +394,42 @@ def test_merge_with_empty(self): self.assertEqual(mmsc1.checkpoint, checkpoint1) + def test_concurrent_update(self): + mmsc = MinMaxSumCountAggregator() + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex: + fut1 = ex.submit(self.call_update, mmsc) + fut2 = ex.submit(self.call_update, mmsc) + + ret1 = fut1.result() + ret2 = fut2.result() + + update_total = MinMaxSumCountAggregator._merge_checkpoint( + ret1, ret2 + ) + mmsc.take_checkpoint() + + self.assertEqual(update_total, mmsc.checkpoint) + + def test_concurrent_update_and_checkpoint(self): + mmsc = MinMaxSumCountAggregator() + checkpoint_total = MinMaxSumCountAggregator._TYPE(2 ** 32, 0, 0, 0) + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex: + fut = ex.submit(self.call_update, mmsc) + + while not fut.done(): + mmsc.take_checkpoint() + checkpoint_total = MinMaxSumCountAggregator._merge_checkpoint( + checkpoint_total, mmsc.checkpoint + ) + + mmsc.take_checkpoint() + checkpoint_total = MinMaxSumCountAggregator._merge_checkpoint( + checkpoint_total, mmsc.checkpoint + ) + + self.assertEqual(checkpoint_total, fut.result()) + class TestController(unittest.TestCase): def test_push_controller(self):