Skip to content

Commit

Permalink
Workflow sweeper fix
Browse files Browse the repository at this point in the history
  • Loading branch information
manan164 committed Dec 13, 2023
1 parent 8c7026f commit 0a5513c
Showing 1 changed file with 24 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.netflix.conductor.service.ExecutionLockService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand Down Expand Up @@ -58,6 +59,8 @@ public class OrkesWorkflowSweeper extends LifecycleAwareComponent {
private final MetricsCollector metricsCollector;
private final SystemTaskRegistry systemTaskRegistry;

private final ExecutionLockService executionLockService;

public OrkesWorkflowSweeper(
@Qualifier(SWEEPER_EXECUTOR_NAME) Executor sweeperExecutor,
QueueDAO queueDAO,
Expand All @@ -66,14 +69,16 @@ public OrkesWorkflowSweeper(
MetricsCollector metricsCollector,
SystemTaskRegistry systemTaskRegistry,
ConductorProperties properties,
OrkesSweeperProperties sweeperProperties) {
OrkesSweeperProperties sweeperProperties,
ExecutionLockService executionLockService) {
this.queueDAO = queueDAO;
this.executionDAO = executionDAO;
this.metricsCollector = metricsCollector;
this.systemTaskRegistry = systemTaskRegistry;
this.properties = properties;
this.sweeperProperties = sweeperProperties;
this.workflowExecutor = workflowExecutor;
this.executionLockService = executionLockService;
log.info("Initializing sweeper with {} threads", properties.getSweeperThreadCount());
for (int i = 0; i < properties.getSweeperThreadCount(); i++) {
sweeperExecutor.execute(this::pollAndSweep);
Expand Down Expand Up @@ -130,8 +135,12 @@ private boolean shouldTaskExistInQueue(TaskModel task) {
public void sweep(String workflowId) {
try {
log.info("Running sweeper for workflow {}", workflowId);
// 1. Run decide on the workflow
WorkflowModel workflow = decideAndRemove(workflowId);
//Run the below operations with a lock
if (!executionLockService.acquireLock(workflowId)) {
return;
}
WorkflowModel workflow = executionDAO.getWorkflow(workflowId);
workflow = decideAndRemove(workflow);
if (workflow == null || workflow.getStatus().isTerminal()) {
return;
}
Expand Down Expand Up @@ -161,20 +170,24 @@ public void sweep(String workflowId) {
"Workflow {} doesn't have an open pending task, requires force evaluation",
workflow.getWorkflowId());
forceSetLastTaskAsNotExecuted(workflow);
workflow = decideAndRemove(workflowId);
workflow = decideAndRemove(workflow);
if (workflow == null || workflow.getStatus().isTerminal()) {
log.warn("Removing from decider after repair is done {}", (workflow == null ? null: workflow.getStatus()));
queueDAO.remove(DECIDER_QUEUE, workflowId);
executionLockService.releaseLock(workflowId);
return;
}
log.debug(
"Force evaluation result for workflow {} - {}",
workflowId,
workflow.getStatus());
if (workflow == null || workflow.getStatus().isTerminal()) {
return;
}
}
// 3. If parent workflow exists, call repair on that too - meaning ensure the parent is
// in the decider queue
if (workflow.getParentWorkflowId() != null) {
ensureWorkflowExistsInDecider(workflow.getParentWorkflowId());
}
executionLockService.releaseLock(workflowId);
} catch (NotFoundException e) {
queueDAO.remove(DECIDER_QUEUE, workflowId);
log.info("Workflow NOT found for id:{}. Removed it from decider queue", workflowId, e);
Expand All @@ -197,7 +210,7 @@ private void forceSetLastTaskAsNotExecuted(WorkflowModel workflow) {
taskModel.getTaskId(),
taskModel.getWorkflowInstanceId());
taskModel.setExecuted(false);
executionDAO.updateTask(taskModel);
executionDAO.updateWorkflow(workflow);
}
}

Expand All @@ -210,13 +223,13 @@ private List<TaskModel> getAllPendingTasks(WorkflowModel workflow) {
return Collections.emptyList();
}

private WorkflowModel decideAndRemove(String workflowId) {
WorkflowModel workflowModel = workflowExecutor.decide(workflowId);
private WorkflowModel decideAndRemove(WorkflowModel workflow) {
WorkflowModel workflowModel = workflowExecutor.decide(workflow);
if (workflowModel == null) {
return null;
}
if (workflowModel.getStatus().isTerminal()) {
queueDAO.remove(DECIDER_QUEUE, workflowId);
queueDAO.remove(DECIDER_QUEUE, workflowModel.getWorkflowId());
}
return workflowModel;
}
Expand Down

0 comments on commit 0a5513c

Please sign in to comment.