From 836638748a005f43da07a446805616986b1086b9 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Fri, 12 Apr 2024 17:17:21 -0500 Subject: [PATCH] Support float and duration metrics Fixes #493 --- temporalio/bridge/metric.py | 60 ++++++ temporalio/bridge/runtime.py | 4 +- temporalio/bridge/src/client.rs | 6 +- temporalio/bridge/src/lib.rs | 3 + temporalio/bridge/src/metric.rs | 131 +++++++++++-- temporalio/bridge/src/runtime.rs | 7 +- temporalio/common.py | 240 +++++++++++++++--------- temporalio/runtime.py | 233 ++++++++++++++--------- temporalio/worker/_workflow_instance.py | 133 ++++++++----- tests/worker/test_workflow.py | 71 +++++++ 10 files changed, 650 insertions(+), 238 deletions(-) diff --git a/temporalio/bridge/metric.py b/temporalio/bridge/metric.py index 2f67486b..399fe5cc 100644 --- a/temporalio/bridge/metric.py +++ b/temporalio/bridge/metric.py @@ -75,6 +75,46 @@ def record(self, value: int, attrs: MetricAttributes) -> None: self._ref.record(value, attrs._ref) +class MetricHistogramFloat: + """Metric histogram using SDK Core.""" + + def __init__( + self, + meter: MetricMeter, + name: str, + description: Optional[str], + unit: Optional[str], + ) -> None: + """Initialize histogram.""" + self._ref = meter._ref.new_histogram_float(name, description, unit) + + def record(self, value: float, attrs: MetricAttributes) -> None: + """Record value on histogram.""" + if value < 0: + raise ValueError("Metric value must be non-negative value") + self._ref.record(value, attrs._ref) + + +class MetricHistogramDuration: + """Metric histogram using SDK Core.""" + + def __init__( + self, + meter: MetricMeter, + name: str, + description: Optional[str], + unit: Optional[str], + ) -> None: + """Initialize histogram.""" + self._ref = meter._ref.new_histogram_duration(name, description, unit) + + def record(self, value_ms: int, attrs: MetricAttributes) -> None: + """Record value on histogram.""" + if value_ms < 0: + raise ValueError("Metric value must be non-negative value") + self._ref.record(value_ms, attrs._ref) + + class MetricGauge: """Metric gauge using SDK Core.""" @@ -95,6 +135,26 @@ def set(self, value: int, attrs: MetricAttributes) -> None: self._ref.set(value, attrs._ref) +class MetricGaugeFloat: + """Metric gauge using SDK Core.""" + + def __init__( + self, + meter: MetricMeter, + name: str, + description: Optional[str], + unit: Optional[str], + ) -> None: + """Initialize gauge.""" + self._ref = meter._ref.new_gauge_float(name, description, unit) + + def set(self, value: float, attrs: MetricAttributes) -> None: + """Set value on gauge.""" + if value < 0: + raise ValueError("Metric value must be non-negative value") + self._ref.set(value, attrs._ref) + + class MetricAttributes: """Metric attributes using SDK Core.""" diff --git a/temporalio/bridge/runtime.py b/temporalio/bridge/runtime.py index 43ea4655..5d4b7528 100644 --- a/temporalio/bridge/runtime.py +++ b/temporalio/bridge/runtime.py @@ -27,9 +27,9 @@ def __init__(self, *, telemetry: TelemetryConfig) -> None: """Create SDK Core runtime.""" self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(telemetry) - def retrieve_buffered_metrics(self) -> Sequence[Any]: + def retrieve_buffered_metrics(self, durations_as_seconds: bool) -> Sequence[Any]: """Get buffered metrics.""" - return self._ref.retrieve_buffered_metrics() + return self._ref.retrieve_buffered_metrics(durations_as_seconds) def write_test_info_log(self, message: str, extra_data: str) -> None: """Write a test core log at INFO level.""" diff --git a/temporalio/bridge/src/client.rs b/temporalio/bridge/src/client.rs index f6f9bc3f..ff163bf7 100644 --- a/temporalio/bridge/src/client.rs +++ b/temporalio/bridge/src/client.rs @@ -5,8 +5,8 @@ use std::str::FromStr; use std::time::Duration; use temporal_client::{ ClientKeepAliveConfig as CoreClientKeepAliveConfig, ClientOptions, ClientOptionsBuilder, - ConfiguredClient, HealthService, OperatorService, RetryClient, RetryConfig, - TemporalServiceClientWithMetrics, TestService, TlsConfig, WorkflowService, HttpConnectProxyOptions, + ConfiguredClient, HealthService, HttpConnectProxyOptions, OperatorService, RetryClient, + RetryConfig, TemporalServiceClientWithMetrics, TestService, TlsConfig, WorkflowService, }; use tonic::metadata::MetadataKey; use url::Url; @@ -467,4 +467,4 @@ impl From for HttpConnectProxyOptions { basic_auth: conf.basic_auth, } } -} \ No newline at end of file +} diff --git a/temporalio/bridge/src/lib.rs b/temporalio/bridge/src/lib.rs index 6d10dea4..589117f2 100644 --- a/temporalio/bridge/src/lib.rs +++ b/temporalio/bridge/src/lib.rs @@ -19,7 +19,10 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_function(wrap_pyfunction!(new_metric_meter, m)?)?; diff --git a/temporalio/bridge/src/metric.rs b/temporalio/bridge/src/metric.rs index 66650b17..6e3a3329 100644 --- a/temporalio/bridge/src/metric.rs +++ b/temporalio/bridge/src/metric.rs @@ -1,4 +1,5 @@ use std::any::Any; +use std::time::Duration; use std::{collections::HashMap, sync::Arc}; use pyo3::prelude::*; @@ -32,11 +33,26 @@ pub struct MetricHistogramRef { histogram: Arc, } +#[pyclass] +pub struct MetricHistogramFloatRef { + histogram: Arc, +} + +#[pyclass] +pub struct MetricHistogramDurationRef { + histogram: Arc, +} + #[pyclass] pub struct MetricGaugeRef { gauge: Arc, } +#[pyclass] +pub struct MetricGaugeFloatRef { + gauge: Arc, +} + pub fn new_metric_meter(runtime_ref: &runtime::RuntimeRef) -> Option { runtime_ref .runtime @@ -84,6 +100,36 @@ impl MetricMeterRef { } } + fn new_histogram_float( + &self, + name: String, + description: Option, + unit: Option, + ) -> MetricHistogramFloatRef { + MetricHistogramFloatRef { + histogram: self.meter.inner.histogram_f64(build_metric_parameters( + name, + description, + unit, + )), + } + } + + fn new_histogram_duration( + &self, + name: String, + description: Option, + unit: Option, + ) -> MetricHistogramDurationRef { + MetricHistogramDurationRef { + histogram: self.meter.inner.histogram_duration(build_metric_parameters( + name, + description, + unit, + )), + } + } + fn new_gauge( &self, name: String, @@ -97,6 +143,20 @@ impl MetricMeterRef { .gauge(build_metric_parameters(name, description, unit)), } } + + fn new_gauge_float( + &self, + name: String, + description: Option, + unit: Option, + ) -> MetricGaugeFloatRef { + MetricGaugeFloatRef { + gauge: self + .meter + .inner + .gauge_f64(build_metric_parameters(name, description, unit)), + } + } } #[pymethods] @@ -113,6 +173,21 @@ impl MetricHistogramRef { } } +#[pymethods] +impl MetricHistogramFloatRef { + fn record(&self, value: f64, attrs_ref: &MetricAttributesRef) { + self.histogram.record(value, &attrs_ref.attrs); + } +} + +#[pymethods] +impl MetricHistogramDurationRef { + fn record(&self, value_ms: u64, attrs_ref: &MetricAttributesRef) { + self.histogram + .record(Duration::from_millis(value_ms), &attrs_ref.attrs); + } +} + #[pymethods] impl MetricGaugeRef { fn set(&self, value: u64, attrs_ref: &MetricAttributesRef) { @@ -120,6 +195,13 @@ impl MetricGaugeRef { } } +#[pymethods] +impl MetricGaugeFloatRef { + fn set(&self, value: f64, attrs_ref: &MetricAttributesRef) { + self.gauge.record(value, &attrs_ref.attrs); + } +} + fn build_metric_parameters( name: String, description: Option, @@ -192,16 +274,18 @@ pub struct BufferedMetricUpdate { } #[derive(Clone)] -pub struct BufferedMetricUpdateValue(metrics::MetricUpdateVal); +pub enum BufferedMetricUpdateValue { + U64(u64), + U128(u128), + F64(f64), +} impl IntoPy for BufferedMetricUpdateValue { fn into_py(self, py: Python) -> PyObject { - match self.0 { - metrics::MetricUpdateVal::Delta(v) => v.into_py(py), - metrics::MetricUpdateVal::DeltaF64(v) => v.into_py(py), - metrics::MetricUpdateVal::Value(v) => v.into_py(py), - metrics::MetricUpdateVal::ValueF64(v) => v.into_py(py), - metrics::MetricUpdateVal::Duration(v) => v.as_millis().into_py(py), + match self { + BufferedMetricUpdateValue::U64(v) => v.into_py(py), + BufferedMetricUpdateValue::U128(v) => v.into_py(py), + BufferedMetricUpdateValue::F64(v) => v.into_py(py), } } } @@ -236,16 +320,18 @@ impl CustomMetricAttributes for BufferedMetricAttributes { pub fn convert_metric_events<'p>( py: Python<'p>, events: Vec>, + durations_as_seconds: bool, ) -> Vec { events .into_iter() - .filter_map(|e| convert_metric_event(py, e)) + .filter_map(|e| convert_metric_event(py, e, durations_as_seconds)) .collect() } fn convert_metric_event<'p>( py: Python<'p>, event: MetricEvent, + durations_as_seconds: bool, ) -> Option { match event { // Create the metric and put it on the lazy ref @@ -262,9 +348,19 @@ fn convert_metric_event<'p>( description: Some(params.description) .filter(|s| !s.is_empty()) .map(|s| s.to_string()), - unit: Some(params.unit) - .filter(|s| !s.is_empty()) - .map(|s| s.to_string()), + unit: if matches!(kind, metrics::MetricKind::HistogramDuration) + && params.unit == "duration" + { + if durations_as_seconds { + Some("s".to_owned()) + } else { + Some("ms".to_owned()) + } + } else if params.unit.is_empty() { + None + } else { + Some(params.unit.to_string()) + }, kind: match kind { metrics::MetricKind::Counter => 0, metrics::MetricKind::Gauge | metrics::MetricKind::GaugeF64 => 1, @@ -324,7 +420,18 @@ fn convert_metric_event<'p>( update, } => Some(BufferedMetricUpdate { metric: instrument.get().clone().0.clone(), - value: BufferedMetricUpdateValue(update), + value: match update { + metrics::MetricUpdateVal::Duration(v) if durations_as_seconds => { + BufferedMetricUpdateValue::F64(v.as_secs_f64()) + } + metrics::MetricUpdateVal::Duration(v) => { + BufferedMetricUpdateValue::U128(v.as_millis()) + } + metrics::MetricUpdateVal::Delta(v) => BufferedMetricUpdateValue::U64(v), + metrics::MetricUpdateVal::DeltaF64(v) => BufferedMetricUpdateValue::F64(v), + metrics::MetricUpdateVal::Value(v) => BufferedMetricUpdateValue::U64(v), + metrics::MetricUpdateVal::ValueF64(v) => BufferedMetricUpdateValue::F64(v), + }, attributes: attributes .get() .clone() diff --git a/temporalio/bridge/src/runtime.rs b/temporalio/bridge/src/runtime.rs index 24e3d801..949afe7a 100644 --- a/temporalio/bridge/src/runtime.rs +++ b/temporalio/bridge/src/runtime.rs @@ -205,7 +205,11 @@ impl Drop for Runtime { #[pymethods] impl RuntimeRef { - fn retrieve_buffered_metrics<'p>(&self, py: Python<'p>) -> Vec { + fn retrieve_buffered_metrics<'p>( + &self, + py: Python<'p>, + durations_as_seconds: bool, + ) -> Vec { convert_metric_events( py, self.runtime @@ -213,6 +217,7 @@ impl RuntimeRef { .as_ref() .expect("Attempting to retrieve buffered metrics without buffer") .retrieve(), + durations_as_seconds, ) } diff --git a/temporalio/common.py b/temporalio/common.py index 5b15994d..27b2e973 100644 --- a/temporalio/common.py +++ b/temporalio/common.py @@ -30,7 +30,7 @@ ) import google.protobuf.internal.containers -from typing_extensions import ClassVar, NamedTuple, TypeAlias, get_origin +from typing_extensions import ClassVar, NamedTuple, Self, TypeAlias, get_origin import temporalio.api.common.v1 import temporalio.api.enums.v1 @@ -577,6 +577,41 @@ def create_histogram( """ ... + @abstractmethod + def create_histogram_float( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> MetricHistogramFloat: + """Create a histogram metric for recording values. + + Args: + name: Name for the metric. + description: Optional description for the metric. + unit: Optional unit for the metric. + + Returns: + Histogram metric. + """ + ... + + @abstractmethod + def create_histogram_timedelta( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> MetricHistogramTimedelta: + """Create a histogram metric for recording values. + + Note, duration precision is millisecond. Also note, if "unit" is set as + "duration", it will be converted to "ms" or "s" on the way out. + + Args: + name: Name for the metric. + description: Optional description for the metric. + unit: Optional unit for the metric. + + Returns: + Histogram metric. + """ + ... + @abstractmethod def create_gauge( self, name: str, description: Optional[str] = None, unit: Optional[str] = None @@ -593,6 +628,22 @@ def create_gauge( """ ... + @abstractmethod + def create_gauge_float( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> MetricGaugeFloat: + """Create a gauge metric for setting values. + + Args: + name: Name for the metric. + description: Optional description for the metric. + unit: Optional unit for the metric. + + Returns: + Gauge metric. + """ + ... + @abstractmethod def with_additional_attributes( self, additional_attributes: MetricAttributes @@ -613,8 +664,8 @@ def with_additional_attributes( ... -class MetricCounter(ABC): - """Counter metric created by a metric meter.""" +class MetricCommon(ABC): + """Base for all metrics.""" @property @abstractmethod @@ -635,63 +686,49 @@ def unit(self) -> Optional[str]: ... @abstractmethod - def add( - self, value: int, additional_attributes: Optional[MetricAttributes] = None - ) -> None: - """Add a value to the counter. + def with_additional_attributes( + self, additional_attributes: MetricAttributes + ) -> Self: + """Create a new metric with the given attributes appended to the + current set. Args: - value: A non-negative integer to add. additional_attributes: Additional attributes to append to the current set. + Returns: + New metric. + Raises: - ValueError: Value is negative. TypeError: Attribute values are not the expected type. """ ... + +class MetricCounter(MetricCommon): + """Counter metric created by a metric meter.""" + @abstractmethod - def with_additional_attributes( - self, additional_attributes: MetricAttributes - ) -> MetricCounter: - """Create a new counter with the given attributes appended to the - current set. + def add( + self, value: int, additional_attributes: Optional[MetricAttributes] = None + ) -> None: + """Add a value to the counter. Args: + value: A non-negative integer to add. additional_attributes: Additional attributes to append to the current set. - Returns: - New counter. - Raises: + ValueError: Value is negative. TypeError: Attribute values are not the expected type. """ ... -class MetricHistogram(ABC): +class MetricHistogram(MetricCommon): """Histogram metric created by a metric meter.""" - @property - @abstractmethod - def name(self) -> str: - """Name for the metric.""" - ... - - @property - @abstractmethod - def description(self) -> Optional[str]: - """Description for the metric if any.""" - ... - - @property - @abstractmethod - def unit(self) -> Optional[str]: - """Unit for the metric if any.""" - ... - @abstractmethod def record( self, value: int, additional_attributes: Optional[MetricAttributes] = None @@ -709,47 +746,54 @@ def record( """ ... + +class MetricHistogramFloat(MetricCommon): + """Histogram metric created by a metric meter.""" + @abstractmethod - def with_additional_attributes( - self, additional_attributes: MetricAttributes - ) -> MetricHistogram: - """Create a new histogram with the given attributes appended to the - current set. + def record( + self, value: float, additional_attributes: Optional[MetricAttributes] = None + ) -> None: + """Record a value on the histogram. Args: + value: A non-negative float to record. additional_attributes: Additional attributes to append to the current set. - Returns: - New histogram. - Raises: + ValueError: Value is negative. TypeError: Attribute values are not the expected type. """ ... -class MetricGauge(ABC): - """Gauge metric created by a metric meter.""" +class MetricHistogramTimedelta(MetricCommon): + """Histogram metric created by a metric meter.""" - @property @abstractmethod - def name(self) -> str: - """Name for the metric.""" - ... + def record( + self, value: timedelta, additional_attributes: Optional[MetricAttributes] = None + ) -> None: + """Record a value on the histogram. - @property - @abstractmethod - def description(self) -> Optional[str]: - """Description for the metric if any.""" - ... + Note, duration precision is millisecond. - @property - @abstractmethod - def unit(self) -> Optional[str]: - """Unit for the metric if any.""" + Args: + value: A non-negative timedelta to record. + additional_attributes: Additional attributes to append to the + current set. + + Raises: + ValueError: Value is negative. + TypeError: Attribute values are not the expected type. + """ ... + +class MetricGauge(MetricCommon): + """Gauge metric created by a metric meter.""" + @abstractmethod def set( self, value: int, additional_attributes: Optional[MetricAttributes] = None @@ -767,21 +811,23 @@ def set( """ ... + +class MetricGaugeFloat(MetricCommon): + """Gauge metric created by a metric meter.""" + @abstractmethod - def with_additional_attributes( - self, additional_attributes: MetricAttributes - ) -> MetricGauge: - """Create a new gauge with the given attributes appended to the - current set. + def set( + self, value: float, additional_attributes: Optional[MetricAttributes] = None + ) -> None: + """Set a value on the gauge. Args: + value: A non-negative float to set. additional_attributes: Additional attributes to append to the current set. - Returns: - New gauge. - Raises: + ValueError: Value is negative. TypeError: Attribute values are not the expected type. """ ... @@ -798,18 +844,33 @@ def create_histogram( ) -> MetricHistogram: return _NoopMetricHistogram(name, description, unit) + def create_histogram_float( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> MetricHistogramFloat: + return _NoopMetricHistogramFloat(name, description, unit) + + def create_histogram_timedelta( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> MetricHistogramTimedelta: + return _NoopMetricHistogramTimedelta(name, description, unit) + def create_gauge( self, name: str, description: Optional[str] = None, unit: Optional[str] = None ) -> MetricGauge: return _NoopMetricGauge(name, description, unit) + def create_gauge_float( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> MetricGaugeFloat: + return _NoopMetricGaugeFloat(name, description, unit) + def with_additional_attributes( self, additional_attributes: MetricAttributes ) -> MetricMeter: return self -class _NoopMetric: +class _NoopMetric(MetricCommon): def __init__( self, name: str, description: Optional[str], unit: Optional[str] ) -> None: @@ -829,41 +890,52 @@ def description(self) -> Optional[str]: def unit(self) -> Optional[str]: return self._unit + def with_additional_attributes( + self, additional_attributes: MetricAttributes + ) -> Self: + return self -class _NoopMetricCounter(_NoopMetric, MetricCounter): + +class _NoopMetricCounter(MetricCounter, _NoopMetric): def add( self, value: int, additional_attributes: Optional[MetricAttributes] = None ) -> None: pass - def with_additional_attributes( - self, additional_attributes: MetricAttributes - ) -> MetricCounter: - return self - -class _NoopMetricHistogram(_NoopMetric, MetricHistogram): +class _NoopMetricHistogram(MetricHistogram, _NoopMetric): def record( self, value: int, additional_attributes: Optional[MetricAttributes] = None ) -> None: pass - def with_additional_attributes( - self, additional_attributes: MetricAttributes - ) -> MetricHistogram: - return self + +class _NoopMetricHistogramFloat(MetricHistogramFloat, _NoopMetric): + def record( + self, value: float, additional_attributes: Optional[MetricAttributes] = None + ) -> None: + pass + + +class _NoopMetricHistogramTimedelta(MetricHistogramTimedelta, _NoopMetric): + def record( + self, value: timedelta, additional_attributes: Optional[MetricAttributes] = None + ) -> None: + pass -class _NoopMetricGauge(_NoopMetric, MetricGauge): +class _NoopMetricGauge(MetricGauge, _NoopMetric): def set( self, value: int, additional_attributes: Optional[MetricAttributes] = None ) -> None: pass - def with_additional_attributes( - self, additional_attributes: MetricAttributes - ) -> MetricGauge: - return self + +class _NoopMetricGaugeFloat(MetricGaugeFloat, _NoopMetric): + def set( + self, value: float, additional_attributes: Optional[MetricAttributes] = None + ) -> None: + pass MetricMeter.noop = _NoopMetricMeter() diff --git a/temporalio/runtime.py b/temporalio/runtime.py index a363183a..8834a680 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -10,9 +10,18 @@ from dataclasses import dataclass, field from datetime import timedelta from enum import Enum -from typing import ClassVar, Mapping, NewType, Optional, Sequence, Union +from typing import ( + ClassVar, + Generic, + Mapping, + NewType, + Optional, + Sequence, + TypeVar, + Union, +) -from typing_extensions import Protocol +from typing_extensions import Protocol, Self import temporalio.bridge.metric import temporalio.bridge.runtime @@ -282,6 +291,16 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.PrometheusConfig: ) +class MetricBufferDurationFormat(Enum): + """How durations are represented for metrics buffers.""" + + MILLISECONDS = 1 + """Durations are millisecond integers.""" + + SECONDS = 2 + """Durations are second floats.""" + + class MetricBuffer: """A buffer that can be set on :py:class:`TelemetryConfig` to record metrics instead of ignoring/exporting them. @@ -293,7 +312,11 @@ class MetricBuffer: logged. """ - def __init__(self, buffer_size: int) -> None: + def __init__( + self, + buffer_size: int, + duration_format: MetricBufferDurationFormat = MetricBufferDurationFormat.MILLISECONDS, + ) -> None: """Create a buffer with the given size. .. warning:: @@ -303,9 +326,13 @@ def __init__(self, buffer_size: int) -> None: Args: buffer_size: Size of the buffer. Set this to a large value. A value in the tens of thousands or higher is plenty reasonable. + duration_format: Which duration format to use. """ self._buffer_size = buffer_size self._runtime: Optional[Runtime] = None + self._durations_as_seconds = ( + duration_format == MetricBufferDurationFormat.SECONDS + ) def retrieve_updates(self) -> Sequence[BufferedMetricUpdate]: """Drain the buffer and return all metric updates. @@ -319,7 +346,9 @@ def retrieve_updates(self) -> Sequence[BufferedMetricUpdate]: """ if not self._runtime: raise RuntimeError("Attempting to retrieve updates before runtime created") - return self._runtime._core_runtime.retrieve_buffered_metrics() + return self._runtime._core_runtime.retrieve_buffered_metrics( + self._durations_as_seconds + ) @dataclass(frozen=True) @@ -485,6 +514,32 @@ def create_histogram( self._core_attrs, ) + def create_histogram_float( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> temporalio.common.MetricHistogramFloat: + return _MetricHistogramFloat( + name, + description, + unit, + temporalio.bridge.metric.MetricHistogramFloat( + self._core_meter, name, description, unit + ), + self._core_attrs, + ) + + def create_histogram_timedelta( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> temporalio.common.MetricHistogramTimedelta: + return _MetricHistogramTimedelta( + name, + description, + unit, + temporalio.bridge.metric.MetricHistogramDuration( + self._core_meter, name, description, unit + ), + self._core_attrs, + ) + def create_gauge( self, name: str, description: Optional[str] = None, unit: Optional[str] = None ) -> temporalio.common.MetricGauge: @@ -498,6 +553,19 @@ def create_gauge( self._core_attrs, ) + def create_gauge_float( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> temporalio.common.MetricGaugeFloat: + return _MetricGaugeFloat( + name, + description, + unit, + temporalio.bridge.metric.MetricGaugeFloat( + self._core_meter, name, description, unit + ), + self._core_attrs, + ) + def with_additional_attributes( self, additional_attributes: temporalio.common.MetricAttributes ) -> temporalio.common.MetricMeter: @@ -507,13 +575,16 @@ def with_additional_attributes( ) -class _MetricCounter(temporalio.common.MetricCounter): +_CoreMetricType = TypeVar("_CoreMetricType") + + +class _MetricCommon(temporalio.common.MetricCommon, Generic[_CoreMetricType]): def __init__( self, name: str, description: Optional[str], unit: Optional[str], - core_metric: temporalio.bridge.metric.MetricCounter, + core_metric: _CoreMetricType, core_attrs: temporalio.bridge.metric.MetricAttributes, ) -> None: self._name = name @@ -534,22 +605,10 @@ def description(self) -> Optional[str]: def unit(self) -> Optional[str]: return self._unit - def add( - self, - value: int, - additional_attributes: Optional[temporalio.common.MetricAttributes] = None, - ) -> None: - if value < 0: - raise ValueError("Metric value cannot be negative") - core_attrs = self._core_attrs - if additional_attributes: - core_attrs = core_attrs.with_additional_attributes(additional_attributes) - self._core_metric.add(value, core_attrs) - def with_additional_attributes( self, additional_attributes: temporalio.common.MetricAttributes - ) -> temporalio.common.MetricCounter: - return _MetricCounter( + ) -> Self: + return self.__class__( self._name, self._description, self._unit, @@ -558,33 +617,27 @@ def with_additional_attributes( ) -class _MetricHistogram(temporalio.common.MetricHistogram): - def __init__( +class _MetricCounter( + temporalio.common.MetricCounter, + _MetricCommon[temporalio.bridge.metric.MetricCounter], +): + def add( self, - name: str, - description: Optional[str], - unit: Optional[str], - core_metric: temporalio.bridge.metric.MetricHistogram, - core_attrs: temporalio.bridge.metric.MetricAttributes, + value: int, + additional_attributes: Optional[temporalio.common.MetricAttributes] = None, ) -> None: - self._name = name - self._description = description - self._unit = unit - self._core_metric = core_metric - self._core_attrs = core_attrs - - @property - def name(self) -> str: - return self._name - - @property - def description(self) -> Optional[str]: - return self._description + if value < 0: + raise ValueError("Metric value cannot be negative") + core_attrs = self._core_attrs + if additional_attributes: + core_attrs = core_attrs.with_additional_attributes(additional_attributes) + self._core_metric.add(value, core_attrs) - @property - def unit(self) -> Optional[str]: - return self._unit +class _MetricHistogram( + temporalio.common.MetricHistogram, + _MetricCommon[temporalio.bridge.metric.MetricHistogram], +): def record( self, value: int, @@ -597,45 +650,49 @@ def record( core_attrs = core_attrs.with_additional_attributes(additional_attributes) self._core_metric.record(value, core_attrs) - def with_additional_attributes( - self, additional_attributes: temporalio.common.MetricAttributes - ) -> temporalio.common.MetricHistogram: - return _MetricHistogram( - self._name, - self._description, - self._unit, - self._core_metric, - self._core_attrs.with_additional_attributes(additional_attributes), - ) - -class _MetricGauge(temporalio.common.MetricGauge): - def __init__( +class _MetricHistogramFloat( + temporalio.common.MetricHistogramFloat, + _MetricCommon[temporalio.bridge.metric.MetricHistogramFloat], +): + def record( self, - name: str, - description: Optional[str], - unit: Optional[str], - core_metric: temporalio.bridge.metric.MetricGauge, - core_attrs: temporalio.bridge.metric.MetricAttributes, + value: float, + additional_attributes: Optional[temporalio.common.MetricAttributes] = None, ) -> None: - self._name = name - self._description = description - self._unit = unit - self._core_metric = core_metric - self._core_attrs = core_attrs + if value < 0: + raise ValueError("Metric value cannot be negative") + core_attrs = self._core_attrs + if additional_attributes: + core_attrs = core_attrs.with_additional_attributes(additional_attributes) + self._core_metric.record(value, core_attrs) - @property - def name(self) -> str: - return self._name - @property - def description(self) -> Optional[str]: - return self._description +class _MetricHistogramTimedelta( + temporalio.common.MetricHistogramTimedelta, + _MetricCommon[temporalio.bridge.metric.MetricHistogramDuration], +): + def record( + self, + value: timedelta, + additional_attributes: Optional[temporalio.common.MetricAttributes] = None, + ) -> None: + if value.days < 0: + raise ValueError("Metric value cannot be negative") + core_attrs = self._core_attrs + if additional_attributes: + core_attrs = core_attrs.with_additional_attributes(additional_attributes) + self._core_metric.record( + (value.days * 86400 * 1000) + + (value.seconds * 1000) + + (value.microseconds // 1000), + core_attrs, + ) - @property - def unit(self) -> Optional[str]: - return self._unit +class _MetricGauge( + temporalio.common.MetricGauge, _MetricCommon[temporalio.bridge.metric.MetricGauge] +): def set( self, value: int, @@ -648,13 +705,19 @@ def set( core_attrs = core_attrs.with_additional_attributes(additional_attributes) self._core_metric.set(value, core_attrs) - def with_additional_attributes( - self, additional_attributes: temporalio.common.MetricAttributes - ) -> temporalio.common.MetricGauge: - return _MetricGauge( - self._name, - self._description, - self._unit, - self._core_metric, - self._core_attrs.with_additional_attributes(additional_attributes), - ) + +class _MetricGaugeFloat( + temporalio.common.MetricGaugeFloat, + _MetricCommon[temporalio.bridge.metric.MetricGaugeFloat], +): + def set( + self, + value: float, + additional_attributes: Optional[temporalio.common.MetricAttributes] = None, + ) -> None: + if value < 0: + raise ValueError("Metric value cannot be negative") + core_attrs = self._core_attrs + if additional_attributes: + core_attrs = core_attrs.with_additional_attributes(additional_attributes) + self._core_metric.set(value, core_attrs) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index afa742a2..39536121 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -23,6 +23,7 @@ Deque, Dict, Generator, + Generic, Iterator, List, Mapping, @@ -38,7 +39,7 @@ cast, ) -from typing_extensions import TypeAlias, TypedDict +from typing_extensions import Self, TypeAlias, TypedDict import temporalio.activity import temporalio.api.common.v1 @@ -2448,6 +2449,20 @@ def create_histogram( self._underlying.create_histogram(name, description, unit) ) + def create_histogram_float( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> temporalio.common.MetricHistogramFloat: + return _ReplaySafeMetricHistogramFloat( + self._underlying.create_histogram_float(name, description, unit) + ) + + def create_histogram_timedelta( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> temporalio.common.MetricHistogramTimedelta: + return _ReplaySafeMetricHistogramTimedelta( + self._underlying.create_histogram_timedelta(name, description, unit) + ) + def create_gauge( self, name: str, description: Optional[str] = None, unit: Optional[str] = None ) -> temporalio.common.MetricGauge: @@ -2455,6 +2470,13 @@ def create_gauge( self._underlying.create_gauge(name, description, unit) ) + def create_gauge_float( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> temporalio.common.MetricGaugeFloat: + return _ReplaySafeMetricGaugeFloat( + self._underlying.create_gauge_float(name, description, unit) + ) + def with_additional_attributes( self, additional_attributes: temporalio.common.MetricAttributes ) -> temporalio.common.MetricMeter: @@ -2463,8 +2485,11 @@ def with_additional_attributes( ) -class _ReplaySafeMetricCounter(temporalio.common.MetricCounter): - def __init__(self, underlying: temporalio.common.MetricCounter) -> None: +_MetricType = TypeVar("_MetricType", bound=temporalio.common.MetricCommon) + + +class _ReplaySafeMetricCommon(temporalio.common.MetricCommon, Generic[_MetricType]): + def __init__(self, underlying: _MetricType) -> None: self._underlying = underlying @property @@ -2479,6 +2504,18 @@ def description(self) -> Optional[str]: def unit(self) -> Optional[str]: return self._underlying.unit + def with_additional_attributes( + self, additional_attributes: temporalio.common.MetricAttributes + ) -> Self: + return self.__class__( + self._underlying.with_additional_attributes(additional_attributes) + ) + + +class _ReplaySafeMetricCounter( + temporalio.common.MetricCounter, + _ReplaySafeMetricCommon[temporalio.common.MetricCounter], +): def add( self, value: int, @@ -2487,30 +2524,11 @@ def add( if not temporalio.workflow.unsafe.is_replaying(): self._underlying.add(value, additional_attributes) - def with_additional_attributes( - self, additional_attributes: temporalio.common.MetricAttributes - ) -> temporalio.common.MetricCounter: - return _ReplaySafeMetricCounter( - self._underlying.with_additional_attributes(additional_attributes) - ) - - -class _ReplaySafeMetricHistogram(temporalio.common.MetricHistogram): - def __init__(self, underlying: temporalio.common.MetricHistogram) -> None: - self._underlying = underlying - - @property - def name(self) -> str: - return self._underlying.name - - @property - def description(self) -> Optional[str]: - return self._underlying.description - - @property - def unit(self) -> Optional[str]: - return self._underlying.unit +class _ReplaySafeMetricHistogram( + temporalio.common.MetricHistogram, + _ReplaySafeMetricCommon[temporalio.common.MetricHistogram], +): def record( self, value: int, @@ -2519,30 +2537,37 @@ def record( if not temporalio.workflow.unsafe.is_replaying(): self._underlying.record(value, additional_attributes) - def with_additional_attributes( - self, additional_attributes: temporalio.common.MetricAttributes - ) -> temporalio.common.MetricHistogram: - return _ReplaySafeMetricHistogram( - self._underlying.with_additional_attributes(additional_attributes) - ) - -class _ReplaySafeMetricGauge(temporalio.common.MetricGauge): - def __init__(self, underlying: temporalio.common.MetricGauge) -> None: - self._underlying = underlying +class _ReplaySafeMetricHistogramFloat( + temporalio.common.MetricHistogramFloat, + _ReplaySafeMetricCommon[temporalio.common.MetricHistogramFloat], +): + def record( + self, + value: float, + additional_attributes: Optional[temporalio.common.MetricAttributes] = None, + ) -> None: + if not temporalio.workflow.unsafe.is_replaying(): + self._underlying.record(value, additional_attributes) - @property - def name(self) -> str: - return self._underlying.name - @property - def description(self) -> Optional[str]: - return self._underlying.description +class _ReplaySafeMetricHistogramTimedelta( + temporalio.common.MetricHistogramTimedelta, + _ReplaySafeMetricCommon[temporalio.common.MetricHistogramTimedelta], +): + def record( + self, + value: timedelta, + additional_attributes: Optional[temporalio.common.MetricAttributes] = None, + ) -> None: + if not temporalio.workflow.unsafe.is_replaying(): + self._underlying.record(value, additional_attributes) - @property - def unit(self) -> Optional[str]: - return self._underlying.unit +class _ReplaySafeMetricGauge( + temporalio.common.MetricGauge, + _ReplaySafeMetricCommon[temporalio.common.MetricGauge], +): def set( self, value: int, @@ -2551,9 +2576,15 @@ def set( if not temporalio.workflow.unsafe.is_replaying(): self._underlying.set(value, additional_attributes) - def with_additional_attributes( - self, additional_attributes: temporalio.common.MetricAttributes - ) -> temporalio.common.MetricGauge: - return _ReplaySafeMetricGauge( - self._underlying.with_additional_attributes(additional_attributes) - ) + +class _ReplaySafeMetricGaugeFloat( + temporalio.common.MetricGaugeFloat, + _ReplaySafeMetricCommon[temporalio.common.MetricGaugeFloat], +): + def set( + self, + value: float, + additional_attributes: Optional[temporalio.common.MetricAttributes] = None, + ) -> None: + if not temporalio.workflow.unsafe.is_replaying(): + self._underlying.set(value, additional_attributes) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 3ab2c4bd..3eac77e7 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -80,6 +80,7 @@ BUFFERED_METRIC_KIND_COUNTER, BUFFERED_METRIC_KIND_HISTOGRAM, MetricBuffer, + MetricBufferDurationFormat, PrometheusConfig, Runtime, TelemetryConfig, @@ -3794,6 +3795,76 @@ async def test_workflow_buffered_metrics(client: Client): ) +async def test_workflow_metrics_other_types(client: Client): + async def do_stuff(buffer: MetricBuffer) -> None: + runtime = Runtime(telemetry=TelemetryConfig(metrics=buffer)) + new_client = await Client.connect( + client.service_client.config.target_host, + namespace=client.namespace, + runtime=runtime, + ) + async with new_worker(new_client, HelloWorkflow) as worker: + await new_client.execute_workflow( + HelloWorkflow.run, + "Temporal", + id=f"wf-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + # Also, add some manual types beyond the defaults tested in other tests + runtime.metric_meter.create_histogram_float("my-histogram-float").record(1.23) + runtime.metric_meter.create_histogram_timedelta( + "my-histogram-timedelta" + ).record(timedelta(days=2, seconds=3, milliseconds=4)) + runtime.metric_meter.create_gauge_float("my-gauge-float").set(4.56) + + # Create a buffer, do stuff, check the metrics + buffer = MetricBuffer(10000) + await do_stuff(buffer) + updates = buffer.retrieve_updates() + assert any( + u.metric.name == "temporal_workflow_task_execution_latency" + # Took more than 3ms + and u.value > 3 and isinstance(u.value, int) and u.metric.unit == "ms" + for u in updates + ) + assert any( + u.metric.name == "my-histogram-float" + and u.value == 1.23 + and isinstance(u.value, float) + for u in updates + ) + assert any( + u.metric.name == "my-histogram-timedelta" + and u.value + == int(timedelta(days=2, seconds=3, milliseconds=4).total_seconds() * 1000) + and isinstance(u.value, int) + for u in updates + ) + assert any( + u.metric.name == "my-gauge-float" + and u.value == 4.56 + and isinstance(u.value, float) + for u in updates + ) + + # Do it again with seconds + buffer = MetricBuffer(10000, duration_format=MetricBufferDurationFormat.SECONDS) + await do_stuff(buffer) + updates = buffer.retrieve_updates() + assert any( + u.metric.name == "temporal_workflow_task_execution_latency" + # Took less than 3s + and u.value < 3 and isinstance(u.value, float) and u.metric.unit == "s" + for u in updates + ) + assert any( + u.metric.name == "my-histogram-timedelta" + and u.value == timedelta(days=2, seconds=3, milliseconds=4).total_seconds() + and isinstance(u.value, float) + for u in updates + ) + + bad_validator_fail_ct = 0 task_fail_ct = 0