Skip to content

Commit

Permalink
Merge pull request #973 from d3sw/ONECOND-2238
Browse files Browse the repository at this point in the history
ONECOND-2238: Add implementation to retrieve all active workflows for a given metadata
  • Loading branch information
pradeeppalat01 authored May 3, 2023
2 parents 8c0e5d3 + 9da214b commit 8ac7f3d
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,10 @@ public List<Workflow> getRunningWorkflows(String workflowName) throws Exception
return allwf;
}

public List<String> getRunningWorkflowsByName(String workflowName) {
return edao.getRunningWorkflowByName(workflowName);
}

public List<String> getWorkflows(String name, Integer version, Long startTime, Long endTime) {
List<Workflow> allwf = edao.getWorkflowsByType(name, startTime, endTime);
return allwf.stream().filter(wf -> wf.getVersion() == version).map(Workflow::getWorkflowId).collect(Collectors.toList());
Expand Down
97 changes: 54 additions & 43 deletions core/src/main/java/com/netflix/conductor/dao/ExecutionDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public interface ExecutionDAO {
* @return List of pending tasks (in_progress)
*
*/
public abstract List<Task> getPendingTasksByWorkflow(String taskName, String workflowId);
List<Task> getPendingTasksByWorkflow(String taskName, String workflowId);

/**
*
Expand All @@ -57,7 +57,7 @@ public interface ExecutionDAO {
* @return List of tasks starting from startKey
*
*/
public abstract List<Task> getTasks(String taskType, String startKey, int count);
List<Task> getTasks(String taskType, String startKey, int count);

/**
*
Expand All @@ -70,22 +70,22 @@ public interface ExecutionDAO {
* </p>
*
*/
public abstract List<Task> createTasks(List<Task> tasks);
List<Task> createTasks(List<Task> tasks);

/**
*
* @param task Task to be updated
*
*/
public abstract void updateTask(Task task);
void updateTask(Task task);

/**
* Checks if the number of tasks in progress for the given taskDef will exceed the limit if the task is scheduled to be in progress (given to the worker or for system tasks start() method called)
* @param task The task to be executed. Limit is set in the Task's definition
* @return true if by executing this task, the limit is breached. false otherwise.
* @see TaskDef#concurrencyLimit()
*/
public abstract boolean exceedsInProgressLimit(Task task);
boolean exceedsInProgressLimit(Task task);

/**
* Checks if the Task is rate limited or not based on the {@link TaskDef#getRateLimitPerFrequency()} and {@link TaskDef#getRateLimitFrequencyInSeconds()}
Expand All @@ -102,7 +102,7 @@ default boolean exceedsRateLimitPerFrequency(Task task) {
* @param tasks Multiple tasks to be updated
*
*/
public abstract void updateTasks(List<Task> tasks);
void updateTasks(List<Task> tasks);

/**
* Sets the in progress flag for the task
Expand All @@ -117,46 +117,46 @@ default void updateInProgressStatus(Task task) {
* @param log Task Execution Log to be added
*
*/
public abstract void addTaskExecLog(List<TaskExecLog> log);
void addTaskExecLog(List<TaskExecLog> log);

/**
*
* @param taskId id of the task to be removed.
*
*/
public abstract void removeTask(String taskId);
void removeTask(String taskId);

/**
*
* @param taskId Task instance id
* @return Task
*
*/
public abstract Task getTask(String taskId);
Task getTask(String taskId);

/**
*
* @param taskIds Task instance ids
* @return List of tasks
*
*/
public abstract List<Task> getTasks(List<String> taskIds);
List<Task> getTasks(List<String> taskIds);

/**
*
* @param taskType Type of the task for which to retrieve the list of pending tasks
* @return List of pending tasks
*
*/
public List<Task> getPendingTasksForTaskType(String taskType);
List<Task> getPendingTasksForTaskType(String taskType);

/**
*
* @param taskType System task type name (e.g. EVENt, WAIT, etc) for which to retrieve the list of pending tasks
* @return List of pending tasks
*
*/
public default List<Task> getPendingSystemTasks(String taskType) {
default List<Task> getPendingSystemTasks(String taskType) {
return Collections.emptyList();
}

Expand All @@ -166,45 +166,45 @@ public default List<Task> getPendingSystemTasks(String taskType) {
* @return List of tasks for the given workflow instance id
*
*/
public abstract List<Task> getTasksForWorkflow(String workflowId);
List<Task> getTasksForWorkflow(String workflowId);

/**
*
* @param workflow Workflow to be created
* @return Id of the newly created workflow
*
*/
public abstract String createWorkflow(Workflow workflow);
String createWorkflow(Workflow workflow);

/**
*
* @param workflow Workflow to be updated
* @return Id of the updated workflow
*
*/
public abstract String updateWorkflow(Workflow workflow);
String updateWorkflow(Workflow workflow);

/**
*
* @param workflowId workflow instance id
*
*/
public abstract void removeWorkflow(String workflowId);
void removeWorkflow(String workflowId);

/**
*
* @param workflowType Workflow Type
* @param workflowId workflow instance id
*/
public abstract void removeFromPendingWorkflow(String workflowType, String workflowId);
void removeFromPendingWorkflow(String workflowType, String workflowId);

/**
*
* @param workflowId workflow instance id
* @return Workflow
*
*/
public abstract Workflow getWorkflow(String workflowId);
Workflow getWorkflow(String workflowId);

/**
*
Expand All @@ -213,14 +213,14 @@ public default List<Task> getPendingSystemTasks(String taskType) {
* @return Workflow instance details
*
*/
public abstract Workflow getWorkflow(String workflowId, boolean includeTasks);
Workflow getWorkflow(String workflowId, boolean includeTasks);

/**
*
* @param workflowName Name of the workflow
* @return List of workflow ids which are running
*/
public abstract List<String> getRunningWorkflowIds(String workflowName);
List<String> getRunningWorkflowIds(String workflowName);

/**
*
Expand Down Expand Up @@ -249,27 +249,38 @@ default List<Workflow> getRunningWorkflowIds(String workflowName, Integer versio
return Collections.emptyList();
}

/**
* Returns the list of workflows that are running for the given workflow name
* <p>
* This method is checks for the provided workflow type that in RUNNING, PAUSED or RESET state
* @param workflowName Name of the workflow
* @return List of workflow ids which are running
*/
default List<String> getRunningWorkflowByName(String workflowName) {
return Collections.emptyList();
}

/**
*
* @param workflowName Name of the workflow
* @return List of workflows that are running
*
*/
public abstract List<Workflow> getPendingWorkflowsByType(String workflowName);
List<Workflow> getPendingWorkflowsByType(String workflowName);

/**
*
* @param workflowName Name of the workflow
* @return No. of running workflows
*/
public abstract long getPendingWorkflowCount(String workflowName);
long getPendingWorkflowCount(String workflowName);

/**
*
* @param taskDefName Name of the task
* @return Number of task currently in IN_PROGRESS status
*/
public abstract long getInProgressTaskCount(String taskDefName);
long getInProgressTaskCount(String taskDefName);

/**
*
Expand All @@ -278,15 +289,15 @@ default List<Workflow> getRunningWorkflowIds(String workflowName, Integer versio
* @param endTime epoch time
* @return List of workflows between start and end time
*/
public abstract List<Workflow> getWorkflowsByType(String workflowName, Long startTime, Long endTime);
List<Workflow> getWorkflowsByType(String workflowName, Long startTime, Long endTime);

/**
*
* @param correlationId Correlation Id
* @return List of workflows by correlation id
*
*/
public abstract List<Workflow> getWorkflowsByCorrelationId(String correlationId);
List<Workflow> getWorkflowsByCorrelationId(String correlationId);


//Events
Expand All @@ -296,13 +307,13 @@ default List<Workflow> getRunningWorkflowIds(String workflowName, Integer versio
* @param ee Event Execution to be stored
* @return true if the event was added. false otherwise when the event by id is already already stored.
*/
public abstract boolean addEventExecution(EventExecution ee);
boolean addEventExecution(EventExecution ee);

/**
*
* @param ee Event execution to be updated
*/
public abstract void updateEventExecution(EventExecution ee);
void updateEventExecution(EventExecution ee);

/**
*
Expand All @@ -312,60 +323,60 @@ default List<Workflow> getRunningWorkflowIds(String workflowName, Integer versio
* @param max max number of executions to return
* @return list of matching events
*/
public List<EventExecution> getEventExecutions(String eventHandlerName, String eventName, String messageId, int max);
List<EventExecution> getEventExecutions(String eventHandlerName, String eventName, String messageId, int max);

/**
* Adds an incoming external message into the store/index
* @param queue Name of the registered queue
* @param msg Message
*/
public abstract void addMessage(String queue, Message msg);
void addMessage(String queue, Message msg);

public abstract void updateLastPoll(String taskDefName, String domain, String workerId);
void updateLastPoll(String taskDefName, String domain, String workerId);

public abstract PollData getPollData(String taskDefName, String domain);
PollData getPollData(String taskDefName, String domain);

public abstract List<PollData> getPollData(String taskDefName);
List<PollData> getPollData(String taskDefName);

public abstract void addErrorRegistry(WorkflowErrorRegistry workflowErrorRegistry);
void addErrorRegistry(WorkflowErrorRegistry workflowErrorRegistry);

public abstract List<WorkflowError> searchWorkflowErrorRegistry(WorkflowErrorRegistry workflowErrorRegistry);
List<WorkflowError> searchWorkflowErrorRegistry(WorkflowErrorRegistry workflowErrorRegistry);

public abstract List<WorkflowErrorRegistry> searchWorkflowErrorRegistryList(WorkflowErrorRegistry workflowErrorRegistry);
List<WorkflowErrorRegistry> searchWorkflowErrorRegistryList(WorkflowErrorRegistry workflowErrorRegistry);

/**
* Returns list of the in progress tasks associated with tags
* @param taskType The task type currently in progress for associated workflows
* @param tags The set of tags to search workflows
* @return List of in progress tasks for workflows associated with tags
*/
public default List<Task> getPendingTasksByTags(String taskType, Set<String> tags) {
default List<Task> getPendingTasksByTags(String taskType, Set<String> tags) {
return Collections.emptyList();
}

public default boolean anyRunningWorkflowsByTags(Set<String> tags) {
default boolean anyRunningWorkflowsByTags(Set<String> tags) {
return false;
}

public default void addEventPublished(EventPublished ep) {
default void addEventPublished(EventPublished ep) {
}

public default void resetStartTime(Task task, boolean updateOutput) {
default void resetStartTime(Task task, boolean updateOutput) {
}

public default Task getTask(String workflowId, String taskRefName) {
default Task getTask(String workflowId, String taskRefName) {
throw new IllegalStateException("Not implemented");
}

public default void removeTask(Task task) {
default void removeTask(Task task) {
throw new IllegalStateException("Not implemented");
}

public default void setWorkflowAttribute(String workflowId, String name, Object value) {
default void setWorkflowAttribute(String workflowId, String name, Object value) {
throw new IllegalStateException("Not implemented");
}

List<WorkflowErrorRegistry> findSubWorkflows(List<String> parent_workflow_ids);

public abstract List<TaskDetails> searchTaskDetails(String jobId, String workflowId, String workflowType, String taskName, Boolean includeOutput);
List<TaskDetails> searchTaskDetails(String jobId, String workflowId, String workflowType, String taskName, Boolean includeOutput);
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ public WorkflowDef getWorkflowDef(String name, Integer version) {
return metadata.get(name, version);
}

/**
* Retrieve workflows with the defined metadata definition.
* @param name Name of the workflow to retrieve
* @return the list of workflow ids where the metadata definition is actively being used.
*/
public List<String> getWorkflowDefInUsage(String name) {
return executor.getRunningWorkflowsByName(name);
}

/**
* Remove workflow definition
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ public WorkflowDef get(@PathParam("name") String name, @QueryParam("version") In
public List<WorkflowDef> getAll() throws Exception {
return service.getWorkflowDefs();
}

@GET
@ApiOperation("Retrieves all workflows associated with the given metadata definition")
@Path("/workflow/usage/{name}")
public List<String> checkUsage(@PathParam("name") String name) throws Exception {
List<String> workflowIds = service.getWorkflowDefInUsage(name);
if (CollectionUtils.isEmpty(workflowIds)) {
throw new ApplicationException(Code.NOT_FOUND, "Workflow is not currently in use");
}
return workflowIds;
}

@POST
@Path("/taskdefs")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,12 @@ public List<String> getRunningWorkflowIds(String workflowName) {
return getWithTransaction(tx -> getRunningWorkflowIds(tx, workflowName));
}

@Override
public List<String> getRunningWorkflowByName(String workflowName) {
Preconditions.checkNotNull(workflowName, "the workflow name must not be null or blank");
return getWithTransaction(tx -> getRunningWorkflowByDefName(tx, workflowName));
}

@Override
public List<String> getRunningWorkflowIds(String workflowName, String startTime, String endDate) {
return getWithTransaction(tx -> getRunningWorkflowIdsWithFilters(tx, workflowName, startTime, endDate));
Expand Down Expand Up @@ -526,6 +532,11 @@ private List<String> getRunningWorkflowIds(Connection tx, String workflowName) {
return query(tx, SQL, q -> q.addParameter(workflowName).executeScalarList(String.class));
}

private List<String> getRunningWorkflowByDefName(Connection tx, String workflowName) {
String SQL = "SELECT workflow_id FROM workflow WHERE workflow_type = ? LIMIT 10";
return query(tx, SQL, q -> q.addParameter(workflowName).executeScalarList(String.class));
}

private List<Workflow> getRunningWorkflowsWithFilters(Connection tx, String workflowName, String startDate, String endDate) {
String SQL = String.join("", "SELECT workflow_id FROM workflow WHERE workflow_type = ? AND workflow_status IN ('RUNNING','PAUSED') ",
buildQuery(startDate, endDate));
Expand Down

0 comments on commit 8ac7f3d

Please sign in to comment.