diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index 9d23a64062f..c7aebca4e9e 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -109,21 +109,53 @@ catch (TaskFailedException) // Task failures are surfaced as TaskFailedException ```java -public static void main(String[] args) throws InterruptedException { - DaprWorkflowClient client = new DaprWorkflowClient(); +public class ChainWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + StringBuilder sb = new StringBuilder(); + String wfInput = ctx.getInput(String.class); + String result1 = ctx.callActivity("Step1", wfInput, String.class).await(); + String result2 = ctx.callActivity("Step2", result1, String.class).await(); + String result3 = ctx.callActivity("Step3", result2, String.class).await(); + String result = sb.append(result1).append(',').append(result2).append(',').append(result3).toString(); + ctx.complete(result); + }; + } +} - try (client) { - client.raiseEvent(instanceId, "TestEvent", "TestEventPayload"); + class Step1 implements WorkflowActivity { - System.out.println(separatorStr); - System.out.println("** Registering parallel Events to be captured by allOf(t1,t2,t3) **"); - client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload"); - client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload"); - client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload"); - System.out.printf("Events raised for workflow with instanceId: %s\n", instanceId); + @Override + public Object run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(Step1.class); + logger.info("Starting Activity: " + ctx.getName()); + // Do some work + return null; + } + } - } -} + class Step2 implements WorkflowActivity { + + @Override + public Object run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(Step2.class); + logger.info("Starting Activity: " + ctx.getName()); + // Do some work + return null; + } + } + + class Step3 implements WorkflowActivity { + + @Override + public Object run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(Step3.class); + logger.info("Starting Activity: " + ctx.getName()); + // Do some work + return null; + } + } ``` {{% /codetab %}} @@ -225,46 +257,23 @@ await context.CallActivityAsync("PostResults", sum); ```java -public static void main(String[] args) throws InterruptedException { - DaprWorkflowClient client = new DaprWorkflowClient(); - - try (client) { - - System.out.println(separatorStr); - System.out.println("**SendExternalMessage**"); - client.raiseEvent(instanceId, "TestEvent", "TestEventPayload"); - - // Get events to process in parallel - System.out.println(separatorStr); - System.out.println("** Registering parallel Events to be captured by allOf(t1,t2,t3) **"); - client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload"); - client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload"); - client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload"); - System.out.printf("Events raised for workflow with instanceId: %s\n", instanceId); - - // Register the raised events to be captured - System.out.println(separatorStr); - System.out.println("** Registering Event to be captured by anyOf(t1,t2,t3) **"); - client.raiseEvent(instanceId, "e2", "event 2 Payload"); - System.out.printf("Event raised for workflow with instanceId: %s\n", instanceId); - - // Wait for all tasks to complete and aggregate results - System.out.println(separatorStr); - System.out.println("**WaitForInstanceCompletion**"); - try { - WorkflowInstanceStatus waitForInstanceCompletionResult = - client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true); - System.out.printf("Result: %s%n", waitForInstanceCompletionResult); - } catch (TimeoutException ex) { - System.out.printf("waitForInstanceCompletion has an exception:%s%n", ex); +public class FaninoutWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + // Get a list of N work items to process in parallel. + Object[] workBatch = ctx.callActivity("GetWorkBatch", Object[].class).await(); + // Schedule the parallel tasks, but don't wait for them to complete yet. + List> tasks = Arrays.stream(workBatch) + .map(workItem -> ctx.callActivity("ProcessWorkItem", workItem, int.class)) + .collect(Collectors.toList()); + // Everything is scheduled. Wait here until all parallel tasks have completed. + List results = ctx.allOf(tasks).await(); + // Aggregate all N outputs and publish the result. + int sum = results.stream().mapToInt(Integer::intValue).sum(); + ctx.complete(sum); + }; } - - System.out.println(separatorStr); - System.out.println("**purgeInstance**"); - boolean purgeResult = client.purgeInstance(instanceId); - System.out.printf("purgeResult: %s%n", purgeResult); - - } } ``` @@ -640,42 +649,34 @@ public override async Task RunAsync(WorkflowContext context, OrderP ```java -public static void main(String[] args) throws InterruptedException { - DaprWorkflowClient client = new DaprWorkflowClient(); - - try (client) { - String eventInstanceId = client.scheduleNewWorkflow(DemoWorkflow.class); - System.out.printf("Started new workflow instance with random ID: %s%n", eventInstanceId); - client.raiseEvent(eventInstanceId, "TestException", null); - System.out.printf("Event raised for workflow with instanceId: %s\n", eventInstanceId); - - System.out.println(separatorStr); - String instanceToTerminateId = "terminateMe"; - client.scheduleNewWorkflow(DemoWorkflow.class, null, instanceToTerminateId); - System.out.printf("Started new workflow instance with specified ID: %s%n", instanceToTerminateId); - - TimeUnit.SECONDS.sleep(5); - System.out.println("Terminate this workflow instance manually before the timeout is reached"); - client.terminateWorkflow(instanceToTerminateId, null); - System.out.println(separatorStr); - - String restartingInstanceId = "restarting"; - client.scheduleNewWorkflow(DemoWorkflow.class, null, restartingInstanceId); - System.out.printf("Started new workflow instance with ID: %s%n", restartingInstanceId); - System.out.println("Sleeping 30 seconds to restart the workflow"); - TimeUnit.SECONDS.sleep(30); - - System.out.println("**SendExternalMessage: RestartEvent**"); - client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload"); - - System.out.println("Sleeping 30 seconds to terminate the eternal workflow"); - TimeUnit.SECONDS.sleep(30); - client.terminateWorkflow(restartingInstanceId, null); - } - - System.out.println("Exiting DemoWorkflowClient."); - System.exit(0); +public class ExternalSystemInteractionWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + // ...other steps... + Integer orderCost = ctx.getInput(int.class); + // Require orders over a certain threshold to be approved + if (orderCost > ORDER_APPROVAL_THRESHOLD) { + try { + // Request human approval for this order + ctx.callActivity("RequestApprovalActivity", orderCost, Void.class).await(); + // Pause and wait for a human to approve the order + boolean approved = ctx.waitForExternalEvent("ManagerApproval", Duration.ofDays(3), boolean.class).await(); + if (!approved) { + // The order was rejected, end the workflow here + ctx.complete("Process reject"); + } + } catch (TaskCanceledException e) { + // An approval timeout results in automatic order cancellation + ctx.complete("Process cancel"); + } + } + // ...other steps... + // End the workflow with a success result + ctx.complete("Process approved"); + }; + } } ```