diff --git a/server/src/main/java/io/orkes/conductor/server/service/OrkesWorkflowSweeper.java b/server/src/main/java/io/orkes/conductor/server/service/OrkesWorkflowSweeper.java index 21df5c8..93774fd 100644 --- a/server/src/main/java/io/orkes/conductor/server/service/OrkesWorkflowSweeper.java +++ b/server/src/main/java/io/orkes/conductor/server/service/OrkesWorkflowSweeper.java @@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import com.netflix.conductor.service.ExecutionLockService; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -58,6 +59,8 @@ public class OrkesWorkflowSweeper extends LifecycleAwareComponent { private final MetricsCollector metricsCollector; private final SystemTaskRegistry systemTaskRegistry; + private final ExecutionLockService executionLockService; + public OrkesWorkflowSweeper( @Qualifier(SWEEPER_EXECUTOR_NAME) Executor sweeperExecutor, QueueDAO queueDAO, @@ -66,7 +69,8 @@ public OrkesWorkflowSweeper( MetricsCollector metricsCollector, SystemTaskRegistry systemTaskRegistry, ConductorProperties properties, - OrkesSweeperProperties sweeperProperties) { + OrkesSweeperProperties sweeperProperties, + ExecutionLockService executionLockService) { this.queueDAO = queueDAO; this.executionDAO = executionDAO; this.metricsCollector = metricsCollector; @@ -74,6 +78,7 @@ public OrkesWorkflowSweeper( this.properties = properties; this.sweeperProperties = sweeperProperties; this.workflowExecutor = workflowExecutor; + this.executionLockService = executionLockService; log.info("Initializing sweeper with {} threads", properties.getSweeperThreadCount()); for (int i = 0; i < properties.getSweeperThreadCount(); i++) { sweeperExecutor.execute(this::pollAndSweep); @@ -130,8 +135,12 @@ private boolean shouldTaskExistInQueue(TaskModel task) { public void sweep(String workflowId) { try { log.info("Running sweeper for workflow {}", workflowId); - // 1. Run decide on the workflow - WorkflowModel workflow = decideAndRemove(workflowId); + //Run the below operations with a lock + if (!executionLockService.acquireLock(workflowId)) { + return; + } + WorkflowModel workflow = executionDAO.getWorkflow(workflowId); + workflow = decideAndRemove(workflow); if (workflow == null || workflow.getStatus().isTerminal()) { return; } @@ -161,20 +170,24 @@ public void sweep(String workflowId) { "Workflow {} doesn't have an open pending task, requires force evaluation", workflow.getWorkflowId()); forceSetLastTaskAsNotExecuted(workflow); - workflow = decideAndRemove(workflowId); + workflow = decideAndRemove(workflow); + if (workflow == null || workflow.getStatus().isTerminal()) { + log.warn("Removing from decider after repair is done {}", (workflow == null ? null: workflow.getStatus())); + queueDAO.remove(DECIDER_QUEUE, workflowId); + executionLockService.releaseLock(workflowId); + return; + } log.debug( "Force evaluation result for workflow {} - {}", workflowId, workflow.getStatus()); - if (workflow == null || workflow.getStatus().isTerminal()) { - return; - } } // 3. If parent workflow exists, call repair on that too - meaning ensure the parent is // in the decider queue if (workflow.getParentWorkflowId() != null) { ensureWorkflowExistsInDecider(workflow.getParentWorkflowId()); } + executionLockService.releaseLock(workflowId); } catch (NotFoundException e) { queueDAO.remove(DECIDER_QUEUE, workflowId); log.info("Workflow NOT found for id:{}. Removed it from decider queue", workflowId, e); @@ -197,7 +210,7 @@ private void forceSetLastTaskAsNotExecuted(WorkflowModel workflow) { taskModel.getTaskId(), taskModel.getWorkflowInstanceId()); taskModel.setExecuted(false); - executionDAO.updateTask(taskModel); + executionDAO.updateWorkflow(workflow); } } @@ -210,13 +223,13 @@ private List getAllPendingTasks(WorkflowModel workflow) { return Collections.emptyList(); } - private WorkflowModel decideAndRemove(String workflowId) { - WorkflowModel workflowModel = workflowExecutor.decide(workflowId); + private WorkflowModel decideAndRemove(WorkflowModel workflow) { + WorkflowModel workflowModel = workflowExecutor.decide(workflow); if (workflowModel == null) { return null; } if (workflowModel.getStatus().isTerminal()) { - queueDAO.remove(DECIDER_QUEUE, workflowId); + queueDAO.remove(DECIDER_QUEUE, workflowModel.getWorkflowId()); } return workflowModel; }