diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/IdentityActivity.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/IdentityActivity.java index cc400545e4..41787bb316 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/IdentityActivity.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/IdentityActivity.java @@ -75,7 +75,7 @@ def my_function(): @IntSetting(key = "I1", displayName = "FIRST", defaultValue = 2, shortDescription = "This setting doesn't do anything.") @StringSetting(key = "S1", displayName = "SECOND", shortDescription = "This setting doesn't do anything.") @IntSetting(key = "X1", displayName = "X1", shortDescription = "Depends on I1 being 42 or 420", subPointer = "I1", subValues = { "42", "420" }) -@StringSetting(key = "X2", displayName = "X2", shortDescription = "Depends on X1 being 3", subPointer = "I1", subValues = { "3" }) +@StringSetting(key = "X2", displayName = "X2", shortDescription = "Depends on X1 being 3", subPointer = "X1", subValues = { "3" }) @StringSetting(key = "X3", displayName = "X3", shortDescription = "Depends on I1/doesNotExist being 7", subPointer = "I1/doesNotExist", subValues = { "7" }) @Group(key = "groupA", displayName = "Group A", diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/VariableToRowActivity.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/VariableToRowActivity.java index 1730948b0d..dc49c12c99 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/VariableToRowActivity.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/VariableToRowActivity.java @@ -105,6 +105,7 @@ public void execute( List inputs, Settings settings, Execution row.add( insertAsString ? PolyString.of( node.toString() ) : PolyValue.fromJson( node.toString() ) ); } + builder.uniquify(); // make sure no conflicts with pk column name RelWriter writer = ctx.createRelWriter( 0, builder.build(), true ); if ( !variables.isEmpty() ) { diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/VariableStore.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/VariableStore.java index 5952f656ac..e5a1cd0667 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/VariableStore.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/VariableStore.java @@ -158,7 +158,7 @@ public JsonNode resolveVariables( JsonNode node, boolean useDefaultIfMissing ) { String[] refSplit = refString.split( "/", 2 ); String variableRef = refSplit[0].replace( JsonPointer.ESC_SLASH, "/" ).replace( JsonPointer.ESC_TILDE, "~" ); JsonNode replacement = variables.get( variableRef ); - if ( refSplit.length == 2 && !refSplit[1].isEmpty() ) { + if ( replacement != null && refSplit.length == 2 && !refSplit[1].isEmpty() ) { replacement = replacement.at( "/" + refSplit[1] ); // resolve JsonPointer } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/context/ExecutionContextImpl.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/context/ExecutionContextImpl.java index 208d76882c..0876148c73 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/context/ExecutionContextImpl.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/context/ExecutionContextImpl.java @@ -174,8 +174,16 @@ private String getStore( int idx ) { @Override public void close() throws Exception { + List exceptions = new ArrayList<>(); for ( CheckpointWriter writer : writers ) { - writer.close(); + try { + writer.close(); + } catch ( Exception e ) { + exceptions.add( e ); + } + } + if (!exceptions.isEmpty()) { + throw exceptions.get( 0 ); // we only throw the first exception } } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/BatchWriter.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/BatchWriter.java index 9d80877d14..86b8554b7d 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/BatchWriter.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/BatchWriter.java @@ -73,7 +73,8 @@ private void executeBatch() { ExecutedContext executedContext = QueryUtils.executeQuery( parsed, writeStatement ); if ( executedContext.getException().isPresent() ) { - throw new GenericRuntimeException( "An error occurred while writing a batch: ", executedContext.getException().get() ); + Throwable e = executedContext.getException().get(); + throw new GenericRuntimeException( "An error occurred while writing a batch: " + e.getMessage(), e ); } List> results = executedContext.getIterator().getAllRowsAndClose(); long changedCount = results.size() == 1 ? results.get( 0 ).get( 0 ).asLong().longValue() : 0; diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManagerImpl.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManagerImpl.java index 90e4b4a3a1..0972ea97de 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManagerImpl.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManagerImpl.java @@ -189,20 +189,26 @@ public synchronized RelWriter createRelCheckpoint( UUID activityId, int outputId } } - Transaction transaction = QueryUtils.startTransaction( relNamespace, "RelCreate" ); String tableName = getTableName( activityId, outputIdx ); + Transaction transaction = QueryUtils.startTransaction( relNamespace, "RelCreate" ); - acquireSchemaLock( transaction, relNamespace ); - ddlManager.createTable( - relNamespace, - tableName, - getFieldInfo( type ), - getPkConstraint( pkField.getName() ), - false, - List.of( getStore( storeName ) ), - PlacementType.AUTOMATIC, - transaction.createStatement() ); - transaction.commit(); + try{ + acquireSchemaLock( transaction, relNamespace ); + ddlManager.createTable( + relNamespace, + tableName, + getFieldInfo( type ), + getPkConstraint( pkField.getName() ), + false, + List.of( getStore( storeName ) ), + PlacementType.AUTOMATIC, + transaction.createStatement() ); + transaction.commit(); + } finally { + if (transaction.isActive()) { + transaction.rollback( null ); + } + } LogicalTable table = Catalog.snapshot().rel().getTable( relNamespace, tableName ).orElseThrow(); @@ -220,16 +226,22 @@ public synchronized DocWriter createDocCheckpoint( UUID activityId, int outputId String collectionName = getCollectionName( activityId, outputIdx ); Transaction transaction = QueryUtils.startTransaction( docNamespace, "DocCreate" ); - acquireSchemaLock( transaction, docNamespace ); - ddlManager.createCollection( - docNamespace, - collectionName, - false, - List.of( getStore( storeName ) ), - PlacementType.AUTOMATIC, - transaction.createStatement() - ); - transaction.commit(); + try { + acquireSchemaLock( transaction, docNamespace ); + ddlManager.createCollection( + docNamespace, + collectionName, + false, + List.of( getStore( storeName ) ), + PlacementType.AUTOMATIC, + transaction.createStatement() + ); + transaction.commit(); + } finally { + if (transaction.isActive()) { + transaction.rollback( null ); + } + } LogicalCollection collection = Catalog.snapshot().doc().getCollection( docNamespace, collectionName ).orElseThrow(); register( activityId, outputIdx, collection ); @@ -245,16 +257,23 @@ public synchronized LpgWriter createLpgCheckpoint( UUID activityId, int outputId String graphName = getGraphName( activityId, outputIdx ); Transaction transaction = QueryUtils.startTransaction( Catalog.defaultNamespaceId, "LpgCreate" ); //acquireSchemaLock( transaction, Catalog.defaultNamespaceId ); // TODO: no lock required since we create a new namespace? - long graphId = ddlManager.createGraph( - graphName, - true, - List.of( getStore( storeName ) ), - false, - false, - RuntimeConfig.GRAPH_NAMESPACE_DEFAULT_CASE_SENSITIVE.getBoolean(), - transaction.createStatement() - ); - transaction.commit(); + long graphId; + try { + graphId = ddlManager.createGraph( + graphName, + true, + List.of( getStore( storeName ) ), + false, + false, + RuntimeConfig.GRAPH_NAMESPACE_DEFAULT_CASE_SENSITIVE.getBoolean(), + transaction.createStatement() + ); + transaction.commit(); + } finally { + if (transaction.isActive()) { + transaction.rollback( null ); + } + } LogicalGraph graph = Catalog.snapshot().graph().getGraph( graphId ).orElseThrow(); register( activityId, outputIdx, graph ); @@ -392,7 +411,7 @@ private DataStore getStore( String storeName ) { private void dropEntity( LogicalEntity entity ) { - Transaction transaction = startTransaction( entity ); + Transaction transaction = QueryUtils.startTransaction( entity.getNamespaceId(), "DropCheckpoint" ); Statement statement = transaction.createStatement(); acquireSchemaLock( transaction, entity.getNamespaceId() ); switch ( entity.dataModel ) { @@ -417,15 +436,6 @@ private void dropNamespaces() { // Utils: - private Transaction startTransaction( LogicalEntity targetEntity ) { - return switch ( targetEntity.dataModel ) { - case RELATIONAL -> QueryUtils.startTransaction( relNamespace ); - case DOCUMENT -> QueryUtils.startTransaction( docNamespace ); - case GRAPH -> QueryUtils.startTransaction( targetEntity.getNamespaceId() ); - }; - } - - private List getPkConstraint( String pkCol ) { return List.of( new ConstraintInformation( "PRIMARY KEY", ConstraintType.PRIMARY, List.of( pkCol ) ) ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/writer/DocWriter.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/writer/DocWriter.java index 3f502d08e9..d526e886b1 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/writer/DocWriter.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/writer/DocWriter.java @@ -56,8 +56,11 @@ public void write( List tuple ) { @Override public void close() throws Exception { if ( transaction.isActive() ) { // ensure writer is only closed once - writer.close(); - super.close(); + try { + writer.close(); + } finally { + super.close(); + } } } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/writer/LpgWriter.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/writer/LpgWriter.java index 55ff409ad2..71d8f753de 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/writer/LpgWriter.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/writer/LpgWriter.java @@ -99,8 +99,11 @@ public void write( List tuple ) { @Override public void close() throws Exception { if ( transaction.isActive() ) { // ensure writer is only closed once - writer.close(); - super.close(); + try { + writer.close(); + } finally { + super.close(); + } } } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/writer/RelWriter.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/writer/RelWriter.java index 52e6f70bf6..f8e4332473 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/writer/RelWriter.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/writer/RelWriter.java @@ -121,8 +121,11 @@ public void write( List row ) { @Override public void close() throws Exception { if ( transaction.isActive() ) { // ensure writer is only closed once - writer.close(); - super.close(); + try { + writer.close(); + } finally { + super.close(); + } } } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/SessionManager.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/SessionManager.java index ed0a215e56..a23740009c 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/SessionManager.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/SessionManager.java @@ -63,7 +63,7 @@ public UUID createUserSession( UUID wId, int version ) throws WorkflowRepoExcept try { return registerUserSession( WorkflowImpl.fromModel( model ), wId, version ); } catch ( Exception e ) { - throw new WorkflowRepoException( "Unable to instantiate workflow", e ); + throw new WorkflowRepoException( "Unable to instantiate workflow: " + e.getMessage(), e ); } }