Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update core and add durations-as-seconds metric option #498

Merged
merged 2 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions temporalio/bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class WorkflowActivation(google.protobuf.message.Message):
* Signal and update handlers should be invoked before workflow routines are iterated. That is to
say before the users' main workflow function and anything spawned by it is allowed to continue.
* Queries always go last (and, in fact, always come in their own activation)
* Evictions also always come in their own activation

The downside of this reordering is that a signal or update handler may not observe that some
other event had already happened (ex: an activity completed) when it is first invoked, though it
Expand All @@ -55,11 +56,9 @@ class WorkflowActivation(google.protobuf.message.Message):

## Evictions

Activations that contain only a `remove_from_cache` job should not cause the workflow code
to be invoked and may be responded to with an empty command list. Eviction jobs may also
appear with other jobs, but will always appear last in the job list. In this case it is
expected that the workflow code will be invoked, and the response produced as normal, but
the caller should evict the run after doing so.
Evictions appear as an activations that contains only a `remove_from_cache` job. Such activations
should not cause the workflow code to be invoked and may be responded to with an empty command
list.
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor
Expand Down Expand Up @@ -180,7 +179,9 @@ class WorkflowActivationJob(google.protobuf.message.Message):
"""Workflow was reset. The randomness seed must be updated."""
@property
def query_workflow(self) -> global___QueryWorkflow:
"""A request to query the workflow was received."""
"""A request to query the workflow was received. It is guaranteed that queries (one or more)
always come in their own activation after other mutating jobs.
"""
@property
def cancel_workflow(self) -> global___CancelWorkflow:
"""A request to cancel the workflow was received."""
Expand Down Expand Up @@ -221,11 +222,9 @@ class WorkflowActivationJob(google.protobuf.message.Message):
"""A request to handle a workflow update."""
@property
def remove_from_cache(self) -> global___RemoveFromCache:
"""Remove the workflow identified by the [WorkflowActivation] containing this job from the cache
after performing the activation.

If other job variant are present in the list, this variant will be the last job in the
job list. The string value is a reason for eviction.
"""Remove the workflow identified by the [WorkflowActivation] containing this job from the
cache after performing the activation. It is guaranteed that this will be the only job
in the activation if present.
"""
def __init__(
self,
Expand Down
2 changes: 2 additions & 0 deletions temporalio/bridge/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class OpenTelemetryConfig:
headers: Mapping[str, str]
metric_periodicity_millis: Optional[int]
metric_temporality_delta: bool
durations_as_seconds: bool


@dataclass(frozen=True)
Expand All @@ -77,6 +78,7 @@ class PrometheusConfig:
bind_address: str
counters_total_suffix: bool
unit_suffix: bool
durations_as_seconds: bool


@dataclass(frozen=True)
Expand Down
2 changes: 1 addition & 1 deletion temporalio/bridge/sdk-core
Submodule sdk-core updated 44 files
+2 −4 .cargo/config.toml
+1 −1 .github/workflows/heavy.yml
+6 −4 .github/workflows/per-pr.yml
+4 −4 README.md
+9 −11 client/src/metrics.rs
+0 −2 core-api/Cargo.toml
+8 −0 core-api/src/errors.rs
+7 −1 core-api/src/telemetry.rs
+75 −4 core-api/src/telemetry/metrics.rs
+30 −11 core-api/src/worker.rs
+0 −11 core/Cargo.toml
+4 −13 core/src/abstractions.rs
+2 −3 core/src/core_tests/workers.rs
+0 −2 core/src/lib.rs
+0 −12 core/src/protosext/protocol_messages.rs
+185 −95 core/src/telemetry/metrics.rs
+2 −1 core/src/telemetry/mod.rs
+140 −62 core/src/telemetry/otel.rs
+21 −13 core/src/test_help/mod.rs
+13 −9 core/src/worker/activities/local_activities.rs
+1 −3 core/src/worker/mod.rs
+0 −29 core/src/worker/workflow/history_update.rs
+2 −2 core/src/worker/workflow/machines/timer_state_machine.rs
+0 −12 core/src/worker/workflow/machines/workflow_machines.rs
+81 −51 core/src/worker/workflow/managed_run.rs
+17 −114 core/src/worker/workflow/mod.rs
+5 −1 core/src/worker/workflow/wft_extraction.rs
+1 −50 core/src/worker/workflow/workflow_stream.rs
+0 −117 core/src/worker/workflow/workflow_stream/saved_wf_inputs.rs
+0 −24 core/src/worker/workflow/workflow_stream/tonic_status_serde.rs
+1 −1 fsm/rustfsm_procmacro/tests/trybuild/no_handle_conversions_require_into_fail.stderr
+0 −1 sdk-core-protos/Cargo.toml
+1 −10 sdk-core-protos/build.rs
+10 −12 sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto
+8 −32 sdk-core-protos/src/lib.rs
+10 −1 sdk-core-protos/src/task_token.rs
+4 −1 sdk/src/lib.rs
+28 −21 test-utils/src/lib.rs
+0 −50 test-utils/src/wf_input_saver.rs
+65 −8 tests/integ_tests/metrics_tests.rs
+100 −5 tests/integ_tests/workflow_tests.rs
+2 −6 tests/integ_tests/workflow_tests/eager.rs
+2 −9 tests/integ_tests/workflow_tests/signals.rs
+0 −32 tests/wf_input_replay.rs
28 changes: 21 additions & 7 deletions temporalio/bridge/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,26 @@ pub struct BufferedMetricUpdate {
#[pyo3(get)]
pub metric: Py<BufferedMetric>,
#[pyo3(get)]
pub value: u64,
pub value: BufferedMetricUpdateValue,
#[pyo3(get)]
pub attributes: Py<PyDict>,
}

#[derive(Clone)]
pub struct BufferedMetricUpdateValue(metrics::MetricUpdateVal);

impl IntoPy<PyObject> for BufferedMetricUpdateValue {
fn into_py(self, py: Python) -> PyObject {
match self.0 {
metrics::MetricUpdateVal::Delta(v) => v.into_py(py),
metrics::MetricUpdateVal::DeltaF64(v) => v.into_py(py),
metrics::MetricUpdateVal::Value(v) => v.into_py(py),
metrics::MetricUpdateVal::ValueF64(v) => v.into_py(py),
metrics::MetricUpdateVal::Duration(v) => v.as_millis().into_py(py),
}
}
}

// WARNING: This must match temporalio.runtime.BufferedMetric protocol
#[pyclass]
pub struct BufferedMetric {
Expand Down Expand Up @@ -252,8 +267,10 @@ fn convert_metric_event<'p>(
.map(|s| s.to_string()),
kind: match kind {
metrics::MetricKind::Counter => 0,
metrics::MetricKind::Gauge => 1,
metrics::MetricKind::Histogram => 2,
metrics::MetricKind::Gauge | metrics::MetricKind::GaugeF64 => 1,
metrics::MetricKind::Histogram
| metrics::MetricKind::HistogramF64
| metrics::MetricKind::HistogramDuration => 2,
},
},
)
Expand Down Expand Up @@ -307,10 +324,7 @@ fn convert_metric_event<'p>(
update,
} => Some(BufferedMetricUpdate {
metric: instrument.get().clone().0.clone(),
value: match update {
metrics::MetricUpdateVal::Delta(v) => v,
metrics::MetricUpdateVal::Value(v) => v,
},
value: BufferedMetricUpdateValue(update),
attributes: attributes
.get()
.clone()
Expand Down
8 changes: 6 additions & 2 deletions temporalio/bridge/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ pub struct OpenTelemetryConfig {
headers: HashMap<String, String>,
metric_periodicity_millis: Option<u64>,
metric_temporality_delta: bool,
durations_as_seconds: bool,
}

#[derive(FromPyObject)]
pub struct PrometheusConfig {
bind_address: String,
counters_total_suffix: bool,
unit_suffix: bool,
durations_as_seconds: bool,
}

const FORWARD_LOG_BUFFER_SIZE: usize = 2048;
Expand Down Expand Up @@ -305,7 +307,8 @@ impl TryFrom<MetricsConfig> for Arc<dyn CoreMeter> {
PyValueError::new_err(format!("Invalid OTel URL: {}", err))
})?,
)
.headers(otel_conf.headers);
.headers(otel_conf.headers)
.use_seconds_for_durations(otel_conf.durations_as_seconds);
if let Some(period) = otel_conf.metric_periodicity_millis {
build.metric_periodicity(Duration::from_millis(period));
}
Expand All @@ -331,7 +334,8 @@ impl TryFrom<MetricsConfig> for Arc<dyn CoreMeter> {
})?,
)
.counters_total_suffix(prom_conf.counters_total_suffix)
.unit_suffix(prom_conf.unit_suffix);
.unit_suffix(prom_conf.unit_suffix)
.use_seconds_for_durations(prom_conf.durations_as_seconds);
if let Some(global_tags) = conf.global_tags {
build.global_tags(global_tags);
}
Expand Down
4 changes: 4 additions & 0 deletions temporalio/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ class OpenTelemetryConfig:
metric_temporality: OpenTelemetryMetricTemporality = (
OpenTelemetryMetricTemporality.CUMULATIVE
)
durations_as_seconds: bool = False

def _to_bridge_config(self) -> temporalio.bridge.runtime.OpenTelemetryConfig:
return temporalio.bridge.runtime.OpenTelemetryConfig(
Expand All @@ -259,6 +260,7 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.OpenTelemetryConfig:
else round(self.metric_periodicity.total_seconds() * 1000),
metric_temporality_delta=self.metric_temporality
== OpenTelemetryMetricTemporality.DELTA,
durations_as_seconds=self.durations_as_seconds,
)


Expand All @@ -269,12 +271,14 @@ class PrometheusConfig:
bind_address: str
counters_total_suffix: bool = False
unit_suffix: bool = False
durations_as_seconds: bool = False

def _to_bridge_config(self) -> temporalio.bridge.runtime.PrometheusConfig:
return temporalio.bridge.runtime.PrometheusConfig(
bind_address=self.bind_address,
counters_total_suffix=self.counters_total_suffix,
unit_suffix=self.unit_suffix,
durations_as_seconds=self.durations_as_seconds,
)


Expand Down
Loading