diff --git a/temporalio/bridge/Cargo.lock b/temporalio/bridge/Cargo.lock index ecef9902..dfc9387c 100644 --- a/temporalio/bridge/Cargo.lock +++ b/temporalio/bridge/Cargo.lock @@ -2527,11 +2527,9 @@ dependencies = [ "derive_more", "opentelemetry", "prost-types", - "serde", "serde_json", "temporal-sdk-core-protos", "thiserror", - "tokio", "tonic", "tracing-core", "url", diff --git a/temporalio/bridge/proto/workflow_activation/workflow_activation_pb2.pyi b/temporalio/bridge/proto/workflow_activation/workflow_activation_pb2.pyi index e7d2f47e..da781994 100644 --- a/temporalio/bridge/proto/workflow_activation/workflow_activation_pb2.pyi +++ b/temporalio/bridge/proto/workflow_activation/workflow_activation_pb2.pyi @@ -44,6 +44,7 @@ class WorkflowActivation(google.protobuf.message.Message): * Signal and update handlers should be invoked before workflow routines are iterated. That is to say before the users' main workflow function and anything spawned by it is allowed to continue. * Queries always go last (and, in fact, always come in their own activation) + * Evictions also always come in their own activation The downside of this reordering is that a signal or update handler may not observe that some other event had already happened (ex: an activity completed) when it is first invoked, though it @@ -55,11 +56,9 @@ class WorkflowActivation(google.protobuf.message.Message): ## Evictions - Activations that contain only a `remove_from_cache` job should not cause the workflow code - to be invoked and may be responded to with an empty command list. Eviction jobs may also - appear with other jobs, but will always appear last in the job list. In this case it is - expected that the workflow code will be invoked, and the response produced as normal, but - the caller should evict the run after doing so. + Evictions appear as an activations that contains only a `remove_from_cache` job. Such activations + should not cause the workflow code to be invoked and may be responded to with an empty command + list. """ DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -180,7 +179,9 @@ class WorkflowActivationJob(google.protobuf.message.Message): """Workflow was reset. The randomness seed must be updated.""" @property def query_workflow(self) -> global___QueryWorkflow: - """A request to query the workflow was received.""" + """A request to query the workflow was received. It is guaranteed that queries (one or more) + always come in their own activation after other mutating jobs. + """ @property def cancel_workflow(self) -> global___CancelWorkflow: """A request to cancel the workflow was received.""" @@ -221,11 +222,9 @@ class WorkflowActivationJob(google.protobuf.message.Message): """A request to handle a workflow update.""" @property def remove_from_cache(self) -> global___RemoveFromCache: - """Remove the workflow identified by the [WorkflowActivation] containing this job from the cache - after performing the activation. - - If other job variant are present in the list, this variant will be the last job in the - job list. The string value is a reason for eviction. + """Remove the workflow identified by the [WorkflowActivation] containing this job from the + cache after performing the activation. It is guaranteed that this will be the only job + in the activation if present. """ def __init__( self, diff --git a/temporalio/bridge/runtime.py b/temporalio/bridge/runtime.py index c61c76e0..43ea4655 100644 --- a/temporalio/bridge/runtime.py +++ b/temporalio/bridge/runtime.py @@ -68,6 +68,7 @@ class OpenTelemetryConfig: headers: Mapping[str, str] metric_periodicity_millis: Optional[int] metric_temporality_delta: bool + durations_as_seconds: bool @dataclass(frozen=True) @@ -77,6 +78,7 @@ class PrometheusConfig: bind_address: str counters_total_suffix: bool unit_suffix: bool + durations_as_seconds: bool @dataclass(frozen=True) diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index bcda5d9f..00b55079 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit bcda5d9f73b082ddf6816921e15db5e68a8ce47e +Subproject commit 00b550794d413274b829eed269312ce56c52abde diff --git a/temporalio/bridge/src/metric.rs b/temporalio/bridge/src/metric.rs index 0c16aeef..66650b17 100644 --- a/temporalio/bridge/src/metric.rs +++ b/temporalio/bridge/src/metric.rs @@ -186,11 +186,26 @@ pub struct BufferedMetricUpdate { #[pyo3(get)] pub metric: Py, #[pyo3(get)] - pub value: u64, + pub value: BufferedMetricUpdateValue, #[pyo3(get)] pub attributes: Py, } +#[derive(Clone)] +pub struct BufferedMetricUpdateValue(metrics::MetricUpdateVal); + +impl IntoPy for BufferedMetricUpdateValue { + fn into_py(self, py: Python) -> PyObject { + match self.0 { + metrics::MetricUpdateVal::Delta(v) => v.into_py(py), + metrics::MetricUpdateVal::DeltaF64(v) => v.into_py(py), + metrics::MetricUpdateVal::Value(v) => v.into_py(py), + metrics::MetricUpdateVal::ValueF64(v) => v.into_py(py), + metrics::MetricUpdateVal::Duration(v) => v.as_millis().into_py(py), + } + } +} + // WARNING: This must match temporalio.runtime.BufferedMetric protocol #[pyclass] pub struct BufferedMetric { @@ -252,8 +267,10 @@ fn convert_metric_event<'p>( .map(|s| s.to_string()), kind: match kind { metrics::MetricKind::Counter => 0, - metrics::MetricKind::Gauge => 1, - metrics::MetricKind::Histogram => 2, + metrics::MetricKind::Gauge | metrics::MetricKind::GaugeF64 => 1, + metrics::MetricKind::Histogram + | metrics::MetricKind::HistogramF64 + | metrics::MetricKind::HistogramDuration => 2, }, }, ) @@ -307,10 +324,7 @@ fn convert_metric_event<'p>( update, } => Some(BufferedMetricUpdate { metric: instrument.get().clone().0.clone(), - value: match update { - metrics::MetricUpdateVal::Delta(v) => v, - metrics::MetricUpdateVal::Value(v) => v, - }, + value: BufferedMetricUpdateValue(update), attributes: attributes .get() .clone() diff --git a/temporalio/bridge/src/runtime.rs b/temporalio/bridge/src/runtime.rs index 65b218d0..24e3d801 100644 --- a/temporalio/bridge/src/runtime.rs +++ b/temporalio/bridge/src/runtime.rs @@ -74,6 +74,7 @@ pub struct OpenTelemetryConfig { headers: HashMap, metric_periodicity_millis: Option, metric_temporality_delta: bool, + durations_as_seconds: bool, } #[derive(FromPyObject)] @@ -81,6 +82,7 @@ pub struct PrometheusConfig { bind_address: String, counters_total_suffix: bool, unit_suffix: bool, + durations_as_seconds: bool, } const FORWARD_LOG_BUFFER_SIZE: usize = 2048; @@ -305,7 +307,8 @@ impl TryFrom for Arc { PyValueError::new_err(format!("Invalid OTel URL: {}", err)) })?, ) - .headers(otel_conf.headers); + .headers(otel_conf.headers) + .use_seconds_for_durations(otel_conf.durations_as_seconds); if let Some(period) = otel_conf.metric_periodicity_millis { build.metric_periodicity(Duration::from_millis(period)); } @@ -331,7 +334,8 @@ impl TryFrom for Arc { })?, ) .counters_total_suffix(prom_conf.counters_total_suffix) - .unit_suffix(prom_conf.unit_suffix); + .unit_suffix(prom_conf.unit_suffix) + .use_seconds_for_durations(prom_conf.durations_as_seconds); if let Some(global_tags) = conf.global_tags { build.global_tags(global_tags); } diff --git a/temporalio/runtime.py b/temporalio/runtime.py index 675c1488..a363183a 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -249,6 +249,7 @@ class OpenTelemetryConfig: metric_temporality: OpenTelemetryMetricTemporality = ( OpenTelemetryMetricTemporality.CUMULATIVE ) + durations_as_seconds: bool = False def _to_bridge_config(self) -> temporalio.bridge.runtime.OpenTelemetryConfig: return temporalio.bridge.runtime.OpenTelemetryConfig( @@ -259,6 +260,7 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.OpenTelemetryConfig: else round(self.metric_periodicity.total_seconds() * 1000), metric_temporality_delta=self.metric_temporality == OpenTelemetryMetricTemporality.DELTA, + durations_as_seconds=self.durations_as_seconds, ) @@ -269,12 +271,14 @@ class PrometheusConfig: bind_address: str counters_total_suffix: bool = False unit_suffix: bool = False + durations_as_seconds: bool = False def _to_bridge_config(self) -> temporalio.bridge.runtime.PrometheusConfig: return temporalio.bridge.runtime.PrometheusConfig( bind_address=self.bind_address, counters_total_suffix=self.counters_total_suffix, unit_suffix=self.unit_suffix, + durations_as_seconds=self.durations_as_seconds, )