Skip to content

Commit

Permalink
[incubator-kie-issues-1110] Add capability to store that node instance (
Browse files Browse the repository at this point in the history
apache#3488)

* [incubator-kie-issues-1110] Add capability to store that node instance
  • Loading branch information
elguardian authored and rgdoliveira committed May 7, 2024
1 parent 0a93241 commit 23b1355
Show file tree
Hide file tree
Showing 45 changed files with 2,187 additions and 815 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.jbpm.workflow.instance;

import java.util.Collection;
import java.util.Map;

import org.jbpm.workflow.core.node.AsyncEventNode;
import org.kie.api.definition.process.Node;
Expand Down Expand Up @@ -99,4 +100,6 @@ default Node resolveAsync(Node node) {
}
return node;
}

Map<String, Integer> getIterationLevels();
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void setContextInstance(String contextId, ContextInstance contextInstance
this.contextInstances.put(contextId, contextInstance);
}

@Override
public ContextInstance getContextInstance(String contextId) {
ContextInstance contextInstance = this.contextInstances.get(contextId);
if (contextInstance != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ public void setCurrentLevel(int currentLevel) {
this.currentLevel = currentLevel;
}

@Override
public Map<String, Integer> getIterationLevels() {
return iterationLevels;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,36 @@
*/
package org.jbpm.flow.serialization;

import java.util.function.Supplier;

import org.jbpm.ruleflow.instance.RuleFlowProcessInstance;
import org.kie.kogito.process.Process;

public final class MarshallerContextName<T> {

public static final MarshallerContextName<ObjectMarshallerStrategy[]> OBJECT_MARSHALLING_STRATEGIES = new MarshallerContextName<>("OBJECT_MARSHALLING_STRATEGIES");
public static final MarshallerContextName<ObjectMarshallerStrategy[]> OBJECT_MARSHALLING_STRATEGIES =
new MarshallerContextName<>("OBJECT_MARSHALLING_STRATEGIES", () -> new ObjectMarshallerStrategy[0]);
public static final MarshallerContextName<String> MARSHALLER_FORMAT = new MarshallerContextName<>("FORMAT");
public static final MarshallerContextName<Process<?>> MARSHALLER_PROCESS = new MarshallerContextName<>("PROCESS");
public static final MarshallerContextName<RuleFlowProcessInstance> MARSHALLER_PROCESS_INSTANCE = new MarshallerContextName<>("PROCESS_INSTANCE");
public static final MarshallerContextName<Boolean> MARSHALLER_INSTANCE_READ_ONLY = new MarshallerContextName<>("READ_ONLY");
public static final MarshallerContextName<ProcessInstanceMarshallerListener[]> MARSHALLER_INSTANCE_LISTENER = new MarshallerContextName<>("MARSHALLER_INSTANCE_LISTENERS");
public static final MarshallerContextName<ProcessInstanceMarshallerListener[]> MARSHALLER_INSTANCE_LISTENER =
new MarshallerContextName<>("MARSHALLER_INSTANCE_LISTENERS", () -> new ProcessInstanceMarshallerListener[0]);
public static final MarshallerContextName<NodeInstanceReader[]> MARSHALLER_NODE_INSTANCE_READER = new MarshallerContextName<>("MARSHALLER_NODE_INSTANCE_READER", () -> new NodeInstanceReader[0]);
public static final MarshallerContextName<NodeInstanceWriter[]> MARSHALLER_NODE_INSTANCE_WRITER = new MarshallerContextName<>("MARSHALLER_NODE_INSTANCE_WRITER", () -> new NodeInstanceWriter[0]);

public static final String MARSHALLER_FORMAT_JSON = "json";

private String name;
private Supplier<T> defaultValue;

private MarshallerContextName(String name) {
this(name, () -> null);
}

private MarshallerContextName(String name, Supplier<T> defaultValue) {
this.name = name;
this.defaultValue = defaultValue;
}

public String name() {
Expand All @@ -44,4 +58,8 @@ public String name() {
public T cast(Object value) {
return (T) value;
}

public T defaultValue() {
return defaultValue.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@

import java.io.InputStream;

import com.google.protobuf.Any;

public interface MarshallerReaderContext extends MarshallerContext {

InputStream input();

NodeInstanceReader findNodeInstanceReader(Any nodeInstance);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@

import java.io.OutputStream;

import org.kie.api.runtime.process.NodeInstance;

public interface MarshallerWriterContext extends MarshallerContext {

OutputStream output();

NodeInstanceWriter findNodeInstanceWriter(NodeInstance nodeInstance);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jbpm.flow.serialization;

import org.kie.api.runtime.process.NodeInstance;

import com.google.protobuf.Any;
import com.google.protobuf.GeneratedMessageV3;

public interface NodeInstanceReader extends Comparable<NodeInstanceReader> {
Integer DEFAULT_ORDER = 10;

default Integer order() {
return DEFAULT_ORDER;
}

@Override
default int compareTo(NodeInstanceReader that) {
return this.order().compareTo(that.order());
}

Class<? extends GeneratedMessageV3> type();

default boolean accept(Any value) {
return value.is(type());
}

NodeInstance read(MarshallerReaderContext context, Any value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jbpm.flow.serialization;

import org.kie.api.runtime.process.NodeInstance;

import com.google.protobuf.GeneratedMessageV3;

public interface NodeInstanceWriter extends Comparable<NodeInstanceWriter> {
Integer DEFAULT_ORDER = 10;

default Integer order() {
return DEFAULT_ORDER;
}

@Override
default int compareTo(NodeInstanceWriter that) {
return this.order().compareTo(that.order());
}

boolean accept(NodeInstance value);

GeneratedMessageV3.Builder<?> write(MarshallerWriterContext writer, NodeInstance value);

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ public class ProcessInstanceMarshallerException extends RuntimeException {

private static final long serialVersionUID = -1676023219884892322L;

public ProcessInstanceMarshallerException(Throwable th) {
super(th);
}

public ProcessInstanceMarshallerException(String msg) {
super(msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.function.Supplier;

import org.jbpm.flow.serialization.impl.ProtobufProcessInstanceMarshallerFactory;
import org.jbpm.util.JbpmClassLoaderUtil;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstanceReadMode;
Expand All @@ -49,6 +50,9 @@ public class ProcessInstanceMarshallerService {

private ProcessInstanceMarshallerFactory processInstanceMarshallerFactory;

private List<NodeInstanceReader> readers;
private List<NodeInstanceWriter> writers;

public class Builder {

public Builder() {
Expand All @@ -60,9 +64,10 @@ public Builder withProcessInstanceMarshallerFactory(ProcessInstanceMarshallerFac
return this;
}

@SuppressWarnings("unchecked")
public <T> Builder withContextEntries(Map<MarshallerContextName<T>, T> contextEntries) {
for (Map.Entry<MarshallerContextName<T>, T> item : contextEntries.entrySet()) {
ProcessInstanceMarshallerService.this.contextEntries.put((MarshallerContextName<Object>) item.getKey(), (Object) item.getValue());
ProcessInstanceMarshallerService.this.contextEntries.put((MarshallerContextName<Object>) item.getKey(), item.getValue());
}
return this;
}
Expand All @@ -77,11 +82,23 @@ public Builder withDefaultListeners() {
}

public Builder withDefaultObjectMarshallerStrategies() {
ServiceLoader<ObjectMarshallerStrategy> loader = ServiceLoader.load(ObjectMarshallerStrategy.class);
ServiceLoader<ObjectMarshallerStrategy> loader = ServiceLoader.load(ObjectMarshallerStrategy.class, JbpmClassLoaderUtil.findClassLoader());

for (ObjectMarshallerStrategy strategy : loader) {
ProcessInstanceMarshallerService.this.strats.add(strategy);
}

ServiceLoader<NodeInstanceReader> readerLoader = ServiceLoader.load(NodeInstanceReader.class, JbpmClassLoaderUtil.findClassLoader());

for (NodeInstanceReader reader : readerLoader) {
ProcessInstanceMarshallerService.this.readers.add(reader);
}

ServiceLoader<NodeInstanceWriter> writerLoader = ServiceLoader.load(NodeInstanceWriter.class, JbpmClassLoaderUtil.findClassLoader());

for (NodeInstanceWriter writer : writerLoader) {
ProcessInstanceMarshallerService.this.writers.add(writer);
}
return this;
}

Expand All @@ -101,6 +118,8 @@ public Builder withObjectMarshallerStrategies(ObjectMarshallerStrategy... strate

public ProcessInstanceMarshallerService build() {
Collections.sort(ProcessInstanceMarshallerService.this.strats);
Collections.sort(ProcessInstanceMarshallerService.this.readers);
Collections.sort(ProcessInstanceMarshallerService.this.writers);
return ProcessInstanceMarshallerService.this;
}

Expand All @@ -113,6 +132,8 @@ public static Builder newBuilder() {
private ProcessInstanceMarshallerService() {
this.listeners = new ArrayList<>();
this.strats = new ArrayList<>();
this.readers = new ArrayList<>();
this.writers = new ArrayList<>();
this.contextEntries = new HashMap<>();
}

Expand All @@ -129,6 +150,7 @@ public byte[] marshallProcessInstance(ProcessInstance<?> processInstance) {
MarshallerWriterContext context = processInstanceMarshallerFactory.newWriterContext(baos);
context.set(MarshallerContextName.MARSHALLER_PROCESS, processInstance.process());
context.set(MarshallerContextName.MARSHALLER_INSTANCE_LISTENER, listeners.toArray(ProcessInstanceMarshallerListener[]::new));
context.set(MarshallerContextName.MARSHALLER_NODE_INSTANCE_WRITER, this.writers.toArray(NodeInstanceWriter[]::new));
setupEnvironment(context);
org.jbpm.flow.serialization.ProcessInstanceMarshaller marshaller = processInstanceMarshallerFactory.newKogitoProcessInstanceMarshaller();
marshaller.writeProcessInstance(context, processInstance);
Expand All @@ -144,6 +166,7 @@ public ProcessInstance<?> unmarshallProcessInstance(byte[] data, Process<?> proc
context.set(MarshallerContextName.MARSHALLER_PROCESS, process);
context.set(MarshallerContextName.MARSHALLER_INSTANCE_READ_ONLY, readOnly);
context.set(MarshallerContextName.MARSHALLER_INSTANCE_LISTENER, listeners.toArray(ProcessInstanceMarshallerListener[]::new));
context.set(MarshallerContextName.MARSHALLER_NODE_INSTANCE_READER, this.readers.toArray(NodeInstanceReader[]::new));
setupEnvironment(context);
org.jbpm.flow.serialization.ProcessInstanceMarshaller marshaller = processInstanceMarshallerFactory.newKogitoProcessInstanceMarshaller();
return marshaller.readProcessInstance(context);
Expand All @@ -170,6 +193,7 @@ public Consumer<AbstractProcessInstance<?>> createdReloadFunction(Supplier<byte[
MarshallerReaderContext context = processInstanceMarshallerFactory.newReaderContext(bais);
context.set(MarshallerContextName.MARSHALLER_PROCESS, processInstance.process());
context.set(MarshallerContextName.MARSHALLER_INSTANCE_LISTENER, listeners.toArray(ProcessInstanceMarshallerListener[]::new));
context.set(MarshallerContextName.MARSHALLER_NODE_INSTANCE_READER, this.readers.toArray(NodeInstanceReader[]::new));
setupEnvironment(context);
org.jbpm.flow.serialization.ProcessInstanceMarshaller marshaller =
processInstanceMarshallerFactory.newKogitoProcessInstanceMarshaller();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

public abstract class ProtobufAbstractMarshallerContext implements MarshallerContext {

private Map<MarshallerContextName, Object> env;
private Map<MarshallerContextName<?>, Object> env;

public ProtobufAbstractMarshallerContext() {
this.env = new HashMap<>();
Expand All @@ -40,7 +40,8 @@ public ProtobufAbstractMarshallerContext() {
@SuppressWarnings("unchecked")
@Override
public <T> T get(MarshallerContextName<T> key) {
return (T) env.get(key);
T value = (T) env.get(key);
return value != null ? value : key.defaultValue();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

import java.io.InputStream;

import org.jbpm.flow.serialization.MarshallerContextName;
import org.jbpm.flow.serialization.MarshallerReaderContext;
import org.jbpm.flow.serialization.NodeInstanceReader;

import com.google.protobuf.Any;

public class ProtobufMarshallerReaderContext extends ProtobufAbstractMarshallerContext implements MarshallerReaderContext {

Expand All @@ -35,4 +39,15 @@ public InputStream input() {
return is;
}

@Override
public NodeInstanceReader findNodeInstanceReader(Any nodeInstance) {
NodeInstanceReader[] readers = this.get(MarshallerContextName.MARSHALLER_NODE_INSTANCE_READER);
for (NodeInstanceReader reader : readers) {
if (reader.accept(nodeInstance)) {
return reader;
}
}
return null;
}

}
Loading

0 comments on commit 23b1355

Please sign in to comment.