diff --git a/CHANGELOG.md b/CHANGELOG.md index 53f742dce3..c7e3c2fce1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.7.0-0.26b0...HEAD) +- Adds Aggregation and instruments as part of Metrics SDK + ([#2234](https://github.com/open-telemetry/opentelemetry-python/pull/2234)) + ## [1.7.1-0.26b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.7.0-0.26b0) - 2021-11-11 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py new file mode 100644 index 0000000000..456e447162 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py @@ -0,0 +1,137 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from collections import OrderedDict +from logging import getLogger +from math import inf + +from opentelemetry._metrics.instrument import _Monotonic +from opentelemetry.util._time import _time_ns + +_logger = getLogger(__name__) + + +class Aggregation(ABC): + @property + def value(self): + return self._value # pylint: disable=no-member + + @abstractmethod + def aggregate(self, value): + pass + + @abstractmethod + def make_point_and_reset(self): + """ + Atomically return a point for the current value of the metric and reset the internal state. + """ + + +class SumAggregation(Aggregation): + """ + This aggregation collects data for the SDK sum metric point. + """ + + def __init__(self, instrument): + self._value = 0 + + def aggregate(self, value): + self._value = self._value + value + + def make_point_and_reset(self): + pass + + +class LastValueAggregation(Aggregation): + + """ + This aggregation collects data for the SDK sum metric point. + """ + + def __init__(self, instrument): + self._value = None + self._timestamp = _time_ns() + + def aggregate(self, value): + self._value = value + self._timestamp = _time_ns() + + def make_point_and_reset(self): + pass + + +class ExplicitBucketHistogramAggregation(Aggregation): + + """ + This aggregation collects data for the SDK sum metric point. + """ + + def __init__( + self, + instrument, + *args, + boundaries=(0, 5, 10, 25, 50, 75, 100, 250, 500, 1000), + record_min_max=True, + ): + super().__init__() + self._value = OrderedDict([(key, 0) for key in (*boundaries, inf)]) + self._min = inf + self._max = -inf + self._sum = 0 + self._instrument = instrument + self._record_min_max = record_min_max + + @property + def min(self): + if not self._record_min_max: + _logger.warning("Min is not being recorded") + + return self._min + + @property + def max(self): + if not self._record_min_max: + _logger.warning("Max is not being recorded") + + return self._max + + @property + def sum(self): + if isinstance(self._instrument, _Monotonic): + return self._sum + + _logger.warning( + "Sum is not filled out when the associated " + "instrument is not monotonic" + ) + return None + + def aggregate(self, value): + if self._record_min_max: + self._min = min(self._min, value) + self._max = max(self._max, value) + + if isinstance(self._instrument, _Monotonic): + self._sum += value + + for key in self._value.keys(): + + if value < key: + self._value[key] = self._value[key] + value + + break + + def make_point_and_reset(self): + pass diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py new file mode 100644 index 0000000000..fc63311ce7 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py @@ -0,0 +1,160 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=function-redefined +# pylint: disable=dangerous-default-value +# Classes in this module use dictionaries as default arguments. This is +# considered dangerous by pylint because the default dictionary is shared by +# all instances. Implementations of these classes must not make any change to +# this default dictionary in __init__. + +from opentelemetry._metrics.instrument import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) +from opentelemetry.sdk._metrics.aggregation import ( + ExplicitBucketHistogramAggregation, + LastValueAggregation, + SumAggregation, +) + + +class _Instrument: + def __init__( + self, + name, + unit="", + description="", + aggregation=None, + aggregation_config={}, + ): + self._attributes_aggregations = {} + self._aggregation = aggregation + self._aggregation_config = aggregation_config + aggregation(self, **aggregation_config) + + +class Counter(_Instrument, Counter): + def __init__( + self, + name, + unit="", + description="", + aggregation=SumAggregation, + aggregation_config={}, + ): + super().__init__( + name, + unit=unit, + description=description, + aggregation=aggregation, + aggregation_config=aggregation_config, + ) + + +class UpDownCounter(_Instrument, UpDownCounter): + def __init__( + self, + name, + unit="", + description="", + aggregation=SumAggregation, + aggregation_config={}, + ): + super().__init__( + name, + unit=unit, + description=description, + aggregation=aggregation, + aggregation_config=aggregation_config, + ) + + +class ObservableCounter(_Instrument, ObservableCounter): + def __init__( + self, + name, + callback, + unit="", + description="", + aggregation=SumAggregation, + aggregation_config={}, + ): + super().__init__( + name, + unit=unit, + description=description, + aggregation=aggregation, + aggregation_config=aggregation_config, + ) + + +class ObservableUpDownCounter(_Instrument, ObservableUpDownCounter): + def __init__( + self, + name, + callback, + unit="", + description="", + aggregation=SumAggregation, + aggregation_config={}, + ): + super().__init__( + name, + unit=unit, + description=description, + aggregation=aggregation, + aggregation_config=aggregation_config, + ) + + +class Histogram(_Instrument, Histogram): + def __init__( + self, + name, + unit="", + description="", + aggregation=ExplicitBucketHistogramAggregation, + aggregation_config={}, + ): + super().__init__( + name, + unit=unit, + description=description, + aggregation=aggregation, + aggregation_config=aggregation_config, + ) + + +class ObservableGauge(_Instrument, ObservableGauge): + def __init__( + self, + name, + callback, + unit="", + description="", + aggregation=LastValueAggregation, + aggregation_config={}, + ): + super().__init__( + name, + unit=unit, + description=description, + aggregation=aggregation, + aggregation_config=aggregation_config, + ) diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py new file mode 100644 index 0000000000..1c4fa1420e --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -0,0 +1,126 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from logging import WARNING +from math import inf +from unittest import TestCase +from unittest.mock import Mock + +from opentelemetry.sdk._metrics.aggregation import ( + ExplicitBucketHistogramAggregation, + LastValueAggregation, + SumAggregation, +) + + +class TestSumAggregation(TestCase): + def test_aggregate(self): + """ + `SumAggregation` collects data for sum metric points + """ + + sum_aggregation = SumAggregation(Mock()) + + sum_aggregation.aggregate(1) + sum_aggregation.aggregate(2) + sum_aggregation.aggregate(3) + + self.assertEqual(sum_aggregation.value, 6) + + sum_aggregation = SumAggregation(Mock()) + + sum_aggregation.aggregate(1) + sum_aggregation.aggregate(-2) + sum_aggregation.aggregate(3) + + self.assertEqual(sum_aggregation.value, 2) + + +class TestLastValueAggregation(TestCase): + def test_aggregate(self): + """ + `LastValueAggregation` collects data for gauge metric points with delta + temporality + """ + + last_value_aggregation = LastValueAggregation(Mock()) + + last_value_aggregation.aggregate(1) + self.assertEqual(last_value_aggregation.value, 1) + + last_value_aggregation.aggregate(2) + self.assertEqual(last_value_aggregation.value, 2) + + last_value_aggregation.aggregate(3) + self.assertEqual(last_value_aggregation.value, 3) + + +class TestExplicitBucketHistogramAggregation(TestCase): + def test_aggregate(self): + """ + `ExplicitBucketHistogramAggregation` collects data for explicit_bucket_histogram metric points + """ + + explicit_bucket_histogram_aggregation = ( + ExplicitBucketHistogramAggregation(Mock()) + ) + + explicit_bucket_histogram_aggregation.aggregate(-1) + explicit_bucket_histogram_aggregation.aggregate(2) + explicit_bucket_histogram_aggregation.aggregate(7) + explicit_bucket_histogram_aggregation.aggregate(8) + explicit_bucket_histogram_aggregation.aggregate(9999) + + self.assertEqual(explicit_bucket_histogram_aggregation.value[0], -1) + self.assertEqual(explicit_bucket_histogram_aggregation.value[5], 2) + self.assertEqual(explicit_bucket_histogram_aggregation.value[10], 15) + self.assertEqual( + explicit_bucket_histogram_aggregation.value[inf], 9999 + ) + + def test_min_max(self): + """ + `record_min_max` indicates the aggregator to record the minimum and + maximum value in the population + """ + + explicit_bucket_histogram_aggregation = ( + ExplicitBucketHistogramAggregation(Mock()) + ) + + explicit_bucket_histogram_aggregation.aggregate(-1) + explicit_bucket_histogram_aggregation.aggregate(2) + explicit_bucket_histogram_aggregation.aggregate(7) + explicit_bucket_histogram_aggregation.aggregate(8) + explicit_bucket_histogram_aggregation.aggregate(9999) + + self.assertEqual(explicit_bucket_histogram_aggregation.min, -1) + self.assertEqual(explicit_bucket_histogram_aggregation.max, 9999) + + explicit_bucket_histogram_aggregation = ( + ExplicitBucketHistogramAggregation(Mock(), record_min_max=False) + ) + + explicit_bucket_histogram_aggregation.aggregate(-1) + explicit_bucket_histogram_aggregation.aggregate(2) + explicit_bucket_histogram_aggregation.aggregate(7) + explicit_bucket_histogram_aggregation.aggregate(8) + explicit_bucket_histogram_aggregation.aggregate(9999) + + with self.assertLogs(level=WARNING): + self.assertEqual(explicit_bucket_histogram_aggregation.min, inf) + + with self.assertLogs(level=WARNING): + self.assertEqual(explicit_bucket_histogram_aggregation.max, -inf)