Skip to content

Commit

Permalink
Merge pull request #18663 from ahmedhamidawan/optimize_useInvocationG…
Browse files Browse the repository at this point in the history
…raph

[24.1] Optimize `useInvocationGraph` for Invocation view
  • Loading branch information
ElectronicBlueberry authored Aug 8, 2024
2 parents cf211c1 + 8856b83 commit 2a670b4
Showing 1 changed file with 154 additions and 101 deletions.
255 changes: 154 additions & 101 deletions client/src/composables/useInvocationGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ import {
faSpinner,
faTrash,
} from "@fortawesome/free-solid-svg-icons";
import Vue, { computed, type Ref, ref } from "vue";
import { computed, type Ref, ref, set } from "vue";

import { stepJobsSummaryFetcher, type StepJobSummary, type WorkflowInvocationElementView } from "@/api/invocations";
import {
type InvocationStep,
stepJobsSummaryFetcher,
type StepJobSummary,
type WorkflowInvocationElementView,
} from "@/api/invocations";
import { isWorkflowInput } from "@/components/Workflow/constants";
import { fromSimple } from "@/components/Workflow/Editor/modules/model";
import { getWorkflowFull } from "@/components/Workflow/workflows.services";
Expand Down Expand Up @@ -73,8 +78,11 @@ export function useInvocationGraph(
library.add(faCheckCircle, faClock, faExclamationTriangle, faForward, faPause, faSpinner, faTrash);

const steps = ref<{ [index: string]: GraphStep }>({});
const stepsPopulated = ref(false);
const storeId = computed(() => `invocation-${invocation.value.id}`);

const lastStepsJobsSummary = ref<StepJobSummary[]>([]);

/** The full invocation mapped onto the original workflow */
const invocationGraph = ref<InvocationGraph | null>(null);

Expand Down Expand Up @@ -105,118 +113,163 @@ export function useInvocationGraph(
}

// get the job summary for each step in the invocation
const { data: stepJobsSummary } = await stepJobsSummaryFetcher({ invocation_id: invocation.value.id });

/** The original steps of the workflow */
const originalSteps: Record<string, Step> = { ...loadedWorkflow.value.steps };

// for each step in the workflow, store the state and status of jobs
for (let i = 0; i < Object.keys(originalSteps).length; i++) {
/** An invocation graph step */
const graphStepFromWfStep = { ...originalSteps[i] } as GraphStep;

/** The type of the step (subworkflow, input, tool, etc.) */
let type;
if (graphStepFromWfStep.type === "subworkflow") {
type = "subworkflow";
} else if (isWorkflowInput(graphStepFromWfStep.type)) {
type = "input";
const { data: stepsJobsSummary } = await stepJobsSummaryFetcher({ invocation_id: invocation.value.id });

// if the steps have not been populated or the job states have changed, update the steps
// TODO: What if the state of something not in the stepsJobsSummary has changed? (e.g.: subworkflows...)
if (
!stepsPopulated.value ||
JSON.stringify(stepsJobsSummary) !== JSON.stringify(lastStepsJobsSummary.value)
) {
updateSteps(stepsJobsSummary);

// Load the invocation graph into the editor the first time
if (!stepsPopulated.value) {
invocationGraph.value!.steps = { ...steps.value };
await fromSimple(storeId.value, invocationGraph.value as any);
stepsPopulated.value = true;
}
}
} catch (e) {
rethrowSimple(e);
}
}

/** The raw invocation step */
const invocationStep = invocation.value.steps[i];

if (type !== "input") {
// there is an invocation step for this workflow step
if (invocationStep) {
/** The `populated_state` for this graph step. (This may or may not be used to
* derive the `state` for this invocation graph step) */
let populatedState;

if (type === "subworkflow") {
// if the step is a subworkflow, get the populated state from the invocation step
populatedState = invocationStep.state || undefined;

/* TODO:
Note that subworkflows are often in the `scheduled` state regardless of whether
their output is successful or not. One good way to visually show if a subworkflow was
successful is to set `graphStepFromWfStep.state = subworkflow.output?.state`.
*/
}
/** Update the steps of the invocation graph with the step job summaries, or initialize the steps
* if they haven't been populated yet.
* @param stepsJobsSummary - The job summary for each step in the invocation
* */
function updateSteps(stepsJobsSummary: StepJobSummary[]) {
/** Initialize with the original steps of the workflow, else update the existing graph steps */
const fullSteps: Record<string, Step | GraphStep> = !stepsPopulated.value
? { ...loadedWorkflow.value.steps }
: steps.value;

// for each step, store the state and status of jobs
for (let i = 0; i < Object.keys(fullSteps).length; i++) {
/** An invocation graph step (initialized with the original workflow step) */
let graphStepFromWfStep;
if (!steps.value[i]) {
graphStepFromWfStep = { ...fullSteps[i] } as GraphStep;
} else {
graphStepFromWfStep = steps.value[i] as GraphStep;
}

// First, try setting the state of the graph step based on its jobs' states or the populated state
else {
/** The step job summary for the invocation step (based on its job id) */
const invocationStepSummary = stepJobsSummary.find((stepJobSummary: StepJobSummary) => {
if (stepJobSummary.model === "ImplicitCollectionJobs") {
return stepJobSummary.id === invocationStep.implicit_collection_jobs_id;
} else {
return stepJobSummary.id === invocationStep.job_id;
}
});

if (invocationStepSummary) {
// the step is not a subworkflow, get the populated state from the invocation step summary
populatedState = invocationStepSummary.populated_state;

if (invocationStepSummary.states) {
const statesForThisStep = Object.keys(invocationStepSummary.states);
// set the state of the graph step based on the job states for this step
graphStepFromWfStep.state = getStepStateFromJobStates(statesForThisStep);
}
// now store the job states for this step in the graph step
graphStepFromWfStep.jobs = invocationStepSummary.states;
} else {
// TODO: There is no summary for this step's `job_id`; what does this mean?
graphStepFromWfStep.state = "waiting";
}
/** The raw invocation step */
const invocationStep = invocation.value.steps[i];

if (!isWorkflowInput(graphStepFromWfStep.type)) {
let invocationStepSummary: StepJobSummary | undefined;
if (invocationStep) {
invocationStepSummary = stepsJobsSummary.find((stepJobSummary: StepJobSummary) => {
if (stepJobSummary.model === "ImplicitCollectionJobs") {
return stepJobSummary.id === invocationStep.implicit_collection_jobs_id;
} else {
return stepJobSummary.id === invocationStep.job_id;
}
});
}
updateStep(graphStepFromWfStep, invocationStep, invocationStepSummary);
}

// If the state still hasn't been set, set it based on the populated state
if (!graphStepFromWfStep.state) {
if (populatedState === "scheduled" || populatedState === "ready") {
graphStepFromWfStep.state = "queued";
} else if (populatedState === "resubmitted") {
graphStepFromWfStep.state = "new";
} else if (populatedState === "failed") {
graphStepFromWfStep.state = "error";
} else if (populatedState === "deleting") {
graphStepFromWfStep.state = "deleted";
} else if (populatedState && !["stop", "stopped"].includes(populatedState)) {
graphStepFromWfStep.state = populatedState as GraphStep["state"];
}
}
}
// add the graph step to the steps object if it doesn't exist yet
if (!steps.value[i]) {
set(steps.value, i, graphStepFromWfStep);
}
}

// there is no invocation step for this workflow step, it is probably queued
else {
graphStepFromWfStep.state = "queued";
}
lastStepsJobsSummary.value = stepsJobsSummary;
}

/** Setting the header class for the graph step */
graphStepFromWfStep.headerClass = {
"node-header-invocation": true,
[`header-${graphStepFromWfStep.state}`]: !!graphStepFromWfStep.state,
};
// TODO: maybe a different one for inputs? Currently they have no state either.

/** Setting the header icon for the graph step */
if (graphStepFromWfStep.state) {
graphStepFromWfStep.headerIcon = iconClasses[graphStepFromWfStep.state]?.icon;
graphStepFromWfStep.headerIconSpin = iconClasses[graphStepFromWfStep.state]?.spin;
/**
* Store the state, jobs and class for the graph step based on the invocation step and its job summary.
* @param graphStep - Invocation graph step
* @param invocationStep - The invocation step for the workflow step
* @param invocationStepSummary - The step job summary for the invocation step (based on its job id)
*/
function updateStep(
graphStep: GraphStep,
invocationStep: InvocationStep | undefined,
invocationStepSummary: StepJobSummary | undefined
) {
/** The new state for the graph step */
let newState = graphStep.state;

// there is an invocation step for this workflow step
if (invocationStep) {
/** The `populated_state` for this graph step. (This may or may not be used to
* derive the `state` for this invocation graph step) */
let populatedState;

if (graphStep.type === "subworkflow") {
// if the step is a subworkflow, get the populated state from the invocation step
populatedState = invocationStep.state || undefined;

/* TODO:
Note that subworkflows are often in the `scheduled` state regardless of whether
their output is successful or not. One good way to visually show if a subworkflow was
successful is to set `graphStep.state = subworkflow.output?.state`.
*/
}

// First, try setting the state of the graph step based on its jobs' states or the populated state
else {
if (invocationStepSummary) {
// the step is not a subworkflow, get the populated state from the invocation step summary
populatedState = invocationStepSummary.populated_state;

if (invocationStepSummary.states) {
const statesForThisStep = Object.keys(invocationStepSummary.states);
// set the state of the graph step based on the job states for this step
newState = getStepStateFromJobStates(statesForThisStep);
}
// now store the job states for this step in the graph step, if they changed since the last time
if (JSON.stringify(graphStep.jobs) !== JSON.stringify(invocationStepSummary.states)) {
set(graphStep, "jobs", invocationStepSummary.states);
}
} else {
// TODO: There is no summary for this step's `job_id`; what does this mean?
newState = "waiting";
}
}

// update the invocation graph steps object
Vue.set(steps.value, i, graphStepFromWfStep);
// If the state still hasn't been set, set it based on the populated state
if (!newState) {
if (populatedState === "scheduled" || populatedState === "ready") {
newState = "queued";
} else if (populatedState === "resubmitted") {
newState = "new";
} else if (populatedState === "failed") {
newState = "error";
} else if (populatedState === "deleting") {
newState = "deleted";
} else if (populatedState && !["stop", "stopped"].includes(populatedState)) {
newState = populatedState as GraphStep["state"];
}
}
}

invocationGraph.value!.steps = { ...steps.value };
// there is no invocation step for this workflow step, it is probably queued
else {
newState = "queued";
}

// Load the invocation graph into the editor every time
await fromSimple(storeId.value, invocationGraph.value as any);
} catch (e) {
rethrowSimple(e);
// if the state has changed, update the graph step
if (graphStep.state !== newState) {
graphStep.state = newState;

/** Setting the header class for the graph step */
graphStep.headerClass = {
"node-header-invocation": true,
[`header-${graphStep.state}`]: !!graphStep.state,
};
// TODO: maybe a different one for inputs? Currently they have no state either.

/** Setting the header icon for the graph step */
if (graphStep.state) {
graphStep.headerIcon = iconClasses[graphStep.state]?.icon;
graphStep.headerIconSpin = iconClasses[graphStep.state]?.spin;
}
}
}

Expand Down

0 comments on commit 2a670b4

Please sign in to comment.