Skip to content

Commit

Permalink
Refactoring: shared utilities and cleanup (#313)
Browse files Browse the repository at this point in the history
* Refactor: add shared utility functions

* Refactor: single-file java features
  • Loading branch information
dandavison authored Jul 17, 2023
1 parent 0ec38b2 commit 8e1e6cb
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 106 deletions.
18 changes: 0 additions & 18 deletions features/child_workflow/result/ChildWorkflow.java

This file was deleted.

92 changes: 53 additions & 39 deletions features/child_workflow/result/feature.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,66 @@
import io.temporal.sdkfeatures.Feature;
import io.temporal.sdkfeatures.Run;
import io.temporal.sdkfeatures.Runner;
import io.temporal.worker.Worker;
import io.temporal.workflow.Async;
import io.temporal.workflow.Promise;
import io.temporal.workflow.Workflow;
import org.junit.jupiter.api.Assertions;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import io.temporal.workflow.Async;
import io.temporal.workflow.Promise;

import java.time.Duration;
import org.junit.jupiter.api.Assertions;

@WorkflowInterface
public interface feature extends Feature {

@WorkflowInterface
interface ChildWorkflow {
@WorkflowMethod
public String workflow();

class Impl implements feature, ChildWorkflow {
private static final String CHILDWORKFLOW_INPUT = "test";

@Override
public String executeChild(String input) {
return input;
}

@Override
public String workflow() {
ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class);
Promise<String> result = Async.function(child::executeChild, CHILDWORKFLOW_INPUT);
return result.get();
}

@Override
public Run execute(Runner runner) throws Exception {
var options = WorkflowOptions.newBuilder()
.setTaskQueue(runner.config.taskQueue)
.setWorkflowExecutionTimeout(Duration.ofMinutes(1))
.build();

var methods = runner.featureInfo.metadata.getWorkflowMethods();

var stub = runner.client.newWorkflowStub(feature.class, options);
return new Run(methods.get(0), WorkflowClient.start(stub::workflow));
}

@Override
public void checkResult(Runner runner, Run run) {
var resultStr = runner.waitForRunResult(run, String.class);
Assertions.assertEquals(CHILDWORKFLOW_INPUT, resultStr);
}
String execute(String input);

class Impl implements ChildWorkflow {
public String execute(String input) {
return input;
}
}
}

@WorkflowMethod
public String workflow();

class Impl implements feature {
private static final String CHILDWORKFLOW_INPUT = "test";

@Override
public void prepareWorker(Worker worker) {
worker.registerWorkflowImplementationTypes(ChildWorkflow.Impl.class);
}

@Override
public String workflow() {
ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class);
Promise<String> result = Async.function(child::execute, CHILDWORKFLOW_INPUT);
return result.get();
}

@Override
public Run execute(Runner runner) throws Exception {
var options =
WorkflowOptions.newBuilder()
.setTaskQueue(runner.config.taskQueue)
.setWorkflowExecutionTimeout(Duration.ofMinutes(1))
.build();

var methods = runner.featureInfo.metadata.getWorkflowMethods();

var stub = runner.client.newWorkflowStub(feature.class, options);
return new Run(methods.get(0), WorkflowClient.start(stub::workflow));
}

@Override
public void checkResult(Runner runner, Run run) {
var resultStr = runner.waitForRunResult(run, String.class);
Assertions.assertEquals(CHILDWORKFLOW_INPUT, resultStr);
}
}
}
7 changes: 1 addition & 6 deletions features/child_workflow/result/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,7 @@ async def run(self, input: str) -> str:


async def start(runner: Runner) -> WorkflowHandle:
return await runner.client.start_workflow(
Workflow,
id=f"{runner.feature.rel_dir}-{uuid4()}",
task_queue=runner.task_queue,
execution_timeout=timedelta(minutes=1),
)
return await runner.start_parameterless_workflow(Workflow)


register_feature(
Expand Down
45 changes: 17 additions & 28 deletions features/child_workflow/signal/feature.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package child_workflow.signal;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.sdkfeatures.Assertions;
import io.temporal.sdkfeatures.Feature;
import io.temporal.sdkfeatures.Run;
Expand All @@ -13,37 +11,36 @@
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.time.Duration;

@WorkflowInterface
public interface feature extends Feature {

@WorkflowInterface
public interface ChildWorkflow {
interface ChildWorkflow {

@WorkflowMethod
String workflow();

@SignalMethod
void unblock(String message);
}

class ChildWorkflowImpl implements ChildWorkflow {
/*
* A workflow that waits for a signal and returns the data received.
*/
class Impl implements ChildWorkflow {
/*
* A workflow that waits for a signal and returns the data received.
*/

private String childWorkflowUnblockMessage;
private String childWorkflowUnblockMessage;

@Override
public String workflow() {
Workflow.await(() -> childWorkflowUnblockMessage != null);
return childWorkflowUnblockMessage;
}
@Override
public String workflow() {
Workflow.await(() -> childWorkflowUnblockMessage != null);
return childWorkflowUnblockMessage;
}

@Override
public void unblock(String message) {
childWorkflowUnblockMessage = message;
@Override
public void unblock(String message) {
childWorkflowUnblockMessage = message;
}
}
}

Expand All @@ -54,7 +51,7 @@ class Impl implements feature {

@Override
public void prepareWorker(Worker worker) {
worker.registerWorkflowImplementationTypes(ChildWorkflowImpl.class);
worker.registerWorkflowImplementationTypes(ChildWorkflow.Impl.class);
}

private static final String UNBLOCK_MESSAGE = "unblock";
Expand All @@ -78,15 +75,7 @@ public String workflow() {

@Override
public Run execute(Runner runner) throws Exception {
var options =
WorkflowOptions.newBuilder()
.setTaskQueue(runner.config.taskQueue)
.setWorkflowExecutionTimeout(Duration.ofMinutes(1))
.build();
var stub = runner.client.newWorkflowStub(feature.class, options);
var execution = WorkflowClient.start(stub::workflow);
var method = runner.featureInfo.metadata.getWorkflowMethods().get(0);
return new Run(method, execution);
return runner.executeSingleParameterlessWorkflow();
}

@Override
Expand Down
7 changes: 1 addition & 6 deletions features/child_workflow/signal/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,7 @@ def unblock(self, message: Optional[str]) -> None:


async def start(runner: Runner) -> WorkflowHandle:
return await runner.client.start_workflow(
Workflow,
id=f"{runner.feature.rel_dir}-{uuid4()}",
task_queue=runner.task_queue,
execution_timeout=timedelta(minutes=1),
)
return await runner.start_parameterless_workflow(Workflow)


register_feature(
Expand Down
8 changes: 1 addition & 7 deletions features/child_workflow/signal/feature.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { randomUUID } from 'crypto';
import * as assert from 'assert';
import { Feature } from '@temporalio/harness';
import * as wf from '@temporalio/workflow';
Expand Down Expand Up @@ -28,12 +27,7 @@ export async function childWorkflow(): Promise<string> {
export const feature = new Feature({
workflow,
async execute(runner) {
return await runner.client.start(workflow, {
taskQueue: runner.options.taskQueue,
workflowId: `${runner.source.relDir}-${randomUUID()}`,
workflowExecutionTimeout: 60000,
...(runner.feature.options.workflowStartOptions ?? {}),
});
return await runner.executeParameterlessWorkflow(workflow);
},
async checkResult(runner, handle) {
const result = await handle.result();
Expand Down
5 changes: 4 additions & 1 deletion harness/python/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ async def run(self) -> None:
async def start_single_parameterless_workflow(self) -> WorkflowHandle:
if len(self.feature.workflows) != 1:
raise ValueError("Must have a single workflow")
defn = workflow._Definition.must_from_class(self.feature.workflows[0])
return await self.start_parameterless_workflow(self.feature.workflows[0])

async def start_parameterless_workflow(self, workflow_cls: Type) -> WorkflowHandle:
defn = workflow._Definition.must_from_class(workflow_cls)
start_options = {
"id": f"{self.feature.rel_dir}-{uuid.uuid4()}",
"task_queue": self.task_queue,
Expand Down
5 changes: 4 additions & 1 deletion harness/ts/harness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,10 @@ export class Runner<W extends Workflow, A extends UntypedActivities> {
}

async executeSingleParameterlessWorkflow(): Promise<WorkflowHandleWithFirstExecutionRunId> {
const workflow = this.feature.options.workflow ?? 'workflow';
return this.executeParameterlessWorkflow(this.feature.options.workflow ?? 'workflow');
}

async executeParameterlessWorkflow(workflow: W | 'workflow'): Promise<WorkflowHandleWithFirstExecutionRunId> {
const startOptions: WorkflowStartOptions = {
taskQueue: this.options.taskQueue,
workflowId: `${this.source.relDir}-${randomUUID()}`,
Expand Down

0 comments on commit 8e1e6cb

Please sign in to comment.