From 158a36d0f89a678796c8670f8f332a0de5837d56 Mon Sep 17 00:00:00 2001 From: saksham2105 Date: Fri, 5 Jul 2024 19:56:55 +0530 Subject: [PATCH] dynamic workflow example --- .../workers/DynamicSubworkflowWorker.java | 207 ++++++++++++++++++ .../io/orkes/samples/workers/HelloWorld.java | 187 +--------------- src/main/resources/application.properties | 7 +- 3 files changed, 212 insertions(+), 189 deletions(-) create mode 100644 src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java diff --git a/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java b/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java new file mode 100644 index 0000000..f8e2c70 --- /dev/null +++ b/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java @@ -0,0 +1,207 @@ +package io.orkes.samples.workers; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.client.worker.Worker; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskResult; +import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.sdk.workflow.def.ConductorWorkflow; +import com.netflix.conductor.sdk.workflow.def.tasks.*; +import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor; +import io.orkes.conductor.client.WorkflowClient; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Component +@Slf4j +@AllArgsConstructor +public class DynamicSubworkflowWorker implements Worker { + + private final WorkflowClient workflowClient; + private final ObjectMapper objectMapper = new ObjectMapper(); + private final WorkflowExecutor executor; + + @Override + public String getTaskDefName() { + return "dynamic_subworkflow_task"; + } + + /** + * This Worker will start 'dynamic_workflow' workflow and pass the subworkflow definitions using createDynamicSubworkflow() method + * @param task + * @return + */ + @Override + public TaskResult execute(Task task) { + System.out.println("Starting dynamic_subworkflow_task"); + TaskResult result = new TaskResult(task); + try { + result.setOutputData(startQuestWorkflow()); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + result.setStatus(TaskResult.Status.COMPLETED); + return result; + } + + public Map startQuestWorkflow() throws JsonProcessingException { + StartWorkflowRequest request = new StartWorkflowRequest(); + request.setName("dynamic_workflow"); + Map inputData = new HashMap<>(); + //inputData.put("enrichmentSubworkflowsDef", subworkflowDef()); + Object dynamicSubworkflowDef = objectMapper.convertValue(createDynamicSubworkflow(), Object.class); + inputData.put("dynamicSubworkflowDef", dynamicSubworkflowDef); + request.setInput(inputData); + + String workflowId = workflowClient.startWorkflow(request); + log.info("Workflow id: {}", workflowId); + return Map.of("workflowId", workflowId); + } + + private WorkflowDef createDynamicSubworkflow() { + var workflow = new ConductorWorkflow<>(executor); + workflow.setName("dynamic_subworkflows_series"); + workflow.setVersion(1); + workflow.setOwnerEmail("saksham.solanki@orkes.io"); + workflow.setDescription("test"); + workflow.setVariables(Map.of()); + workflow.setDefaultInput(Map.of()); + workflow.setTimeoutPolicy(WorkflowDef.TimeoutPolicy.ALERT_ONLY); + + // ---- Fork task def started + com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks = new com.netflix.conductor.sdk.workflow.def.tasks.Task[4][1]; + + //Below code is subworkflows in forked task + //ForkTask is having following structure [{}}, {}}] + //Enrichment level started + ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("imdb_enrichment_workflow"); + Http httptask = new Http("imdb_enrichment_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + SubWorkflow forkSubworkflow = new SubWorkflow("imdb_enrichment_subworkflow", conductorWorkflow); + forkSubworkflow.input("name","{workflow.input.name}"); + forkedTasks[0][0] = forkSubworkflow; + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("empi_enrichment_workflow"); + httptask = new Http("empi_enrichment_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("empi_enrichment_subworkflow", conductorWorkflow); + forkedTasks[1][0] = forkSubworkflow; + + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("mlcp_enrichment_workflow"); + httptask = new Http("mlcp_enrichment_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("mlcp_enrichment_workflow", conductorWorkflow); + forkedTasks[2][0] = forkSubworkflow; + + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("ohc_enrichment_workflow"); + httptask = new Http("ohc_enrichment_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("ohc_enrichment_subworkflow", conductorWorkflow); + forkedTasks[3][0] = forkSubworkflow; + + ForkJoin forkJoin = new ForkJoin("fork_enrichment", forkedTasks); + workflow.add(forkJoin); + // Enrichment level ended + + + // Translation Level Starts + com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks1 = new com.netflix.conductor.sdk.workflow.def.tasks.Task[2][1]; + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("Labos_Translation_workflow"); + httptask = new Http("IMDB_EMPI_Translations"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + httptask = new Http("LabOS_Translation"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + + forkSubworkflow = new SubWorkflow("LabOS_Translation_subworkflow", conductorWorkflow); + forkedTasks1[0][0] = forkSubworkflow; + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("BFE_Translation_workflow"); + httptask = new Http("IMDB_Enrichment"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + httptask = new Http("BFE_Translation"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("BFE_Translation_subworkflow", conductorWorkflow); + forkedTasks1[1][0] = forkSubworkflow; + + forkJoin = new ForkJoin("fork_translation", forkedTasks1); + workflow.add(forkJoin); + //Translation Level Ended + + //Distribution level starts + com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks2 = new com.netflix.conductor.sdk.workflow.def.tasks.Task[3][1]; + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("BFE_Distributions"); + httptask = new Http("bfe_distributions_subworkflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("BFE_Distributions_subworkflow", conductorWorkflow); + forkSubworkflow.input("name","{workflow.input.name}"); + forkedTasks2[0][0] = forkSubworkflow; + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("ELabs_Distributions"); + httptask = new Http("ELabs_Distributions_subworkflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("ELabs_Distributions_subworkflow", conductorWorkflow); + forkedTasks2[1][0] = forkSubworkflow; + + + conductorWorkflow = new ConductorWorkflow(executor); + conductorWorkflow.setName("LabOs_Distributions"); + httptask = new Http("LabOs_Distributions_subworkflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + + forkSubworkflow = new SubWorkflow("LabOs_Distributions_subworkflow", conductorWorkflow); + forkedTasks2[2][0] = forkSubworkflow; + + forkJoin = new ForkJoin("fork_distribution", forkedTasks2); + workflow.add(forkJoin); + + // Distribution level Ended + + WorkflowDef workflowDef = workflow.toWorkflowDef(); + workflowDef.setOutputParameters(Map.of()); + workflowDef.setTimeoutSeconds(0); + workflowDef.setInputTemplate(Map.of()); + workflowDef.setSchemaVersion(2); + workflowDef.setInputParameters(List.of()); + + return workflowDef; + } +} diff --git a/src/main/java/io/orkes/samples/workers/HelloWorld.java b/src/main/java/io/orkes/samples/workers/HelloWorld.java index a299181..0fb1387 100644 --- a/src/main/java/io/orkes/samples/workers/HelloWorld.java +++ b/src/main/java/io/orkes/samples/workers/HelloWorld.java @@ -1,204 +1,21 @@ package io.orkes.samples.workers; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.netflix.conductor.client.worker.Worker; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.TaskResult; -import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; -import com.netflix.conductor.common.metadata.workflow.WorkflowDef; -import com.netflix.conductor.sdk.workflow.def.ConductorWorkflow; -import com.netflix.conductor.sdk.workflow.def.tasks.*; -import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor; -import io.orkes.conductor.client.WorkflowClient; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - @Component -@Slf4j -@AllArgsConstructor public class HelloWorld implements Worker { - - private final WorkflowClient workflowClient; - private final ObjectMapper objectMapper = new ObjectMapper(); - private final WorkflowExecutor executor; - @Override public String getTaskDefName() { - return "quest_start_subworkflow"; + return "hello_world"; } @Override public TaskResult execute(Task task) { - - System.out.println("Starting quest_start_subworkflow"); TaskResult result = new TaskResult(task); - try { - result.setOutputData(startQuestWorkflow()); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + result.addOutputData("hw_response", "Hello World!"); result.setStatus(TaskResult.Status.COMPLETED); return result; } - - public Map startQuestWorkflow() throws JsonProcessingException { - StartWorkflowRequest request = new StartWorkflowRequest(); - request.setName("dynamic_workflow"); - Map inputData = new HashMap<>(); - //inputData.put("enrichmentSubworkflowsDef", subworkflowDef()); - Object dynamicSubworkflowDef = objectMapper.convertValue(createDynamicSubworkflow(), Object.class); - inputData.put("dynamicSubworkflowDef", dynamicSubworkflowDef); - request.setInput(inputData); - - String workflowId = workflowClient.startWorkflow(request); - log.info("Workflow id: {}", workflowId); - return Map.of("workflowId", workflowId); - } - - - private WorkflowDef createDynamicSubworkflow() { - var workflow = new ConductorWorkflow<>(executor); - workflow.setName("All_quest_subworkflows_series"); - workflow.setVersion(1); - workflow.setOwnerEmail("saksham.solanki@orkes.io"); - workflow.setDescription("test"); - workflow.setVariables(Map.of()); - workflow.setDefaultInput(Map.of()); - workflow.setTimeoutPolicy(WorkflowDef.TimeoutPolicy.ALERT_ONLY); - - // ---- Fork task def started - com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks = new com.netflix.conductor.sdk.workflow.def.tasks.Task[4][1]; - - //Below code is subworkflows in forked task - //ForkTask is having following structure [{}}, {}}] - //Enrichment level started - ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("imdb_enrichment_workflow"); - Http httptask = new Http("imdb_enrichment_workflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - SubWorkflow forkSubworkflow = new SubWorkflow("imdb_enrichment_subworkflow", conductorWorkflow); - forkSubworkflow.input("name","{workflow.input.name}"); - forkedTasks[0][0] = forkSubworkflow; - - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("empi_enrichment_workflow"); - httptask = new Http("empi_enrichment_workflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("empi_enrichment_subworkflow", conductorWorkflow); - forkedTasks[1][0] = forkSubworkflow; - - - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("mlcp_enrichment_workflow"); - httptask = new Http("mlcp_enrichment_workflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("mlcp_enrichment_workflow", conductorWorkflow); - forkedTasks[2][0] = forkSubworkflow; - - - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("ohc_enrichment_workflow"); - httptask = new Http("ohc_enrichment_workflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("ohc_enrichment_subworkflow", conductorWorkflow); - forkedTasks[3][0] = forkSubworkflow; - - ForkJoin forkJoin = new ForkJoin("fork_enrichment", forkedTasks); - workflow.add(forkJoin); - // Enrichment level ended - - - // Translation Level Starts - com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks1 = new com.netflix.conductor.sdk.workflow.def.tasks.Task[2][1]; - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("Labos_Translation_workflow"); - httptask = new Http("IMDB_EMPI_Translations"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - httptask = new Http("LabOS_Translation"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - - forkSubworkflow = new SubWorkflow("LabOS_Translation_subworkflow", conductorWorkflow); - forkedTasks1[0][0] = forkSubworkflow; - - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("BFE_Translation_workflow"); - httptask = new Http("IMDB_Enrichment"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - httptask = new Http("BFE_Translation"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("BFE_Translation_subworkflow", conductorWorkflow); - forkedTasks1[1][0] = forkSubworkflow; - - forkJoin = new ForkJoin("fork_translation", forkedTasks1); - workflow.add(forkJoin); - //Translation Level Ended - - //Distribution level starts - com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks2 = new com.netflix.conductor.sdk.workflow.def.tasks.Task[3][1]; - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("BFE_Distributions"); - httptask = new Http("bfe_distributions_subworkflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("BFE_Distributions_subworkflow", conductorWorkflow); - forkSubworkflow.input("name","{workflow.input.name}"); - forkedTasks2[0][0] = forkSubworkflow; - - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("ELabs_Distributions"); - httptask = new Http("ELabs_Distributions_subworkflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("ELabs_Distributions_subworkflow", conductorWorkflow); - forkedTasks2[1][0] = forkSubworkflow; - - - conductorWorkflow = new ConductorWorkflow(executor); - conductorWorkflow.setName("LabOs_Distributions"); - httptask = new Http("LabOs_Distributions_subworkflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - - forkSubworkflow = new SubWorkflow("LabOs_Distributions_subworkflow", conductorWorkflow); - forkedTasks2[2][0] = forkSubworkflow; - - forkJoin = new ForkJoin("fork_distribution", forkedTasks2); - workflow.add(forkJoin); - - // Distribution level Ended - - WorkflowDef workflowDef = workflow.toWorkflowDef(); - workflowDef.setOutputParameters(Map.of()); - workflowDef.setTimeoutSeconds(0); - workflowDef.setInputTemplate(Map.of()); - workflowDef.setSchemaVersion(2); - workflowDef.setInputParameters(List.of()); - - return workflowDef; - } - } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index cece382..c171895 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,5 +1,4 @@ -conductor.server.url=http://localhost:8080/api +conductor.server.url=https://play.orkes.io/api -conductor.security.client.key-id=96196359-6652-4a3b-9a08-8f2e3b779f0b -conductor.security.client.secret=gC98Y9KPYhLjZjN34Qe5ZbIFfanCtCH0IL8BIffWBMmjvaTy -server.port=7000 \ No newline at end of file +conductor.security.client.key-id=_CHANGE_ME_ +conductor.security.client.secret=_CHANGE_ME_ \ No newline at end of file