Skip to content

Commit

Permalink
Implement WorkflowOptimizer
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Dec 4, 2024
1 parent e313327 commit 2975992
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ public interface Workflow {

List<Edge> getEdges( ActivityWrapper from, ActivityWrapper to );

/**
* Returns a list of all edges incident to the target.
* The order of the returned edges is arbitrary.
*
* @param target target activity id
* @return a list of all edges incident to the target
*/
List<Edge> getInEdges( UUID target );

List<Edge> getOutEdges( UUID source );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@
package org.polypheny.db.workflow.dag.edges;

import lombok.Getter;
import lombok.Setter;
import org.polypheny.db.workflow.dag.activities.ActivityWrapper;
import org.polypheny.db.workflow.models.EdgeModel;

@Getter
public class ControlEdge extends Edge {

public static final int SUCCESS_PORT = 0;
public static final int FAIL_PORT = 1;

@Getter
private final boolean onSuccess;

@Setter
private boolean isIgnored = false; // true if this control edge can be ignored (e.g. since target is already allowed to execute)


public ControlEdge( ActivityWrapper from, ActivityWrapper to, boolean onSuccess ) {
super( from, to );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.util.UUID;
import lombok.Getter;
import lombok.Setter;
import org.polypheny.db.util.graph.AttributedDirectedGraph;
import org.polypheny.db.util.graph.DefaultEdge;
import org.polypheny.db.workflow.dag.edges.ControlEdge;
Expand All @@ -39,8 +38,6 @@ public class ExecutionEdge extends DefaultEdge {

// isControl == true
private final boolean onSuccess;
@Setter
private boolean isIgnored = false; // control edge has no influence anymore as target is already allowed to execute


public ExecutionEdge( UUID v0, UUID v1, Edge edge ) {
Expand Down Expand Up @@ -68,7 +65,6 @@ public ExecutionEdge( ExecutionEdge edge ) {
this.fromPort = edge.fromPort;
this.toPort = edge.toPort;
this.onSuccess = edge.onSuccess;
this.isIgnored = edge.isIgnored;
}


Expand Down Expand Up @@ -102,7 +98,6 @@ public int hashCode() {
result = 31 * result + Boolean.hashCode( isControl );
if ( isControl ) {
result = 31 * result + Boolean.hashCode( onSuccess );
// isIgnored is not part of hashCode, as it is mutable
} else {
result = 31 * result + Integer.hashCode( fromPort );
result = 31 * result + Integer.hashCode( toPort );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public synchronized void startExecution( Workflow workflow, StorageManager sm, @
}
interruptedSessions.remove( sessionId );

WorkflowScheduler scheduler = new WorkflowScheduler( workflow, sm, targetActivity );
WorkflowScheduler scheduler = new WorkflowScheduler( workflow, sm, globalWorkers, targetActivity );
List<ExecutionSubmission> submissions = scheduler.startExecution();
if ( submissions.isEmpty() ) {
throw new GenericRuntimeException( "At least one activity needs to be executable when submitting a workflow for execution" );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,32 @@
package org.polypheny.db.workflow.engine.scheduler;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.commons.lang3.NotImplementedException;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.catalog.exceptions.GenericRuntimeException;
import org.polypheny.db.util.graph.AttributedDirectedGraph;
import org.polypheny.db.util.graph.TopologicalOrderIterator;
import org.polypheny.db.workflow.dag.Workflow;
import org.polypheny.db.workflow.dag.activities.Activity;
import org.polypheny.db.workflow.dag.activities.ActivityException;
import org.polypheny.db.workflow.dag.activities.ActivityWrapper;
import org.polypheny.db.workflow.dag.activities.ActivityWrapper.ActivityState;
import org.polypheny.db.workflow.dag.edges.DataEdge;
import org.polypheny.db.workflow.dag.edges.Edge;
import org.polypheny.db.workflow.dag.edges.Edge.EdgeState;
import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue;
import org.polypheny.db.workflow.engine.scheduler.optimizer.WorkflowOptimizer;
import org.polypheny.db.workflow.engine.scheduler.optimizer.WorkflowOptimizer.SubmissionFactory;
import org.polypheny.db.workflow.engine.scheduler.optimizer.WorkflowOptimizerImpl;
import org.polypheny.db.workflow.engine.storage.StorageManager;

Expand All @@ -59,15 +69,17 @@ public class WorkflowScheduler {
private boolean isFinished;

private int pendingCount = 0; // current number of unfinished submissions
private final Queue<ExecutionSubmission> submissionBuffer = new LinkedList<>(); // contains not yet submitted subtrees because of the maxWorkers limit
private final Set<UUID> remainingActivities = new HashSet<>(); // activities that have not finished execution
private final Set<UUID> pendingActivities = new HashSet<>(); // contains activities submitted for execution

private final Map<UUID, List<Optional<AlgDataType>>> typePreviews = new HashMap<>(); // contains the (possibly not yet known) output types of execDag activities
private final Map<UUID, Map<String, Optional<SettingValue>>> settingsPreviews = new HashMap<>(); // contains the (possibly not yet known) settings of execDag activities

public WorkflowScheduler( Workflow workflow, StorageManager sm, @Nullable UUID targetActivity ) throws Exception {

public WorkflowScheduler( Workflow workflow, StorageManager sm, int globalWorkers, @Nullable UUID targetActivity ) throws Exception {
this.workflow = workflow;
this.sm = sm;
this.maxWorkers = workflow.getConfig().getMaxWorkers();
this.maxWorkers = Math.min( workflow.getConfig().getMaxWorkers(), globalWorkers );

if ( targetActivity != null && workflow.getActivity( targetActivity ).getState() == ActivityState.SAVED ) {
throw new GenericRuntimeException( "A saved activity first needs to be reset before executing it" );
Expand All @@ -78,13 +90,14 @@ public WorkflowScheduler( Workflow workflow, StorageManager sm, @Nullable UUID t
validateCommonLoad();

this.execDag = targetActivity == null ? prepareExecutionDag() : prepareExecutionDag( List.of( targetActivity ) );
initPreviews();
this.optimizer = new WorkflowOptimizerImpl( workflow, execDag );

}


public List<ExecutionSubmission> startExecution() {
return getNextBufferedSubmissions( computeNextSubmissions() );
return computeNextSubmissions();
}


Expand All @@ -108,9 +121,7 @@ public List<ExecutionSubmission> handleExecutionResult( ExecutionResult result )

propagateResult( result.isSuccess(), result.getActivities() );

List<ExecutionSubmission> nextSubmissions = computeNextSubmissions();

return getNextBufferedSubmissions( null );
return computeNextSubmissions();
}


Expand Down Expand Up @@ -205,24 +216,65 @@ private void validateCommonLoad() throws Exception {
}


private List<ExecutionSubmission> getNextBufferedSubmissions( List<ExecutionSubmission> newSubmissions ) {
submissionBuffer.addAll( newSubmissions );
// TODO: update state of involved activities to queued or something similar
private void initPreviews() throws ActivityException {
for ( UUID n : TopologicalOrderIterator.of( execDag ) ) {
ActivityWrapper wrapper = workflow.getActivity( n );
Activity activity = wrapper.getActivity();
ActivityState state = wrapper.getState();

if ( state == ActivityState.SAVED ) {
// settings are not required for already executed nodes
typePreviews.put( n, sm.getTupleTypes( n ).stream().map( Optional::ofNullable ).toList() );

} else if ( state == ActivityState.IDLE ) {
List<Optional<AlgDataType>> inputTypes = new ArrayList<>();
boolean allInputsSaved = true;
for ( int i = 0; i < wrapper.getDef().getInPorts().length; i++ ) {
DataEdge dataEdge = workflow.getDataEdge( n, i );
ActivityWrapper inWrapper = dataEdge.getFrom();
if ( remainingActivities.contains( inWrapper.getId() ) ) {
allInputsSaved = false;
}
inputTypes.add( typePreviews.get( inWrapper.getId() ).get( dataEdge.getFromPort() ) );
}
// TODO: ensure control inputs are also saved, then merge variables correctly
// Also change executor merge to be correct (correct order, only active)

Map<String, Optional<SettingValue>> settings = wrapper.resolveAvailableSettings();
settingsPreviews.put( n, settings );
typePreviews.put( n, activity.previewOutTypes( inputTypes, settings ) );

} else {
throw new IllegalStateException( "Illegal state of activity while initiating scheduler: " + state + " for " + n );
}

List<ExecutionSubmission> submissions = new ArrayList<>();
while ( pendingActivities.size() < maxWorkers && !submissionBuffer.isEmpty() ) {
ExecutionSubmission submission = submissionBuffer.remove();
submissions.add( submission );
pendingActivities.addAll( submission.getActivities() );
pendingCount++;
switch ( state ) {

case IDLE -> {
}
case QUEUED -> {
}
case EXECUTING -> {
}
case SKIPPED -> {
}
case FAILED -> {
}
case FINISHED -> {
}
case SAVED -> {
}
}
}
return submissions;
}


private List<ExecutionSubmission> computeNextSubmissions() {
// TODO: determine previews
return optimizer.computeNextTrees( null, null ).stream().map( f -> f.create( sm, workflow ) ).toList();

List<SubmissionFactory> factories = optimizer.computeNextTrees( null, null, maxWorkers - pendingCount, null );
pendingCount += factories.size();
return factories.stream().map( f -> f.create( sm, workflow ) ).toList();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.polypheny.db.workflow.engine.scheduler.optimizer;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -26,6 +27,7 @@
import org.polypheny.db.catalog.exceptions.GenericRuntimeException;
import org.polypheny.db.util.graph.AttributedDirectedGraph;
import org.polypheny.db.workflow.dag.Workflow;
import org.polypheny.db.workflow.dag.activities.ActivityWrapper;
import org.polypheny.db.workflow.dag.activities.ActivityWrapper.ActivityState;
import org.polypheny.db.workflow.dag.activities.Fusable;
import org.polypheny.db.workflow.dag.activities.Pipeable;
Expand All @@ -42,6 +44,7 @@
import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge;
import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge.ExecutionEdgeFactory;
import org.polypheny.db.workflow.engine.scheduler.ExecutionSubmission;
import org.polypheny.db.workflow.engine.scheduler.GraphUtils;
import org.polypheny.db.workflow.engine.storage.StorageManager;
import org.polypheny.db.workflow.models.ActivityConfigModel.CommonTransaction;

Expand All @@ -60,14 +63,24 @@ protected WorkflowOptimizer( Workflow workflow, AttributedDirectedGraph<UUID, Ex
}


public final List<SubmissionFactory> computeNextTrees( Map<UUID, List<Optional<AlgDataType>>> typePreviews, Map<UUID, Map<String, Optional<SettingValue>>> settingsPreviews ) {
public final List<SubmissionFactory> computeNextTrees( Map<UUID, List<Optional<AlgDataType>>> typePreviews, Map<UUID, Map<String, Optional<SettingValue>>> settingsPreviews, int submissionCount, CommonTransaction commonType ) {
this.typePreviews = typePreviews;
this.settingsPreviews = settingsPreviews;
return computeNextTrees();

List<SubmissionFactory> orderedCandidates = computeNextTrees( commonType );
return orderedCandidates.subList( 0, Math.min( submissionCount, orderedCandidates.size() ) );
}


abstract List<SubmissionFactory> computeNextTrees();
/**
* Returns a list of candidate submissions based on the current state of the optimizer.
* The list is ordered by priority (most important submission first).
* This operation must not perform any changes to any of the fields of the abstract WorkflowOptimizer.
* There is no guarantee whether the returned submissions will actually be queued for execution.
*
* @return A list of SubmissionFactories that can be used to create actual submissions.
*/
abstract List<SubmissionFactory> computeNextTrees( CommonTransaction commonType );


/**
Expand All @@ -92,13 +105,6 @@ boolean canPipe( UUID activityId ) {
}


boolean canPipe( ExecutionEdge edge ) {
UUID source = edge.getSource(), target = edge.getTarget();
return execDag.getOutwardEdges( source ).size() == 1
&& canPipe( source ) && canPipe( target );
}


boolean requestsToWrite( UUID activityId ) {
if ( workflow.getActivity( activityId ).getActivity() instanceof VariableWriter writer ) {
// true is more restricting for optimizer -> return true if empty Optional
Expand All @@ -118,13 +124,30 @@ ActivityState getState( UUID activityId ) {
}


EdgeState getEdgeState( ExecutionEdge edge ) {
AttributedDirectedGraph<UUID, ExecutionEdge> getCommonSubExecDag( CommonTransaction commonType ) {
Set<UUID> nodes = new HashSet<>();
for ( UUID n : execDag.vertexSet() ) {
ActivityWrapper wrapper = workflow.getActivity( n );
if ( wrapper.getConfig().getTransactionMode() == commonType ) {
nodes.add( n );
}
}
return GraphUtils.getInducedSubgraph( execDag, nodes );
}


Edge getEdge( ExecutionEdge edge ) {
for ( Edge candidate : workflow.getEdges( edge.getSource(), edge.getTarget() ) ) {
if ( edge.representsEdge( candidate ) ) {
return candidate.getState();
return candidate;
}
}
throw new IllegalArgumentException( "Cannot return edge state of edge that is not part of the workflow: " + edge );
throw new IllegalArgumentException( "Cannot return Edge of ExecutionEdge that is not part of the workflow: " + edge );
}


EdgeState getEdgeState( ExecutionEdge edge ) {
return getEdge( edge ).getState();
}


Expand Down
Loading

0 comments on commit 2975992

Please sign in to comment.