From 3fade953bb131e8c6095ab4f17704244bef0b8f3 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Mon, 25 Sep 2023 08:58:45 -0500 Subject: [PATCH] Custom metric support (#384) --- temporalio/activity.py | 43 +++ temporalio/bridge/metric.py | 111 ++++++++ temporalio/bridge/src/lib.rs | 14 + temporalio/bridge/src/metric.rs | 175 +++++++++++++ temporalio/common.py | 334 +++++++++++++++++++++++- temporalio/runtime.py | 224 ++++++++++++++++ temporalio/testing/_activity.py | 6 + temporalio/worker/_activity.py | 10 + temporalio/worker/_replayer.py | 1 + temporalio/worker/_worker.py | 4 + temporalio/worker/_workflow.py | 5 + temporalio/worker/_workflow_instance.py | 158 ++++++++++- temporalio/workflow.py | 16 ++ tests/helpers/__init__.py | 9 + tests/test_runtime.py | 10 +- tests/worker/test_workflow.py | 154 ++++++++++- 16 files changed, 1257 insertions(+), 17 deletions(-) create mode 100644 temporalio/bridge/metric.py create mode 100644 temporalio/bridge/src/metric.rs diff --git a/temporalio/activity.py b/temporalio/activity.py index 565906ed..f1914f64 100644 --- a/temporalio/activity.py +++ b/temporalio/activity.py @@ -149,8 +149,10 @@ class _Context: Type[temporalio.converter.PayloadConverter], temporalio.converter.PayloadConverter, ] + runtime_metric_meter: Optional[temporalio.common.MetricMeter] _logger_details: Optional[Mapping[str, Any]] = None _payload_converter: Optional[temporalio.converter.PayloadConverter] = None + _metric_meter: Optional[temporalio.common.MetricMeter] = None @staticmethod def current() -> _Context: @@ -185,6 +187,29 @@ def payload_converter(self) -> temporalio.converter.PayloadConverter: self._payload_converter = self.payload_converter_class_or_instance() return self._payload_converter + @property + def metric_meter(self) -> temporalio.common.MetricMeter: + # If there isn't a runtime metric meter, then we're in a non-threaded + # sync function and we don't support cross-process metrics + if not self.runtime_metric_meter: + raise RuntimeError( + "Metrics meter not available in non-threaded sync activities like mulitprocess" + ) + # Create the meter lazily if not already created. We are ok creating + # multiple in the rare race where a user calls this property on + # different threads inside the same activity. The meter is immutable and + # it's better than a lock. + if not self._metric_meter: + info = self.info() + self._metric_meter = self.runtime_metric_meter.with_additional_attributes( + { + "namespace": info.workflow_namespace, + "task_queue": info.task_queue, + "activity_type": info.activity_type, + } + ) + return self._metric_meter + @dataclass class _CompositeEvent: @@ -377,6 +402,24 @@ def payload_converter() -> temporalio.converter.PayloadConverter: return _Context.current().payload_converter +def metric_meter() -> temporalio.common.MetricMeter: + """Get the metric meter for the current activity. + + .. warning:: + This is only available in async or synchronous threaded activities. An + error is raised on non-thread-based sync activities when trying to + access this. + + Returns: + Current metric meter for this activity for recording metrics. + + Raises: + RuntimeError: When not in an activity or in a non-thread-based + synchronous activity. + """ + return _Context.current().metric_meter + + class LoggerAdapter(logging.LoggerAdapter): """Adapter that adds details to the log about the running activity. diff --git a/temporalio/bridge/metric.py b/temporalio/bridge/metric.py new file mode 100644 index 00000000..396a734f --- /dev/null +++ b/temporalio/bridge/metric.py @@ -0,0 +1,111 @@ +"""Metrics using SDK Core. (unstable) + +Nothing in this module should be considered stable. The API may change. +""" + +from __future__ import annotations + +from typing import Mapping, Optional, Union + +import temporalio.bridge.runtime +import temporalio.bridge.temporal_sdk_bridge + + +class MetricMeter: + """Metric meter using SDK Core.""" + + @staticmethod + def create(runtime: temporalio.bridge.runtime.Runtime) -> Optional[MetricMeter]: + """Create optional metric meter.""" + ref = temporalio.bridge.temporal_sdk_bridge.new_metric_meter(runtime._ref) + if not ref: + return None + return MetricMeter(ref) + + def __init__( + self, ref: temporalio.bridge.temporal_sdk_bridge.MetricMeterRef + ) -> None: + """Initialize metric meter.""" + self._ref = ref + self._default_attributes = MetricAttributes(ref.default_attributes) + + @property + def default_attributes(self) -> MetricAttributes: + """Default attributes for the metric meter.""" + return self._default_attributes + + +class MetricCounter: + """Metric counter using SDK Core.""" + + def __init__( + self, + meter: MetricMeter, + name: str, + description: Optional[str], + unit: Optional[str], + ) -> None: + """Initialize counter metric.""" + self._ref = meter._ref.new_counter(name, description, unit) + + def add(self, value: int, attrs: MetricAttributes) -> None: + """Add value to counter.""" + if value < 0: + raise ValueError("Metric value must be non-negative value") + self._ref.add(value, attrs._ref) + + +class MetricHistogram: + """Metric histogram using SDK Core.""" + + def __init__( + self, + meter: MetricMeter, + name: str, + description: Optional[str], + unit: Optional[str], + ) -> None: + """Initialize histogram.""" + self._ref = meter._ref.new_histogram(name, description, unit) + + def record(self, value: int, attrs: MetricAttributes) -> None: + """Record value on histogram.""" + if value < 0: + raise ValueError("Metric value must be non-negative value") + self._ref.record(value, attrs._ref) + + +class MetricGauge: + """Metric gauge using SDK Core.""" + + def __init__( + self, + meter: MetricMeter, + name: str, + description: Optional[str], + unit: Optional[str], + ) -> None: + """Initialize gauge.""" + self._ref = meter._ref.new_gauge(name, description, unit) + + def set(self, value: int, attrs: MetricAttributes) -> None: + """Set value on gauge.""" + if value < 0: + raise ValueError("Metric value must be non-negative value") + self._ref.set(value, attrs._ref) + + +class MetricAttributes: + """Metric attributes using SDK Core.""" + + def __init__( + self, ref: temporalio.bridge.temporal_sdk_bridge.MetricAttributesRef + ) -> None: + """Initialize attributes.""" + self._ref = ref + + def with_additional_attributes( + self, new_attrs: Mapping[str, Union[str, int, float, bool]] + ) -> MetricAttributes: + """Create new attributes with new attributes appended.""" + return MetricAttributes(self._ref.with_additional_attributes(new_attrs)) diff --git a/temporalio/bridge/src/lib.rs b/temporalio/bridge/src/lib.rs index 583c2992..31164c03 100644 --- a/temporalio/bridge/src/lib.rs +++ b/temporalio/bridge/src/lib.rs @@ -2,6 +2,7 @@ use pyo3::prelude::*; use pyo3::types::PyTuple; mod client; +mod metric; mod runtime; mod testing; mod worker; @@ -13,6 +14,14 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_function(wrap_pyfunction!(connect_client, m)?)?; + // Metric stuff + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_function(wrap_pyfunction!(new_metric_meter, m)?)?; + // Runtime stuff m.add_class::()?; m.add_function(wrap_pyfunction!(init_runtime, m)?)?; @@ -44,6 +53,11 @@ fn connect_client<'a>( client::connect_client(py, &runtime_ref, config) } +#[pyfunction] +fn new_metric_meter(runtime_ref: &runtime::RuntimeRef) -> Option { + metric::new_metric_meter(&runtime_ref) +} + #[pyfunction] fn init_runtime(telemetry_config: runtime::TelemetryConfig) -> PyResult { runtime::init_runtime(telemetry_config) diff --git a/temporalio/bridge/src/metric.rs b/temporalio/bridge/src/metric.rs new file mode 100644 index 00000000..4e736448 --- /dev/null +++ b/temporalio/bridge/src/metric.rs @@ -0,0 +1,175 @@ +use std::{collections::HashMap, sync::Arc}; + +use pyo3::exceptions::PyTypeError; +use pyo3::prelude::*; +use temporal_sdk_core_api::telemetry::metrics; + +use crate::runtime; + +#[pyclass] +pub struct MetricMeterRef { + meter: metrics::TemporalMeter, + #[pyo3(get)] + default_attributes: MetricAttributesRef, +} + +#[pyclass] +#[derive(Clone)] +pub struct MetricAttributesRef { + attrs: metrics::MetricAttributes, +} + +#[pyclass] +pub struct MetricCounterRef { + counter: Arc, +} + +#[pyclass] +pub struct MetricHistogramRef { + histogram: Arc, +} + +#[pyclass] +pub struct MetricGaugeRef { + gauge: Arc, +} + +pub fn new_metric_meter(runtime_ref: &runtime::RuntimeRef) -> Option { + runtime_ref + .runtime + .core + .telemetry() + .get_metric_meter() + .map(|meter| { + let default_attributes = MetricAttributesRef { + attrs: meter.inner.new_attributes(meter.default_attribs.clone()), + }; + MetricMeterRef { + meter, + default_attributes, + } + }) +} + +#[pymethods] +impl MetricMeterRef { + fn new_counter( + &self, + name: String, + description: Option, + unit: Option, + ) -> MetricCounterRef { + MetricCounterRef { + counter: self + .meter + .inner + .counter(build_metric_parameters(name, description, unit)), + } + } + + fn new_histogram( + &self, + name: String, + description: Option, + unit: Option, + ) -> MetricHistogramRef { + MetricHistogramRef { + histogram: self + .meter + .inner + .histogram(build_metric_parameters(name, description, unit)), + } + } + + fn new_gauge( + &self, + name: String, + description: Option, + unit: Option, + ) -> MetricGaugeRef { + MetricGaugeRef { + gauge: self + .meter + .inner + .gauge(build_metric_parameters(name, description, unit)), + } + } +} + +#[pymethods] +impl MetricCounterRef { + fn add(&self, value: u64, attrs_ref: &MetricAttributesRef) { + self.counter.add(value, &attrs_ref.attrs); + } +} + +#[pymethods] +impl MetricHistogramRef { + fn record(&self, value: u64, attrs_ref: &MetricAttributesRef) { + self.histogram.record(value, &attrs_ref.attrs); + } +} + +#[pymethods] +impl MetricGaugeRef { + fn set(&self, value: u64, attrs_ref: &MetricAttributesRef) { + self.gauge.record(value, &attrs_ref.attrs); + } +} + +fn build_metric_parameters( + name: String, + description: Option, + unit: Option, +) -> metrics::MetricParameters { + let mut build = metrics::MetricParametersBuilder::default(); + build.name(name); + if let Some(description) = description { + build.description(description); + } + if let Some(unit) = unit { + build.unit(unit); + } + // Should be nothing that would fail validation here + build.build().unwrap() +} + +#[pymethods] +impl MetricAttributesRef { + fn with_additional_attributes<'p>( + &self, + py: Python<'p>, + new_attrs: HashMap, + ) -> PyResult { + let mut attrs = self.attrs.clone(); + attrs.add_new_attrs( + new_attrs + .into_iter() + .map(|(k, obj)| metric_key_value_from_py(py, k, obj)) + .collect::>>()?, + ); + Ok(MetricAttributesRef { attrs }) + } +} + +fn metric_key_value_from_py<'p>( + py: Python<'p>, + k: String, + obj: PyObject, +) -> PyResult { + let val = if let Ok(v) = obj.extract::(py) { + metrics::MetricValue::String(v) + } else if let Ok(v) = obj.extract::(py) { + metrics::MetricValue::Bool(v) + } else if let Ok(v) = obj.extract::(py) { + metrics::MetricValue::Int(v) + } else if let Ok(v) = obj.extract::(py) { + metrics::MetricValue::Float(v) + } else { + return Err(PyTypeError::new_err(format!( + "Invalid value type for key {}, must be str, int, float, or bool", + k + ))); + }; + Ok(metrics::MetricKeyValue::new(k, val)) +} diff --git a/temporalio/common.py b/temporalio/common.py index c31c84bf..e023e4a2 100644 --- a/temporalio/common.py +++ b/temporalio/common.py @@ -4,6 +4,7 @@ import inspect import types +from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime, timedelta from enum import IntEnum @@ -22,7 +23,7 @@ ) import google.protobuf.internal.containers -from typing_extensions import TypeAlias +from typing_extensions import ClassVar, TypeAlias import temporalio.api.common.v1 import temporalio.api.enums.v1 @@ -175,6 +176,337 @@ def __setstate__(self, state: object) -> None: SearchAttributes: TypeAlias = Mapping[str, SearchAttributeValues] +MetricAttributes: TypeAlias = Mapping[str, Union[str, int, float, bool]] + + +class MetricMeter(ABC): + """Metric meter for recording metrics.""" + + noop: ClassVar[MetricMeter] + """Metric meter implementation that does nothing.""" + + @abstractmethod + def create_counter( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> MetricCounter: + """Create a counter metric for adding values. + + Args: + name: Name for the metric. + description: Optional description for the metric. + unit: Optional unit for the metric. + + Returns: + Counter metric. + """ + ... + + @abstractmethod + def create_histogram( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> MetricHistogram: + """Create a histogram metric for recording values. + + Args: + name: Name for the metric. + description: Optional description for the metric. + unit: Optional unit for the metric. + + Returns: + Histogram metric. + """ + ... + + @abstractmethod + def create_gauge( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> MetricGauge: + """Create a gauge metric for setting values. + + Args: + name: Name for the metric. + description: Optional description for the metric. + unit: Optional unit for the metric. + + Returns: + Gauge metric. + """ + ... + + @abstractmethod + def with_additional_attributes( + self, additional_attributes: MetricAttributes + ) -> MetricMeter: + """Create a new metric meter with the given attributes appended to the + current set. + + Args: + additional_attributes: Additional attributes to append to the + current set. + + Returns: + New metric meter. + + Raises: + TypeError: Attribute values are not the expected type. + """ + ... + + +class MetricCounter(ABC): + """Counter metric created by a metric meter.""" + + @property + @abstractmethod + def name(self) -> str: + """Name for the metric.""" + ... + + @property + @abstractmethod + def description(self) -> Optional[str]: + """Description for the metric if any.""" + ... + + @property + @abstractmethod + def unit(self) -> Optional[str]: + """Unit for the metric if any.""" + ... + + @abstractmethod + def add( + self, value: int, additional_attributes: Optional[MetricAttributes] = None + ) -> None: + """Add a value to the counter. + + Args: + value: A non-negative integer to add. + additional_attributes: Additional attributes to append to the + current set. + + Raises: + ValueError: Value is negative. + TypeError: Attribute values are not the expected type. + """ + ... + + @abstractmethod + def with_additional_attributes( + self, additional_attributes: MetricAttributes + ) -> MetricCounter: + """Create a new counter with the given attributes appended to the + current set. + + Args: + additional_attributes: Additional attributes to append to the + current set. + + Returns: + New counter. + + Raises: + TypeError: Attribute values are not the expected type. + """ + ... + + +class MetricHistogram(ABC): + """Histogram metric created by a metric meter.""" + + @property + @abstractmethod + def name(self) -> str: + """Name for the metric.""" + ... + + @property + @abstractmethod + def description(self) -> Optional[str]: + """Description for the metric if any.""" + ... + + @property + @abstractmethod + def unit(self) -> Optional[str]: + """Unit for the metric if any.""" + ... + + @abstractmethod + def record( + self, value: int, additional_attributes: Optional[MetricAttributes] = None + ) -> None: + """Record a value on the histogram. + + Args: + value: A non-negative integer to record. + additional_attributes: Additional attributes to append to the + current set. + + Raises: + ValueError: Value is negative. + TypeError: Attribute values are not the expected type. + """ + ... + + @abstractmethod + def with_additional_attributes( + self, additional_attributes: MetricAttributes + ) -> MetricHistogram: + """Create a new histogram with the given attributes appended to the + current set. + + Args: + additional_attributes: Additional attributes to append to the + current set. + + Returns: + New histogram. + + Raises: + TypeError: Attribute values are not the expected type. + """ + ... + + +class MetricGauge(ABC): + """Gauge metric created by a metric meter.""" + + @property + @abstractmethod + def name(self) -> str: + """Name for the metric.""" + ... + + @property + @abstractmethod + def description(self) -> Optional[str]: + """Description for the metric if any.""" + ... + + @property + @abstractmethod + def unit(self) -> Optional[str]: + """Unit for the metric if any.""" + ... + + @abstractmethod + def set( + self, value: int, additional_attributes: Optional[MetricAttributes] = None + ) -> None: + """Set a value on the gauge. + + Args: + value: A non-negative integer to set. + additional_attributes: Additional attributes to append to the + current set. + + Raises: + ValueError: Value is negative. + TypeError: Attribute values are not the expected type. + """ + ... + + @abstractmethod + def with_additional_attributes( + self, additional_attributes: MetricAttributes + ) -> MetricGauge: + """Create a new gauge with the given attributes appended to the + current set. + + Args: + additional_attributes: Additional attributes to append to the + current set. + + Returns: + New gauge. + + Raises: + TypeError: Attribute values are not the expected type. + """ + ... + + +class _NoopMetricMeter(MetricMeter): + def create_counter( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> MetricCounter: + return _NoopMetricCounter(name, description, unit) + + def create_histogram( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> MetricHistogram: + return _NoopMetricHistogram(name, description, unit) + + def create_gauge( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> MetricGauge: + return _NoopMetricGauge(name, description, unit) + + def with_additional_attributes( + self, additional_attributes: MetricAttributes + ) -> MetricMeter: + return self + + +class _NoopMetric: + def __init__( + self, name: str, description: Optional[str], unit: Optional[str] + ) -> None: + self._name = name + self._description = description + self._unit = unit + + @property + def name(self) -> str: + return self._name + + @property + def description(self) -> Optional[str]: + return self._description + + @property + def unit(self) -> Optional[str]: + return self._unit + + +class _NoopMetricCounter(_NoopMetric, MetricCounter): + def add( + self, value: int, additional_attributes: Optional[MetricAttributes] = None + ) -> None: + pass + + def with_additional_attributes( + self, additional_attributes: MetricAttributes + ) -> MetricCounter: + return self + + +class _NoopMetricHistogram(_NoopMetric, MetricHistogram): + def record( + self, value: int, additional_attributes: Optional[MetricAttributes] = None + ) -> None: + pass + + def with_additional_attributes( + self, additional_attributes: MetricAttributes + ) -> MetricHistogram: + return self + + +class _NoopMetricGauge(_NoopMetric, MetricGauge): + def set( + self, value: int, additional_attributes: Optional[MetricAttributes] = None + ) -> None: + pass + + def with_additional_attributes( + self, additional_attributes: MetricAttributes + ) -> MetricGauge: + return self + + +MetricMeter.noop = _NoopMetricMeter() # Should be set as the "arg" argument for _arg_or_args checks where the argument # is unset. This is different than None which is a legitimate argument. diff --git a/temporalio/runtime.py b/temporalio/runtime.py index 232467f3..25f50bde 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -10,7 +10,9 @@ from enum import Enum from typing import ClassVar, Mapping, Optional, Union +import temporalio.bridge.metric import temporalio.bridge.runtime +import temporalio.common _default_runtime: Optional[Runtime] = None @@ -64,6 +66,18 @@ def __init__(self, *, telemetry: TelemetryConfig) -> None: self._core_runtime = temporalio.bridge.runtime.Runtime( telemetry=telemetry._to_bridge_config() ) + core_meter = temporalio.bridge.metric.MetricMeter.create(self._core_runtime) + if not core_meter: + self._metric_meter = temporalio.common.MetricMeter.noop + else: + self._metric_meter = _MetricMeter(core_meter, core_meter.default_attributes) + + @property + def metric_meter(self) -> temporalio.common.MetricMeter: + """Metric meter for this runtime. This is a no-op metric meter if no + metrics were configured. + """ + return self._metric_meter @dataclass @@ -197,3 +211,213 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.TelemetryConfig: metric_prefix=self.metric_prefix, ), ) + + +class _MetricMeter(temporalio.common.MetricMeter): + def __init__( + self, + core_meter: temporalio.bridge.metric.MetricMeter, + core_attrs: temporalio.bridge.metric.MetricAttributes, + ) -> None: + self._core_meter = core_meter + self._core_attrs = core_attrs + + def create_counter( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> temporalio.common.MetricCounter: + return _MetricCounter( + name, + description, + unit, + temporalio.bridge.metric.MetricCounter( + self._core_meter, name, description, unit + ), + self._core_attrs, + ) + + def create_histogram( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> temporalio.common.MetricHistogram: + return _MetricHistogram( + name, + description, + unit, + temporalio.bridge.metric.MetricHistogram( + self._core_meter, name, description, unit + ), + self._core_attrs, + ) + + def create_gauge( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> temporalio.common.MetricGauge: + return _MetricGauge( + name, + description, + unit, + temporalio.bridge.metric.MetricGauge( + self._core_meter, name, description, unit + ), + self._core_attrs, + ) + + def with_additional_attributes( + self, additional_attributes: temporalio.common.MetricAttributes + ) -> temporalio.common.MetricMeter: + return _MetricMeter( + self._core_meter, + self._core_attrs.with_additional_attributes(additional_attributes), + ) + + +class _MetricCounter(temporalio.common.MetricCounter): + def __init__( + self, + name: str, + description: Optional[str], + unit: Optional[str], + core_metric: temporalio.bridge.metric.MetricCounter, + core_attrs: temporalio.bridge.metric.MetricAttributes, + ) -> None: + self._name = name + self._description = description + self._unit = unit + self._core_metric = core_metric + self._core_attrs = core_attrs + + @property + def name(self) -> str: + return self._name + + @property + def description(self) -> Optional[str]: + return self._description + + @property + def unit(self) -> Optional[str]: + return self._unit + + def add( + self, + value: int, + additional_attributes: Optional[temporalio.common.MetricAttributes] = None, + ) -> None: + if value < 0: + raise ValueError("Metric value cannot be negative") + core_attrs = self._core_attrs + if additional_attributes: + core_attrs = core_attrs.with_additional_attributes(additional_attributes) + self._core_metric.add(value, core_attrs) + + def with_additional_attributes( + self, additional_attributes: temporalio.common.MetricAttributes + ) -> temporalio.common.MetricCounter: + return _MetricCounter( + self._name, + self._description, + self._unit, + self._core_metric, + self._core_attrs.with_additional_attributes(additional_attributes), + ) + + +class _MetricHistogram(temporalio.common.MetricHistogram): + def __init__( + self, + name: str, + description: Optional[str], + unit: Optional[str], + core_metric: temporalio.bridge.metric.MetricHistogram, + core_attrs: temporalio.bridge.metric.MetricAttributes, + ) -> None: + self._name = name + self._description = description + self._unit = unit + self._core_metric = core_metric + self._core_attrs = core_attrs + + @property + def name(self) -> str: + return self._name + + @property + def description(self) -> Optional[str]: + return self._description + + @property + def unit(self) -> Optional[str]: + return self._unit + + def record( + self, + value: int, + additional_attributes: Optional[temporalio.common.MetricAttributes] = None, + ) -> None: + if value < 0: + raise ValueError("Metric value cannot be negative") + core_attrs = self._core_attrs + if additional_attributes: + core_attrs = core_attrs.with_additional_attributes(additional_attributes) + self._core_metric.record(value, core_attrs) + + def with_additional_attributes( + self, additional_attributes: temporalio.common.MetricAttributes + ) -> temporalio.common.MetricHistogram: + return _MetricHistogram( + self._name, + self._description, + self._unit, + self._core_metric, + self._core_attrs.with_additional_attributes(additional_attributes), + ) + + +class _MetricGauge(temporalio.common.MetricGauge): + def __init__( + self, + name: str, + description: Optional[str], + unit: Optional[str], + core_metric: temporalio.bridge.metric.MetricGauge, + core_attrs: temporalio.bridge.metric.MetricAttributes, + ) -> None: + self._name = name + self._description = description + self._unit = unit + self._core_metric = core_metric + self._core_attrs = core_attrs + + @property + def name(self) -> str: + return self._name + + @property + def description(self) -> Optional[str]: + return self._description + + @property + def unit(self) -> Optional[str]: + return self._unit + + def set( + self, + value: int, + additional_attributes: Optional[temporalio.common.MetricAttributes] = None, + ) -> None: + if value < 0: + raise ValueError("Metric value cannot be negative") + core_attrs = self._core_attrs + if additional_attributes: + core_attrs = core_attrs.with_additional_attributes(additional_attributes) + self._core_metric.set(value, core_attrs) + + def with_additional_attributes( + self, additional_attributes: temporalio.common.MetricAttributes + ) -> temporalio.common.MetricGauge: + return _MetricGauge( + self._name, + self._description, + self._unit, + self._core_metric, + self._core_attrs.with_additional_attributes(additional_attributes), + ) diff --git a/temporalio/testing/_activity.py b/temporalio/testing/_activity.py index 2684a482..35ebce6e 100644 --- a/temporalio/testing/_activity.py +++ b/temporalio/testing/_activity.py @@ -12,6 +12,7 @@ from typing_extensions import ParamSpec import temporalio.activity +import temporalio.common import temporalio.converter import temporalio.exceptions import temporalio.worker._activity @@ -56,6 +57,9 @@ class ActivityEnvironment: payload_converter: Payload converter set on the activity context. This must be set before :py:meth:`run`. Changes after the activity has started do not take effect. + metric_meter: Metric meter set on the activity context. This must be set + before :py:meth:`run`. Changes after the activity has started do not + take effect. Default is noop. """ def __init__(self) -> None: @@ -65,6 +69,7 @@ def __init__(self) -> None: self.payload_converter = ( temporalio.converter.DataConverter.default.payload_converter ) + self.metric_meter = temporalio.common.MetricMeter.noop self._cancelled = False self._worker_shutdown = False self._activities: Set[_Activity] = set() @@ -147,6 +152,7 @@ def __init__( if not self.cancel_thread_raiser else self.cancel_thread_raiser.shielded, payload_converter_class_or_instance=env.payload_converter, + runtime_metric_meter=env.metric_meter, ) self.task: Optional[asyncio.Task] = None diff --git a/temporalio/worker/_activity.py b/temporalio/worker/_activity.py index 86d106c4..3bc0aac0 100644 --- a/temporalio/worker/_activity.py +++ b/temporalio/worker/_activity.py @@ -68,6 +68,7 @@ def __init__( shared_state_manager: Optional[SharedStateManager], data_converter: temporalio.converter.DataConverter, interceptors: Sequence[Interceptor], + metric_meter: temporalio.common.MetricMeter, ) -> None: self._bridge_worker = bridge_worker self._task_queue = task_queue @@ -76,6 +77,7 @@ def __init__( self._running_activities: Dict[bytes, _RunningActivity] = {} self._data_converter = data_converter self._interceptors = interceptors + self._metric_meter = metric_meter self._fail_worker_exception_queue: asyncio.Queue[Exception] = asyncio.Queue() # Lazily created on first activity self._worker_shutdown_event: Optional[ @@ -295,6 +297,7 @@ async def _run_activity( ) # Setup events + sync_non_threaded = False if not activity_def.is_async: running_activity.sync = True # If we're in a thread-pool executor we can use threading events @@ -310,6 +313,7 @@ async def _run_activity( if not activity_def.no_thread_cancel_exception: running_activity.cancel_thread_raiser = _ThreadExceptionRaiser() else: + sync_non_threaded = True manager = self._shared_state_manager # Pre-checked on worker init assert manager @@ -421,6 +425,9 @@ async def _run_activity( if not running_activity.cancel_thread_raiser else running_activity.cancel_thread_raiser.shielded, payload_converter_class_or_instance=self._data_converter.payload_converter, + runtime_metric_meter=None + if sync_non_threaded + else self._metric_meter, ) ) temporalio.activity.logger.debug("Starting activity") @@ -674,6 +681,7 @@ async def heartbeat_with_context(*details: Any) -> None: cancelled_event.thread_event, worker_shutdown_event.thread_event, payload_converter_class_or_instance, + ctx.runtime_metric_meter, input.fn, *input.args, ] @@ -720,6 +728,7 @@ def _execute_sync_activity( Type[temporalio.converter.PayloadConverter], temporalio.converter.PayloadConverter, ], + runtime_metric_meter: Optional[temporalio.common.MetricMeter], fn: Callable[..., Any], *args: Any, ) -> Any: @@ -750,6 +759,7 @@ def _execute_sync_activity( if not cancel_thread_raiser else cancel_thread_raiser.shielded, payload_converter_class_or_instance=payload_converter_class_or_instance, + runtime_metric_meter=runtime_metric_meter, ) ) return fn(*args) diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 44350e66..e856e0f8 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -225,6 +225,7 @@ def on_eviction_hook( data_converter=self._config["data_converter"], interceptors=self._config["interceptors"], debug_mode=self._config["debug_mode"], + metric_meter=runtime.metric_meter, on_eviction_hook=on_eviction_hook, disable_eager_activity_execution=False, ).run() diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 67187deb..a975936e 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -23,6 +23,7 @@ import temporalio.client import temporalio.converter import temporalio.exceptions +import temporalio.runtime import temporalio.service from ._activity import SharedStateManager, _ActivityWorker @@ -259,6 +260,7 @@ def __init__( # Create activity and workflow worker self._activity_worker: Optional[_ActivityWorker] = None + runtime = bridge_client.config.runtime or temporalio.runtime.Runtime.default() if activities: self._activity_worker = _ActivityWorker( bridge_worker=lambda: self._bridge_worker, @@ -268,6 +270,7 @@ def __init__( shared_state_manager=shared_state_manager, data_converter=client_config["data_converter"], interceptors=interceptors, + metric_meter=runtime.metric_meter, ) self._workflow_worker: Optional[_WorkflowWorker] = None if workflows: @@ -283,6 +286,7 @@ def __init__( interceptors=interceptors, debug_mode=debug_mode, disable_eager_activity_execution=disable_eager_activity_execution, + metric_meter=runtime.metric_meter, on_eviction_hook=None, ) diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 9c29f51f..d60ffaca 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -30,6 +30,7 @@ WorkflowInstance, WorkflowInstanceDetails, WorkflowRunner, + _WorkflowExternFunctions, ) logger = logging.getLogger(__name__) @@ -53,6 +54,7 @@ def __init__( interceptors: Sequence[Interceptor], debug_mode: bool, disable_eager_activity_execution: bool, + metric_meter: temporalio.common.MetricMeter, on_eviction_hook: Optional[ Callable[ [str, temporalio.bridge.proto.workflow_activation.RemoveFromCache], None @@ -83,6 +85,9 @@ def __init__( interceptor_class = i.workflow_interceptor_class(interceptor_class_input) if interceptor_class: self._interceptor_classes.append(interceptor_class) + self._extern_functions.update( + **_WorkflowExternFunctions(__temporal_get_metric_meter=lambda: metric_meter) + ) self._running_workflows: Dict[str, WorkflowInstance] = {} self._disable_eager_activity_execution = disable_eager_activity_execution self._on_eviction_hook = on_eviction_hook diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index fa0523a3..6701718e 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -37,7 +37,7 @@ cast, ) -from typing_extensions import TypeAlias +from typing_extensions import TypeAlias, TypedDict import temporalio.activity import temporalio.api.common.v1 @@ -94,7 +94,7 @@ def create_instance(self, det: WorkflowInstanceDetails) -> WorkflowInstance: """Create a workflow instance that can handle activations. Args: - det: Serializable details that can be used to create the instance. + det: Details that can be used to create the instance. Returns: Workflow instance that can handle activations. @@ -104,7 +104,7 @@ def create_instance(self, det: WorkflowInstanceDetails) -> WorkflowInstance: @dataclass(frozen=True) class WorkflowInstanceDetails: - """Immutable, serializable details for creating a workflow instance.""" + """Immutable details for creating a workflow instance.""" payload_converter_class: Type[temporalio.converter.PayloadConverter] failure_converter_class: Type[temporalio.converter.FailureConverter] @@ -256,6 +256,9 @@ def __init__(self, det: WorkflowInstanceDetails) -> None: # can check in some places to swallow these errors on tear down. self._deleting = False + # We only create the metric meter lazily + self._metric_meter: Optional[_ReplaySafeMetricMeter] = None + def __del__(self) -> None: # We have confirmed there are no super() versions of __del__ self._deleting = True @@ -793,6 +796,22 @@ def workflow_memo_value( [payload], [type_hint] if type_hint else None )[0] + def workflow_metric_meter(self) -> temporalio.common.MetricMeter: + # Create if not present, which means using an extern function + if not self._metric_meter: + metric_meter = cast(_WorkflowExternFunctions, self._extern_functions)[ + "__temporal_get_metric_meter" + ]() + metric_meter = metric_meter.with_additional_attributes( + { + "namespace": self._info.namespace, + "task_queue": self._info.task_queue, + "workflow_type": self._info.workflow_type, + } + ) + self._metric_meter = _ReplaySafeMetricMeter(metric_meter) + return self._metric_meter + def workflow_patch(self, id: str, *, deprecated: bool) -> bool: self._assert_not_read_only("patch") # We use a previous memoized result of this if present. If this is being @@ -2059,3 +2078,136 @@ def _encode_search_attributes( """Encode search attributes as bridge payloads.""" for k, vals in attributes.items(): payloads[k].CopyFrom(temporalio.converter.encode_search_attribute_values(vals)) + + +class _WorkflowExternFunctions(TypedDict): + __temporal_get_metric_meter: Callable[[], temporalio.common.MetricMeter] + + +class _ReplaySafeMetricMeter(temporalio.common.MetricMeter): + def __init__(self, underlying: temporalio.common.MetricMeter) -> None: + self._underlying = underlying + + def create_counter( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> temporalio.common.MetricCounter: + return _ReplaySafeMetricCounter( + self._underlying.create_counter(name, description, unit) + ) + + def create_histogram( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> temporalio.common.MetricHistogram: + return _ReplaySafeMetricHistogram( + self._underlying.create_histogram(name, description, unit) + ) + + def create_gauge( + self, name: str, description: Optional[str] = None, unit: Optional[str] = None + ) -> temporalio.common.MetricGauge: + return _ReplaySafeMetricGauge( + self._underlying.create_gauge(name, description, unit) + ) + + def with_additional_attributes( + self, additional_attributes: temporalio.common.MetricAttributes + ) -> temporalio.common.MetricMeter: + return _ReplaySafeMetricMeter( + self._underlying.with_additional_attributes(additional_attributes) + ) + + +class _ReplaySafeMetricCounter(temporalio.common.MetricCounter): + def __init__(self, underlying: temporalio.common.MetricCounter) -> None: + self._underlying = underlying + + @property + def name(self) -> str: + return self._underlying.name + + @property + def description(self) -> Optional[str]: + return self._underlying.description + + @property + def unit(self) -> Optional[str]: + return self._underlying.unit + + def add( + self, + value: int, + additional_attributes: Optional[temporalio.common.MetricAttributes] = None, + ) -> None: + if not temporalio.workflow.unsafe.is_replaying(): + self._underlying.add(value, additional_attributes) + + def with_additional_attributes( + self, additional_attributes: temporalio.common.MetricAttributes + ) -> temporalio.common.MetricCounter: + return _ReplaySafeMetricCounter( + self._underlying.with_additional_attributes(additional_attributes) + ) + + +class _ReplaySafeMetricHistogram(temporalio.common.MetricHistogram): + def __init__(self, underlying: temporalio.common.MetricHistogram) -> None: + self._underlying = underlying + + @property + def name(self) -> str: + return self._underlying.name + + @property + def description(self) -> Optional[str]: + return self._underlying.description + + @property + def unit(self) -> Optional[str]: + return self._underlying.unit + + def record( + self, + value: int, + additional_attributes: Optional[temporalio.common.MetricAttributes] = None, + ) -> None: + if not temporalio.workflow.unsafe.is_replaying(): + self._underlying.record(value, additional_attributes) + + def with_additional_attributes( + self, additional_attributes: temporalio.common.MetricAttributes + ) -> temporalio.common.MetricHistogram: + return _ReplaySafeMetricHistogram( + self._underlying.with_additional_attributes(additional_attributes) + ) + + +class _ReplaySafeMetricGauge(temporalio.common.MetricGauge): + def __init__(self, underlying: temporalio.common.MetricGauge) -> None: + self._underlying = underlying + + @property + def name(self) -> str: + return self._underlying.name + + @property + def description(self) -> Optional[str]: + return self._underlying.description + + @property + def unit(self) -> Optional[str]: + return self._underlying.unit + + def set( + self, + value: int, + additional_attributes: Optional[temporalio.common.MetricAttributes] = None, + ) -> None: + if not temporalio.workflow.unsafe.is_replaying(): + self._underlying.set(value, additional_attributes) + + def with_additional_attributes( + self, additional_attributes: temporalio.common.MetricAttributes + ) -> temporalio.common.MetricGauge: + return _ReplaySafeMetricGauge( + self._underlying.with_additional_attributes(additional_attributes) + ) diff --git a/temporalio/workflow.py b/temporalio/workflow.py index c8fd5d53..2d5a7646 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -468,6 +468,10 @@ def workflow_memo_value( ) -> Any: ... + @abstractmethod + def workflow_metric_meter(self) -> temporalio.common.MetricMeter: + ... + @abstractmethod def workflow_patch(self, id: str, *, deprecated: bool) -> bool: ... @@ -653,6 +657,18 @@ def memo_value( return _Runtime.current().workflow_memo_value(key, default, type_hint=type_hint) +def metric_meter() -> temporalio.common.MetricMeter: + """Get the metric meter for the current workflow. + + This meter is replay safe which means that metrics will not be recorded + during replay. + + Returns: + Current metric meter for this workflow for recording metrics. + """ + return _Runtime.current().workflow_metric_meter() + + def now() -> datetime: """Current time from the workflow perspective. diff --git a/tests/helpers/__init__.py b/tests/helpers/__init__.py index d15dcf88..a095dfe9 100644 --- a/tests/helpers/__init__.py +++ b/tests/helpers/__init__.py @@ -1,6 +1,8 @@ import asyncio +import socket import time import uuid +from contextlib import closing from datetime import timedelta from typing import Awaitable, Callable, Optional, Sequence, Type, TypeVar @@ -63,3 +65,10 @@ async def worker_versioning_enabled(client: Client) -> bool: if e.status in [RPCStatusCode.PERMISSION_DENIED, RPCStatusCode.UNIMPLEMENTED]: return False raise + + +def find_free_port() -> int: + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.bind(("", 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return s.getsockname()[1] diff --git a/tests/test_runtime.py b/tests/test_runtime.py index f3624002..b90ddace 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -1,19 +1,11 @@ -import socket import uuid -from contextlib import closing from urllib.request import urlopen from temporalio import workflow from temporalio.client import Client from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig from temporalio.worker import Worker - - -def find_free_port() -> int: - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: - s.bind(("", 0)) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - return s.getsockname()[1] +from tests.helpers import find_free_port @workflow.defn diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 9c08cea0..dccb048e 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -24,6 +24,7 @@ Tuple, cast, ) +from urllib.request import urlopen import pytest from google.protobuf.timestamp_pb2 import Timestamp @@ -65,6 +66,7 @@ TimeoutError, WorkflowAlreadyStartedError, ) +from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig from temporalio.service import RPCError, RPCStatusCode from temporalio.testing import WorkflowEnvironment from temporalio.worker import ( @@ -74,7 +76,7 @@ WorkflowInstanceDetails, WorkflowRunner, ) -from tests.helpers import assert_eq_eventually, new_worker +from tests.helpers import assert_eq_eventually, find_free_port, new_worker @workflow.defn @@ -1450,9 +1452,6 @@ def prepare_workflow(self, defn: workflow._Definition) -> None: pass def create_instance(self, det: WorkflowInstanceDetails) -> WorkflowInstance: - # We need to assert details can be pickled for potential sandbox use - det_pickled = pickle.loads(pickle.dumps(det)) - assert det == det_pickled return CustomWorkflowInstance(self, self._unsandboxed.create_instance(det)) @@ -3236,3 +3235,150 @@ async def test_workflow_annotated_with_self(client: Client): id=f"wf-{uuid.uuid4()}", task_queue=worker.task_queue, ) + + +@activity.defn +async def custom_metrics_activity() -> None: + counter = activity.metric_meter().create_counter( + "my-activity-counter", "my-activity-description", "my-activity-unit" + ) + counter.add(12) + counter.add(34, {"my-activity-extra-attr": 12.34}) + + +@workflow.defn +class CustomMetricsWorkflow: + @workflow.run + async def run(self) -> None: + await workflow.execute_activity( + custom_metrics_activity, schedule_to_close_timeout=timedelta(seconds=30) + ) + + histogram = workflow.metric_meter().create_histogram( + "my-workflow-histogram", "my-workflow-description", "my-workflow-unit" + ) + histogram.record(56) + histogram.with_additional_attributes({"my-workflow-extra-attr": 1234}).record( + 78 + ) + + +async def test_workflow_custom_metrics(client: Client): + # Run worker with default runtime which is noop meter just to confirm it + # doesn't fail + async with new_worker( + client, CustomMetricsWorkflow, activities=[custom_metrics_activity] + ) as worker: + await client.execute_workflow( + CustomMetricsWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + + # Create new runtime with Prom server + prom_addr = f"127.0.0.1:{find_free_port()}" + runtime = Runtime( + telemetry=TelemetryConfig( + metrics=PrometheusConfig(bind_address=prom_addr), metric_prefix="foo_" + ) + ) + + # Confirm meter fails with bad attribute type + with pytest.raises(TypeError) as err: + runtime.metric_meter.with_additional_attributes({"some_attr": None}) # type: ignore + assert str(err.value).startswith("Invalid value type for key") + + # New client with the runtime + client = await Client.connect( + client.service_client.config.target_host, + namespace=client.namespace, + runtime=runtime, + ) + + async with new_worker( + client, CustomMetricsWorkflow, activities=[custom_metrics_activity] + ) as worker: + # Record a gauge at runtime level + gauge = runtime.metric_meter.with_additional_attributes( + {"my-runtime-extra-attr1": "val1", "my-runtime-extra-attr2": True} + ).create_gauge("my-runtime-gauge", "my-runtime-description") + gauge.set(90) + + # Run workflow + await client.execute_workflow( + CustomMetricsWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + + # Get Prom dump + with urlopen(url=f"http://{prom_addr}/metrics") as f: + prom_str: str = f.read().decode("utf-8") + prom_lines = prom_str.splitlines() + + # Intentionally naive metric checker + def matches_metric_line( + line: str, name: str, at_least_labels: Mapping[str, str], value: int + ) -> bool: + # Must have metric name + if not line.startswith(name + "{"): + return False + # Must have labels (don't escape for this test) + for k, v in at_least_labels.items(): + if not f'{k}="{v}"' in line: + return False + return line.endswith(f" {value}") + + def assert_metric_exists( + name: str, at_least_labels: Mapping[str, str], value: int + ) -> None: + assert any( + matches_metric_line(line, name, at_least_labels, value) + for line in prom_lines + ) + + def assert_description_exists(name: str, description: str) -> None: + assert f"# HELP {name} {description}" in prom_lines + + # Check some metrics are as we expect + assert_description_exists("my_runtime_gauge", "my-runtime-description") + assert_metric_exists( + "my_runtime_gauge", + { + "my_runtime_extra_attr1": "val1", + "my_runtime_extra_attr2": "true", + # Also confirm global service name label + "service_name": "temporal-core-sdk", + }, + 90, + ) + assert_description_exists("my_workflow_histogram", "my-workflow-description") + assert_metric_exists("my_workflow_histogram_sum", {}, 56) + assert_metric_exists( + "my_workflow_histogram_sum", + { + "my_workflow_extra_attr": "1234", + # Also confirm some workflow labels + "namespace": client.namespace, + "task_queue": worker.task_queue, + "workflow_type": "CustomMetricsWorkflow", + }, + 78, + ) + assert_description_exists("my_activity_counter", "my-activity-description") + assert_metric_exists("my_activity_counter", {}, 12) + assert_metric_exists( + "my_activity_counter", + { + "my_activity_extra_attr": "12.34", + # Also confirm some activity labels + "namespace": client.namespace, + "task_queue": worker.task_queue, + "activity_type": "custom_metrics_activity", + }, + 34, + ) + # Also check Temporal metric got its prefix + assert_metric_exists( + "foo_workflow_completed", {"workflow_type": "CustomMetricsWorkflow"}, 1 + )