Skip to content

Commit

Permalink
fix reconnect in user task listener
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Sep 30, 2024
1 parent 7801939 commit ca84ee2
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,15 @@
import org.kie.kogito.process.workitems.InternalKogitoWorkItem;
import org.kie.kogito.process.workitems.InternalKogitoWorkItemManager;
import org.kie.kogito.signal.SignalManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Collections.emptyMap;

public class LightWorkItemManager implements InternalKogitoWorkItemManager {

private static final Logger LOG = LoggerFactory.getLogger(LightWorkItemManager.class);

private Map<String, InternalKogitoWorkItem> workItems = new ConcurrentHashMap<>();
private Map<String, KogitoWorkItemHandler> workItemHandlers = new HashMap<>();

Expand All @@ -70,6 +74,7 @@ public void registerWorkItemHandler(String workItemName, KogitoWorkItemHandler h

@Override
public void internalAddWorkItem(InternalKogitoWorkItem workItem) {
LOG.info("internal add work item {}", workItem);
workItems.put(workItem.getStringId(), workItem);
}

Expand All @@ -84,7 +89,8 @@ public InternalKogitoWorkItem getWorkItem(String workItemId) {

@Override
public void internalRemoveWorkItem(String id) {
workItems.remove(id);
InternalKogitoWorkItem workItem = workItems.remove(id);
LOG.info("internal remove work item {}", workItem);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,13 @@
import org.kie.kogito.process.flexible.Milestone;
import org.kie.kogito.process.workitems.InternalKogitoWorkItem;
import org.kie.kogito.services.uow.ProcessInstanceWorkUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractProcessInstance<T extends Model> implements ProcessInstance<T> {

private static final Logger LOG = LoggerFactory.getLogger(AbstractProcessInstance.class);

private static final String KOGITO_PROCESS_INSTANCE = "KogitoProcessInstance";

protected final T variables;
Expand Down Expand Up @@ -156,6 +160,7 @@ public AbstractProcessInstance(AbstractProcess<T> process, T variables, ProcessR
}

protected void reconnect() {
LOG.info("reconnect process {}", processInstance.getId());
//set correlation
if (correlationInstance.isEmpty()) {
correlationInstance = process().correlations().findByCorrelatedId(id());
Expand All @@ -170,8 +175,8 @@ protected void reconnect() {
addCompletionEventListener();

for (org.kie.api.runtime.process.NodeInstance nodeInstance : processInstance.getNodeInstances(true)) {
if (nodeInstance instanceof WorkItemNodeInstance) {
((WorkItemNodeInstance) nodeInstance).internalRegisterWorkItem();
if (nodeInstance instanceof WorkItemNodeInstance workItemNodeInstance) {
workItemNodeInstance.internalRegisterWorkItem();
}
}

Expand All @@ -193,13 +198,15 @@ private void removeCompletionListener() {
}

protected void disconnect() {

if (processInstance == null) {
return;
}

LOG.info("disconnect process {}", processInstance.getId());
for (org.kie.api.runtime.process.NodeInstance nodeInstance : processInstance.getNodeInstances(true)) {
if (nodeInstance instanceof WorkItemNodeInstance) {
((WorkItemNodeInstance) nodeInstance).internalRemoveWorkItem();
if (nodeInstance instanceof WorkItemNodeInstance workItemNodeInstance) {
workItemNodeInstance.internalRemoveWorkItem();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@

import org.kie.kogito.Model;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstanceReadMode;
import org.kie.kogito.process.Processes;
import org.kie.kogito.process.workitems.InternalKogitoWorkItem;
import org.kie.kogito.usertask.UserTaskEventListener;
import org.kie.kogito.usertask.events.UserTaskStateEvent;
import org.kie.kogito.usertask.impl.lifecycle.DefaultUserTaskLifeCycle;
import org.kie.kogito.usertask.lifecycle.UserTaskState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UserTaskKogitoWorkItemHandlerProcessListener implements UserTaskEventListener {

private static final Logger LOG = LoggerFactory.getLogger(UserTaskKogitoWorkItemHandlerProcessListener.class);

private Processes processes;

public UserTaskKogitoWorkItemHandlerProcessListener(Processes processes) {
Expand All @@ -44,9 +47,11 @@ public void onUserTaskState(UserTaskStateEvent event) {
return;
}

LOG.info("onUserTaskState {} on termination trigger {}", event, userTaskState);

String processId = (String) event.getUserTaskInstance().getMetadata().get("ProcessId");
String processInstanceId = (String) event.getUserTaskInstance().getMetadata().get("ProcessInstanceId");
ProcessInstance<? extends Model> processInstance = processes.processById(processId).instances().findById(processInstanceId, ProcessInstanceReadMode.MUTABLE).orElse(null);
ProcessInstance<? extends Model> processInstance = processes.processById(processId).instances().findById(processInstanceId).orElse(null);
processInstance.updateWorkItem(event.getUserTaskInstance().getExternalReferenceId(), workItem -> {
((InternalKogitoWorkItem) workItem).setActualOwner(event.getUserTaskInstance().getActualOwner());
return workItem;
Expand All @@ -56,7 +61,8 @@ public void onUserTaskState(UserTaskStateEvent event) {
if (notify != null && !notify) {
return;
}

LOG.info("onUserTaskState {} on complete work item", event);
processInstance = processes.processById(processId).instances().findById(processInstanceId).orElse(null);
processInstance.completeWorkItem(event.getUserTaskInstance().getExternalReferenceId(), event.getUserTaskInstance().getOutputs());

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public void setDisconnectUserTaskInstance(Function<UserTaskInstance, UserTaskIns
@Override
public Optional<UserTaskInstance> findById(String userTaskInstanceId) {
try {
if (!userTaskInstances.containsKey(userTaskInstanceId)) {
return null;
}
UserTaskInstance userTaskInstance = mapper.readValue(userTaskInstances.get(userTaskInstanceId), DefaultUserTaskInstance.class);
return Optional.ofNullable(reconnectUserTaskInstance.apply(userTaskInstance));
} catch (Exception e) {
Expand Down Expand Up @@ -160,6 +163,9 @@ public UserTaskInstance update(UserTaskInstance userTaskInstance) {
@Override
public UserTaskInstance remove(String userTaskInstanceId) {
try {
if (!userTaskInstances.containsKey(userTaskInstanceId)) {
return null;
}
return disconnectUserTaskInstance.apply(mapper.readValue(userTaskInstances.remove(userTaskInstanceId), DefaultUserTaskInstance.class));
} catch (Exception e) {
LOG.error("during remove {}", userTaskInstanceId, e);
Expand Down

0 comments on commit ca84ee2

Please sign in to comment.