diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java index 214bcc006a..0132961a94 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java @@ -54,6 +54,13 @@ public interface Workflow { List getEdges( ActivityWrapper from, ActivityWrapper to ); + /** + * Returns a list of all edges incident to the target. + * The order of the returned edges is arbitrary. + * + * @param target target activity id + * @return a list of all edges incident to the target + */ List getInEdges( UUID target ); List getOutEdges( UUID source ); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/edges/ControlEdge.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/edges/ControlEdge.java index 6f14e189db..d7ae8454ae 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/edges/ControlEdge.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/edges/ControlEdge.java @@ -17,17 +17,21 @@ package org.polypheny.db.workflow.dag.edges; import lombok.Getter; +import lombok.Setter; import org.polypheny.db.workflow.dag.activities.ActivityWrapper; import org.polypheny.db.workflow.models.EdgeModel; +@Getter public class ControlEdge extends Edge { public static final int SUCCESS_PORT = 0; public static final int FAIL_PORT = 1; - @Getter private final boolean onSuccess; + @Setter + private boolean isIgnored = false; // true if this control edge can be ignored (e.g. since target is already allowed to execute) + public ControlEdge( ActivityWrapper from, ActivityWrapper to, boolean onSuccess ) { super( from, to ); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionEdge.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionEdge.java index 238d57c750..c398b2cedd 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionEdge.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionEdge.java @@ -18,7 +18,6 @@ import java.util.UUID; import lombok.Getter; -import lombok.Setter; import org.polypheny.db.util.graph.AttributedDirectedGraph; import org.polypheny.db.util.graph.DefaultEdge; import org.polypheny.db.workflow.dag.edges.ControlEdge; @@ -39,8 +38,6 @@ public class ExecutionEdge extends DefaultEdge { // isControl == true private final boolean onSuccess; - @Setter - private boolean isIgnored = false; // control edge has no influence anymore as target is already allowed to execute public ExecutionEdge( UUID v0, UUID v1, Edge edge ) { @@ -68,7 +65,6 @@ public ExecutionEdge( ExecutionEdge edge ) { this.fromPort = edge.fromPort; this.toPort = edge.toPort; this.onSuccess = edge.onSuccess; - this.isIgnored = edge.isIgnored; } @@ -102,7 +98,6 @@ public int hashCode() { result = 31 * result + Boolean.hashCode( isControl ); if ( isControl ) { result = 31 * result + Boolean.hashCode( onSuccess ); - // isIgnored is not part of hashCode, as it is mutable } else { result = 31 * result + Integer.hashCode( fromPort ); result = 31 * result + Integer.hashCode( toPort ); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java index 86e5b0b884..d58c3bcb0a 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java @@ -74,7 +74,7 @@ public synchronized void startExecution( Workflow workflow, StorageManager sm, @ } interruptedSessions.remove( sessionId ); - WorkflowScheduler scheduler = new WorkflowScheduler( workflow, sm, targetActivity ); + WorkflowScheduler scheduler = new WorkflowScheduler( workflow, sm, globalWorkers, targetActivity ); List submissions = scheduler.startExecution(); if ( submissions.isEmpty() ) { throw new GenericRuntimeException( "At least one activity needs to be executable when submitting a workflow for execution" ); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java index c73e17d77d..dd0697c4c4 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java @@ -17,22 +17,32 @@ package org.polypheny.db.workflow.engine.scheduler; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.UUID; import javax.annotation.Nullable; import org.apache.commons.lang3.NotImplementedException; +import org.polypheny.db.algebra.type.AlgDataType; import org.polypheny.db.catalog.exceptions.GenericRuntimeException; import org.polypheny.db.util.graph.AttributedDirectedGraph; +import org.polypheny.db.util.graph.TopologicalOrderIterator; import org.polypheny.db.workflow.dag.Workflow; +import org.polypheny.db.workflow.dag.activities.Activity; +import org.polypheny.db.workflow.dag.activities.ActivityException; import org.polypheny.db.workflow.dag.activities.ActivityWrapper; import org.polypheny.db.workflow.dag.activities.ActivityWrapper.ActivityState; +import org.polypheny.db.workflow.dag.edges.DataEdge; import org.polypheny.db.workflow.dag.edges.Edge; import org.polypheny.db.workflow.dag.edges.Edge.EdgeState; +import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue; import org.polypheny.db.workflow.engine.scheduler.optimizer.WorkflowOptimizer; +import org.polypheny.db.workflow.engine.scheduler.optimizer.WorkflowOptimizer.SubmissionFactory; import org.polypheny.db.workflow.engine.scheduler.optimizer.WorkflowOptimizerImpl; import org.polypheny.db.workflow.engine.storage.StorageManager; @@ -59,15 +69,17 @@ public class WorkflowScheduler { private boolean isFinished; private int pendingCount = 0; // current number of unfinished submissions - private final Queue submissionBuffer = new LinkedList<>(); // contains not yet submitted subtrees because of the maxWorkers limit private final Set remainingActivities = new HashSet<>(); // activities that have not finished execution private final Set pendingActivities = new HashSet<>(); // contains activities submitted for execution + private final Map>> typePreviews = new HashMap<>(); // contains the (possibly not yet known) output types of execDag activities + private final Map>> settingsPreviews = new HashMap<>(); // contains the (possibly not yet known) settings of execDag activities - public WorkflowScheduler( Workflow workflow, StorageManager sm, @Nullable UUID targetActivity ) throws Exception { + + public WorkflowScheduler( Workflow workflow, StorageManager sm, int globalWorkers, @Nullable UUID targetActivity ) throws Exception { this.workflow = workflow; this.sm = sm; - this.maxWorkers = workflow.getConfig().getMaxWorkers(); + this.maxWorkers = Math.min( workflow.getConfig().getMaxWorkers(), globalWorkers ); if ( targetActivity != null && workflow.getActivity( targetActivity ).getState() == ActivityState.SAVED ) { throw new GenericRuntimeException( "A saved activity first needs to be reset before executing it" ); @@ -78,13 +90,14 @@ public WorkflowScheduler( Workflow workflow, StorageManager sm, @Nullable UUID t validateCommonLoad(); this.execDag = targetActivity == null ? prepareExecutionDag() : prepareExecutionDag( List.of( targetActivity ) ); + initPreviews(); this.optimizer = new WorkflowOptimizerImpl( workflow, execDag ); } public List startExecution() { - return getNextBufferedSubmissions( computeNextSubmissions() ); + return computeNextSubmissions(); } @@ -108,9 +121,7 @@ public List handleExecutionResult( ExecutionResult result ) propagateResult( result.isSuccess(), result.getActivities() ); - List nextSubmissions = computeNextSubmissions(); - - return getNextBufferedSubmissions( null ); + return computeNextSubmissions(); } @@ -205,24 +216,65 @@ private void validateCommonLoad() throws Exception { } - private List getNextBufferedSubmissions( List newSubmissions ) { - submissionBuffer.addAll( newSubmissions ); - // TODO: update state of involved activities to queued or something similar + private void initPreviews() throws ActivityException { + for ( UUID n : TopologicalOrderIterator.of( execDag ) ) { + ActivityWrapper wrapper = workflow.getActivity( n ); + Activity activity = wrapper.getActivity(); + ActivityState state = wrapper.getState(); + + if ( state == ActivityState.SAVED ) { + // settings are not required for already executed nodes + typePreviews.put( n, sm.getTupleTypes( n ).stream().map( Optional::ofNullable ).toList() ); + + } else if ( state == ActivityState.IDLE ) { + List> inputTypes = new ArrayList<>(); + boolean allInputsSaved = true; + for ( int i = 0; i < wrapper.getDef().getInPorts().length; i++ ) { + DataEdge dataEdge = workflow.getDataEdge( n, i ); + ActivityWrapper inWrapper = dataEdge.getFrom(); + if ( remainingActivities.contains( inWrapper.getId() ) ) { + allInputsSaved = false; + } + inputTypes.add( typePreviews.get( inWrapper.getId() ).get( dataEdge.getFromPort() ) ); + } + // TODO: ensure control inputs are also saved, then merge variables correctly + // Also change executor merge to be correct (correct order, only active) + + Map> settings = wrapper.resolveAvailableSettings(); + settingsPreviews.put( n, settings ); + typePreviews.put( n, activity.previewOutTypes( inputTypes, settings ) ); + + } else { + throw new IllegalStateException( "Illegal state of activity while initiating scheduler: " + state + " for " + n ); + } - List submissions = new ArrayList<>(); - while ( pendingActivities.size() < maxWorkers && !submissionBuffer.isEmpty() ) { - ExecutionSubmission submission = submissionBuffer.remove(); - submissions.add( submission ); - pendingActivities.addAll( submission.getActivities() ); - pendingCount++; + switch ( state ) { + + case IDLE -> { + } + case QUEUED -> { + } + case EXECUTING -> { + } + case SKIPPED -> { + } + case FAILED -> { + } + case FINISHED -> { + } + case SAVED -> { + } + } } - return submissions; } private List computeNextSubmissions() { // TODO: determine previews - return optimizer.computeNextTrees( null, null ).stream().map( f -> f.create( sm, workflow ) ).toList(); + + List factories = optimizer.computeNextTrees( null, null, maxWorkers - pendingCount, null ); + pendingCount += factories.size(); + return factories.stream().map( f -> f.create( sm, workflow ) ).toList(); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizer.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizer.java index a598fc4867..f77a2963e7 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizer.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizer.java @@ -16,6 +16,7 @@ package org.polypheny.db.workflow.engine.scheduler.optimizer; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -26,6 +27,7 @@ import org.polypheny.db.catalog.exceptions.GenericRuntimeException; import org.polypheny.db.util.graph.AttributedDirectedGraph; import org.polypheny.db.workflow.dag.Workflow; +import org.polypheny.db.workflow.dag.activities.ActivityWrapper; import org.polypheny.db.workflow.dag.activities.ActivityWrapper.ActivityState; import org.polypheny.db.workflow.dag.activities.Fusable; import org.polypheny.db.workflow.dag.activities.Pipeable; @@ -42,6 +44,7 @@ import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge; import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge.ExecutionEdgeFactory; import org.polypheny.db.workflow.engine.scheduler.ExecutionSubmission; +import org.polypheny.db.workflow.engine.scheduler.GraphUtils; import org.polypheny.db.workflow.engine.storage.StorageManager; import org.polypheny.db.workflow.models.ActivityConfigModel.CommonTransaction; @@ -60,14 +63,24 @@ protected WorkflowOptimizer( Workflow workflow, AttributedDirectedGraph computeNextTrees( Map>> typePreviews, Map>> settingsPreviews ) { + public final List computeNextTrees( Map>> typePreviews, Map>> settingsPreviews, int submissionCount, CommonTransaction commonType ) { this.typePreviews = typePreviews; this.settingsPreviews = settingsPreviews; - return computeNextTrees(); + + List orderedCandidates = computeNextTrees( commonType ); + return orderedCandidates.subList( 0, Math.min( submissionCount, orderedCandidates.size() ) ); } - abstract List computeNextTrees(); + /** + * Returns a list of candidate submissions based on the current state of the optimizer. + * The list is ordered by priority (most important submission first). + * This operation must not perform any changes to any of the fields of the abstract WorkflowOptimizer. + * There is no guarantee whether the returned submissions will actually be queued for execution. + * + * @return A list of SubmissionFactories that can be used to create actual submissions. + */ + abstract List computeNextTrees( CommonTransaction commonType ); /** @@ -92,13 +105,6 @@ boolean canPipe( UUID activityId ) { } - boolean canPipe( ExecutionEdge edge ) { - UUID source = edge.getSource(), target = edge.getTarget(); - return execDag.getOutwardEdges( source ).size() == 1 - && canPipe( source ) && canPipe( target ); - } - - boolean requestsToWrite( UUID activityId ) { if ( workflow.getActivity( activityId ).getActivity() instanceof VariableWriter writer ) { // true is more restricting for optimizer -> return true if empty Optional @@ -118,13 +124,30 @@ ActivityState getState( UUID activityId ) { } - EdgeState getEdgeState( ExecutionEdge edge ) { + AttributedDirectedGraph getCommonSubExecDag( CommonTransaction commonType ) { + Set nodes = new HashSet<>(); + for ( UUID n : execDag.vertexSet() ) { + ActivityWrapper wrapper = workflow.getActivity( n ); + if ( wrapper.getConfig().getTransactionMode() == commonType ) { + nodes.add( n ); + } + } + return GraphUtils.getInducedSubgraph( execDag, nodes ); + } + + + Edge getEdge( ExecutionEdge edge ) { for ( Edge candidate : workflow.getEdges( edge.getSource(), edge.getTarget() ) ) { if ( edge.representsEdge( candidate ) ) { - return candidate.getState(); + return candidate; } } - throw new IllegalArgumentException( "Cannot return edge state of edge that is not part of the workflow: " + edge ); + throw new IllegalArgumentException( "Cannot return Edge of ExecutionEdge that is not part of the workflow: " + edge ); + } + + + EdgeState getEdgeState( ExecutionEdge edge ) { + return getEdge( edge ).getState(); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizerImpl.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizerImpl.java index 86e774dab1..a2bc0d259a 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizerImpl.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizerImpl.java @@ -16,19 +16,30 @@ package org.polypheny.db.workflow.engine.scheduler.optimizer; +import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; import java.util.Set; import java.util.UUID; +import org.polypheny.db.util.Pair; import org.polypheny.db.util.graph.AttributedDirectedGraph; import org.polypheny.db.util.graph.TopologicalOrderIterator; import org.polypheny.db.workflow.dag.Workflow; import org.polypheny.db.workflow.dag.activities.ActivityWrapper.ActivityState; +import org.polypheny.db.workflow.dag.edges.ControlEdge; +import org.polypheny.db.workflow.dag.edges.Edge; import org.polypheny.db.workflow.dag.edges.Edge.EdgeState; +import org.polypheny.db.workflow.engine.execution.Executor.ExecutorType; import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge; import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge.ExecutionEdgeFactory; +import org.polypheny.db.workflow.engine.scheduler.GraphUtils; +import org.polypheny.db.workflow.models.ActivityConfigModel.CommonTransaction; public class WorkflowOptimizerImpl extends WorkflowOptimizer { @@ -39,53 +50,55 @@ public WorkflowOptimizerImpl( Workflow workflow, AttributedDirectedGraph computeNextTrees() { - AttributedDirectedGraph subDag = AttributedDirectedGraph.create( new ExecutionEdgeFactory() ); // ignore irrelevant nodes of execDag + public List computeNextTrees( CommonTransaction commonType ) { + AttributedDirectedGraph subDag = AttributedDirectedGraph.create( new ExecutionEdgeFactory() ); Map nodeColors = new HashMap<>(); Map edgeColors = new HashMap<>(); - initializeSubDag(subDag, nodeColors, edgeColors ); + initializeSubDag( getCommonSubExecDag( commonType ), subDag, nodeColors, edgeColors ); // ignore irrelevant nodes of execDag // order determines priority if an activity implements multiple interfaces - determineVariableWriters(subDag, nodeColors, edgeColors ); - determineFusions(subDag, nodeColors, edgeColors ); - determinePipes(subDag, nodeColors, edgeColors ); + determineVariableWriters( subDag, nodeColors, edgeColors ); + determineFusions( subDag, nodeColors, edgeColors ); + determinePipes( subDag, nodeColors, edgeColors ); - // getContractedDag(subDag, nodeColors, edgeColors) or directly if possible - // get blocks with no incoming edges + System.out.println( "\nSub-DAG: " + subDag ); + System.out.println( "Node Colors: " + nodeColors ); + System.out.println( "Edge Colors: " + edgeColors ); // TODO: ensure common transaction boundaries are respected - return null; + return createFactories( subDag, getFirstConnectedComponents( subDag, nodeColors, edgeColors ) ); } - private void initializeSubDag(AttributedDirectedGraph subDag, Map nodeColors, Map edgeColors) { - - for ( UUID n : execDag.vertexSet() ) { + private void initializeSubDag( AttributedDirectedGraph baseDag, AttributedDirectedGraph subDag, Map nodeColors, Map edgeColors ) { + for ( UUID n : baseDag.vertexSet() ) { ActivityState state = getState( n ); // ignore finished or skipped activities if ( state == ActivityState.QUEUED || state == ActivityState.EXECUTING ) { - nodeColors.put( n, state == ActivityState.EXECUTING ? NodeColor.EXECUTING : NodeColor.DEFAULT ); + nodeColors.put( n, state == ActivityState.EXECUTING ? NodeColor.EXECUTING : NodeColor.UNDEFINED ); subDag.addVertex( n ); } } for ( UUID source : subDag.vertexSet() ) { - for ( ExecutionEdge edge : execDag.getOutwardEdges( source ) ) { + for ( ExecutionEdge edge : baseDag.getOutwardEdges( source ) ) { UUID target = edge.getTarget(); if ( !subDag.vertexSet().contains( target ) ) { continue; // edge to an activity that was already aborted } - assert getEdgeState( edge ) == EdgeState.IDLE : "Encountered edge of queued or executing activity that is not idle: " + edge; - if (edge.isIgnored()) { + Edge edgeData = getEdge( edge ); + + assert edgeData.getState() == EdgeState.IDLE : "Encountered edge of queued or executing activity that is not idle: " + edge; + if ( edgeData instanceof ControlEdge control && control.isIgnored() ) { continue; // control edges that are no longer required are not added to the subDag } EdgeColor color = EdgeColor.UNDEFINED; - if (edge.isControl()) { + if ( edge.isControl() ) { color = EdgeColor.CONTROL; - } else if (requiresCheckpoint( source ) || getState( source ) == ActivityState.EXECUTING) { + } else if ( requiresCheckpoint( source ) || getState( source ) == ActivityState.EXECUTING ) { color = EdgeColor.CHECKPOINT; } edgeColors.put( edge, color ); @@ -107,22 +120,22 @@ private void determineVariableWriters( AttributedDirectedGraph subDag, Map nodeColors, Map edgeColors ) { Set fused = new HashSet<>(); - for (UUID source : TopologicalOrderIterator.of( subDag ) ) { - if (nodeColors.get( source ) != NodeColor.DEFAULT || !canFuse( source )) { + for ( UUID source : TopologicalOrderIterator.of( subDag ) ) { + if ( nodeColors.get( source ) != NodeColor.UNDEFINED || !canFuse( source ) ) { continue; // ignore already colored activities } List edges = subDag.getOutwardEdges( source ); - if (edges.size() != 1 ) { + if ( edges.size() != 1 ) { continue; } ExecutionEdge edge = edges.get( 0 ); UUID target = edge.getTarget(); - if (nodeColors.get( target ) != NodeColor.DEFAULT || edgeColors.get( edge ) != EdgeColor.UNDEFINED) { + if ( nodeColors.get( target ) != NodeColor.UNDEFINED || edgeColors.get( edge ) != EdgeColor.UNDEFINED ) { continue; // checkpoint or control edge, writer node } - if (canFuse( target ) ) { + if ( canFuse( target ) ) { edgeColors.put( edge, EdgeColor.FUSED ); fused.add( source ); // source or target might already be in set fused.add( target ); @@ -135,22 +148,22 @@ private void determineFusions( AttributedDirectedGraph subD private void determinePipes( AttributedDirectedGraph subDag, Map nodeColors, Map edgeColors ) { Set piped = new HashSet<>(); - for (UUID source : TopologicalOrderIterator.of( subDag ) ) { - if (nodeColors.get( source ) != NodeColor.DEFAULT || !canPipe( source )) { + for ( UUID source : TopologicalOrderIterator.of( subDag ) ) { + if ( nodeColors.get( source ) != NodeColor.UNDEFINED || !canPipe( source ) ) { continue; // ignore already colored activities } List edges = subDag.getOutwardEdges( source ); - if (edges.size() != 1 ) { + if ( edges.size() != 1 ) { continue; } ExecutionEdge edge = edges.get( 0 ); UUID target = edge.getTarget(); - if (nodeColors.get( target ) != NodeColor.DEFAULT || edgeColors.get( edge ) != EdgeColor.UNDEFINED) { + if ( nodeColors.get( target ) != NodeColor.UNDEFINED || edgeColors.get( edge ) != EdgeColor.UNDEFINED ) { continue; // checkpoint or control edge, writer or fused node } - if (canPipe( target ) ) { + if ( canPipe( target ) ) { edgeColors.put( edge, EdgeColor.PIPED ); piped.add( source ); // source or target might already be in set piped.add( target ); @@ -161,15 +174,126 @@ private void determinePipes( AttributedDirectedGraph subDag } + private List, NodeColor>> getFirstConnectedComponents( AttributedDirectedGraph subDag, Map nodeColors, Map edgeColors ) { + + // TODO: replace with contracted graph to make cleaner? + List, NodeColor>> connected = new ArrayList<>(); + Set visited = new HashSet<>(); + + for ( UUID start : subDag.vertexSet().stream().filter( n -> subDag.getInwardEdges( n ).isEmpty() ).toList() ) { + if ( visited.contains( start ) ) { + continue; // already part of a (possibly ignored) component + } + visited.add( start ); + NodeColor color = nodeColors.get( start ); + if ( color.executorType == null ) { + continue; + } + Set component = new HashSet<>(); + component.add( start ); + if ( color.compatibleEdge == null ) { + connected.add( Pair.of( component, color ) ); + continue; + } + + // move through inverted tree forwards in graph up to root + List outEdges = subDag.getOutwardEdges( start ).stream().filter( e -> edgeColors.get( e ) == color.compatibleEdge ).toList(); + assert outEdges.size() <= 1 : "Found connected component which is not an inverted tree"; + if ( outEdges.isEmpty() ) { + connected.add( Pair.of( component, color ) ); + continue; + } + ExecutionEdge outEdge = outEdges.get( 0 ); + UUID target = null; + while ( outEdge != null ) { + target = outEdge.getTarget(); + assert nodeColors.get( target ) == color; + + outEdges = subDag.getOutwardEdges( start ).stream().filter( e -> edgeColors.get( e ) == color.compatibleEdge ).toList(); + assert outEdges.size() <= 1 : "Found connected component which is not an inverted tree"; + outEdge = outEdges.isEmpty() ? null : outEdges.get( 0 ); + } + assert target != null; + + // explore backward (DFS) and check for dependencies + boolean hasDependency = false; + Queue open = new LinkedList<>( List.of( target ) ); + while ( !open.isEmpty() ) { + UUID n = open.remove(); + if ( component.contains( n ) ) { + continue; + } + assert nodeColors.get( n ) == color; + component.add( n ); + visited.add( n ); + + List inEdges = subDag.getInwardEdges( n ); + for ( ExecutionEdge inEdge : inEdges ) { + if ( color.compatibleEdge == edgeColors.get( inEdge ) ) { + open.add( inEdge.getSource() ); + } else { + hasDependency = true; + } + } + } + + if ( !hasDependency ) { + connected.add( Pair.of( component, color ) ); + } + } + return connected; + } + + + private List createFactories( AttributedDirectedGraph subDag, List, NodeColor>> components ) { + PriorityQueue> queue + = new PriorityQueue<>( Comparator.comparingInt( obj -> (int) ((Pair) obj).getLeft() ).reversed() ); + + for ( Pair, NodeColor> component : components ) { + SubmissionFactory factory = new SubmissionFactory( + GraphUtils.getInducedSubgraph( subDag, component.left ), + component.left, + component.right.executorType, // TODO: use fusion executor even for single activities if possible + CommonTransaction.NONE ); + queue.add( Pair.of( factory.getActivities().size(), factory ) ); // larger trees have higher priority + } + + List result = new ArrayList<>(); + while ( !queue.isEmpty() ) { + result.add( queue.remove().right ); + } + return result; + } + + /** * We successively assign "colors" (or roles) to nodes and edges of the execDag */ private enum NodeColor { - DEFAULT, + UNDEFINED( ExecutorType.DEFAULT ), EXECUTING, - FUSED, - PIPED, - WRITER + FUSED( ExecutorType.FUSION, EdgeColor.FUSED ), + PIPED( ExecutorType.PIPE, EdgeColor.PIPED ), + WRITER( ExecutorType.VARIABLE_WRITER ); + + public final ExecutorType executorType; + public final EdgeColor compatibleEdge; + + + NodeColor() { + this( null, null ); + } + + + NodeColor( ExecutorType executorType ) { + this( executorType, null ); + } + + + NodeColor( ExecutorType executorType, EdgeColor compatibleEdge ) { + this.executorType = executorType; + this.compatibleEdge = compatibleEdge; + } } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManager.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManager.java index 2e394f0dff..2ce575b183 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManager.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManager.java @@ -16,6 +16,7 @@ package org.polypheny.db.workflow.engine.storage; +import java.util.List; import java.util.UUID; import javax.annotation.Nullable; import lombok.NonNull; @@ -48,6 +49,10 @@ public interface StorageManager extends AutoCloseable { // TODO: remove AutoClos DataModel getDataModel( UUID activityId, int outputIdx ); + AlgDataType getTupleType( UUID activityId, int outputIdx ); + + List getTupleTypes( UUID activityId ); + /** * Creates a relational checkpoint for an activity output and returns a RelWriter for that checkpoint. * diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManagerImpl.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManagerImpl.java index ba748b0be7..7b8358ab95 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManagerImpl.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManagerImpl.java @@ -138,6 +138,24 @@ public DataModel getDataModel( UUID activityId, int outputIdx ) { } + @Override + public AlgDataType getTupleType( UUID activityId, int outputIdx ) { + return getCheckpoint( activityId, outputIdx ).getTupleType(); + } + + + @Override + public List getTupleTypes( UUID activityId ) { + List types = new ArrayList<>(); + Map outputs = checkpoints.get( activityId ); + for ( int i = 0; i < outputs.size(); i++ ) { + LogicalEntity output = outputs.get( i ); + types.add( output == null ? null : output.getTupleType() ); + } + return types; + } + + @Override public RelWriter createRelCheckpoint( UUID activityId, int outputIdx, AlgDataType type, boolean resetPk, @Nullable String storeName ) { if ( storeName == null || storeName.isEmpty() ) {