Skip to content

Commit

Permalink
Support float and duration metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Apr 12, 2024
1 parent cf4c7cb commit 8366387
Show file tree
Hide file tree
Showing 10 changed files with 650 additions and 238 deletions.
60 changes: 60 additions & 0 deletions temporalio/bridge/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,46 @@ def record(self, value: int, attrs: MetricAttributes) -> None:
self._ref.record(value, attrs._ref)


class MetricHistogramFloat:
"""Metric histogram using SDK Core."""

def __init__(
self,
meter: MetricMeter,
name: str,
description: Optional[str],
unit: Optional[str],
) -> None:
"""Initialize histogram."""
self._ref = meter._ref.new_histogram_float(name, description, unit)

def record(self, value: float, attrs: MetricAttributes) -> None:
"""Record value on histogram."""
if value < 0:
raise ValueError("Metric value must be non-negative value")
self._ref.record(value, attrs._ref)


class MetricHistogramDuration:
"""Metric histogram using SDK Core."""

def __init__(
self,
meter: MetricMeter,
name: str,
description: Optional[str],
unit: Optional[str],
) -> None:
"""Initialize histogram."""
self._ref = meter._ref.new_histogram_duration(name, description, unit)

def record(self, value_ms: int, attrs: MetricAttributes) -> None:
"""Record value on histogram."""
if value_ms < 0:
raise ValueError("Metric value must be non-negative value")
self._ref.record(value_ms, attrs._ref)


class MetricGauge:
"""Metric gauge using SDK Core."""

Expand All @@ -95,6 +135,26 @@ def set(self, value: int, attrs: MetricAttributes) -> None:
self._ref.set(value, attrs._ref)


class MetricGaugeFloat:
"""Metric gauge using SDK Core."""

def __init__(
self,
meter: MetricMeter,
name: str,
description: Optional[str],
unit: Optional[str],
) -> None:
"""Initialize gauge."""
self._ref = meter._ref.new_gauge_float(name, description, unit)

def set(self, value: float, attrs: MetricAttributes) -> None:
"""Set value on gauge."""
if value < 0:
raise ValueError("Metric value must be non-negative value")
self._ref.set(value, attrs._ref)


class MetricAttributes:
"""Metric attributes using SDK Core."""

Expand Down
4 changes: 2 additions & 2 deletions temporalio/bridge/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ def __init__(self, *, telemetry: TelemetryConfig) -> None:
"""Create SDK Core runtime."""
self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(telemetry)

def retrieve_buffered_metrics(self) -> Sequence[Any]:
def retrieve_buffered_metrics(self, durations_as_seconds: bool) -> Sequence[Any]:
"""Get buffered metrics."""
return self._ref.retrieve_buffered_metrics()
return self._ref.retrieve_buffered_metrics(durations_as_seconds)

def write_test_info_log(self, message: str, extra_data: str) -> None:
"""Write a test core log at INFO level."""
Expand Down
6 changes: 3 additions & 3 deletions temporalio/bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::str::FromStr;
use std::time::Duration;
use temporal_client::{
ClientKeepAliveConfig as CoreClientKeepAliveConfig, ClientOptions, ClientOptionsBuilder,
ConfiguredClient, HealthService, OperatorService, RetryClient, RetryConfig,
TemporalServiceClientWithMetrics, TestService, TlsConfig, WorkflowService, HttpConnectProxyOptions,
ConfiguredClient, HealthService, HttpConnectProxyOptions, OperatorService, RetryClient,
RetryConfig, TemporalServiceClientWithMetrics, TestService, TlsConfig, WorkflowService,
};
use tonic::metadata::MetadataKey;
use url::Url;
Expand Down Expand Up @@ -467,4 +467,4 @@ impl From<ClientHttpConnectProxyConfig> for HttpConnectProxyOptions {
basic_auth: conf.basic_auth,
}
}
}
}
3 changes: 3 additions & 0 deletions temporalio/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<metric::MetricAttributesRef>()?;
m.add_class::<metric::MetricCounterRef>()?;
m.add_class::<metric::MetricHistogramRef>()?;
m.add_class::<metric::MetricHistogramFloatRef>()?;
m.add_class::<metric::MetricHistogramDurationRef>()?;
m.add_class::<metric::MetricGaugeRef>()?;
m.add_class::<metric::MetricGaugeFloatRef>()?;
m.add_class::<metric::BufferedMetricUpdate>()?;
m.add_class::<metric::BufferedMetric>()?;
m.add_function(wrap_pyfunction!(new_metric_meter, m)?)?;
Expand Down
131 changes: 119 additions & 12 deletions temporalio/bridge/src/metric.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::any::Any;
use std::time::Duration;
use std::{collections::HashMap, sync::Arc};

use pyo3::prelude::*;
Expand Down Expand Up @@ -32,11 +33,26 @@ pub struct MetricHistogramRef {
histogram: Arc<dyn metrics::Histogram>,
}

#[pyclass]
pub struct MetricHistogramFloatRef {
histogram: Arc<dyn metrics::HistogramF64>,
}

#[pyclass]
pub struct MetricHistogramDurationRef {
histogram: Arc<dyn metrics::HistogramDuration>,
}

#[pyclass]
pub struct MetricGaugeRef {
gauge: Arc<dyn metrics::Gauge>,
}

#[pyclass]
pub struct MetricGaugeFloatRef {
gauge: Arc<dyn metrics::GaugeF64>,
}

pub fn new_metric_meter(runtime_ref: &runtime::RuntimeRef) -> Option<MetricMeterRef> {
runtime_ref
.runtime
Expand Down Expand Up @@ -84,6 +100,36 @@ impl MetricMeterRef {
}
}

fn new_histogram_float(
&self,
name: String,
description: Option<String>,
unit: Option<String>,
) -> MetricHistogramFloatRef {
MetricHistogramFloatRef {
histogram: self.meter.inner.histogram_f64(build_metric_parameters(
name,
description,
unit,
)),
}
}

fn new_histogram_duration(
&self,
name: String,
description: Option<String>,
unit: Option<String>,
) -> MetricHistogramDurationRef {
MetricHistogramDurationRef {
histogram: self.meter.inner.histogram_duration(build_metric_parameters(
name,
description,
unit,
)),
}
}

fn new_gauge(
&self,
name: String,
Expand All @@ -97,6 +143,20 @@ impl MetricMeterRef {
.gauge(build_metric_parameters(name, description, unit)),
}
}

fn new_gauge_float(
&self,
name: String,
description: Option<String>,
unit: Option<String>,
) -> MetricGaugeFloatRef {
MetricGaugeFloatRef {
gauge: self
.meter
.inner
.gauge_f64(build_metric_parameters(name, description, unit)),
}
}
}

#[pymethods]
Expand All @@ -113,13 +173,35 @@ impl MetricHistogramRef {
}
}

#[pymethods]
impl MetricHistogramFloatRef {
fn record(&self, value: f64, attrs_ref: &MetricAttributesRef) {
self.histogram.record(value, &attrs_ref.attrs);
}
}

#[pymethods]
impl MetricHistogramDurationRef {
fn record(&self, value_ms: u64, attrs_ref: &MetricAttributesRef) {
self.histogram
.record(Duration::from_millis(value_ms), &attrs_ref.attrs);
}
}

#[pymethods]
impl MetricGaugeRef {
fn set(&self, value: u64, attrs_ref: &MetricAttributesRef) {
self.gauge.record(value, &attrs_ref.attrs);
}
}

#[pymethods]
impl MetricGaugeFloatRef {
fn set(&self, value: f64, attrs_ref: &MetricAttributesRef) {
self.gauge.record(value, &attrs_ref.attrs);
}
}

fn build_metric_parameters(
name: String,
description: Option<String>,
Expand Down Expand Up @@ -192,16 +274,18 @@ pub struct BufferedMetricUpdate {
}

#[derive(Clone)]
pub struct BufferedMetricUpdateValue(metrics::MetricUpdateVal);
pub enum BufferedMetricUpdateValue {
U64(u64),
U128(u128),
F64(f64),
}

impl IntoPy<PyObject> for BufferedMetricUpdateValue {
fn into_py(self, py: Python) -> PyObject {
match self.0 {
metrics::MetricUpdateVal::Delta(v) => v.into_py(py),
metrics::MetricUpdateVal::DeltaF64(v) => v.into_py(py),
metrics::MetricUpdateVal::Value(v) => v.into_py(py),
metrics::MetricUpdateVal::ValueF64(v) => v.into_py(py),
metrics::MetricUpdateVal::Duration(v) => v.as_millis().into_py(py),
match self {
BufferedMetricUpdateValue::U64(v) => v.into_py(py),
BufferedMetricUpdateValue::U128(v) => v.into_py(py),
BufferedMetricUpdateValue::F64(v) => v.into_py(py),
}
}
}
Expand Down Expand Up @@ -236,16 +320,18 @@ impl CustomMetricAttributes for BufferedMetricAttributes {
pub fn convert_metric_events<'p>(
py: Python<'p>,
events: Vec<MetricEvent<BufferedMetricRef>>,
durations_as_seconds: bool,
) -> Vec<BufferedMetricUpdate> {
events
.into_iter()
.filter_map(|e| convert_metric_event(py, e))
.filter_map(|e| convert_metric_event(py, e, durations_as_seconds))
.collect()
}

fn convert_metric_event<'p>(
py: Python<'p>,
event: MetricEvent<BufferedMetricRef>,
durations_as_seconds: bool,
) -> Option<BufferedMetricUpdate> {
match event {
// Create the metric and put it on the lazy ref
Expand All @@ -262,9 +348,19 @@ fn convert_metric_event<'p>(
description: Some(params.description)
.filter(|s| !s.is_empty())
.map(|s| s.to_string()),
unit: Some(params.unit)
.filter(|s| !s.is_empty())
.map(|s| s.to_string()),
unit: if matches!(kind, metrics::MetricKind::HistogramDuration)
&& params.unit == "duration"
{
if durations_as_seconds {
Some("s".to_owned())
} else {
Some("ms".to_owned())
}
} else if params.unit.is_empty() {
None
} else {
Some(params.unit.to_string())
},
kind: match kind {
metrics::MetricKind::Counter => 0,
metrics::MetricKind::Gauge | metrics::MetricKind::GaugeF64 => 1,
Expand Down Expand Up @@ -324,7 +420,18 @@ fn convert_metric_event<'p>(
update,
} => Some(BufferedMetricUpdate {
metric: instrument.get().clone().0.clone(),
value: BufferedMetricUpdateValue(update),
value: match update {
metrics::MetricUpdateVal::Duration(v) if durations_as_seconds => {
BufferedMetricUpdateValue::F64(v.as_secs_f64())
}
metrics::MetricUpdateVal::Duration(v) => {
BufferedMetricUpdateValue::U128(v.as_millis())
}
metrics::MetricUpdateVal::Delta(v) => BufferedMetricUpdateValue::U64(v),
metrics::MetricUpdateVal::DeltaF64(v) => BufferedMetricUpdateValue::F64(v),
metrics::MetricUpdateVal::Value(v) => BufferedMetricUpdateValue::U64(v),
metrics::MetricUpdateVal::ValueF64(v) => BufferedMetricUpdateValue::F64(v),
},
attributes: attributes
.get()
.clone()
Expand Down
7 changes: 6 additions & 1 deletion temporalio/bridge/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,19 @@ impl Drop for Runtime {

#[pymethods]
impl RuntimeRef {
fn retrieve_buffered_metrics<'p>(&self, py: Python<'p>) -> Vec<BufferedMetricUpdate> {
fn retrieve_buffered_metrics<'p>(
&self,
py: Python<'p>,
durations_as_seconds: bool,
) -> Vec<BufferedMetricUpdate> {
convert_metric_events(
py,
self.runtime
.metrics_call_buffer
.as_ref()
.expect("Attempting to retrieve buffered metrics without buffer")
.retrieve(),
durations_as_seconds,
)
}

Expand Down
Loading

0 comments on commit 8366387

Please sign in to comment.