Skip to content

Commit

Permalink
Worker client replacement (#517)
Browse files Browse the repository at this point in the history
Fixes #513
  • Loading branch information
cretz authored May 3, 2024
1 parent 0bb94f8 commit 0687151
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 34 deletions.
2 changes: 1 addition & 1 deletion temporalio/bridge/sdk-core
Submodule sdk-core updated 83 files
+3 −0 Cargo.toml
+0 −2 README.md
+3 −0 client/Cargo.toml
+3 −1 client/src/lib.rs
+1 −1 client/src/metrics.rs
+4 −4 client/src/raw.rs
+5 −3 client/src/worker_registry/mod.rs
+3 −1 client/src/workflow_handle/mod.rs
+3 −0 core-api/Cargo.toml
+4 −0 core/Cargo.toml
+11 −11 core/src/abstractions.rs
+3 −3 core/src/abstractions/take_cell.rs
+2 −1 core/src/core_tests/determinism.rs
+1 −1 core/src/core_tests/local_activities.rs
+1 −1 core/src/core_tests/workflow_tasks.rs
+4 −120 core/src/ephemeral_server/mod.rs
+8 −8 core/src/internal_flags.rs
+12 −8 core/src/lib.rs
+2 −2 core/src/pollers/mod.rs
+6 −4 core/src/pollers/poll_buffer.rs
+31 −31 core/src/protosext/mod.rs
+14 −12 core/src/protosext/protocol_messages.rs
+32 −1 core/src/telemetry/metrics.rs
+3 −2 core/src/telemetry/otel.rs
+4 −4 core/src/telemetry/prometheus_server.rs
+57 −44 core/src/test_help/mod.rs
+14 −14 core/src/worker/activities.rs
+6 −6 core/src/worker/activities/activity_heartbeat_manager.rs
+17 −16 core/src/worker/activities/local_activities.rs
+40 −38 core/src/worker/client.rs
+5 −3 core/src/worker/client/mocks.rs
+21 −9 core/src/worker/mod.rs
+3 −3 core/src/worker/slot_provider.rs
+6 −6 core/src/worker/workflow/driven_workflow.rs
+21 −18 core/src/worker/workflow/history_update.rs
+2 −15 core/src/worker/workflow/machines/activity_state_machine.rs
+1 −10 core/src/worker/workflow/machines/cancel_external_state_machine.rs
+2 −9 core/src/worker/workflow/machines/cancel_workflow_state_machine.rs
+3 −17 core/src/worker/workflow/machines/child_workflow_state_machine.rs
+0 −8 core/src/worker/workflow/machines/complete_workflow_state_machine.rs
+1 −5 core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs
+0 −5 core/src/worker/workflow/machines/fail_workflow_state_machine.rs
+0 −5 core/src/worker/workflow/machines/local_activity_state_machine.rs
+0 −14 core/src/worker/workflow/machines/mod.rs
+0 −5 core/src/worker/workflow/machines/modify_workflow_properties_state_machine.rs
+0 −5 core/src/worker/workflow/machines/patch_state_machine.rs
+1 −10 core/src/worker/workflow/machines/signal_external_state_machine.rs
+1 −8 core/src/worker/workflow/machines/timer_state_machine.rs
+6 −1 core/src/worker/workflow/machines/transition_coverage.rs
+0 −10 core/src/worker/workflow/machines/update_state_machine.rs
+6 −13 core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs
+21 −19 core/src/worker/workflow/machines/workflow_machines.rs
+1 −12 core/src/worker/workflow/machines/workflow_task_state_machine.rs
+4 −4 core/src/worker/workflow/managed_run.rs
+38 −38 core/src/worker/workflow/mod.rs
+22 −13 core/src/worker/workflow/run_cache.rs
+2 −2 core/src/worker/workflow/workflow_stream.rs
+3 −0 fsm/Cargo.toml
+3 −0 sdk-core-protos/Cargo.toml
+1 −1 sdk-core-protos/src/history_builder.rs
+16 −8 sdk-core-protos/src/lib.rs
+1 −1 sdk-core-protos/src/task_token.rs
+3 −0 sdk/Cargo.toml
+3 −3 sdk/src/app_data.rs
+0 −1 sdk/src/lib.rs
+0 −11 sdk/src/payload_converter.rs
+10 −9 sdk/src/workflow_context.rs
+1 −1 sdk/src/workflow_context/options.rs
+1 −1 sdk/src/workflow_future.rs
+3 −0 test-utils/Cargo.toml
+2 −2 test-utils/src/lib.rs
+1 −1 tests/integ_tests/activity_functions.rs
+8 −25 tests/integ_tests/ephemeral_server_tests.rs
+4 −3 tests/integ_tests/metrics_tests.rs
+120 −1 tests/integ_tests/polling_tests.rs
+2 −1 tests/integ_tests/workflow_tests.rs
+1 −1 tests/integ_tests/workflow_tests/activities.rs
+1 −1 tests/integ_tests/workflow_tests/appdata_propagation.rs
+2 −1 tests/integ_tests/workflow_tests/determinism.rs
+1 −1 tests/integ_tests/workflow_tests/eager.rs
+4 −4 tests/integ_tests/workflow_tests/local_activities.rs
+5 −3 tests/integ_tests/workflow_tests/patches.rs
+1 −1 tests/integ_tests/workflow_tests/timers.rs
7 changes: 7 additions & 0 deletions temporalio/bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ impl WorkerRef {
Ok(())
}

fn replace_client(&self, client: &client::ClientRef) {
self.worker
.as_ref()
.expect("missing worker")
.replace_client(client.retry_client.clone().into_inner());
}

fn initiate_shutdown(&self) -> PyResult<()> {
let worker = self.worker.as_ref().unwrap().clone();
worker.initiate_shutdown();
Expand Down
4 changes: 4 additions & 0 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ def request_workflow_eviction(self, run_id: str) -> None:
"""Request a workflow be evicted."""
self._ref.request_workflow_eviction(run_id)

def replace_client(self, client: temporalio.bridge.client.Client) -> None:
"""Replace the worker client."""
self._ref.replace_client(client._ref)

def initiate_shutdown(self) -> None:
"""Start shutdown of the worker."""
self._ref.initiate_shutdown()
Expand Down
96 changes: 64 additions & 32 deletions temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,23 +221,8 @@ def __init__(
)
interceptors = interceptors_from_client + list(interceptors)

# Extract the bridge service client. We try the service on the client
# first, then we support a worker_service_client on the client's service
# to return underlying service client we can use.
bridge_client: temporalio.service._BridgeServiceClient
if isinstance(client.service_client, temporalio.service._BridgeServiceClient):
bridge_client = client.service_client
elif hasattr(client.service_client, "worker_service_client"):
bridge_client = client.service_client.worker_service_client
if not isinstance(bridge_client, temporalio.service._BridgeServiceClient):
raise TypeError(
"Client's worker_service_client cannot be used for a worker"
)
else:
raise TypeError(
"Client cannot be used for a worker. "
+ "Use the original client's service or set worker_service_client on the wrapped service with the original service client."
)
# Extract the bridge service client
bridge_client = _extract_bridge_client_for_worker(client)

# Store the config for tracking
self._config = WorkerConfig(
Expand Down Expand Up @@ -283,7 +268,9 @@ def __init__(

# Create activity and workflow worker
self._activity_worker: Optional[_ActivityWorker] = None
runtime = bridge_client.config.runtime or temporalio.runtime.Runtime.default()
self._runtime = (
bridge_client.config.runtime or temporalio.runtime.Runtime.default()
)
if activities:
# Issue warning here if executor max_workers is lower than max
# concurrent activities. We do this here instead of in
Expand All @@ -304,7 +291,7 @@ def __init__(
shared_state_manager=shared_state_manager,
data_converter=client_config["data_converter"],
interceptors=interceptors,
metric_meter=runtime.metric_meter,
metric_meter=self._runtime.metric_meter,
)
self._workflow_worker: Optional[_WorkflowWorker] = None
if workflows:
Expand All @@ -321,30 +308,20 @@ def __init__(
workflow_failure_exception_types=workflow_failure_exception_types,
debug_mode=debug_mode,
disable_eager_activity_execution=disable_eager_activity_execution,
metric_meter=runtime.metric_meter,
metric_meter=self._runtime.metric_meter,
on_eviction_hook=None,
disable_safe_eviction=disable_safe_workflow_eviction,
)

# We need an already connected client
# TODO(cretz): How to connect to client inside constructor here? In the
# meantime, we disallow lazy clients from being used for workers. We
# could check whether the connected client is present which means
# lazy-but-already-connected clients would work, but that is confusing
# to users that the client only works if they already made a call on it.
if bridge_client.config.lazy:
raise RuntimeError("Lazy clients cannot be used for workers")
raw_bridge_client = bridge_client._bridge_client
assert raw_bridge_client

# Create bridge worker last. We have empirically observed that if it is
# created before an error is raised from the activity worker
# constructor, a deadlock/hang will occur presumably while trying to
# free it.
# TODO(cretz): Why does this cause a test hang when an exception is
# thrown after it?
assert bridge_client._bridge_client
self._bridge_worker = temporalio.bridge.worker.Worker.create(
raw_bridge_client,
bridge_client._bridge_client,
temporalio.bridge.worker.WorkerConfig(
namespace=client.namespace,
task_queue=task_queue,
Expand Down Expand Up @@ -403,6 +380,29 @@ def task_queue(self) -> str:
"""Task queue this worker is on."""
return self._config["task_queue"]

@property
def client(self) -> temporalio.client.Client:
"""Client currently set on the worker."""
return self._config["client"]

@client.setter
def client(self, value: temporalio.client.Client) -> None:
"""Update the client associated with the worker.
Changing the client will make sure the worker starts using it for the
next calls it makes. However, outstanding client calls will still
complete with the existing client. The new client cannot be "lazy" and
must be using the same runtime as the current client.
"""
bridge_client = _extract_bridge_client_for_worker(value)
if self._runtime is not bridge_client.config.runtime:
raise ValueError(
"New client is not on the same runtime as the existing client"
)
assert bridge_client._bridge_client
self._bridge_worker.replace_client(bridge_client._bridge_client)
self._config["client"] = value

@property
def is_running(self) -> bool:
"""Whether the worker is running.
Expand Down Expand Up @@ -714,5 +714,37 @@ def _get_module_code(mod_name: str) -> Optional[bytes]:
return None


def _extract_bridge_client_for_worker(
client: temporalio.client.Client,
) -> temporalio.service._BridgeServiceClient:
# Extract the bridge service client. We try the service on the client first,
# then we support a worker_service_client on the client's service to return
# underlying service client we can use.
bridge_client: temporalio.service._BridgeServiceClient
if isinstance(client.service_client, temporalio.service._BridgeServiceClient):
bridge_client = client.service_client
elif hasattr(client.service_client, "worker_service_client"):
bridge_client = client.service_client.worker_service_client
if not isinstance(bridge_client, temporalio.service._BridgeServiceClient):
raise TypeError(
"Client's worker_service_client cannot be used for a worker"
)
else:
raise TypeError(
"Client cannot be used for a worker. "
+ "Use the original client's service or set worker_service_client on the wrapped service with the original service client."
)

# We need an already connected client
# TODO(cretz): How to connect to client inside Worker constructor here? In
# the meantime, we disallow lazy clients from being used for workers. We
# could check whether the connected client is present which means
# lazy-but-already-connected clients would work, but that is confusing
# to users that the client only works if they already made a call on it.
if bridge_client.config.lazy:
raise RuntimeError("Lazy clients cannot be used for workers")
return bridge_client


class _ShutdownRequested(RuntimeError):
pass
62 changes: 61 additions & 1 deletion tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4506,8 +4506,68 @@ async def test_workflow_fail_on_bad_input(client: Client):
await client.execute_workflow(
"FailOnBadInputWorkflow",
123,
id=f"wf-{uuid}",
id=f"wf-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
assert isinstance(err.value.cause, ApplicationError)
assert "Failed decoding arguments" in err.value.cause.message


@workflow.defn
class TickingWorkflow:
@workflow.run
async def run(self) -> None:
# Just tick every 100ms for 10s
for _ in range(100):
await asyncio.sleep(0.1)


async def test_workflow_replace_worker_client(client: Client, env: WorkflowEnvironment):
if env.supports_time_skipping:
pytest.skip("Only testing against two real servers")
# We are going to start a second ephemeral server and then replace the
# client. So we will start a no-cache ticking workflow with the current
# client and confirm it has accomplished at least one task. Then we will
# start another on the other client, and confirm it gets started too. Then
# we will terminate both. We have to use a ticking workflow with only one
# poller to force a quick re-poll to recognize our client change quickly (as
# opposed to just waiting the minute for poll timeout).
async with await WorkflowEnvironment.start_local() as other_env:
# Start both workflows on different servers
task_queue = f"tq-{uuid.uuid4()}"
handle1 = await client.start_workflow(
TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue
)
handle2 = await other_env.client.start_workflow(
TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue
)

async def any_task_completed(handle: WorkflowHandle) -> bool:
async for e in handle.fetch_history_events():
if e.HasField("workflow_task_completed_event_attributes"):
return True
return False

# Now start the worker on the first env
async with Worker(
client,
task_queue=task_queue,
workflows=[TickingWorkflow],
max_cached_workflows=0,
max_concurrent_workflow_task_polls=1,
) as worker:
# Confirm the first ticking workflow has completed a task but not
# the second
await assert_eq_eventually(True, lambda: any_task_completed(handle1))
assert not await any_task_completed(handle2)

# Now replace the client, which should be used fairly quickly
# because we should have timer-done poll completions every 100ms
worker.client = other_env.client

# Now confirm the other workflow has started
await assert_eq_eventually(True, lambda: any_task_completed(handle2))

# Terminate both
await handle1.terminate()
await handle2.terminate()

0 comments on commit 0687151

Please sign in to comment.