Skip to content

Commit

Permalink
Expose resource based auto-tuner options (#559)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Jun 26, 2024
1 parent 2c1ac54 commit 7ac4445
Show file tree
Hide file tree
Showing 10 changed files with 785 additions and 170 deletions.
332 changes: 209 additions & 123 deletions temporalio/bridge/Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions temporalio/bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ name = "temporal_sdk_bridge"
crate-type = ["cdylib"]

[dependencies]
anyhow = "1.0"
futures = "0.3"
log = "0.4"
once_cell = "1.16"
prost = "0.12"
prost-types = "0.12"
pyo3 = { version = "0.19", features = ["extension-module", "abi3-py38"] }
pyo3-asyncio = { version = "0.19", features = ["tokio-runtime"] }
pythonize = "0.19"
pyo3 = { version = "0.20", features = ["extension-module", "abi3-py38", "anyhow"] }
pyo3-asyncio = { version = "0.20", features = ["tokio-runtime"] }
pythonize = "0.20"
temporal-client = { version = "0.1.0", path = "./sdk-core/client" }
temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = ["ephemeral-server"] }
temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" }
Expand Down
1 change: 0 additions & 1 deletion temporalio/bridge/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use futures::channel::mpsc::Receiver;
use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::AsPyPointer;
use pythonize::pythonize;
use std::collections::HashMap;
use std::future::Future;
Expand Down
149 changes: 131 additions & 18 deletions temporalio/bridge/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Context;
use prost::Message;
use pyo3::exceptions::{PyException, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
Expand Down Expand Up @@ -34,9 +35,7 @@ pub struct WorkerConfig {
build_id: String,
identity_override: Option<String>,
max_cached_workflows: usize,
max_outstanding_workflow_tasks: usize,
max_outstanding_activities: usize,
max_outstanding_local_activities: usize,
tuner: TunerHolder,
max_concurrent_workflow_task_polls: usize,
nonsticky_to_sticky_poll_ratio: f32,
max_concurrent_activity_task_polls: usize,
Expand All @@ -52,6 +51,39 @@ pub struct WorkerConfig {
nondeterminism_as_workflow_fail_for_types: HashSet<String>,
}

#[derive(FromPyObject)]
pub struct TunerHolder {
workflow_slot_supplier: SlotSupplier,
activity_slot_supplier: SlotSupplier,
local_activity_slot_supplier: SlotSupplier,
}

#[derive(FromPyObject)]
pub enum SlotSupplier {
FixedSize(FixedSizeSlotSupplier),
ResourceBased(ResourceBasedSlotSupplier),
}

#[derive(FromPyObject)]
pub struct FixedSizeSlotSupplier {
num_slots: usize,
}

#[derive(FromPyObject)]
pub struct ResourceBasedSlotSupplier {
minimum_slots: usize,
maximum_slots: usize,
// Need pyo3 0.21+ for this to be std Duration
ramp_throttle_ms: u64,
tuner_config: ResourceBasedTunerConfig,
}

#[derive(FromPyObject, Clone, Copy, PartialEq)]
pub struct ResourceBasedTunerConfig {
target_memory_usage: f64,
target_cpu_usage: f64,
}

macro_rules! enter_sync {
($runtime:expr) => {
if let Some(subscriber) = $runtime.core.telemetry().trace_subscriber() {
Expand All @@ -73,7 +105,7 @@ pub fn new_worker(
config,
client.retry_client.clone().into_inner(),
)
.map_err(|err| PyValueError::new_err(format!("Failed creating worker: {}", err)))?;
.context("Failed creating worker")?;
Ok(WorkerRef {
worker: Some(Arc::new(worker)),
runtime: runtime_ref.runtime.clone(),
Expand Down Expand Up @@ -107,9 +139,11 @@ impl WorkerRef {
fn validate<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
let worker = self.worker.as_ref().unwrap().clone();
self.runtime.future_into_py(py, async move {
worker.validate().await.map_err(|err| {
PyRuntimeError::new_err(format!("Worker validation failed: {}", err))
})
worker
.validate()
.await
.context("Worker validation failed")
.map_err(Into::into)
})
}

Expand Down Expand Up @@ -151,10 +185,8 @@ impl WorkerRef {
worker
.complete_workflow_activation(completion)
.await
.map_err(|err| {
// TODO(cretz): More error types
PyRuntimeError::new_err(format!("Completion failure: {}", err))
})
.context("Completion failure")
.map_err(Into::into)
})
}

Expand All @@ -166,10 +198,8 @@ impl WorkerRef {
worker
.complete_activity_task(completion)
.await
.map_err(|err| {
// TODO(cretz): More error types
PyRuntimeError::new_err(format!("Completion failure: {}", err))
})
.context("Completion failure")
.map_err(Into::into)
})
}

Expand Down Expand Up @@ -226,16 +256,15 @@ impl TryFrom<WorkerConfig> for temporal_sdk_core::WorkerConfig {
type Error = PyErr;

fn try_from(conf: WorkerConfig) -> PyResult<Self> {
let converted_tuner: temporal_sdk_core::TunerHolder = conf.tuner.try_into()?;
temporal_sdk_core::WorkerConfigBuilder::default()
.namespace(conf.namespace)
.task_queue(conf.task_queue)
.worker_build_id(conf.build_id)
.client_identity_override(conf.identity_override)
.max_cached_workflows(conf.max_cached_workflows)
.max_outstanding_workflow_tasks(conf.max_outstanding_workflow_tasks)
.max_outstanding_activities(conf.max_outstanding_activities)
.max_outstanding_local_activities(conf.max_outstanding_local_activities)
.max_concurrent_wft_polls(conf.max_concurrent_workflow_task_polls)
.tuner(Arc::new(converted_tuner))
.nonsticky_to_sticky_poll_ratio(conf.nonsticky_to_sticky_poll_ratio)
.max_concurrent_at_polls(conf.max_concurrent_activity_task_polls)
.no_remote_activities(conf.no_remote_activities)
Expand Down Expand Up @@ -276,6 +305,90 @@ impl TryFrom<WorkerConfig> for temporal_sdk_core::WorkerConfig {
}
}

impl TryFrom<TunerHolder> for temporal_sdk_core::TunerHolder {
type Error = PyErr;

fn try_from(holder: TunerHolder) -> PyResult<Self> {
// Verify all resource-based options are the same if any are set
let maybe_wf_resource_opts =
if let SlotSupplier::ResourceBased(ref ss) = holder.workflow_slot_supplier {
Some(&ss.tuner_config)
} else {
None
};
let maybe_act_resource_opts =
if let SlotSupplier::ResourceBased(ref ss) = holder.activity_slot_supplier {
Some(&ss.tuner_config)
} else {
None
};
let maybe_local_act_resource_opts =
if let SlotSupplier::ResourceBased(ref ss) = holder.local_activity_slot_supplier {
Some(&ss.tuner_config)
} else {
None
};
let all_resource_opts = [
maybe_wf_resource_opts,
maybe_act_resource_opts,
maybe_local_act_resource_opts,
];
let mut set_resource_opts = all_resource_opts.iter().flatten();
let first = set_resource_opts.next();
let all_are_same = if let Some(first) = first {
set_resource_opts.all(|elem| elem == first)
} else {
true
};
if !all_are_same {
return Err(PyValueError::new_err(
"All resource-based slot suppliers must have the same ResourceBasedTunerOptions",
));
}

let mut options = temporal_sdk_core::TunerHolderOptionsBuilder::default();
if let Some(first) = first {
options.resource_based_options(
temporal_sdk_core::ResourceBasedSlotsOptionsBuilder::default()
.target_mem_usage(first.target_memory_usage)
.target_cpu_usage(first.target_cpu_usage)
.build()
.expect("Building ResourceBasedSlotsOptions is infallible"),
);
};
options
.workflow_slot_options(holder.workflow_slot_supplier.try_into()?)
.activity_slot_options(holder.activity_slot_supplier.try_into()?)
.local_activity_slot_options(holder.local_activity_slot_supplier.try_into()?);
Ok(options
.build()
.map_err(|e| PyValueError::new_err(format!("Invalid tuner holder options: {}", e)))?
.build_tuner_holder()
.context("Failed building tuner holder")?)
}
}

impl TryFrom<SlotSupplier> for temporal_sdk_core::SlotSupplierOptions {
type Error = PyErr;

fn try_from(supplier: SlotSupplier) -> PyResult<temporal_sdk_core::SlotSupplierOptions> {
Ok(match supplier {
SlotSupplier::FixedSize(fs) => temporal_sdk_core::SlotSupplierOptions::FixedSize {
slots: fs.num_slots,
},
SlotSupplier::ResourceBased(ss) => {
temporal_sdk_core::SlotSupplierOptions::ResourceBased(
temporal_sdk_core::ResourceSlotOptions::new(
ss.minimum_slots,
ss.maximum_slots,
Duration::from_millis(ss.ramp_throttle_ms),
),
)
}
})
}
}

/// For feeding histories into core during replay
#[pyclass]
pub struct HistoryPusher {
Expand Down
43 changes: 40 additions & 3 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations

from dataclasses import dataclass
from datetime import timedelta
from typing import (
TYPE_CHECKING,
Awaitable,
Expand All @@ -15,6 +16,7 @@
Sequence,
Set,
Tuple,
Union,
)

import google.protobuf.internal.containers
Expand Down Expand Up @@ -43,9 +45,7 @@ class WorkerConfig:
build_id: str
identity_override: Optional[str]
max_cached_workflows: int
max_outstanding_workflow_tasks: int
max_outstanding_activities: int
max_outstanding_local_activities: int
tuner: TunerHolder
max_concurrent_workflow_task_polls: int
nonsticky_to_sticky_poll_ratio: float
max_concurrent_activity_task_polls: int
Expand All @@ -61,6 +61,43 @@ class WorkerConfig:
nondeterminism_as_workflow_fail_for_types: Set[str]


@dataclass
class ResourceBasedTunerConfig:
"""Python representation of the Rust struct for configuring a resource-based tuner."""

target_memory_usage: float
target_cpu_usage: float


@dataclass
class ResourceBasedSlotSupplier:
"""Python representation of the Rust struct for a resource-based slot supplier."""

minimum_slots: int
maximum_slots: int
ramp_throttle_ms: int
tuner_config: ResourceBasedTunerConfig


@dataclass(frozen=True)
class FixedSizeSlotSupplier:
"""Python representation of the Rust struct for a fixed-size slot supplier."""

num_slots: int


SlotSupplier: TypeAlias = Union[FixedSizeSlotSupplier, ResourceBasedSlotSupplier]


@dataclass
class TunerHolder:
"""Python representation of the Rust struct for a tuner holder."""

workflow_slot_supplier: SlotSupplier
activity_slot_supplier: SlotSupplier
local_activity_slot_supplier: SlotSupplier


class Worker:
"""SDK Core worker."""

Expand Down
13 changes: 13 additions & 0 deletions temporalio/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
WorkflowReplayResult,
WorkflowReplayResults,
)
from ._tuning import (
FixedSizeSlotSupplier,
ResourceBasedSlotConfig,
ResourceBasedSlotSupplier,
ResourceBasedTunerConfig,
WorkerTuner,
)
from ._worker import Worker, WorkerConfig
from ._workflow_instance import (
UnsandboxedWorkflowRunner,
Expand Down Expand Up @@ -69,4 +76,10 @@
"WorkflowInstance",
"WorkflowInstanceDetails",
"UnsandboxedWorkflowRunner",
# Tuning types
"WorkerTuner",
"FixedSizeSlotSupplier",
"ResourceBasedSlotSupplier",
"ResourceBasedTunerConfig",
"ResourceBasedSlotConfig",
]
14 changes: 11 additions & 3 deletions temporalio/worker/_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,17 @@ def on_eviction_hook(
nondeterminism_as_workflow_fail_for_types=workflow_worker.nondeterminism_as_workflow_fail_for_types(),
# All values below are ignored but required by Core
max_cached_workflows=2,
max_outstanding_workflow_tasks=2,
max_outstanding_activities=1,
max_outstanding_local_activities=1,
tuner=temporalio.bridge.worker.TunerHolder(
workflow_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
2
),
activity_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
1
),
local_activity_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
1
),
),
max_concurrent_workflow_task_polls=1,
nonsticky_to_sticky_poll_ratio=1,
max_concurrent_activity_task_polls=1,
Expand Down
Loading

0 comments on commit 7ac4445

Please sign in to comment.