Skip to content

Commit

Permalink
Get activity pipelining working
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Dec 10, 2024
1 parent 0d1ff95 commit d8a0ee5
Show file tree
Hide file tree
Showing 27 changed files with 523 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ enum ActivityCategory {
LOAD,
RELATIONAL,
DOCUMENT,
GRAPH
GRAPH,
VARIABLES
// more granular categories are also thinkable
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public static SettingsPreview buildAvailableSettingValues( String activityType,
* @param activityType the identifier for the activity type.
* @return an unmodifiable map of setting keys to default {@link SettingValue}s for that key.
*/
public static Settings getSettingValues( String activityType ) {
public static Settings getDefaultSettings( String activityType ) {
Map<String, SettingDef> settingDefs = get( activityType ).getSettings();
Map<String, SettingValue> settingValues = new HashMap<>();

Expand All @@ -163,8 +163,8 @@ public static Settings getSettingValues( String activityType ) {
* @param activityType the identifier for the activity type.
* @return an unmodifiable map of setting keys to their default values.
*/
public static Map<String, JsonNode> getSerializableSettingValues( String activityType ) {
return getSettingValues( activityType ).getSerializableSettings();
public static Map<String, JsonNode> getSerializableDefaultSettings( String activityType ) {
return getDefaultSettings( activityType ).getSerializableSettings();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -151,7 +152,11 @@ public void resetExecution() {


public static ActivityWrapper fromModel( ActivityModel model ) {
return new ActivityWrapper( model.getId(), ActivityRegistry.activityFromType( model.getType() ), model.getType(), model.getSettings(), model.getConfig(), model.getRendering() );
String type = model.getType();
// ensure the default value is used for missing settings
Map<String, JsonNode> settings = new HashMap<>( ActivityRegistry.getSerializableDefaultSettings( type ) );
settings.putAll( model.getSettings() );
return new ActivityWrapper( model.getId(), ActivityRegistry.activityFromType( type ), type, settings, model.getConfig(), model.getRendering() );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,26 @@

package org.polypheny.db.workflow.dag.activities;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.NotImplementedException;
import org.polypheny.db.algebra.AlgNode;
import org.polypheny.db.algebra.AlgRoot;
import org.polypheny.db.algebra.constant.Kind;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.plan.AlgCluster;
import org.polypheny.db.processing.ImplementationContext.ExecutedContext;
import org.polypheny.db.rex.RexBuilder;
import org.polypheny.db.transaction.Statement;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.workflow.dag.settings.SettingDef.Settings;
import org.polypheny.db.workflow.dag.settings.SettingDef.SettingsPreview;
import org.polypheny.db.workflow.engine.execution.Executor.ExecutorException;
import org.polypheny.db.workflow.engine.execution.context.ExecutionContext;
import org.polypheny.db.workflow.engine.storage.QueryUtils;
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;
import org.polypheny.db.workflow.engine.storage.writer.CheckpointWriter;

// TODO: write test to ensure at most 1 output was specified
public interface Fusable extends Activity {
Expand All @@ -47,8 +57,41 @@ default Optional<Boolean> canFuse( List<Optional<AlgDataType>> inTypes, Settings

@Override
default void execute( List<CheckpointReader> inputs, Settings settings, ExecutionContext ctx ) throws Exception {
// TODO: add default implementation that calls fuse().
throw new NotImplementedException();
assert canFuse(
inputs.stream().map( r -> Optional.of( r.getTupleType() ) ).toList(),
SettingsPreview.of( settings )
).orElseThrow() : "Cannot use the default execute implementation of Fusable if canFuse returns false.";

// Imitates the fusion executor
Statement statement = ctx.getTransaction().createStatement();
AlgCluster cluster = AlgCluster.create(
statement.getQueryProcessor().getPlanner(),
new RexBuilder( statement.getTransaction().getTypeFactory() ),
null,
statement.getDataContext().getSnapshot() );

List<AlgNode> inNodes = inputs.stream().map( reader -> reader.getAlgNode( cluster ) ).toList();
AlgRoot root = AlgRoot.of( fuse( inNodes, settings, cluster ), Kind.SELECT );

if ( !QueryUtils.validateAlg( root, false, null ) ) {
throw new ExecutorException( "The fused AlgNode tree may not perform data manipulation" );
}

ExecutedContext executedContext = QueryUtils.executeAlgRoot( root, statement );
if ( executedContext.getException().isPresent() ) {
throw new ExecutorException( "An error occurred while executing the fused activities." );
}

Iterator<PolyValue[]> iterator = executedContext.getIterator().getIterator();
try ( CheckpointWriter writer = ctx.createWriter( 0, root.validatedRowType, true ) ) {
while ( iterator.hasNext() ) {
writer.write( Arrays.asList( iterator.next() ) );
}
} catch ( Exception e ) {
throw new ExecutorException( e );
} finally {
executedContext.getIterator().close();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.workflow.dag.settings.SettingDef.Settings;
import org.polypheny.db.workflow.dag.settings.SettingDef.SettingsPreview;
import org.polypheny.db.workflow.engine.execution.Executor.ExecutorException;
import org.polypheny.db.workflow.engine.execution.context.ExecutionContext;
import org.polypheny.db.workflow.engine.execution.context.ExecutionContextImpl;
import org.polypheny.db.workflow.engine.execution.context.PipeExecutionContext;
Expand Down Expand Up @@ -72,6 +73,8 @@ assert canPipe(
try ( CheckpointWriter writer = ctx.createWriter( 0, type, true ) ) {
OutputPipe outPipe = new CheckpointOutputPipe( type, writer );
pipe( inPipes, outPipe, settings, pipeCtx );
} catch ( PipeInterruptedException e ) {
throw new ExecutorException( "Activity execution was interrupted" );
}
}

Expand Down Expand Up @@ -103,6 +106,11 @@ assert canPipe(

class PipeInterruptedException extends RuntimeException {

public PipeInterruptedException() {
this( null );
}


// Constructor that accepts a cause
public PipeInterruptedException( Throwable cause ) {
super( "The pipe operation was interrupted.", cause );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@

import java.util.List;
import java.util.Optional;
import org.polypheny.db.algebra.AlgNode;
import org.polypheny.db.algebra.logical.relational.LogicalRelProject;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.catalog.exceptions.GenericRuntimeException;
import org.polypheny.db.plan.AlgCluster;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.workflow.dag.activities.Activity;
import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory;
import org.polypheny.db.workflow.dag.activities.Activity.PortType;
import org.polypheny.db.workflow.dag.activities.ActivityException;
import org.polypheny.db.workflow.dag.activities.Fusable;
import org.polypheny.db.workflow.dag.activities.Pipeable;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.InPort;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.OutPort;
Expand All @@ -34,23 +40,25 @@
import org.polypheny.db.workflow.dag.settings.SettingDef.Settings;
import org.polypheny.db.workflow.dag.settings.SettingDef.SettingsPreview;
import org.polypheny.db.workflow.engine.execution.context.ExecutionContext;
import org.polypheny.db.workflow.engine.execution.context.PipeExecutionContext;
import org.polypheny.db.workflow.engine.execution.pipe.InputPipe;
import org.polypheny.db.workflow.engine.execution.pipe.OutputPipe;
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;
import org.polypheny.db.workflow.engine.storage.reader.RelReader;
import org.polypheny.db.workflow.engine.storage.writer.RelWriter;

@ActivityDefinition(type = "debug", displayName = "Debugging", categories = { ActivityCategory.TRANSFORM, ActivityCategory.RELATIONAL },
@ActivityDefinition(type = "debug", displayName = "Relational Identity Activity with Debugging",
categories = { ActivityCategory.TRANSFORM, ActivityCategory.RELATIONAL },
inPorts = { @InPort(type = PortType.REL) },
outPorts = { @OutPort(type = PortType.REL) }
)

@IntSetting(key = "delay", displayName = "Delay (ms)", defaultValue = 500)
@IntSetting(key = "delay", displayName = "Delay (ms)", defaultValue = 500, min = 0)
@IntSetting(key = "pipeDelay", displayName = "Tuple-wise Delay for Pipelining (ms)", defaultValue = 10, min = 0)
@BoolSetting(key = "canPipe", displayName = "Enable Pipelining", defaultValue = false)
@BoolSetting(key = "canFuse", displayName = "Enable Fusion", defaultValue = false)
@BoolSetting(key = "isSuccessful", displayName = "Successful Execution", defaultValue = true)
public class DebugActivity implements Activity {


public DebugActivity() {
}

public class DebugActivity implements Activity, Pipeable, Fusable {

@Override
public List<Optional<AlgDataType>> previewOutTypes( List<Optional<AlgDataType>> inTypes, SettingsPreview settings ) throws ActivityException {
Expand All @@ -71,6 +79,44 @@ public void execute( List<CheckpointReader> inputs, Settings settings, Execution
}


@Override
public Optional<Boolean> canFuse( List<Optional<AlgDataType>> inTypes, SettingsPreview settings ) {
return settings.get( "canFuse", BoolValue.class ).map( BoolValue::getValue );
}


@Override
public AlgNode fuse( List<AlgNode> inputs, Settings settings, AlgCluster cluster ) throws Exception {
return LogicalRelProject.identity( inputs.get( 0 ) );
}


@Override
public Optional<Boolean> canPipe( List<Optional<AlgDataType>> inTypes, SettingsPreview settings ) {
return settings.get( "canPipe", BoolValue.class ).map( BoolValue::getValue );
}


@Override
public AlgDataType lockOutputType( List<AlgDataType> inTypes, Settings settings ) throws Exception {
return inTypes.get( 0 );
}


@Override
public void pipe( List<InputPipe> inputs, OutputPipe output, Settings settings, PipeExecutionContext ctx ) throws Exception {
int millis = settings.get( "pipeDelay", IntValue.class ).getValue();
for ( List<PolyValue> value : inputs.get( 0 ) ) {
try {
Thread.sleep( millis );
} catch ( InterruptedException e ) {
throw new PipeInterruptedException( e );
}
output.put( value );
}
}


@Override
public void reset() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.polypheny.db.algebra.logical.relational.LogicalRelProject;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.plan.AlgCluster;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.workflow.dag.activities.Activity;
import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory;
import org.polypheny.db.workflow.dag.activities.Activity.PortType;
import org.polypheny.db.workflow.dag.activities.ActivityException;
import org.polypheny.db.workflow.dag.activities.Fusable;
import org.polypheny.db.workflow.dag.activities.Pipeable;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.InPort;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.OutPort;
Expand All @@ -37,6 +39,9 @@
import org.polypheny.db.workflow.dag.settings.SettingDef.Settings;
import org.polypheny.db.workflow.dag.settings.SettingDef.SettingsPreview;
import org.polypheny.db.workflow.engine.execution.context.ExecutionContext;
import org.polypheny.db.workflow.engine.execution.context.PipeExecutionContext;
import org.polypheny.db.workflow.engine.execution.pipe.InputPipe;
import org.polypheny.db.workflow.engine.execution.pipe.OutputPipe;
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;
import org.polypheny.db.workflow.engine.storage.reader.RelReader;
import org.polypheny.db.workflow.engine.storage.writer.RelWriter;
Expand All @@ -54,7 +59,7 @@
)
@IntSetting(key = "I2", displayName = "THIRD", defaultValue = 0, isList = true, group = "groupA")
@StringSetting(key = "S2", displayName = "FOURTH", defaultValue = "test", isList = true, group = "groupA", subGroup = "a")
public class IdentityActivity implements Activity, Fusable {
public class IdentityActivity implements Activity, Fusable, Pipeable {


public IdentityActivity() {
Expand All @@ -76,6 +81,20 @@ public void execute( List<CheckpointReader> inputs, Settings settings, Execution
}


@Override
public AlgDataType lockOutputType( List<AlgDataType> inTypes, Settings settings ) throws Exception {
return inTypes.get( 0 );
}


@Override
public void pipe( List<InputPipe> inputs, OutputPipe output, Settings settings, PipeExecutionContext ctx ) throws Exception {
for ( List<PolyValue> value : inputs.get( 0 ) ) {
output.put( value );
}
}


@Override
public AlgNode fuse( List<AlgNode> inputs, Settings settings, AlgCluster cluster ) throws Exception {
// to make it more interesting, we add a project activity that doesn't change the tupleType
Expand Down
Loading

0 comments on commit d8a0ee5

Please sign in to comment.