diff --git a/temporalio/bridge/Cargo.lock b/temporalio/bridge/Cargo.lock index 92e0fd5f..4fa15f8f 100644 --- a/temporalio/bridge/Cargo.lock +++ b/temporalio/bridge/Cargo.lock @@ -1160,15 +1160,6 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" -[[package]] -name = "memoffset" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" -dependencies = [ - "autocfg", -] - [[package]] name = "memoffset" version = "0.8.0" @@ -1248,16 +1239,13 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" [[package]] name = "nix" -version = "0.26.2" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a" +checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.4.0", "cfg-if", "libc", - "memoffset 0.7.1", - "pin-utils", - "static_assertions", ] [[package]] @@ -2221,9 +2209,9 @@ dependencies = [ [[package]] name = "siphasher" -version = "0.3.11" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +checksum = "54ac45299ccbd390721be55b412d41931911f654fa99e2cb8bfb57184b2061fe" [[package]] name = "slab" @@ -2275,12 +2263,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" -[[package]] -name = "static_assertions" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" - [[package]] name = "strsim" version = "0.10.0" diff --git a/temporalio/bridge/metric.py b/temporalio/bridge/metric.py index 396a734f..2f67486b 100644 --- a/temporalio/bridge/metric.py +++ b/temporalio/bridge/metric.py @@ -27,7 +27,7 @@ def __init__( ) -> None: """Initialize metric meter.""" self._ref = ref - self._default_attributes = MetricAttributes(ref.default_attributes) + self._default_attributes = MetricAttributes(self, ref.default_attributes) @property def default_attributes(self) -> MetricAttributes: @@ -99,13 +99,19 @@ class MetricAttributes: """Metric attributes using SDK Core.""" def __init__( - self, ref: temporalio.bridge.temporal_sdk_bridge.MetricAttributesRef + self, + meter: MetricMeter, + ref: temporalio.bridge.temporal_sdk_bridge.MetricAttributesRef, ) -> None: """Initialize attributes.""" + self._meter = meter self._ref = ref def with_additional_attributes( self, new_attrs: Mapping[str, Union[str, int, float, bool]] ) -> MetricAttributes: """Create new attributes with new attributes appended.""" - return MetricAttributes(self._ref.with_additional_attributes(new_attrs)) + return MetricAttributes( + self._meter, + self._ref.with_additional_attributes(self._meter._ref, new_attrs), + ) diff --git a/temporalio/bridge/runtime.py b/temporalio/bridge/runtime.py index 43e516a8..f86c4a87 100644 --- a/temporalio/bridge/runtime.py +++ b/temporalio/bridge/runtime.py @@ -6,7 +6,7 @@ from __future__ import annotations from dataclasses import dataclass -from typing import Mapping, Optional, Type +from typing import Any, Mapping, Optional, Sequence, Type import temporalio.bridge.temporal_sdk_bridge @@ -25,6 +25,10 @@ def __init__(self, *, telemetry: TelemetryConfig) -> None: """Create SDK Core runtime.""" self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(telemetry) + def retrieve_buffered_metrics(self) -> Sequence[Any]: + """Get buffered metrics.""" + return self._ref.retrieve_buffered_metrics() + @dataclass(frozen=True) class LoggingConfig: @@ -40,6 +44,7 @@ class MetricsConfig: opentelemetry: Optional[OpenTelemetryConfig] prometheus: Optional[PrometheusConfig] + buffered_with_size: int attach_service_name: bool global_tags: Optional[Mapping[str, str]] metric_prefix: Optional[str] diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 6f11bbf6..617612aa 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 6f11bbf6dc0621a005e07adead196c8d9a9fd87d +Subproject commit 617612aa419d687aebabcf0258ac86f5c36df189 diff --git a/temporalio/bridge/src/lib.rs b/temporalio/bridge/src/lib.rs index 31164c03..660ce94a 100644 --- a/temporalio/bridge/src/lib.rs +++ b/temporalio/bridge/src/lib.rs @@ -20,6 +20,8 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_function(wrap_pyfunction!(new_metric_meter, m)?)?; // Runtime stuff diff --git a/temporalio/bridge/src/metric.rs b/temporalio/bridge/src/metric.rs index 4e736448..0c16aeef 100644 --- a/temporalio/bridge/src/metric.rs +++ b/temporalio/bridge/src/metric.rs @@ -1,8 +1,11 @@ +use std::any::Any; use std::{collections::HashMap, sync::Arc}; -use pyo3::exceptions::PyTypeError; use pyo3::prelude::*; -use temporal_sdk_core_api::telemetry::metrics; +use pyo3::{exceptions::PyTypeError, types::PyDict}; +use temporal_sdk_core_api::telemetry::metrics::{ + self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent, NewAttributes, +}; use crate::runtime; @@ -139,14 +142,17 @@ impl MetricAttributesRef { fn with_additional_attributes<'p>( &self, py: Python<'p>, + meter: &MetricMeterRef, new_attrs: HashMap, ) -> PyResult { - let mut attrs = self.attrs.clone(); - attrs.add_new_attrs( - new_attrs - .into_iter() - .map(|(k, obj)| metric_key_value_from_py(py, k, obj)) - .collect::>>()?, + let attrs = meter.meter.inner.extend_attributes( + self.attrs.clone(), + NewAttributes { + attributes: new_attrs + .into_iter() + .map(|(k, obj)| metric_key_value_from_py(py, k, obj)) + .collect::>>()?, + }, ); Ok(MetricAttributesRef { attrs }) } @@ -173,3 +179,146 @@ fn metric_key_value_from_py<'p>( }; Ok(metrics::MetricKeyValue::new(k, val)) } + +// WARNING: This must match temporalio.runtime.BufferedMetricUpdate protocol +#[pyclass] +pub struct BufferedMetricUpdate { + #[pyo3(get)] + pub metric: Py, + #[pyo3(get)] + pub value: u64, + #[pyo3(get)] + pub attributes: Py, +} + +// WARNING: This must match temporalio.runtime.BufferedMetric protocol +#[pyclass] +pub struct BufferedMetric { + #[pyo3(get)] + pub name: String, + #[pyo3(get)] + pub description: Option, + #[pyo3(get)] + pub unit: Option, + #[pyo3(get)] + pub kind: u8, // 0 - counter, 1 - gauge, 2 - histogram +} + +#[derive(Debug)] +struct BufferedMetricAttributes(Py); + +#[derive(Clone, Debug)] +pub struct BufferedMetricRef(Py); + +impl BufferInstrumentRef for BufferedMetricRef {} + +impl CustomMetricAttributes for BufferedMetricAttributes { + fn as_any(self: Arc) -> Arc { + self as Arc + } +} + +pub fn convert_metric_events<'p>( + py: Python<'p>, + events: Vec>, +) -> Vec { + events + .into_iter() + .filter_map(|e| convert_metric_event(py, e)) + .collect() +} + +fn convert_metric_event<'p>( + py: Python<'p>, + event: MetricEvent, +) -> Option { + match event { + // Create the metric and put it on the lazy ref + MetricEvent::Create { + params, + populate_into, + kind, + } => { + let buffered_ref = BufferedMetricRef( + Py::new( + py, + BufferedMetric { + name: params.name.to_string(), + description: Some(params.description) + .filter(|s| !s.is_empty()) + .map(|s| s.to_string()), + unit: Some(params.unit) + .filter(|s| !s.is_empty()) + .map(|s| s.to_string()), + kind: match kind { + metrics::MetricKind::Counter => 0, + metrics::MetricKind::Gauge => 1, + metrics::MetricKind::Histogram => 2, + }, + }, + ) + .expect("Unable to create buffered metric"), + ); + populate_into.set(Arc::new(buffered_ref)).unwrap(); + None + } + // Create the attributes and put it on the lazy ref + MetricEvent::CreateAttributes { + populate_into, + append_from, + attributes, + } => { + // Create the dictionary (as copy from existing if needed) + let new_attrs_ref: Py = match append_from { + Some(existing) => existing + .get() + .clone() + .as_any() + .downcast::() + .expect("Unable to downcast to expected buffered metric attributes") + .0 + .as_ref(py) + .copy() + .expect("Failed to copy metric attribute dictionary") + .into(), + None => PyDict::new(py).into(), + }; + // Add attributes + let new_attrs = new_attrs_ref.as_ref(py); + for kv in attributes.into_iter() { + match kv.value { + metrics::MetricValue::String(v) => new_attrs.set_item(kv.key, v), + metrics::MetricValue::Int(v) => new_attrs.set_item(kv.key, v), + metrics::MetricValue::Float(v) => new_attrs.set_item(kv.key, v), + metrics::MetricValue::Bool(v) => new_attrs.set_item(kv.key, v), + } + .expect("Unable to set metric key/value on dictionary"); + } + // Put on lazy ref + populate_into + .set(Arc::new(BufferedMetricAttributes(new_attrs_ref))) + .expect("Unable to set buffered metric attributes on reference"); + None + } + // Convert to Python metric event + MetricEvent::Update { + instrument, + attributes, + update, + } => Some(BufferedMetricUpdate { + metric: instrument.get().clone().0.clone(), + value: match update { + metrics::MetricUpdateVal::Delta(v) => v, + metrics::MetricUpdateVal::Value(v) => v, + }, + attributes: attributes + .get() + .clone() + .as_any() + .downcast::() + .expect("Unable to downcast to expected buffered metric attributes") + .0 + .clone(), + }), + } +} diff --git a/temporalio/bridge/src/runtime.rs b/temporalio/bridge/src/runtime.rs index d3df1ee3..dff05921 100644 --- a/temporalio/bridge/src/runtime.rs +++ b/temporalio/bridge/src/runtime.rs @@ -8,15 +8,19 @@ use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use temporal_sdk_core::telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter}; +use temporal_sdk_core::telemetry::{ + build_otlp_metric_exporter, start_prometheus_metric_exporter, MetricsCallBuffer, +}; use temporal_sdk_core::CoreRuntime; -use temporal_sdk_core_api::telemetry::metrics::CoreMeter; +use temporal_sdk_core_api::telemetry::metrics::{CoreMeter, MetricCallBufferer}; use temporal_sdk_core_api::telemetry::{ Logger, MetricTemporality, OtelCollectorOptionsBuilder, PrometheusExporterOptionsBuilder, TelemetryOptions, TelemetryOptionsBuilder, }; use url::Url; +use crate::metric::{convert_metric_events, BufferedMetricRef, BufferedMetricUpdate}; + #[pyclass] pub struct RuntimeRef { pub(crate) runtime: Runtime, @@ -25,6 +29,7 @@ pub struct RuntimeRef { #[derive(Clone)] pub(crate) struct Runtime { pub(crate) core: Arc, + metrics_call_buffer: Option>>, } #[derive(FromPyObject)] @@ -41,8 +46,11 @@ pub struct LoggingConfig { #[derive(FromPyObject)] pub struct MetricsConfig { + // These fields are mutually exclusive opentelemetry: Option, prometheus: Option, + buffered_with_size: usize, + attach_service_name: bool, global_tags: Option>, metric_prefix: Option, @@ -71,16 +79,34 @@ pub fn init_runtime(telemetry_config: TelemetryConfig) -> PyResult { tokio::runtime::Builder::new_multi_thread(), ) .map_err(|err| PyRuntimeError::new_err(format!("Failed initializing telemetry: {}", err)))?; + // We late-bind the metrics after core runtime is created since it needs // the Tokio handle + let mut maybe_metrics_call_buffer: Option>> = None; if let Some(metrics_conf) = telemetry_config.metrics { let _guard = core.tokio_handle().enter(); - core.telemetry_mut() - .attach_late_init_metrics(metrics_conf.try_into()?); + // If they want buffered, cannot have Prom/OTel and we make buffered + if metrics_conf.buffered_with_size > 0 { + if metrics_conf.opentelemetry.is_some() || metrics_conf.prometheus.is_some() { + return Err(PyValueError::new_err( + "Cannot have buffer size with OpenTelemetry or Prometheus metric config", + )); + } + let metrics_call_buffer = + Arc::new(MetricsCallBuffer::new(metrics_conf.buffered_with_size)); + core.telemetry_mut() + .attach_late_init_metrics(metrics_call_buffer.clone()); + maybe_metrics_call_buffer = Some(metrics_call_buffer); + } else { + core.telemetry_mut() + .attach_late_init_metrics(metrics_conf.try_into()?); + } } + Ok(RuntimeRef { runtime: Runtime { core: Arc::new(core), + metrics_call_buffer: maybe_metrics_call_buffer, }, }) } @@ -100,6 +126,20 @@ impl Runtime { } } +#[pymethods] +impl RuntimeRef { + fn retrieve_buffered_metrics<'p>(&self, py: Python<'p>) -> Vec { + convert_metric_events( + py, + self.runtime + .metrics_call_buffer + .as_ref() + .expect("Attempting to retrieve buffered metrics without buffer") + .retrieve(), + ) + } +} + impl TryFrom<&TelemetryConfig> for TelemetryOptions { type Error = PyErr; diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index 491a4745..5d8be94a 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -49,9 +49,9 @@ pub struct WorkerConfig { macro_rules! enter_sync { ($runtime:expr) => { - temporal_sdk_core::telemetry::set_trace_subscriber_for_current_thread( - $runtime.core.telemetry().trace_subscriber(), - ); + if let Some(subscriber) = $runtime.core.telemetry().trace_subscriber() { + temporal_sdk_core::telemetry::set_trace_subscriber_for_current_thread(subscriber); + } let _guard = $runtime.core.tokio_handle().enter(); }; } diff --git a/temporalio/runtime.py b/temporalio/runtime.py index 25f50bde..eb494bcf 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -8,7 +8,9 @@ from dataclasses import dataclass, field from datetime import timedelta from enum import Enum -from typing import ClassVar, Mapping, Optional, Union +from typing import ClassVar, Mapping, NewType, Optional, Sequence, Union + +from typing_extensions import Literal, Protocol import temporalio.bridge.metric import temporalio.bridge.runtime @@ -66,6 +68,8 @@ def __init__(self, *, telemetry: TelemetryConfig) -> None: self._core_runtime = temporalio.bridge.runtime.Runtime( telemetry=telemetry._to_bridge_config() ) + if isinstance(telemetry.metrics, MetricBuffer): + telemetry.metrics._runtime = self core_meter = temporalio.bridge.metric.MetricMeter.create(self._core_runtime) if not core_meter: self._metric_meter = temporalio.common.MetricMeter.noop @@ -174,6 +178,46 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.PrometheusConfig: ) +class MetricBuffer: + """A buffer that can be set on :py:class:`TelemetryConfig` to record + metrics instead of ignoring/exporting them. + + .. warning:: + It is important that the buffer size is set to a high number and that + :py:meth:`retrieve_updates` is called regularly to drain the buffer. If + the buffer is full, metric updates will be dropped and an error will be + logged. + """ + + def __init__(self, buffer_size: int) -> None: + """Create a buffer with the given size. + + .. warning:: + It is important that the buffer size is set to a high number and is + drained regularly. See :py:class:`MetricBuffer` warning. + + Args: + buffer_size: Size of the buffer. Set this to a large value. A value + in the tens of thousands or higher is plenty reasonable. + """ + self._buffer_size = buffer_size + self._runtime: Optional[Runtime] = None + + def retrieve_updates(self) -> Sequence[BufferedMetricUpdate]: + """Drain the buffer and return all metric updates. + + .. warning:: + It is important that this is called regularly. See + :py:class:`MetricBuffer` warning. + + Returns: + A sequence of metric updates. + """ + if not self._runtime: + raise RuntimeError("Attempting to retrieve updates before runtime created") + return self._runtime._core_runtime.retrieve_buffered_metrics() + + @dataclass(frozen=True) class TelemetryConfig: """Configuration for Core telemetry.""" @@ -181,8 +225,8 @@ class TelemetryConfig: logging: Optional[LoggingConfig] = LoggingConfig.default """Logging configuration.""" - metrics: Optional[Union[OpenTelemetryConfig, PrometheusConfig]] = None - """Metrics configuration.""" + metrics: Optional[Union[OpenTelemetryConfig, PrometheusConfig, MetricBuffer]] = None + """Metrics configuration or buffer.""" global_tags: Mapping[str, str] = field(default_factory=dict) """OTel resource tags to be applied to all metrics.""" @@ -206,6 +250,9 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.TelemetryConfig: prometheus=None if not isinstance(self.metrics, PrometheusConfig) else self.metrics._to_bridge_config(), + buffered_with_size=0 + if not isinstance(self.metrics, MetricBuffer) + else self.metrics._buffer_size, attach_service_name=self.attach_service_name, global_tags=self.global_tags or None, metric_prefix=self.metric_prefix, @@ -213,6 +260,92 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.TelemetryConfig: ) +BufferedMetricKind = NewType("BufferedMetricKind", int) +"""Representation of a buffered metric kind.""" + +BUFFERED_METRIC_KIND_COUNTER = BufferedMetricKind(0) +"""Buffered metric is a counter which means values are deltas.""" + +BUFFERED_METRIC_KIND_GAUGE = BufferedMetricKind(1) +"""Buffered metric is a gauge.""" + +BUFFERED_METRIC_KIND_HISTOGRAM = BufferedMetricKind(2) +"""Buffered metric is a histogram.""" + + +# WARNING: This must match Rust metric::BufferedMetric +class BufferedMetric(Protocol): + """A metric for a buffered update. + + The same metric for the same name and runtime is guaranteed to be the exact + same object for performance reasons. This means py:func:`id` will be the + same for the same metric across updates. + """ + + @property + def name(self) -> str: + """Get the name of the metric.""" + ... + + @property + def description(self) -> Optional[str]: + """Get the description of the metric if any.""" + ... + + @property + def unit(self) -> Optional[str]: + """Get the unit of the metric if any.""" + ... + + @property + def kind(self) -> BufferedMetricKind: + """Get the metric kind. + + This is one of :py:const:`BUFFERED_METRIC_KIND_COUNTER`, + :py:const:`BUFFERED_METRIC_KIND_GAUGE`, or + :py:const:`BUFFERED_METRIC_KIND_HISTOGRAM`. + """ + ... + + +# WARNING: This must match Rust metric::BufferedMetricUpdate +class BufferedMetricUpdate(Protocol): + """A single metric value update.""" + + @property + def metric(self) -> BufferedMetric: + """Metric being updated. + + For performance reasons, this is the same object across updates for the + same metric. This means py:func:`id` will be the same for the same + metric across updates. + """ + ... + + @property + def value(self) -> Union[int, float]: + """Value for the update. + + For counters this is a delta, for gauges and histograms this is just the + value. + """ + ... + + @property + def attributes(self) -> temporalio.common.MetricAttributes: + """Attributes for the update. + + For performance reasons, this is the same object across updates for the + same attribute set. This means py:func:`id` will be the same for the + same attribute set across updates. Note this is for same "attribute set" + as created by the metric creator, but different attribute sets may have + the same values. + + Do not mutate this. + """ + ... + + class _MetricMeter(temporalio.common.MetricMeter): def __init__( self, diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index dccb048e..eab77e30 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -66,7 +66,14 @@ TimeoutError, WorkflowAlreadyStartedError, ) -from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig +from temporalio.runtime import ( + BUFFERED_METRIC_KIND_COUNTER, + BUFFERED_METRIC_KIND_HISTOGRAM, + MetricBuffer, + PrometheusConfig, + Runtime, + TelemetryConfig, +) from temporalio.service import RPCError, RPCStatusCode from temporalio.testing import WorkflowEnvironment from temporalio.worker import ( @@ -3382,3 +3389,127 @@ def assert_description_exists(name: str, description: str) -> None: assert_metric_exists( "foo_workflow_completed", {"workflow_type": "CustomMetricsWorkflow"}, 1 ) + + +async def test_workflow_buffered_metrics(client: Client): + # Create runtime with metric buffer + buffer = MetricBuffer(10000) + runtime = Runtime( + telemetry=TelemetryConfig(metrics=buffer, metric_prefix="some_prefix_") + ) + + # Confirm no updates yet + assert not buffer.retrieve_updates() + + # Create a counter and make one with more attrs + runtime_counter = runtime.metric_meter.create_counter( + "runtime-counter", "runtime-counter-desc", "runtime-counter-unit" + ) + runtime_counter_with_attrs = runtime_counter.with_additional_attributes( + {"foo": "bar", "baz": 123} + ) + + # Send adds to both + runtime_counter.add(100) + runtime_counter_with_attrs.add(200) + + # Get updates and check their values + runtime_updates1 = buffer.retrieve_updates() + assert len(runtime_updates1) == 2 + # Check that the metric fields are right + assert runtime_updates1[0].metric.name == "runtime-counter" + assert runtime_updates1[0].metric.description == "runtime-counter-desc" + assert runtime_updates1[0].metric.unit == "runtime-counter-unit" + assert runtime_updates1[0].metric.kind == BUFFERED_METRIC_KIND_COUNTER + # Check that the metric is the exact same object all the way from Rust + assert id(runtime_updates1[0].metric) == id(runtime_updates1[1].metric) + # Check the values and attributes + assert runtime_updates1[0].value == 100 + assert runtime_updates1[0].attributes == {"service_name": "temporal-core-sdk"} + assert runtime_updates1[1].value == 200 + assert runtime_updates1[1].attributes == { + "service_name": "temporal-core-sdk", + "foo": "bar", + "baz": 123, + } + + # Confirm no more updates + assert not buffer.retrieve_updates() + + # Send some more adds and check + runtime_counter.add(300) + runtime_counter_with_attrs.add(400) + runtime_updates2 = buffer.retrieve_updates() + assert len(runtime_updates2) + # Check that metrics are the same exact object as before + assert id(runtime_updates1[0].metric) == id(runtime_updates2[0].metric) + assert id(runtime_updates1[1].metric) == id(runtime_updates2[1].metric) + # Check that even the attribute dictionaries are exact same objects as before + assert id(runtime_updates1[0].attributes) == id(runtime_updates2[0].attributes) + assert id(runtime_updates1[1].attributes) == id(runtime_updates2[1].attributes) + # Check values + assert runtime_updates2[0].value == 300 + assert runtime_updates2[1].value == 400 + + # Create a new client on the runtime and execute the custom metric workflow + client = await Client.connect( + client.service_client.config.target_host, + namespace=client.namespace, + runtime=runtime, + ) + async with new_worker( + client, CustomMetricsWorkflow, activities=[custom_metrics_activity] + ) as worker: + await client.execute_workflow( + CustomMetricsWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + + # Drain updates and confirm updates exist as expected + updates = buffer.retrieve_updates() + # Workflow update histogram, with some extra sanity checks + assert any( + update.metric.name == "my-workflow-histogram" + and update.metric.description == "my-workflow-description" + and update.metric.unit == "my-workflow-unit" + and update.metric.kind == BUFFERED_METRIC_KIND_HISTOGRAM + and update.attributes["namespace"] == client.namespace + and update.attributes["task_queue"] == worker.task_queue + and update.attributes["workflow_type"] == "CustomMetricsWorkflow" + and "my-workflow-extra-attr" not in update.attributes + and update.value == 56 + for update in updates + ) + assert any( + update.metric.name == "my-workflow-histogram" + and update.attributes.get("my-workflow-extra-attr") == 1234 + and update.value == 78 + for update in updates + ) + # Check activity counter too + assert any( + update.metric.name == "my-activity-counter" + and update.metric.description == "my-activity-description" + and update.metric.unit == "my-activity-unit" + and update.metric.kind == BUFFERED_METRIC_KIND_COUNTER + and update.attributes["namespace"] == client.namespace + and update.attributes["task_queue"] == worker.task_queue + and update.attributes["activity_type"] == "custom_metrics_activity" + and "my-activity-extra-attr" not in update.attributes + and update.value == 12 + for update in updates + ) + assert any( + update.metric.name == "my-activity-counter" + and update.attributes.get("my-activity-extra-attr") == 12.34 + and update.value == 34 + for update in updates + ) + # Check for a Temporal metric too + assert any( + update.metric.name == "some_prefix_workflow_completed" + and update.attributes["workflow_type"] == "CustomMetricsWorkflow" + and update.value == 1 + for update in updates + )