Skip to content

Commit

Permalink
Creating dynamic workflow using json mediation rules
Browse files Browse the repository at this point in the history
  • Loading branch information
saksham2105 committed Jul 9, 2024
1 parent 84c614a commit 02fdd13
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 119 deletions.
16 changes: 16 additions & 0 deletions src/main/java/io/orkes/samples/models/Distribution.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.orkes.samples.models;

import lombok.Data;

import java.util.Map;

@Data
public class Distribution {
private String translation;
private String distributeTo;
private String sendToORB = "Y";
private String taskType = "HTTP";
private String natsWorkflowName;
private Integer natsWorkflowVersion;
private Map<String, Object> natsWorkflowInput;
}
15 changes: 15 additions & 0 deletions src/main/java/io/orkes/samples/models/Enrichment.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.orkes.samples.models;

import lombok.Data;

import java.util.Map;

@Data
public class Enrichment {
private String enrichmentType;
private String sendToORB = "N";
private String taskType = "HTTP";
private String natsWorkflowName;
private Integer natsWorkflowVersion;
private Map<String, Object> natsWorkflowInput;
}
12 changes: 12 additions & 0 deletions src/main/java/io/orkes/samples/models/MediationRules.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.orkes.samples.models;

import lombok.Data;

import java.util.List;

@Data
public class MediationRules {
private List<Enrichment> enrichments;
private List<Translation> translations;
private List<Distribution> distributions;
}
18 changes: 18 additions & 0 deletions src/main/java/io/orkes/samples/models/Translation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.orkes.samples.models;

import lombok.Data;

import java.util.List;
import java.util.Map;

@Data
public class Translation {
private String name;
private List<String> enrichments;
private String sendToORB = "N";
private String taskType = "HTTP";
private String natsWorkflowName;
private Integer natsWorkflowVersion;
private Map<String, Object> natsWorkflowInput;

}
202 changes: 83 additions & 119 deletions src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.netflix.conductor.sdk.workflow.def.tasks.*;
import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor;
import io.orkes.conductor.client.WorkflowClient;
import io.orkes.samples.models.MediationRules;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
Expand All @@ -31,7 +32,7 @@ public class DynamicSubworkflowWorker implements Worker {

@Override
public String getTaskDefName() {
return "dynamic_subworkflow_task";
return "quest_start_subworkflow";
}

/**
Expand All @@ -41,32 +42,34 @@ public String getTaskDefName() {
*/
@Override
public TaskResult execute(Task task) {
System.out.println("Starting dynamic_subworkflow_task");
System.out.println("Starting quest_start_subworkflow task");
TaskResult result = new TaskResult(task);
try {
result.setOutputData(startExistingWorkflow());
MediationRules mediationRules = objectMapper.convertValue(task.getInputData().get("mediation_rules"), MediationRules.class);
result.setOutputData(startExistingWorkflow(mediationRules));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
result.setStatus(TaskResult.Status.COMPLETED);
return result;
}

public Map<String, Object> startExistingWorkflow() throws JsonProcessingException {
public Map<String, Object> startExistingWorkflow(MediationRules mediationRules) throws JsonProcessingException {
StartWorkflowRequest request = new StartWorkflowRequest();
request.setName("dynamic_workflow");
Map<String, Object> inputData = new HashMap<>();
//inputData.put("enrichmentSubworkflowsDef", subworkflowDef());
Object dynamicSubworkflowDef = objectMapper.convertValue(createDynamicSubworkflow(), Object.class);
Object dynamicSubworkflowDef = objectMapper.convertValue(createDynamicSubworkflow(mediationRules), Object.class);
inputData.put("dynamicSubworkflowDef", dynamicSubworkflowDef);
inputData.put("mediation_rules", objectMapper.convertValue(mediationRules, Object.class));
request.setInput(inputData);

String workflowId = workflowClient.startWorkflow(request);
log.info("Workflow id: {}", workflowId);
return Map.of("workflowId", workflowId);
}

private WorkflowDef createDynamicSubworkflow() {
private WorkflowDef createDynamicSubworkflow(MediationRules mediationRules) throws JsonProcessingException {
var workflow = new ConductorWorkflow<>(executor);
workflow.setName("dynamic_subworkflows_series");
workflow.setVersion(1);
Expand All @@ -76,124 +79,85 @@ private WorkflowDef createDynamicSubworkflow() {
workflow.setDefaultInput(Map.of());
workflow.setTimeoutPolicy(WorkflowDef.TimeoutPolicy.ALERT_ONLY);

// ---- Fork task def started
com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks = new com.netflix.conductor.sdk.workflow.def.tasks.Task[4][1];

//Below code is subworkflows in forked task
//ForkTask is having following structure [{}}, {}}]
//Enrichment level started
ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor);
conductorWorkflow.setName("imdb_enrichment_workflow");
Http httptask = new Http("imdb_enrichment_workflow_task");
httptask.url("https://orkes-api-tester.orkesconductor.com/api");
conductorWorkflow.add(httptask);
//ForkTask is having following structure [{}, {}]

SubWorkflow forkSubworkflow = new SubWorkflow("imdb_enrichment_subworkflow", conductorWorkflow);
forkSubworkflow.input("name","{workflow.input.name}");
forkedTasks[0][0] = forkSubworkflow;
// --------------- Enrichment level started ------------------
com.netflix.conductor.sdk.workflow.def.tasks.Task[][] enrichmentForkTasks = new com.netflix.conductor.sdk.workflow.def.tasks.Task[mediationRules.getEnrichments().size()][1];
for (int i = 0; i < mediationRules.getEnrichments().size(); i++) {
ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor);
conductorWorkflow.setName(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_workflow");

conductorWorkflow = new ConductorWorkflow(executor);
conductorWorkflow.setName("empi_enrichment_workflow");
httptask = new Http("empi_enrichment_workflow_task");
httptask.url("https://orkes-api-tester.orkesconductor.com/api");
conductorWorkflow.add(httptask);
Http httptask = new Http(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_enrichment_workflow_task");
httptask.url("https://orkes-api-tester.orkesconductor.com/api");
conductorWorkflow.add(httptask);

forkSubworkflow = new SubWorkflow("empi_enrichment_subworkflow", conductorWorkflow);
forkedTasks[1][0] = forkSubworkflow;
SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getEnrichments().get(i).getEnrichmentType() + "_subworkflow_ref", mediationRules.getEnrichments().get(i).getNatsWorkflowName(), mediationRules.getEnrichments().get(i).getNatsWorkflowVersion());
natsSubworkflow.input(mediationRules.getEnrichments().get(i).getNatsWorkflowInput());
Switch sendToORBEnrichmentSwitch = new Switch("send_to_" + mediationRules.getEnrichments().get(i).getEnrichmentType() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of());
conductorWorkflow.add(sendToORBEnrichmentSwitch);

SubWorkflow forkSubworkflow = new SubWorkflow(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_subworkflow_ref", conductorWorkflow);
forkSubworkflow.input("sendToORB", mediationRules.getEnrichments().get(i).getSendToORB());
enrichmentForkTasks[i][0] = forkSubworkflow;
}
ForkJoin forkEnrichment = new ForkJoin("fork_enrichment", enrichmentForkTasks);
workflow.add(forkEnrichment);
// ------------- Enrichment level ended ----------------


// -------------- Translation Level started ----------------
com.netflix.conductor.sdk.workflow.def.tasks.Task[][] translationForkTasks = new com.netflix.conductor.sdk.workflow.def.tasks.Task[mediationRules.getTranslations().size()][1];
for (int i = 0; i < mediationRules.getTranslations().size(); i++) {
ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor);
SubWorkflow forkSubworkflow = new SubWorkflow(mediationRules.getTranslations().get(i).getName() + "_subworkflow_ref", conductorWorkflow);
forkSubworkflow.input("sendToORB", mediationRules.getTranslations().get(i).getSendToORB());
conductorWorkflow.setName(mediationRules.getTranslations().get(i).getName() + "_workflow");
for (int j = 0; j < mediationRules.getTranslations().get(i).getEnrichments().size(); j++) {
Http httptask = new Http(mediationRules.getTranslations().get(i).getEnrichments().get(j)+ "_translations_workflow_task");
httptask.url("https://orkes-api-tester.orkesconductor.com/api");
String taskRef = mediationRules.getTranslations().get(i).getEnrichments().get(j) + "_subworkflow_ref";
String outputExpression = "${" + taskRef + ".output.response}"; //Can differ with different different tasks. Example with Simple/Inline tasks we might have to use result
forkSubworkflow.input(mediationRules.getTranslations().get(i).getEnrichments().get(j), outputExpression);
conductorWorkflow.add(httptask);
}
SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getTranslations().get(i).getName() + "_subworkflow_ref", mediationRules.getTranslations().get(i).getNatsWorkflowName(), mediationRules.getTranslations().get(i).getNatsWorkflowVersion());
natsSubworkflow.input(mediationRules.getTranslations().get(i).getNatsWorkflowInput());
Switch sendToORBTranslationSwitch = new Switch("send_to_" + mediationRules.getTranslations().get(i).getName() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of());
conductorWorkflow.add(sendToORBTranslationSwitch);

translationForkTasks[i][0] = forkSubworkflow;
}
ForkJoin forkTranslation = new ForkJoin("fork_translation", translationForkTasks);
workflow.add(forkTranslation);
// ------------ Translation Level Ended -------------------


// -------------- Distribution level started --------------------
com.netflix.conductor.sdk.workflow.def.tasks.Task[][] distributionForkTasks = new com.netflix.conductor.sdk.workflow.def.tasks.Task[mediationRules.getDistributions().size()][1];
for (int i = 0; i < mediationRules.getDistributions().size(); i++) {
ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor);
conductorWorkflow.setName(mediationRules.getDistributions().get(i).getDistributeTo() + "_workflow");
Http httptask = new Http(mediationRules.getDistributions().get(i).getDistributeTo() + "_distributions_workflow_task");
httptask.url("https://orkes-api-tester.orkesconductor.com/api");
conductorWorkflow.add(httptask);

SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getDistributions().get(i).getDistributeTo() + "_subworkflow_ref", mediationRules.getDistributions().get(i).getNatsWorkflowName(), mediationRules.getDistributions().get(i).getNatsWorkflowVersion());
natsSubworkflow.input(mediationRules.getDistributions().get(i).getNatsWorkflowInput());
Switch sendToORBDistributionSwitch = new Switch("send_to_" + mediationRules.getDistributions().get(i).getDistributeTo() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of());
conductorWorkflow.add(sendToORBDistributionSwitch);

SubWorkflow forkSubworkflow = new SubWorkflow(mediationRules.getDistributions().get(i).getDistributeTo() + "_subworkflow_ref", conductorWorkflow);
forkSubworkflow.input("sendToORB", mediationRules.getDistributions().get(i).getSendToORB());
String taskRef = mediationRules.getDistributions().get(i).getTranslation() + "_subworkflow_ref";
String outputExpression = "${" + taskRef + ".output.response}"; //Can differ with different different tasks. Example with Simple/Inline tasks we might have to use result
forkSubworkflow.input(mediationRules.getDistributions().get(i).getTranslation(), outputExpression);
forkSubworkflow.input("sink", "nats:nats-integ:subject");
distributionForkTasks[i][0] = forkSubworkflow;
}
ForkJoin forkDistribution = new ForkJoin("fork_distribution", distributionForkTasks);
workflow.add(forkDistribution);
// ----------- Distribution level Ended --------------------

conductorWorkflow = new ConductorWorkflow(executor);
conductorWorkflow.setName("mlcp_enrichment_workflow");
httptask = new Http("mlcp_enrichment_workflow_task");
httptask.url("https://orkes-api-tester.orkesconductor.com/api");
conductorWorkflow.add(httptask);

forkSubworkflow = new SubWorkflow("mlcp_enrichment_workflow", conductorWorkflow);
forkedTasks[2][0] = forkSubworkflow;


conductorWorkflow = new ConductorWorkflow(executor);
conductorWorkflow.setName("ohc_enrichment_workflow");
httptask = new Http("ohc_enrichment_workflow_task");
httptask.url("https://orkes-api-tester.orkesconductor.com/api");
conductorWorkflow.add(httptask);

forkSubworkflow = new SubWorkflow("ohc_enrichment_subworkflow", conductorWorkflow);
forkedTasks[3][0] = forkSubworkflow;

ForkJoin forkJoin = new ForkJoin("fork_enrichment", forkedTasks);
workflow.add(forkJoin);
// Enrichment level ended


// Translation Level Starts
com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks1 = new com.netflix.conductor.sdk.workflow.def.tasks.Task[2][1];
conductorWorkflow = new ConductorWorkflow(executor);
conductorWorkflow.setName("Labos_Translation_workflow");
httptask = new Http("IMDB_EMPI_Translations");
httptask.url("https://orkes-api-tester.orkesconductor.com/api");
conductorWorkflow.add(httptask);

httptask = new Http("LabOS_Translation");
httptask.url("https://orkes-api-tester.orkesconductor.com/api");
conductorWorkflow.add(httptask);


forkSubworkflow = new SubWorkflow("LabOS_Translation_subworkflow", conductorWorkflow);
forkedTasks1[0][0] = forkSubworkflow;

conductorWorkflow = new ConductorWorkflow(executor);
conductorWorkflow.setName("BFE_Translation_workflow");
httptask = new Http("IMDB_Enrichment");
httptask.url("https://orkes-api-tester.orkesconductor.com/api");
conductorWorkflow.add(httptask);

httptask = new Http("BFE_Translation");
httptask.url("https://orkes-api-tester.orkesconductor.com/api");
conductorWorkflow.add(httptask);

forkSubworkflow = new SubWorkflow("BFE_Translation_subworkflow", conductorWorkflow);
forkedTasks1[1][0] = forkSubworkflow;

forkJoin = new ForkJoin("fork_translation", forkedTasks1);
workflow.add(forkJoin);
//Translation Level Ended

//Distribution level starts
com.netflix.conductor.sdk.workflow.def.tasks.Task[][] forkedTasks2 = new com.netflix.conductor.sdk.workflow.def.tasks.Task[3][1];
conductorWorkflow = new ConductorWorkflow(executor);
conductorWorkflow.setName("BFE_Distributions");
httptask = new Http("bfe_distributions_subworkflow_task");
httptask.url("https://orkes-api-tester.orkesconductor.com/api");
conductorWorkflow.add(httptask);

forkSubworkflow = new SubWorkflow("BFE_Distributions_subworkflow", conductorWorkflow);
forkSubworkflow.input("name","{workflow.input.name}");
forkedTasks2[0][0] = forkSubworkflow;

conductorWorkflow = new ConductorWorkflow(executor);
conductorWorkflow.setName("ELabs_Distributions");
httptask = new Http("ELabs_Distributions_subworkflow_task");
httptask.url("https://orkes-api-tester.orkesconductor.com/api");
conductorWorkflow.add(httptask);

forkSubworkflow = new SubWorkflow("ELabs_Distributions_subworkflow", conductorWorkflow);
forkedTasks2[1][0] = forkSubworkflow;


conductorWorkflow = new ConductorWorkflow(executor);
conductorWorkflow.setName("LabOs_Distributions");
httptask = new Http("LabOs_Distributions_subworkflow_task");
httptask.url("https://orkes-api-tester.orkesconductor.com/api");
conductorWorkflow.add(httptask);

forkSubworkflow = new SubWorkflow("LabOs_Distributions_subworkflow", conductorWorkflow);
forkedTasks2[2][0] = forkSubworkflow;

forkJoin = new ForkJoin("fork_distribution", forkedTasks2);
workflow.add(forkJoin);

// Distribution level Ended

WorkflowDef workflowDef = workflow.toWorkflowDef();
workflowDef.setOutputParameters(Map.of());
Expand Down

0 comments on commit 02fdd13

Please sign in to comment.