diff --git a/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java b/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java index f81dacf..a5b49e5 100644 --- a/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java +++ b/src/main/java/io/orkes/samples/workers/DynamicSubworkflowWorker.java @@ -61,7 +61,6 @@ public Map startExistingWorkflow(MediationRules mediationRules) //inputData.put("enrichmentSubworkflowsDef", subworkflowDef()); Object dynamicSubworkflowDef = objectMapper.convertValue(createDynamicSubworkflow(mediationRules), Object.class); inputData.put("dynamicSubworkflowDef", dynamicSubworkflowDef); - inputData.put("mediation_rules", objectMapper.convertValue(mediationRules, Object.class)); request.setInput(inputData); String workflowId = workflowClient.startWorkflow(request); @@ -87,15 +86,15 @@ private WorkflowDef createDynamicSubworkflow(MediationRules mediationRules) thro ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor); conductorWorkflow.setName(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_workflow"); - Http httptask = new Http(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_enrichment_workflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); - SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getEnrichments().get(i).getEnrichmentType() + "_subworkflow_ref", mediationRules.getEnrichments().get(i).getNatsWorkflowName(), mediationRules.getEnrichments().get(i).getNatsWorkflowVersion()); natsSubworkflow.input(mediationRules.getEnrichments().get(i).getNatsWorkflowInput()); Switch sendToORBEnrichmentSwitch = new Switch("send_to_" + mediationRules.getEnrichments().get(i).getEnrichmentType() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of()); conductorWorkflow.add(sendToORBEnrichmentSwitch); + Http httptask = new Http(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_enrichment_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + SubWorkflow forkSubworkflow = new SubWorkflow(mediationRules.getEnrichments().get(i).getEnrichmentType() + "_subworkflow_ref", conductorWorkflow); forkSubworkflow.input("sendToORB", mediationRules.getEnrichments().get(i).getSendToORB()); enrichmentForkTasks[i][0] = forkSubworkflow; @@ -112,6 +111,11 @@ private WorkflowDef createDynamicSubworkflow(MediationRules mediationRules) thro SubWorkflow forkSubworkflow = new SubWorkflow(mediationRules.getTranslations().get(i).getName() + "_subworkflow_ref", conductorWorkflow); forkSubworkflow.input("sendToORB", mediationRules.getTranslations().get(i).getSendToORB()); conductorWorkflow.setName(mediationRules.getTranslations().get(i).getName() + "_workflow"); + SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getTranslations().get(i).getName() + "_subworkflow_ref", mediationRules.getTranslations().get(i).getNatsWorkflowName(), mediationRules.getTranslations().get(i).getNatsWorkflowVersion()); + natsSubworkflow.input(mediationRules.getTranslations().get(i).getNatsWorkflowInput()); + Switch sendToORBTranslationSwitch = new Switch("send_to_" + mediationRules.getTranslations().get(i).getName() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of()); + conductorWorkflow.add(sendToORBTranslationSwitch); + for (int j = 0; j < mediationRules.getTranslations().get(i).getEnrichments().size(); j++) { Http httptask = new Http(mediationRules.getTranslations().get(i).getEnrichments().get(j)+ "_translations_workflow_task"); httptask.url("https://orkes-api-tester.orkesconductor.com/api"); @@ -120,10 +124,6 @@ private WorkflowDef createDynamicSubworkflow(MediationRules mediationRules) thro forkSubworkflow.input(mediationRules.getTranslations().get(i).getEnrichments().get(j), outputExpression); conductorWorkflow.add(httptask); } - SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getTranslations().get(i).getName() + "_subworkflow_ref", mediationRules.getTranslations().get(i).getNatsWorkflowName(), mediationRules.getTranslations().get(i).getNatsWorkflowVersion()); - natsSubworkflow.input(mediationRules.getTranslations().get(i).getNatsWorkflowInput()); - Switch sendToORBTranslationSwitch = new Switch("send_to_" + mediationRules.getTranslations().get(i).getName() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of()); - conductorWorkflow.add(sendToORBTranslationSwitch); translationForkTasks[i][0] = forkSubworkflow; } @@ -137,15 +137,16 @@ private WorkflowDef createDynamicSubworkflow(MediationRules mediationRules) thro for (int i = 0; i < mediationRules.getDistributions().size(); i++) { ConductorWorkflow conductorWorkflow = new ConductorWorkflow(executor); conductorWorkflow.setName(mediationRules.getDistributions().get(i).getDistributeTo() + "_workflow"); - Http httptask = new Http(mediationRules.getDistributions().get(i).getDistributeTo() + "_distributions_workflow_task"); - httptask.url("https://orkes-api-tester.orkesconductor.com/api"); - conductorWorkflow.add(httptask); SubWorkflow natsSubworkflow = new SubWorkflow("nats_" + mediationRules.getDistributions().get(i).getDistributeTo() + "_subworkflow_ref", mediationRules.getDistributions().get(i).getNatsWorkflowName(), mediationRules.getDistributions().get(i).getNatsWorkflowVersion()); natsSubworkflow.input(mediationRules.getDistributions().get(i).getNatsWorkflowInput()); Switch sendToORBDistributionSwitch = new Switch("send_to_" + mediationRules.getDistributions().get(i).getDistributeTo() + "_switch", "${workflow.input.sendToORB}").switchCase("Y", natsSubworkflow).defaultCase(List.of()); conductorWorkflow.add(sendToORBDistributionSwitch); + Http httptask = new Http(mediationRules.getDistributions().get(i).getDistributeTo() + "_distributions_workflow_task"); + httptask.url("https://orkes-api-tester.orkesconductor.com/api"); + conductorWorkflow.add(httptask); + SubWorkflow forkSubworkflow = new SubWorkflow(mediationRules.getDistributions().get(i).getDistributeTo() + "_subworkflow_ref", conductorWorkflow); forkSubworkflow.input("sendToORB", mediationRules.getDistributions().get(i).getSendToORB()); String taskRef = mediationRules.getDistributions().get(i).getTranslation() + "_subworkflow_ref";