Skip to content

Commit

Permalink
Work on user metric support
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Sep 8, 2023
1 parent aa829d3 commit 8637cdc
Show file tree
Hide file tree
Showing 16 changed files with 1,257 additions and 16 deletions.
43 changes: 43 additions & 0 deletions temporalio/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,10 @@ class _Context:
Type[temporalio.converter.PayloadConverter],
temporalio.converter.PayloadConverter,
]
runtime_metric_meter: Optional[temporalio.common.MetricMeter]
_logger_details: Optional[Mapping[str, Any]] = None
_payload_converter: Optional[temporalio.converter.PayloadConverter] = None
_metric_meter: Optional[temporalio.common.MetricMeter] = None

@staticmethod
def current() -> _Context:
Expand Down Expand Up @@ -185,6 +187,29 @@ def payload_converter(self) -> temporalio.converter.PayloadConverter:
self._payload_converter = self.payload_converter_class_or_instance()
return self._payload_converter

@property
def metric_meter(self) -> temporalio.common.MetricMeter:
# If there isn't a runtime metric meter, then we're in a non-threaded
# sync function and we don't support cross-process metrics
if not self.runtime_metric_meter:
raise RuntimeError(
"Metrics meter not available in non-threaded sync activities like mulitprocess"
)
# Create the meter lazily if not already created. We are ok creating
# multiple in the rare race where a user calls this property on
# different threads inside the same activity. The meter is immutable and
# it's better than a lock.
if not self._metric_meter:
info = self.info()
self._metric_meter = self.runtime_metric_meter.with_additional_attributes(
{
"namespace": info.workflow_namespace,
"task_queue": info.task_queue,
"activity_type": info.activity_type,
}
)
return self._metric_meter


@dataclass
class _CompositeEvent:
Expand Down Expand Up @@ -377,6 +402,24 @@ def payload_converter() -> temporalio.converter.PayloadConverter:
return _Context.current().payload_converter


def metric_meter() -> temporalio.common.MetricMeter:
"""Get the metric meter for the current activity.
.. warning::
This is only available in async or synchronous threaded activities. An
error is raised on non-thread-based sync activities when trying to
access this.
Returns:
Current metric meter for this activity for recording metrics.
Raises:
RuntimeError: When not in an activity or in a non-thread-based
synchronous activity.
"""
return _Context.current().metric_meter


class LoggerAdapter(logging.LoggerAdapter):
"""Adapter that adds details to the log about the running activity.
Expand Down
111 changes: 111 additions & 0 deletions temporalio/bridge/metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Metrics using SDK Core. (unstable)
Nothing in this module should be considered stable. The API may change.
"""

from __future__ import annotations

from typing import Mapping, Optional, Union

import temporalio.bridge.runtime
import temporalio.bridge.temporal_sdk_bridge


class MetricMeter:
"""Metric meter using SDK Core."""

@staticmethod
def create(runtime: temporalio.bridge.runtime.Runtime) -> Optional[MetricMeter]:
"""Create optional metric meter."""
ref = temporalio.bridge.temporal_sdk_bridge.new_metric_meter(runtime._ref)
if not ref:
return None
return MetricMeter(ref)

def __init__(
self, ref: temporalio.bridge.temporal_sdk_bridge.MetricMeterRef
) -> None:
"""Initialize metric meter."""
self._ref = ref
self._default_attributes = MetricAttributes(ref.default_attributes)

@property
def default_attributes(self) -> MetricAttributes:
"""Default attributes for the metric meter."""
return self._default_attributes


class MetricCounter:
"""Metric counter using SDK Core."""

def __init__(
self,
meter: MetricMeter,
name: str,
description: Optional[str],
unit: Optional[str],
) -> None:
"""Initialize counter metric."""
self._ref = meter._ref.new_counter(name, description, unit)

def add(self, value: int, attrs: MetricAttributes) -> None:
"""Add value to counter."""
if value < 0:
raise ValueError("Metric value must be non-negative value")
self._ref.add(value, attrs._ref)


class MetricHistogram:
"""Metric histogram using SDK Core."""

def __init__(
self,
meter: MetricMeter,
name: str,
description: Optional[str],
unit: Optional[str],
) -> None:
"""Initialize histogram."""
self._ref = meter._ref.new_histogram(name, description, unit)

def record(self, value: int, attrs: MetricAttributes) -> None:
"""Record value on histogram."""
if value < 0:
raise ValueError("Metric value must be non-negative value")
self._ref.record(value, attrs._ref)


class MetricGauge:
"""Metric gauge using SDK Core."""

def __init__(
self,
meter: MetricMeter,
name: str,
description: Optional[str],
unit: Optional[str],
) -> None:
"""Initialize gauge."""
self._ref = meter._ref.new_gauge(name, description, unit)

def set(self, value: int, attrs: MetricAttributes) -> None:
"""Set value on gauge."""
if value < 0:
raise ValueError("Metric value must be non-negative value")
self._ref.set(value, attrs._ref)


class MetricAttributes:
"""Metric attributes using SDK Core."""

def __init__(
self, ref: temporalio.bridge.temporal_sdk_bridge.MetricAttributesRef
) -> None:
"""Initialize attributes."""
self._ref = ref

def with_additional_attributes(
self, new_attrs: Mapping[str, Union[str, int, float, bool]]
) -> MetricAttributes:
"""Create new attributes with new attributes appended."""
return MetricAttributes(self._ref.with_additional_attributes(new_attrs))
14 changes: 14 additions & 0 deletions temporalio/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use pyo3::prelude::*;
use pyo3::types::PyTuple;

mod client;
mod metric;
mod runtime;
mod testing;
mod worker;
Expand All @@ -13,6 +14,14 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<client::ClientRef>()?;
m.add_function(wrap_pyfunction!(connect_client, m)?)?;

// Metric stuff
m.add_class::<metric::MetricMeterRef>()?;
m.add_class::<metric::MetricAttributesRef>()?;
m.add_class::<metric::MetricCounterRef>()?;
m.add_class::<metric::MetricHistogramRef>()?;
m.add_class::<metric::MetricGaugeRef>()?;
m.add_function(wrap_pyfunction!(new_metric_meter, m)?)?;

// Runtime stuff
m.add_class::<runtime::RuntimeRef>()?;
m.add_function(wrap_pyfunction!(init_runtime, m)?)?;
Expand Down Expand Up @@ -44,6 +53,11 @@ fn connect_client<'a>(
client::connect_client(py, &runtime_ref, config)
}

#[pyfunction]
fn new_metric_meter(runtime_ref: &runtime::RuntimeRef) -> Option<metric::MetricMeterRef> {
metric::new_metric_meter(&runtime_ref)
}

#[pyfunction]
fn init_runtime(telemetry_config: runtime::TelemetryConfig) -> PyResult<runtime::RuntimeRef> {
runtime::init_runtime(telemetry_config)
Expand Down
175 changes: 175 additions & 0 deletions temporalio/bridge/src/metric.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
use std::{collections::HashMap, sync::Arc};

use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use temporal_sdk_core_api::telemetry::metrics;

use crate::runtime;

#[pyclass]
pub struct MetricMeterRef {
meter: metrics::TemporalMeter,
#[pyo3(get)]
default_attributes: MetricAttributesRef,
}

#[pyclass]
#[derive(Clone)]
pub struct MetricAttributesRef {
attrs: metrics::MetricAttributes,
}

#[pyclass]
pub struct MetricCounterRef {
counter: Arc<dyn metrics::Counter>,
}

#[pyclass]
pub struct MetricHistogramRef {
histogram: Arc<dyn metrics::Histogram>,
}

#[pyclass]
pub struct MetricGaugeRef {
gauge: Arc<dyn metrics::Gauge>,
}

pub fn new_metric_meter(runtime_ref: &runtime::RuntimeRef) -> Option<MetricMeterRef> {
runtime_ref
.runtime
.core
.telemetry()
.get_metric_meter()
.map(|meter| {
let default_attributes = MetricAttributesRef {
attrs: meter.inner.new_attributes(meter.default_attribs.clone()),
};
MetricMeterRef {
meter,
default_attributes,
}
})
}

#[pymethods]
impl MetricMeterRef {
fn new_counter(
&self,
name: String,
description: Option<String>,
unit: Option<String>,
) -> MetricCounterRef {
MetricCounterRef {
counter: self
.meter
.inner
.counter(build_metric_parameters(name, description, unit)),
}
}

fn new_histogram(
&self,
name: String,
description: Option<String>,
unit: Option<String>,
) -> MetricHistogramRef {
MetricHistogramRef {
histogram: self
.meter
.inner
.histogram(build_metric_parameters(name, description, unit)),
}
}

fn new_gauge(
&self,
name: String,
description: Option<String>,
unit: Option<String>,
) -> MetricGaugeRef {
MetricGaugeRef {
gauge: self
.meter
.inner
.gauge(build_metric_parameters(name, description, unit)),
}
}
}

#[pymethods]
impl MetricCounterRef {
fn add(&self, value: u64, attrs_ref: &MetricAttributesRef) {
self.counter.add(value, &attrs_ref.attrs);
}
}

#[pymethods]
impl MetricHistogramRef {
fn record(&self, value: u64, attrs_ref: &MetricAttributesRef) {
self.histogram.record(value, &attrs_ref.attrs);
}
}

#[pymethods]
impl MetricGaugeRef {
fn set(&self, value: u64, attrs_ref: &MetricAttributesRef) {
self.gauge.record(value, &attrs_ref.attrs);
}
}

fn build_metric_parameters(
name: String,
description: Option<String>,
unit: Option<String>,
) -> metrics::MetricParameters {
let mut build = metrics::MetricParametersBuilder::default();
build.name(name);
if let Some(description) = description {
build.description(description);
}
if let Some(unit) = unit {
build.unit(unit);
}
// Should be nothing that would fail validation here
build.build().unwrap()
}

#[pymethods]
impl MetricAttributesRef {
fn with_additional_attributes<'p>(
&self,
py: Python<'p>,
new_attrs: HashMap<String, PyObject>,
) -> PyResult<Self> {
let mut attrs = self.attrs.clone();
attrs.add_new_attrs(
new_attrs
.into_iter()
.map(|(k, obj)| metric_key_value_from_py(py, k, obj))
.collect::<PyResult<Vec<metrics::MetricKeyValue>>>()?,
);
Ok(MetricAttributesRef { attrs })
}
}

fn metric_key_value_from_py<'p>(
py: Python<'p>,
k: String,
obj: PyObject,
) -> PyResult<metrics::MetricKeyValue> {
let val = if let Ok(v) = obj.extract::<String>(py) {
metrics::MetricValue::String(v)
} else if let Ok(v) = obj.extract::<bool>(py) {
metrics::MetricValue::Bool(v)
} else if let Ok(v) = obj.extract::<i64>(py) {
metrics::MetricValue::Int(v)
} else if let Ok(v) = obj.extract::<f64>(py) {
metrics::MetricValue::Float(v)
} else {
return Err(PyTypeError::new_err(format!(
"Invalid value type for key {}, must be str, int, float, or bool",
k
)));
};
Ok(metrics::MetricKeyValue::new(k, val))
}
Loading

0 comments on commit 8637cdc

Please sign in to comment.