Skip to content

Commit

Permalink
Register http endpoints for workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Dec 20, 2024
1 parent 33151eb commit d164f75
Show file tree
Hide file tree
Showing 14 changed files with 237 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.NotImplementedException;
import org.polypheny.db.PolyphenyDb;
import org.polypheny.db.adapter.java.AdapterTemplate;
import org.polypheny.db.catalog.Catalog;
Expand All @@ -36,7 +37,6 @@
import org.polypheny.db.webui.ConfigService.HandlerType;
import org.polypheny.db.webui.HttpServer;
import org.polypheny.db.workflow.dag.activities.ActivityWrapper;
import org.polypheny.db.workflow.dag.activities.ActivityWrapper.ActivityState;
import org.polypheny.db.workflow.engine.execution.context.ExecutionContextImpl;
import org.polypheny.db.workflow.engine.storage.StorageManager;
import org.polypheny.db.workflow.engine.storage.StorageManagerImpl;
Expand Down Expand Up @@ -110,8 +110,7 @@ private void createExecuteDummyWorkflowTest() {
activityId,
Map.of( "table", setting ),
ActivityConfigModel.of(),
RenderModel.of(),
ActivityState.IDLE
RenderModel.of()
) );

long start = System.currentTimeMillis();
Expand All @@ -127,8 +126,7 @@ private void createExecuteDummyWorkflowTest() {
activityId2,
Map.of( "table", setting ),
ActivityConfigModel.of(),
RenderModel.of(),
ActivityState.IDLE
RenderModel.of()
) );

start = System.currentTimeMillis();
Expand Down Expand Up @@ -171,7 +169,10 @@ private void registerEndpoints() {
server.addSerializedRoute( PATH + "/sessions", this::getSessions, HandlerType.GET );
server.addSerializedRoute( PATH + "/sessions/{sessionId}", this::getSession, HandlerType.GET );
server.addSerializedRoute( PATH + "/sessions/{sessionId}/workflow", this::getActiveWorkflow, HandlerType.GET );
server.addSerializedRoute( PATH + "/workflows", this::getWorkflows, HandlerType.GET );
server.addSerializedRoute( PATH + "/sessions/{sessionId}/workflow/config", this::getWorkflowConfig, HandlerType.GET );
server.addSerializedRoute( PATH + "/sessions/{sessionId}/workflow/{activityId}", this::getActivity, HandlerType.GET );
server.addSerializedRoute( PATH + "/sessions/{sessionId}/workflow/{activityId}/{outIndex}", this::getIntermediaryResult, HandlerType.GET );
server.addSerializedRoute( PATH + "/workflows", this::getWorkflowDefs, HandlerType.GET );

server.addSerializedRoute( PATH + "/sessions", this::createSession, HandlerType.POST );
server.addSerializedRoute( PATH + "/sessions/{sessionId}/save", this::saveSession, HandlerType.POST );
Expand All @@ -193,11 +194,33 @@ private void getSession( final Context ctx ) {

private void getActiveWorkflow( final Context ctx ) {
UUID sessionId = UUID.fromString( ctx.pathParam( "sessionId" ) );
process( ctx, () -> sessionManager.getActiveWorkflowModel( sessionId ) );
process( ctx, () -> sessionManager.getSessionOrThrow( sessionId ).getWorkflowModel( true ) );
}


private void getWorkflows( final Context ctx ) {
private void getWorkflowConfig( final Context ctx ) {
UUID sessionId = UUID.fromString( ctx.pathParam( "sessionId" ) );
process( ctx, () -> sessionManager.getSessionOrThrow( sessionId ).getWorkflowConfig() );
}


private void getActivity( final Context ctx ) {
UUID sessionId = UUID.fromString( ctx.pathParam( "sessionId" ) );
UUID activityId = UUID.fromString( ctx.pathParam( "activityId" ) );
process( ctx, () -> sessionManager.getSessionOrThrow( sessionId ).getActivityModel( activityId ) );
}


private void getIntermediaryResult( final Context ctx ) {
UUID sessionId = UUID.fromString( ctx.pathParam( "sessionId" ) );
UUID activityId = UUID.fromString( ctx.pathParam( "activityId" ) );
int outIndex = Integer.parseInt( ctx.pathParam( "outIndex" ) );
throw new NotImplementedException();
//process( ctx, () -> sessionManager.getSessionOrThrow( sessionId ) );
}


private void getWorkflowDefs( final Context ctx ) {
process( ctx, repo::getWorkflowDefs );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,19 @@ public DataModel getDataModel() {
}


public DataModel getDataModel( AlgDataType type ) {
if ( this != ANY || type == null ) {
return getDataModel();
}

return switch ( type.getPolyType() ) {
case DOCUMENT -> DataModel.DOCUMENT;
case GRAPH -> DataModel.GRAPH;
default -> DataModel.RELATIONAL;
};
}


public static PortType fromDataModel( DataModel model ) {
return switch ( model ) {
case RELATIONAL -> REL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
public class ActivityException extends Exception {

@Setter
private ActivityWrapper activity = null; // should be set by the corresponding executor
private ActivityWrapper activity = null;


public ActivityException( String message ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.polypheny.db.workflow.models.ActivityConfigModel;
import org.polypheny.db.workflow.models.ActivityModel;
import org.polypheny.db.workflow.models.RenderModel;
import org.polypheny.db.workflow.models.TypePreviewModel;

@Getter
public class ActivityWrapper {
Expand All @@ -62,6 +63,7 @@ public class ActivityWrapper {
private List<Optional<AlgDataType>> inTypePreview; // contains the (possibly not yet known) input type
@Setter
private SettingsPreview settingsPreview; // contains the (possibly not yet known) settings
private ActivityException invalidStateReason; // null if the state of this activity is not invalid


protected ActivityWrapper( UUID id, Activity activity, String type, Map<String, JsonNode> settings, ActivityConfigModel config, RenderModel rendering ) {
Expand Down Expand Up @@ -107,9 +109,16 @@ public SettingsPreview resolveAvailableSettings( boolean hasStableVariables ) th
* @throws ActivityException if an error occurs during the preview resolution.
*/
public SettingsPreview updateOutTypePreview( List<Optional<AlgDataType>> inTypePreviews, boolean hasStableVariables ) throws ActivityException {
SettingsPreview settings = resolveAvailableSettings( hasStableVariables );
outTypePreview = activity.previewOutTypes( inTypePreviews, settings );
return settings;
try {
SettingsPreview settings = resolveAvailableSettings( hasStableVariables );
outTypePreview = activity.previewOutTypes( inTypePreviews, settings );
invalidStateReason = null;
return settings;
} catch ( ActivityException e ) {
e.setActivity( this );
invalidStateReason = e;
throw e;
}
}


Expand All @@ -119,8 +128,15 @@ public ActivityDef getDef() {


public ActivityModel toModel( boolean includeState ) {
ActivityState state = includeState ? this.state : null;
return new ActivityModel( type, id, serializableSettings, config, rendering, state );
if ( includeState ) {
List<TypePreviewModel> inTypeModels = inTypePreview.stream().map(
inType -> inType.map( TypePreviewModel::of ).orElse( null ) ).toList();
return new ActivityModel( type, id, serializableSettings, config, rendering, this.state, inTypeModels, invalidStateReason.toString() );

} else {
return new ActivityModel( type, id, serializableSettings, config, rendering );

}
}


Expand Down Expand Up @@ -160,6 +176,19 @@ public void resetExecution() {
}


/**
* Every time the outTypePreview is updated, the activity determines if the inTypePreviews
* and the SettingsPreview would result in an invalid state.
* In this case, this method returns false and the reason can be retrieved with {@code getInvalidStateReason()}.
* If it returns true, this only means that during the last update, no contradictory state was observed.
*
* @return true if the last {@code updateOutTypePreview()} succeeded.
*/
public boolean isValid() {
return invalidStateReason == null;
}


public static ActivityWrapper fromModel( ActivityModel model ) {
String type = model.getType();
// ensuring the default value is used for missing settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private OutputPipe getCheckpointWriterPipe( UUID rootId, AlgDataType rootType )
return null;
}
ActivityWrapper wrapper = workflow.getActivity( rootId );
DataModel model = wrapper.getDef().getOutPortTypes()[0].getDataModel(); // TODO: handle ANY DataModel
DataModel model = wrapper.getDef().getOutPortTypes()[0].getDataModel( rootType );
String store = wrapper.getConfig().getPreferredStore( 0 );

System.out.println( "creating CheckpointWriterPipe for model " + model );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public LpgWriter createLpgWriter( int idx ) {
public CheckpointWriter createWriter( int idx, AlgDataType tupleType, boolean resetPk ) {
PortType type = Objects.requireNonNull( remainingOutPorts[idx] );
remainingOutPorts[idx] = null;
CheckpointWriter writer = sm.createCheckpoint( activityWrapper.getId(), idx, tupleType, resetPk, getStore( idx ), type.getDataModel() );
CheckpointWriter writer = sm.createCheckpoint( activityWrapper.getId(), idx, tupleType, resetPk, getStore( idx ), type.getDataModel( tupleType ) );
writers.add( writer );
return writer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ExecutionInfo {
private final Map<ExecutionState, StopWatch> durations = new HashMap<>();
@Getter
private final Set<UUID> activities;
@Getter
private final ExecutorType executorType;

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import lombok.Getter;
import org.apache.commons.lang3.time.StopWatch;
import org.polypheny.db.workflow.dag.Workflow;
import org.polypheny.db.workflow.engine.execution.Executor.ExecutorType;
import org.polypheny.db.workflow.models.responses.WsResponse;
import org.polypheny.db.workflow.models.responses.WsResponse.ProgressUpdateResponse;
import org.polypheny.db.workflow.models.responses.WsResponse.StateUpdateResponse;
Expand Down Expand Up @@ -103,6 +104,16 @@ public Map<UUID, Double> getAllProgress() {
}


public Map<ExecutorType, Integer> getActivityCounts() {
Map<ExecutorType, Integer> activityCounts = new HashMap<>();
for ( ExecutionInfo info : infos ) {
ExecutorType type = info.getExecutorType();
activityCounts.put( type, activityCounts.getOrDefault( type, 0 ) + info.getActivities().size() );
}
return Collections.unmodifiableMap( activityCounts );
}


public void stop() {
workflowDuration.stop();
forwardStates();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.AllArgsConstructor;
Expand All @@ -35,8 +36,13 @@ public class ActivityModel {
ActivityConfigModel config;
RenderModel rendering;

@JsonInclude(JsonInclude.Include.NON_NULL) // do not serialize in static version
// the following fields are not serialized in the static version -> not used for deserializing an ActivityWrapper
@JsonInclude(JsonInclude.Include.NON_NULL)
ActivityState state;
@JsonInclude(JsonInclude.Include.NON_NULL)
List<TypePreviewModel> inTypePreview;
@JsonInclude(JsonInclude.Include.NON_NULL)
String invalidReason; // null if not invalid


public ActivityModel( String type ) {
Expand All @@ -45,12 +51,12 @@ public ActivityModel( String type ) {


public ActivityModel( String type, RenderModel renderModel ) {
this( type, UUID.randomUUID(), ActivityRegistry.getSerializableDefaultSettings( type ), ActivityConfigModel.of(), renderModel, null );
this( type, UUID.randomUUID(), ActivityRegistry.getSerializableDefaultSettings( type ), ActivityConfigModel.of(), renderModel );
}


public ActivityModel( String type, Map<String, JsonNode> settings ) {
this( type, UUID.randomUUID(), settings, ActivityConfigModel.of(), RenderModel.of(), null );
this( type, UUID.randomUUID(), settings, ActivityConfigModel.of(), RenderModel.of() );
}


Expand All @@ -61,6 +67,8 @@ public ActivityModel( String type, UUID id, Map<String, JsonNode> settings, Acti
this.config = config;
this.rendering = rendering;
this.state = null;
this.inTypePreview = null;
this.invalidReason = null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2019-2024 The Polypheny Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.polypheny.db.workflow.models;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import lombok.Value;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.algebra.type.AlgDataTypeField;
import org.polypheny.db.algebra.type.DocumentType;
import org.polypheny.db.algebra.type.GraphType;
import org.polypheny.db.catalog.logistic.DataModel;
import org.polypheny.db.webui.models.catalog.FieldDefinition;

@Value
public class TypePreviewModel {

DataModel dataModel;
List<FieldDefinition> fields;


public TypePreviewModel( DataModel dataModel, AlgDataType type ) {
this.dataModel = dataModel;
List<FieldDefinition> fields = new ArrayList<>();
switch ( dataModel ) {
// TODO: better previews for graph and document?
case RELATIONAL -> {
for ( AlgDataTypeField field : type.getFields() ) {
fields.add( FieldDefinition.of( field ) );
}
}
case DOCUMENT -> {
fields.add( FieldDefinition.builder()
.name( "Document" )
.dataType( DocumentType.ofId().getFullTypeString() )
.build() );
}
case GRAPH -> {
fields.add( FieldDefinition.builder()
.name( "Graph" )
.dataType( GraphType.of().getFullTypeString() )
.build() );
}
}
this.fields = Collections.unmodifiableList( fields );
}


public static TypePreviewModel of( AlgDataType type ) {
DataModel model = switch ( type.getPolyType() ) {
case DOCUMENT -> DataModel.DOCUMENT;
case GRAPH -> DataModel.GRAPH;
default -> DataModel.RELATIONAL;
};
return new TypePreviewModel( model, type );
}

}
Loading

0 comments on commit d164f75

Please sign in to comment.