Skip to content

Commit

Permalink
Update core/tonic, update metric options, remove core tracing (#380)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Aug 31, 2023
1 parent 40daaaa commit aa829d3
Show file tree
Hide file tree
Showing 9 changed files with 472 additions and 433 deletions.
664 changes: 329 additions & 335 deletions temporalio/bridge/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion temporalio/bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" }
temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" }
tokio = "1.26"
tokio-stream = "0.1"
tonic = "0.8"
tonic = "0.9"
tracing = "0.1"
url = "2.2"

Expand Down
16 changes: 6 additions & 10 deletions temporalio/bridge/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ def __init__(self, *, telemetry: TelemetryConfig) -> None:
self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(telemetry)


@dataclass(frozen=True)
class TracingConfig:
"""Python representation of the Rust struct for tracing config."""

filter: str
opentelemetry: OpenTelemetryConfig


@dataclass(frozen=True)
class LoggingConfig:
"""Python representation of the Rust struct for logging config."""
Expand All @@ -48,6 +40,9 @@ class MetricsConfig:

opentelemetry: Optional[OpenTelemetryConfig]
prometheus: Optional[PrometheusConfig]
attach_service_name: bool
global_tags: Optional[Mapping[str, str]]
metric_prefix: Optional[str]


@dataclass(frozen=True)
Expand All @@ -57,20 +52,21 @@ class OpenTelemetryConfig:
url: str
headers: Mapping[str, str]
metric_periodicity_millis: Optional[int]
metric_temporality_delta: bool


@dataclass(frozen=True)
class PrometheusConfig:
"""Python representation of the Rust struct for Prometheus config."""

bind_address: str
counters_total_suffix: bool
unit_suffix: bool


@dataclass(frozen=True)
class TelemetryConfig:
"""Python representation of the Rust struct for telemetry config."""

tracing: Optional[TracingConfig]
logging: Optional[LoggingConfig]
metrics: Optional[MetricsConfig]
global_tags: Mapping[str, str]
5 changes: 4 additions & 1 deletion temporalio/bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ pub fn connect_client<'a>(
runtime_ref.runtime.future_into_py(py, async move {
Ok(ClientRef {
retry_client: opts
.connect_no_namespace(runtime.core.metric_meter().as_deref(), headers)
.connect_no_namespace(
runtime.core.telemetry().get_temporal_metric_meter(),
headers,
)
.await
.map_err(|err| {
PyRuntimeError::new_err(format!("Failed client connect: {}", err))
Expand Down
158 changes: 100 additions & 58 deletions temporalio/bridge/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use temporal_sdk_core::telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter};
use temporal_sdk_core::CoreRuntime;
use temporal_sdk_core_api::telemetry::metrics::CoreMeter;
use temporal_sdk_core_api::telemetry::{
Logger, MetricsExporter, OtelCollectorOptions, TelemetryOptions, TelemetryOptionsBuilder,
TraceExportConfig, TraceExporter,
Logger, MetricTemporality, OtelCollectorOptionsBuilder, PrometheusExporterOptionsBuilder,
TelemetryOptions, TelemetryOptionsBuilder,
};
use url::Url;

Expand All @@ -27,16 +29,8 @@ pub(crate) struct Runtime {

#[derive(FromPyObject)]
pub struct TelemetryConfig {
tracing: Option<TracingConfig>,
logging: Option<LoggingConfig>,
metrics: Option<MetricsConfig>,
global_tags: Option<HashMap<String, String>>
}

#[derive(FromPyObject)]
pub struct TracingConfig {
filter: String,
opentelemetry: OpenTelemetryConfig,
}

#[derive(FromPyObject)]
Expand All @@ -49,32 +43,44 @@ pub struct LoggingConfig {
pub struct MetricsConfig {
opentelemetry: Option<OpenTelemetryConfig>,
prometheus: Option<PrometheusConfig>,
attach_service_name: bool,
global_tags: Option<HashMap<String, String>>,
metric_prefix: Option<String>,
}

#[derive(FromPyObject)]
pub struct OpenTelemetryConfig {
url: String,
headers: HashMap<String, String>,
metric_periodicity_millis: Option<u64>,
metric_temporality_delta: bool,
}

#[derive(FromPyObject)]
pub struct PrometheusConfig {
bind_address: String,
counters_total_suffix: bool,
unit_suffix: bool,
}

pub fn init_runtime(telemetry_config: TelemetryConfig) -> PyResult<RuntimeRef> {
let mut core = CoreRuntime::new(
// We don't move telemetry config here because we need it for
// late-binding metrics
(&telemetry_config).try_into()?,
tokio::runtime::Builder::new_multi_thread(),
)
.map_err(|err| PyRuntimeError::new_err(format!("Failed initializing telemetry: {}", err)))?;
// We late-bind the metrics after core runtime is created since it needs
// the Tokio handle
if let Some(metrics_conf) = telemetry_config.metrics {
let _guard = core.tokio_handle().enter();
core.telemetry_mut()
.attach_late_init_metrics(metrics_conf.try_into()?);
}
Ok(RuntimeRef {
runtime: Runtime {
core: Arc::new(
CoreRuntime::new(
telemetry_config.try_into()?,
tokio::runtime::Builder::new_multi_thread(),
)
.map_err(|err| {
PyRuntimeError::new_err(format!("Failed initializing telemetry: {}", err))
})?,
),
core: Arc::new(core),
},
})
}
Expand All @@ -94,61 +100,97 @@ impl Runtime {
}
}

impl TryFrom<TelemetryConfig> for TelemetryOptions {
impl TryFrom<&TelemetryConfig> for TelemetryOptions {
type Error = PyErr;

fn try_from(conf: TelemetryConfig) -> PyResult<Self> {
fn try_from(conf: &TelemetryConfig) -> PyResult<Self> {
let mut build = TelemetryOptionsBuilder::default();
if let Some(v) = conf.tracing {
build.tracing(TraceExportConfig {
filter: v.filter,
exporter: TraceExporter::Otel(v.opentelemetry.try_into()?),
});
}
if let Some(v) = conf.logging {
build.logging(if v.forward {
Logger::Forward { filter: v.filter }
} else {
Logger::Console { filter: v.filter }
});
}
if let Some(v) = conf.metrics {
build.metrics(if let Some(t) = v.opentelemetry {
if v.prometheus.is_some() {
return Err(PyValueError::new_err(
"Cannot have OpenTelemetry and Prometheus metrics",
));
if let Some(logging_conf) = &conf.logging {
build.logging(if logging_conf.forward {
Logger::Forward {
filter: logging_conf.filter.to_string(),
}
MetricsExporter::Otel(t.try_into()?)
} else if let Some(t) = v.prometheus {
MetricsExporter::Prometheus(SocketAddr::from_str(&t.bind_address).map_err(
|err| PyValueError::new_err(format!("Invalid Prometheus address: {}", err)),
)?)
} else {
return Err(PyValueError::new_err(
"Either OpenTelemetry or Prometheus config must be provided",
));
Logger::Console {
filter: logging_conf.filter.to_string(),
}
});
}
if let Some(v) = conf.global_tags {
build.global_tags(v);
if let Some(metrics_conf) = &conf.metrics {
// Note, actual metrics instance is late-bound in init_runtime
build.attach_service_name(metrics_conf.attach_service_name);
if let Some(prefix) = &metrics_conf.metric_prefix {
build.metric_prefix(prefix.to_string());
}
}
build
.build()
.map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {}", err)))
}
}

impl TryFrom<OpenTelemetryConfig> for OtelCollectorOptions {
impl TryFrom<MetricsConfig> for Arc<dyn CoreMeter> {
type Error = PyErr;

fn try_from(conf: OpenTelemetryConfig) -> PyResult<Self> {
Ok(OtelCollectorOptions {
url: Url::parse(&conf.url)
.map_err(|err| PyValueError::new_err(format!("Invalid OTel URL: {}", err)))?,
headers: conf.headers,
metric_periodicity: conf.metric_periodicity_millis.map(Duration::from_millis),
})
fn try_from(conf: MetricsConfig) -> PyResult<Self> {
if let Some(otel_conf) = conf.opentelemetry {
if !conf.prometheus.is_none() {
return Err(PyValueError::new_err(
"Cannot have OpenTelemetry and Prometheus metrics",
));
}

// Build OTel exporter
let mut build = OtelCollectorOptionsBuilder::default();
build
.url(
Url::parse(&otel_conf.url).map_err(|err| {
PyValueError::new_err(format!("Invalid OTel URL: {}", err))
})?,
)
.headers(otel_conf.headers);
if let Some(period) = otel_conf.metric_periodicity_millis {
build.metric_periodicity(Duration::from_millis(period));
}
if otel_conf.metric_temporality_delta {
build.metric_temporality(MetricTemporality::Delta);
}
if let Some(global_tags) = conf.global_tags {
build.global_tags(global_tags);
}
let otel_options = build
.build()
.map_err(|err| PyValueError::new_err(format!("Invalid OTel config: {}", err)))?;
Ok(Arc::new(build_otlp_metric_exporter(otel_options).map_err(
|err| PyValueError::new_err(format!("Failed building OTel exporter: {}", err)),
)?))
} else if let Some(prom_conf) = conf.prometheus {
// Start prom exporter
let mut build = PrometheusExporterOptionsBuilder::default();
build
.socket_addr(
SocketAddr::from_str(&prom_conf.bind_address).map_err(|err| {
PyValueError::new_err(format!("Invalid Prometheus address: {}", err))
})?,
)
.counters_total_suffix(prom_conf.counters_total_suffix)
.unit_suffix(prom_conf.unit_suffix);
if let Some(global_tags) = conf.global_tags {
build.global_tags(global_tags);
}
let prom_options = build.build().map_err(|err| {
PyValueError::new_err(format!("Invalid Prometheus config: {}", err))
})?;
Ok(start_prometheus_metric_exporter(prom_options)
.map_err(|err| {
PyValueError::new_err(format!("Failed starting Prometheus exporter: {}", err))
})?
.meter)
} else {
Err(PyValueError::new_err(
"Either OpenTelemetry or Prometheus config must be provided",
))
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion temporalio/bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct WorkerConfig {
macro_rules! enter_sync {
($runtime:expr) => {
temporal_sdk_core::telemetry::set_trace_subscriber_for_current_thread(
$runtime.core.trace_subscriber(),
$runtime.core.telemetry().trace_subscriber(),
);
let _guard = $runtime.core.tokio_handle().enter();
};
Expand Down
Loading

0 comments on commit aa829d3

Please sign in to comment.