Skip to content

Commit

Permalink
Bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Jan 16, 2025
1 parent a2ac7e9 commit 3905367
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def my_function():
@IntSetting(key = "I1", displayName = "FIRST", defaultValue = 2, shortDescription = "This setting doesn't do anything.")
@StringSetting(key = "S1", displayName = "SECOND", shortDescription = "This setting doesn't do anything.")
@IntSetting(key = "X1", displayName = "X1", shortDescription = "Depends on I1 being 42 or 420", subPointer = "I1", subValues = { "42", "420" })
@StringSetting(key = "X2", displayName = "X2", shortDescription = "Depends on X1 being 3", subPointer = "I1", subValues = { "3" })
@StringSetting(key = "X2", displayName = "X2", shortDescription = "Depends on X1 being 3", subPointer = "X1", subValues = { "3" })
@StringSetting(key = "X3", displayName = "X3", shortDescription = "Depends on I1/doesNotExist being 7", subPointer = "I1/doesNotExist", subValues = { "7" })

@Group(key = "groupA", displayName = "Group A",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public void execute( List<CheckpointReader> inputs, Settings settings, Execution

row.add( insertAsString ? PolyString.of( node.toString() ) : PolyValue.fromJson( node.toString() ) );
}
builder.uniquify(); // make sure no conflicts with pk column name

RelWriter writer = ctx.createRelWriter( 0, builder.build(), true );
if ( !variables.isEmpty() ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public JsonNode resolveVariables( JsonNode node, boolean useDefaultIfMissing ) {
String[] refSplit = refString.split( "/", 2 );
String variableRef = refSplit[0].replace( JsonPointer.ESC_SLASH, "/" ).replace( JsonPointer.ESC_TILDE, "~" );
JsonNode replacement = variables.get( variableRef );
if ( refSplit.length == 2 && !refSplit[1].isEmpty() ) {
if ( replacement != null && refSplit.length == 2 && !refSplit[1].isEmpty() ) {
replacement = replacement.at( "/" + refSplit[1] ); // resolve JsonPointer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,16 @@ private String getStore( int idx ) {

@Override
public void close() throws Exception {
List<Exception> exceptions = new ArrayList<>();
for ( CheckpointWriter writer : writers ) {
writer.close();
try {
writer.close();
} catch ( Exception e ) {
exceptions.add( e );
}
}
if (!exceptions.isEmpty()) {
throw exceptions.get( 0 ); // we only throw the first exception
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private void executeBatch() {
ExecutedContext executedContext = QueryUtils.executeQuery( parsed, writeStatement );

if ( executedContext.getException().isPresent() ) {
throw new GenericRuntimeException( "An error occurred while writing a batch: ", executedContext.getException().get() );
Throwable e = executedContext.getException().get();
throw new GenericRuntimeException( "An error occurred while writing a batch: " + e.getMessage(), e );
}
List<List<PolyValue>> results = executedContext.getIterator().getAllRowsAndClose();
long changedCount = results.size() == 1 ? results.get( 0 ).get( 0 ).asLong().longValue() : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,20 +189,26 @@ public synchronized RelWriter createRelCheckpoint( UUID activityId, int outputId
}
}

Transaction transaction = QueryUtils.startTransaction( relNamespace, "RelCreate" );
String tableName = getTableName( activityId, outputIdx );
Transaction transaction = QueryUtils.startTransaction( relNamespace, "RelCreate" );

acquireSchemaLock( transaction, relNamespace );
ddlManager.createTable(
relNamespace,
tableName,
getFieldInfo( type ),
getPkConstraint( pkField.getName() ),
false,
List.of( getStore( storeName ) ),
PlacementType.AUTOMATIC,
transaction.createStatement() );
transaction.commit();
try{
acquireSchemaLock( transaction, relNamespace );
ddlManager.createTable(
relNamespace,
tableName,
getFieldInfo( type ),
getPkConstraint( pkField.getName() ),
false,
List.of( getStore( storeName ) ),
PlacementType.AUTOMATIC,
transaction.createStatement() );
transaction.commit();
} finally {
if (transaction.isActive()) {
transaction.rollback( null );
}
}

LogicalTable table = Catalog.snapshot().rel().getTable( relNamespace, tableName ).orElseThrow();

Expand All @@ -220,16 +226,22 @@ public synchronized DocWriter createDocCheckpoint( UUID activityId, int outputId
String collectionName = getCollectionName( activityId, outputIdx );

Transaction transaction = QueryUtils.startTransaction( docNamespace, "DocCreate" );
acquireSchemaLock( transaction, docNamespace );
ddlManager.createCollection(
docNamespace,
collectionName,
false,
List.of( getStore( storeName ) ),
PlacementType.AUTOMATIC,
transaction.createStatement()
);
transaction.commit();
try {
acquireSchemaLock( transaction, docNamespace );
ddlManager.createCollection(
docNamespace,
collectionName,
false,
List.of( getStore( storeName ) ),
PlacementType.AUTOMATIC,
transaction.createStatement()
);
transaction.commit();
} finally {
if (transaction.isActive()) {
transaction.rollback( null );
}
}

LogicalCollection collection = Catalog.snapshot().doc().getCollection( docNamespace, collectionName ).orElseThrow();
register( activityId, outputIdx, collection );
Expand All @@ -245,16 +257,23 @@ public synchronized LpgWriter createLpgCheckpoint( UUID activityId, int outputId
String graphName = getGraphName( activityId, outputIdx );
Transaction transaction = QueryUtils.startTransaction( Catalog.defaultNamespaceId, "LpgCreate" );
//acquireSchemaLock( transaction, Catalog.defaultNamespaceId ); // TODO: no lock required since we create a new namespace?
long graphId = ddlManager.createGraph(
graphName,
true,
List.of( getStore( storeName ) ),
false,
false,
RuntimeConfig.GRAPH_NAMESPACE_DEFAULT_CASE_SENSITIVE.getBoolean(),
transaction.createStatement()
);
transaction.commit();
long graphId;
try {
graphId = ddlManager.createGraph(
graphName,
true,
List.of( getStore( storeName ) ),
false,
false,
RuntimeConfig.GRAPH_NAMESPACE_DEFAULT_CASE_SENSITIVE.getBoolean(),
transaction.createStatement()
);
transaction.commit();
} finally {
if (transaction.isActive()) {
transaction.rollback( null );
}
}

LogicalGraph graph = Catalog.snapshot().graph().getGraph( graphId ).orElseThrow();
register( activityId, outputIdx, graph );
Expand Down Expand Up @@ -392,7 +411,7 @@ private DataStore<?> getStore( String storeName ) {


private void dropEntity( LogicalEntity entity ) {
Transaction transaction = startTransaction( entity );
Transaction transaction = QueryUtils.startTransaction( entity.getNamespaceId(), "DropCheckpoint" );
Statement statement = transaction.createStatement();
acquireSchemaLock( transaction, entity.getNamespaceId() );
switch ( entity.dataModel ) {
Expand All @@ -417,15 +436,6 @@ private void dropNamespaces() {
// Utils:


private Transaction startTransaction( LogicalEntity targetEntity ) {
return switch ( targetEntity.dataModel ) {
case RELATIONAL -> QueryUtils.startTransaction( relNamespace );
case DOCUMENT -> QueryUtils.startTransaction( docNamespace );
case GRAPH -> QueryUtils.startTransaction( targetEntity.getNamespaceId() );
};
}


private List<ConstraintInformation> getPkConstraint( String pkCol ) {
return List.of( new ConstraintInformation( "PRIMARY KEY", ConstraintType.PRIMARY, List.of( pkCol ) ) );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@ public void write( List<PolyValue> tuple ) {
@Override
public void close() throws Exception {
if ( transaction.isActive() ) { // ensure writer is only closed once
writer.close();
super.close();
try {
writer.close();
} finally {
super.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,11 @@ public void write( List<PolyValue> tuple ) {
@Override
public void close() throws Exception {
if ( transaction.isActive() ) { // ensure writer is only closed once
writer.close();
super.close();
try {
writer.close();
} finally {
super.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,11 @@ public void write( List<PolyValue> row ) {
@Override
public void close() throws Exception {
if ( transaction.isActive() ) { // ensure writer is only closed once
writer.close();
super.close();
try {
writer.close();
} finally {
super.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public UUID createUserSession( UUID wId, int version ) throws WorkflowRepoExcept
try {
return registerUserSession( WorkflowImpl.fromModel( model ), wId, version );
} catch ( Exception e ) {
throw new WorkflowRepoException( "Unable to instantiate workflow", e );
throw new WorkflowRepoException( "Unable to instantiate workflow: " + e.getMessage(), e );
}
}

Expand Down

0 comments on commit 3905367

Please sign in to comment.