diff --git a/server/src/main/java/io/orkes/conductor/server/service/OrkesSweeperProperties.java b/server/src/main/java/io/orkes/conductor/server/service/OrkesSweeperProperties.java index 9cec440..37f4b1d 100644 --- a/server/src/main/java/io/orkes/conductor/server/service/OrkesSweeperProperties.java +++ b/server/src/main/java/io/orkes/conductor/server/service/OrkesSweeperProperties.java @@ -20,11 +20,12 @@ import lombok.ToString; @Configuration -@ConfigurationProperties("conductor.app.sweeper") +@ConfigurationProperties("conductor.orkes.sweeper") @Getter @Setter @ToString public class OrkesSweeperProperties { + private int frequencyMillis = 10; private int sweepBatchSize = 5; private int queuePopTimeout = 100; } diff --git a/server/src/main/java/io/orkes/conductor/server/service/OrkesWorkflowSweepWorker.java b/server/src/main/java/io/orkes/conductor/server/service/OrkesWorkflowSweepWorker.java new file mode 100644 index 0000000..09aeea9 --- /dev/null +++ b/server/src/main/java/io/orkes/conductor/server/service/OrkesWorkflowSweepWorker.java @@ -0,0 +1,252 @@ +/* + * Copyright 2023 Orkes, Inc. + *

+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.server.service; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.exception.NotFoundException; +import com.netflix.conductor.core.execution.WorkflowExecutor; +import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry; +import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask; +import com.netflix.conductor.core.utils.QueueUtils; +import com.netflix.conductor.core.utils.Utils; +import com.netflix.conductor.dao.ExecutionDAO; +import com.netflix.conductor.dao.QueueDAO; +import com.netflix.conductor.metrics.Monitors; +import com.netflix.conductor.model.TaskModel; +import com.netflix.conductor.model.WorkflowModel; +import com.netflix.conductor.service.ExecutionLockService; + +import io.orkes.conductor.metrics.MetricsCollector; + +import lombok.extern.slf4j.Slf4j; + +import static com.netflix.conductor.core.config.SchedulerConfiguration.SWEEPER_EXECUTOR_NAME; +import static com.netflix.conductor.core.utils.Utils.DECIDER_QUEUE; + +@Component +@ConditionalOnProperty(name = "conductor.orkes.sweeper.enabled", havingValue = "true") +@Slf4j +public class OrkesWorkflowSweepWorker { + + private final QueueDAO queueDAO; + private final ConductorProperties properties; + private final WorkflowExecutor workflowExecutor; + private final ExecutionDAO executionDAO; + private final MetricsCollector metricsCollector; + private final SystemTaskRegistry systemTaskRegistry; + private final ExecutionLockService executionLockService; + + public OrkesWorkflowSweepWorker( + QueueDAO queueDAO, + WorkflowExecutor workflowExecutor, + ExecutionDAO executionDAO, + MetricsCollector metricsCollector, + SystemTaskRegistry systemTaskRegistry, + ExecutionLockService executionLockService, + ConductorProperties properties) { + this.queueDAO = queueDAO; + this.executionDAO = executionDAO; + this.metricsCollector = metricsCollector; + this.systemTaskRegistry = systemTaskRegistry; + this.executionLockService = executionLockService; + this.properties = properties; + this.workflowExecutor = workflowExecutor; + } + + @Async(SWEEPER_EXECUTOR_NAME) + public CompletableFuture sweepAsync(String workflowId) { + metricsCollector.getTimer("workflowSweeper").record(() -> sweep(workflowId)); + return CompletableFuture.completedFuture(null); + } + + private void sweep(String workflowId) { + boolean workflowLocked = false; + try { + workflowLocked = executionLockService.acquireLock(workflowId); + if (!workflowLocked) { + return; + } + log.info("Running sweeper for workflow {}, acquired lock", workflowId); + // 1. Run decide on the workflow + WorkflowModel workflow = executionDAO.getWorkflow(workflowId, true); + if (workflow == null) { + log.warn( + "Workflow NOT found by id: {}. Removed it from decider queue safely.", + workflowId); + queueDAO.remove(DECIDER_QUEUE, workflowId); + return; + } + workflow = decideAndRemove(workflow); + if (workflow == null || workflow.getStatus().isTerminal()) { + log.debug( + "Repair/decide result for workflow {} - {}", + workflowId, + workflow == null ? null : workflow.getStatus()); + if (workflow == null) { + // The workflow does not exist anymore, possible if it was completed and + // archived + queueDAO.remove(DECIDER_QUEUE, workflowId); + } + return; + } + + // 2. If decide returns false + // - Check if the workflow has at least one scheduled or in progress task? + // - If scheduled or in progress - Check if it exists in its corresponding queue, if + // not add it back + // - If no scheduled or in progress task exists + // 1. Set the last task as isExecuted = false to force a re-evaluation + // 2. Call decide + + if (System.currentTimeMillis() - workflow.getUpdatedTime() < 60_000L) { + // Only do this once every 60 second + return; + } + List pendingTasks = getAllPendingTasks(workflow); + if (pendingTasks.size() > 0) { + pendingTasks.forEach(this::ensurePendingTaskIsInQueue); + } else { + log.warn( + "Workflow {} doesn't have an open pending task, requires force evaluation", + workflow.getWorkflowId()); + forceSetLastTaskAsNotExecuted(workflow); + // Decide again after setting isExecuted to false + workflow = decideAndRemove(workflow); + if (workflow == null || workflow.getStatus().isTerminal()) { + log.warn( + "Removing from decider after repair is done, {}, {}", + workflowId, + (workflow == null ? null : workflow.getStatus())); + queueDAO.remove(DECIDER_QUEUE, workflowId); + return; + } + log.debug( + "Force evaluation result for workflow {} - {}", + workflowId, + workflow.getStatus()); + } + + // 3. If parent workflow exists, call repair on that too - meaning ensure the parent is + // in the decider queue + if (workflow.getParentWorkflowId() != null) { + ensureWorkflowExistsInDecider(workflow.getParentWorkflowId()); + } + + // 4. TODO: Don't do this now - Check the min timeout for all running tasks and set + // Math.min(minTime, 1 hour) for decider queue + queueDAO.setUnackTimeout( + DECIDER_QUEUE, workflowId, properties.getWorkflowOffsetTimeout().toMillis()); + } catch (NotFoundException e) { + log.warn("Workflow NOT found for id: {}. Removed it from decider queue", workflowId, e); + queueDAO.remove(DECIDER_QUEUE, workflowId); + } catch (Exception e) { + log.error("Error running sweep for workflow {}", workflowId, e); + } finally { + if (workflowLocked) { + executionLockService.releaseLock(workflowId); + log.debug("Sweeper released lock for workflow {}", workflowId); + } + } + } + + private void forceSetLastTaskAsNotExecuted(WorkflowModel workflow) { + if (workflow.getTasks() != null && workflow.getTasks().size() > 0) { + TaskModel taskModel = workflow.getTasks().get(workflow.getTasks().size() - 1); + log.warn( + "Force setting isExecuted to false for last task - {} for workflow {}", + taskModel.getTaskId(), + taskModel.getWorkflowInstanceId()); + taskModel.setExecuted(false); + executionDAO.updateWorkflow(workflow); + } + } + + private List getAllPendingTasks(WorkflowModel workflow) { + if (workflow.getTasks() != null && workflow.getTasks().size() > 0) { + return workflow.getTasks().stream() + .filter(taskModel -> !taskModel.isExecuted()) + .collect(Collectors.toList()); + } + return Collections.emptyList(); + } + + /** Decide with lock and remove terminal workflow from DECIDER_QUEUE */ + private WorkflowModel decideAndRemove(WorkflowModel workflow) { + WorkflowModel workflowModel = workflowExecutor.decide(workflow); + if (workflowModel == null) { + return null; + } + if (workflowModel.getStatus().isTerminal()) { + queueDAO.remove(DECIDER_QUEUE, workflowModel.getWorkflowId()); + } + return workflowModel; + } + + private boolean ensurePendingTaskIsInQueue(TaskModel task) { + if (shouldTaskExistInQueue(task)) { + // Ensure QueueDAO contains this taskId + String taskQueueName = QueueUtils.getQueueName(task); + if (!queueDAO.containsMessage(taskQueueName, task.getTaskId())) { + queueDAO.push(taskQueueName, task.getTaskId(), task.getCallbackAfterSeconds()); + log.info( + "Task {} in workflow {} re-queued for repairs", + task.getTaskId(), + task.getWorkflowInstanceId()); + metricsCollector + .getCounter("repairTaskReQueued", task.getTaskDefName()) + .increment(); + return true; + } + } + return false; + } + + private boolean ensureWorkflowExistsInDecider(String workflowId) { + if (StringUtils.isNotEmpty(workflowId)) { + String queueName = Utils.DECIDER_QUEUE; + if (!queueDAO.containsMessage(queueName, workflowId)) { + queueDAO.push( + queueName, workflowId, properties.getWorkflowOffsetTimeout().getSeconds()); + log.info("Workflow {} re-queued for repairs", workflowId); + Monitors.recordQueueMessageRepushFromRepairService(queueName); + return true; + } + } + return false; + } + + private boolean shouldTaskExistInQueue(TaskModel task) { + if (systemTaskRegistry.isSystemTask(task.getTaskType())) { + WorkflowSystemTask workflowSystemTask = systemTaskRegistry.get(task.getTaskType()); + return workflowSystemTask.isAsync() // Is Async + // Not async complete OR is async complete, but in scheduled state + && (!workflowSystemTask.isAsyncComplete(task) + || (workflowSystemTask.isAsyncComplete(task) + && task.getStatus() == TaskModel.Status.SCHEDULED)) + // Status is IN_PROGRESS or SCHEDULED + && (task.getStatus() == TaskModel.Status.IN_PROGRESS + || task.getStatus() == TaskModel.Status.SCHEDULED); + } + return task.getStatus() == TaskModel.Status.SCHEDULED; + } +} diff --git a/server/src/main/java/io/orkes/conductor/server/service/OrkesWorkflowSweeper.java b/server/src/main/java/io/orkes/conductor/server/service/OrkesWorkflowSweeper.java index 21df5c8..eb9fb6a 100644 --- a/server/src/main/java/io/orkes/conductor/server/service/OrkesWorkflowSweeper.java +++ b/server/src/main/java/io/orkes/conductor/server/service/OrkesWorkflowSweeper.java @@ -12,245 +12,85 @@ */ package io.orkes.conductor.server.service; -import java.util.Collections; import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; +import java.util.concurrent.CompletableFuture; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.netflix.conductor.core.LifecycleAwareComponent; import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.exception.NotFoundException; -import com.netflix.conductor.core.execution.OrkesWorkflowExecutor; -import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry; -import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask; -import com.netflix.conductor.core.utils.QueueUtils; -import com.netflix.conductor.core.utils.Utils; -import com.netflix.conductor.dao.ExecutionDAO; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; -import com.netflix.conductor.model.TaskModel; -import com.netflix.conductor.model.WorkflowModel; -import io.orkes.conductor.metrics.MetricsCollector; - -import com.google.common.util.concurrent.Uninterruptibles; import lombok.extern.slf4j.Slf4j; -import static com.netflix.conductor.core.config.SchedulerConfiguration.SWEEPER_EXECUTOR_NAME; import static com.netflix.conductor.core.utils.Utils.DECIDER_QUEUE; @Component +@EnableScheduling @ConditionalOnProperty(name = "conductor.orkes.sweeper.enabled", havingValue = "true") @Slf4j public class OrkesWorkflowSweeper extends LifecycleAwareComponent { private final QueueDAO queueDAO; - private final ConductorProperties properties; private final OrkesSweeperProperties sweeperProperties; - private final OrkesWorkflowExecutor workflowExecutor; - private final ExecutionDAO executionDAO; - private final MetricsCollector metricsCollector; - private final SystemTaskRegistry systemTaskRegistry; + private final OrkesWorkflowSweepWorker sweepWorker; public OrkesWorkflowSweeper( - @Qualifier(SWEEPER_EXECUTOR_NAME) Executor sweeperExecutor, + OrkesWorkflowSweepWorker sweepWorker, QueueDAO queueDAO, - OrkesWorkflowExecutor workflowExecutor, - ExecutionDAO executionDAO, - MetricsCollector metricsCollector, - SystemTaskRegistry systemTaskRegistry, ConductorProperties properties, OrkesSweeperProperties sweeperProperties) { + this.sweepWorker = sweepWorker; this.queueDAO = queueDAO; - this.executionDAO = executionDAO; - this.metricsCollector = metricsCollector; - this.systemTaskRegistry = systemTaskRegistry; - this.properties = properties; this.sweeperProperties = sweeperProperties; - this.workflowExecutor = workflowExecutor; log.info("Initializing sweeper with {} threads", properties.getSweeperThreadCount()); - for (int i = 0; i < properties.getSweeperThreadCount(); i++) { - sweeperExecutor.execute(this::pollAndSweep); - } } - private void pollAndSweep() { + // Reuse com.netflix.conductor.core.config.SchedulerConfiguration + @Scheduled( + fixedDelayString = "${conductor.orkes.sweeper.frequencyMillis:10}", + initialDelayString = "${conductor.orkes.sweeper.frequencyMillis:10}") + public void pollAndSweep() { try { - while (true) { - try { - if (!isRunning()) { - log.trace("Component stopped, skip workflow sweep"); - } else { - List workflowIds = - queueDAO.pop( - DECIDER_QUEUE, - sweeperProperties.getSweepBatchSize(), - sweeperProperties.getQueuePopTimeout()); - if (workflowIds != null) { - workflowIds.forEach( - workflowId -> - metricsCollector - .getTimer("workflowSweeper") - .record(() -> sweep(workflowId))); - } else { - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - } - } - } catch (Exception e) { - log.warn("Error while processing sweeper - ", e); - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - } - } - } catch (Exception e) { - log.error("Error polling for sweep entries", e); - } - } - - private boolean shouldTaskExistInQueue(TaskModel task) { - if (systemTaskRegistry.isSystemTask(task.getTaskType())) { - WorkflowSystemTask workflowSystemTask = systemTaskRegistry.get(task.getTaskType()); - return workflowSystemTask.isAsync() // Is Async - // Not async complete OR is async complete, but in scheduled state - && (!workflowSystemTask.isAsyncComplete(task) - || (workflowSystemTask.isAsyncComplete(task) - && task.getStatus() == TaskModel.Status.SCHEDULED)) - // Status is IN_PROGRESS or SCHEDULED - && (task.getStatus() == TaskModel.Status.IN_PROGRESS - || task.getStatus() == TaskModel.Status.SCHEDULED); - } - return task.getStatus() == TaskModel.Status.SCHEDULED; - } - - public void sweep(String workflowId) { - try { - log.info("Running sweeper for workflow {}", workflowId); - // 1. Run decide on the workflow - WorkflowModel workflow = decideAndRemove(workflowId); - if (workflow == null || workflow.getStatus().isTerminal()) { - return; - } - - // 2. If decide returns false - // - Check if the workflow has at least one scheduled or in progress task? - // - If scheduled or in progress - Check if it exissts in its corresponding queue, if - // not add it back - // - If no scheduled or in progress task exists - // 1. Set the last task as isExecuted = false to force a re-evaluation - // 2. Call decide - if (workflow == null) { - // The workflow does not exist anymore, possible if it was completed and archived - queueDAO.remove(DECIDER_QUEUE, workflowId); - return; - } - - if (System.currentTimeMillis() - workflow.getUpdatedTime() < 60) { - // Only do this once every 60 second - return; - } - List pendingTasks = getAllPendingTasks(workflow); - if (pendingTasks.size() > 0) { - pendingTasks.forEach(this::ensurePendingTaskIsInQueue); + if (!isRunning()) { + log.trace("Component stopped, skip workflow sweep"); } else { - log.warn( - "Workflow {} doesn't have an open pending task, requires force evaluation", - workflow.getWorkflowId()); - forceSetLastTaskAsNotExecuted(workflow); - workflow = decideAndRemove(workflowId); - log.debug( - "Force evaluation result for workflow {} - {}", - workflowId, - workflow.getStatus()); - if (workflow == null || workflow.getStatus().isTerminal()) { - return; + List workflowIds = + queueDAO.pop( + DECIDER_QUEUE, + sweeperProperties.getSweepBatchSize(), + sweeperProperties.getQueuePopTimeout()); + if (workflowIds != null && workflowIds.size() > 0) { + // wait for all workflow ids to be "swept" + CompletableFuture.allOf( + workflowIds.stream() + .map(sweepWorker::sweepAsync) + .toArray(CompletableFuture[]::new)) + .get(); + log.debug( + "Sweeper processed {} workflow from the decider queue, workflowIds: {}", + workflowIds.size(), + workflowIds); } + // NOTE: Disabling the sweeper implicitly disables this metric. + recordQueueDepth(); } - // 3. If parent workflow exists, call repair on that too - meaning ensure the parent is - // in the decider queue - if (workflow.getParentWorkflowId() != null) { - ensureWorkflowExistsInDecider(workflow.getParentWorkflowId()); - } - } catch (NotFoundException e) { - queueDAO.remove(DECIDER_QUEUE, workflowId); - log.info("Workflow NOT found for id:{}. Removed it from decider queue", workflowId, e); - return; } catch (Exception e) { - log.error("Error running sweep for " + workflowId, e); - } - - // 4. TODO: Don't do this now - Check the min timeout for all running tasks and set - // Math.min(minTime, 1 hour) for decider queue - queueDAO.setUnackTimeout( - DECIDER_QUEUE, workflowId, properties.getWorkflowOffsetTimeout().toMillis()); - } - - private void forceSetLastTaskAsNotExecuted(WorkflowModel workflow) { - if (workflow.getTasks() != null && workflow.getTasks().size() > 0) { - TaskModel taskModel = workflow.getTasks().get(workflow.getTasks().size() - 1); - log.warn( - "Force setting isExecuted to false for last task - {} for workflow {}", - taskModel.getTaskId(), - taskModel.getWorkflowInstanceId()); - taskModel.setExecuted(false); - executionDAO.updateTask(taskModel); - } - } - - private List getAllPendingTasks(WorkflowModel workflow) { - if (workflow.getTasks() != null && workflow.getTasks().size() > 0) { - return workflow.getTasks().stream() - .filter(taskModel -> !taskModel.isExecuted()) - .collect(Collectors.toList()); - } - return Collections.emptyList(); - } - - private WorkflowModel decideAndRemove(String workflowId) { - WorkflowModel workflowModel = workflowExecutor.decide(workflowId); - if (workflowModel == null) { - return null; - } - if (workflowModel.getStatus().isTerminal()) { - queueDAO.remove(DECIDER_QUEUE, workflowId); - } - return workflowModel; - } - - boolean ensurePendingTaskIsInQueue(TaskModel task) { - if (shouldTaskExistInQueue(task)) { - // Ensure QueueDAO contains this taskId - String taskQueueName = QueueUtils.getQueueName(task); - if (!queueDAO.containsMessage(taskQueueName, task.getTaskId())) { - queueDAO.push(taskQueueName, task.getTaskId(), task.getCallbackAfterSeconds()); - log.info( - "Task {} in workflow {} re-queued for repairs", - task.getTaskId(), - task.getWorkflowInstanceId()); - metricsCollector - .getCounter("repairTaskReQueued", task.getTaskDefName()) - .increment(); - return true; + Monitors.error(OrkesWorkflowSweeper.class.getSimpleName(), "poll"); + log.error("Error when polling for workflows", e); + if (e instanceof InterruptedException) { + // Restore interrupted state... + Thread.currentThread().interrupt(); } } - return false; } - private boolean ensureWorkflowExistsInDecider(String workflowId) { - if (StringUtils.isNotEmpty(workflowId)) { - String queueName = Utils.DECIDER_QUEUE; - if (!queueDAO.containsMessage(queueName, workflowId)) { - queueDAO.push( - queueName, workflowId, properties.getWorkflowOffsetTimeout().getSeconds()); - log.info("Workflow {} re-queued for repairs", workflowId); - Monitors.recordQueueMessageRepushFromRepairService(queueName); - return true; - } - } - return false; + private void recordQueueDepth() { + int currentQueueSize = queueDAO.getSize(DECIDER_QUEUE); + Monitors.recordGauge(DECIDER_QUEUE, currentQueueSize); } } diff --git a/server/src/main/resources/application.properties b/server/src/main/resources/application.properties index ba5e3ec..c792e74 100644 --- a/server/src/main/resources/application.properties +++ b/server/src/main/resources/application.properties @@ -1,3 +1,4 @@ + conductor.app.workflow-execution-lock-enabled=true conductor.workflow-execution-lock.type=redis conductor.redis-lock.serverAddress=redis://localhost:6379 @@ -38,22 +39,25 @@ spring.datasource.hikari.auto-commit=true spring.search-datasource.hikari.maximum-pool-size=8 spring.search-datasource.hikari.auto-commit=true - -#Background sweeper job -conductor.workflow-monitor.enabled=true -#Disable default -conductor.workflow-reconciler.enabled=false -conductor.workflow-repair-service.enabled=false - #System Task Workers conductor.app.systemTaskWorkerPollInterval=1 conductor.app.systemTaskMaxPollCount=10 conductor.app.systemTaskWorkerThreadCount=10 +#Background sweeper job +#Disable default +conductor.workflow-reconciler.enabled=false +conductor.workflow-repair-service.enabled=false #Enable the Orkes version conductor.orkes.sweeper.enabled=true -conductor.app.sweeperThreadCount=10 -conductor.sweep-frequency.millis=1 +conductor.orkes.sweeper.frequencyMillis=100 +conductor.orkes.sweeper.sweepBatchSize=10 +conductor.orkes.sweeper.queuePopTimeout=100 +#shares the same sweeper executor +conductor.app.sweeperThreadCount=16 + +#Monitor +conductor.workflow-monitor.enabled=true #metrics -- only enable what is necessary management.endpoints.web.exposure.include=prometheus,health