Skip to content

Commit

Permalink
Buffered metrics (#391)
Browse files Browse the repository at this point in the history
Fixes #369
  • Loading branch information
cretz authored Oct 5, 2023
1 parent d5edb71 commit d03f356
Show file tree
Hide file tree
Showing 10 changed files with 495 additions and 47 deletions.
28 changes: 5 additions & 23 deletions temporalio/bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions temporalio/bridge/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(
) -> None:
"""Initialize metric meter."""
self._ref = ref
self._default_attributes = MetricAttributes(ref.default_attributes)
self._default_attributes = MetricAttributes(self, ref.default_attributes)

@property
def default_attributes(self) -> MetricAttributes:
Expand Down Expand Up @@ -99,13 +99,19 @@ class MetricAttributes:
"""Metric attributes using SDK Core."""

def __init__(
self, ref: temporalio.bridge.temporal_sdk_bridge.MetricAttributesRef
self,
meter: MetricMeter,
ref: temporalio.bridge.temporal_sdk_bridge.MetricAttributesRef,
) -> None:
"""Initialize attributes."""
self._meter = meter
self._ref = ref

def with_additional_attributes(
self, new_attrs: Mapping[str, Union[str, int, float, bool]]
) -> MetricAttributes:
"""Create new attributes with new attributes appended."""
return MetricAttributes(self._ref.with_additional_attributes(new_attrs))
return MetricAttributes(
self._meter,
self._ref.with_additional_attributes(self._meter._ref, new_attrs),
)
7 changes: 6 additions & 1 deletion temporalio/bridge/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Mapping, Optional, Type
from typing import Any, Mapping, Optional, Sequence, Type

import temporalio.bridge.temporal_sdk_bridge

Expand All @@ -25,6 +25,10 @@ def __init__(self, *, telemetry: TelemetryConfig) -> None:
"""Create SDK Core runtime."""
self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(telemetry)

def retrieve_buffered_metrics(self) -> Sequence[Any]:
"""Get buffered metrics."""
return self._ref.retrieve_buffered_metrics()


@dataclass(frozen=True)
class LoggingConfig:
Expand All @@ -40,6 +44,7 @@ class MetricsConfig:

opentelemetry: Optional[OpenTelemetryConfig]
prometheus: Optional[PrometheusConfig]
buffered_with_size: int
attach_service_name: bool
global_tags: Optional[Mapping[str, str]]
metric_prefix: Optional[str]
Expand Down
2 changes: 2 additions & 0 deletions temporalio/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<metric::MetricCounterRef>()?;
m.add_class::<metric::MetricHistogramRef>()?;
m.add_class::<metric::MetricGaugeRef>()?;
m.add_class::<metric::BufferedMetricUpdate>()?;
m.add_class::<metric::BufferedMetric>()?;
m.add_function(wrap_pyfunction!(new_metric_meter, m)?)?;

// Runtime stuff
Expand Down
165 changes: 157 additions & 8 deletions temporalio/bridge/src/metric.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::any::Any;
use std::{collections::HashMap, sync::Arc};

use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use temporal_sdk_core_api::telemetry::metrics;
use pyo3::{exceptions::PyTypeError, types::PyDict};
use temporal_sdk_core_api::telemetry::metrics::{
self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent, NewAttributes,
};

use crate::runtime;

Expand Down Expand Up @@ -139,14 +142,17 @@ impl MetricAttributesRef {
fn with_additional_attributes<'p>(
&self,
py: Python<'p>,
meter: &MetricMeterRef,
new_attrs: HashMap<String, PyObject>,
) -> PyResult<Self> {
let mut attrs = self.attrs.clone();
attrs.add_new_attrs(
new_attrs
.into_iter()
.map(|(k, obj)| metric_key_value_from_py(py, k, obj))
.collect::<PyResult<Vec<metrics::MetricKeyValue>>>()?,
let attrs = meter.meter.inner.extend_attributes(
self.attrs.clone(),
NewAttributes {
attributes: new_attrs
.into_iter()
.map(|(k, obj)| metric_key_value_from_py(py, k, obj))
.collect::<PyResult<Vec<metrics::MetricKeyValue>>>()?,
},
);
Ok(MetricAttributesRef { attrs })
}
Expand All @@ -173,3 +179,146 @@ fn metric_key_value_from_py<'p>(
};
Ok(metrics::MetricKeyValue::new(k, val))
}

// WARNING: This must match temporalio.runtime.BufferedMetricUpdate protocol
#[pyclass]
pub struct BufferedMetricUpdate {
#[pyo3(get)]
pub metric: Py<BufferedMetric>,
#[pyo3(get)]
pub value: u64,
#[pyo3(get)]
pub attributes: Py<PyDict>,
}

// WARNING: This must match temporalio.runtime.BufferedMetric protocol
#[pyclass]
pub struct BufferedMetric {
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub description: Option<String>,
#[pyo3(get)]
pub unit: Option<String>,
#[pyo3(get)]
pub kind: u8, // 0 - counter, 1 - gauge, 2 - histogram
}

#[derive(Debug)]
struct BufferedMetricAttributes(Py<PyDict>);

#[derive(Clone, Debug)]
pub struct BufferedMetricRef(Py<BufferedMetric>);

impl BufferInstrumentRef for BufferedMetricRef {}

impl CustomMetricAttributes for BufferedMetricAttributes {
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self as Arc<dyn Any + Send + Sync>
}
}

pub fn convert_metric_events<'p>(
py: Python<'p>,
events: Vec<MetricEvent<BufferedMetricRef>>,
) -> Vec<BufferedMetricUpdate> {
events
.into_iter()
.filter_map(|e| convert_metric_event(py, e))
.collect()
}

fn convert_metric_event<'p>(
py: Python<'p>,
event: MetricEvent<BufferedMetricRef>,
) -> Option<BufferedMetricUpdate> {
match event {
// Create the metric and put it on the lazy ref
MetricEvent::Create {
params,
populate_into,
kind,
} => {
let buffered_ref = BufferedMetricRef(
Py::new(
py,
BufferedMetric {
name: params.name.to_string(),
description: Some(params.description)
.filter(|s| !s.is_empty())
.map(|s| s.to_string()),
unit: Some(params.unit)
.filter(|s| !s.is_empty())
.map(|s| s.to_string()),
kind: match kind {
metrics::MetricKind::Counter => 0,
metrics::MetricKind::Gauge => 1,
metrics::MetricKind::Histogram => 2,
},
},
)
.expect("Unable to create buffered metric"),
);
populate_into.set(Arc::new(buffered_ref)).unwrap();
None
}
// Create the attributes and put it on the lazy ref
MetricEvent::CreateAttributes {
populate_into,
append_from,
attributes,
} => {
// Create the dictionary (as copy from existing if needed)
let new_attrs_ref: Py<PyDict> = match append_from {
Some(existing) => existing
.get()
.clone()
.as_any()
.downcast::<BufferedMetricAttributes>()
.expect("Unable to downcast to expected buffered metric attributes")
.0
.as_ref(py)
.copy()
.expect("Failed to copy metric attribute dictionary")
.into(),
None => PyDict::new(py).into(),
};
// Add attributes
let new_attrs = new_attrs_ref.as_ref(py);
for kv in attributes.into_iter() {
match kv.value {
metrics::MetricValue::String(v) => new_attrs.set_item(kv.key, v),
metrics::MetricValue::Int(v) => new_attrs.set_item(kv.key, v),
metrics::MetricValue::Float(v) => new_attrs.set_item(kv.key, v),
metrics::MetricValue::Bool(v) => new_attrs.set_item(kv.key, v),
}
.expect("Unable to set metric key/value on dictionary");
}
// Put on lazy ref
populate_into
.set(Arc::new(BufferedMetricAttributes(new_attrs_ref)))
.expect("Unable to set buffered metric attributes on reference");
None
}
// Convert to Python metric event
MetricEvent::Update {
instrument,
attributes,
update,
} => Some(BufferedMetricUpdate {
metric: instrument.get().clone().0.clone(),
value: match update {
metrics::MetricUpdateVal::Delta(v) => v,
metrics::MetricUpdateVal::Value(v) => v,
},
attributes: attributes
.get()
.clone()
.as_any()
.downcast::<BufferedMetricAttributes>()
.expect("Unable to downcast to expected buffered metric attributes")
.0
.clone(),
}),
}
}
Loading

0 comments on commit d03f356

Please sign in to comment.