From ca84ee2f1801ec8ae32d80d3370e38c3e9462d24 Mon Sep 17 00:00:00 2001 From: Enrique Gonzalez Martinez Date: Mon, 30 Sep 2024 21:10:42 +0200 Subject: [PATCH] fix reconnect in user task listener --- .../process/instance/LightWorkItemManager.java | 8 +++++++- .../process/impl/AbstractProcessInstance.java | 15 +++++++++++---- ...rTaskKogitoWorkItemHandlerProcessListener.java | 12 +++++++++--- .../usertask/impl/InMemoryUserTaskInstances.java | 6 ++++++ 4 files changed, 33 insertions(+), 8 deletions(-) diff --git a/jbpm/jbpm-flow/src/main/java/org/jbpm/process/instance/LightWorkItemManager.java b/jbpm/jbpm-flow/src/main/java/org/jbpm/process/instance/LightWorkItemManager.java index 234c0455509..012fff1f5e5 100755 --- a/jbpm/jbpm-flow/src/main/java/org/jbpm/process/instance/LightWorkItemManager.java +++ b/jbpm/jbpm-flow/src/main/java/org/jbpm/process/instance/LightWorkItemManager.java @@ -40,11 +40,15 @@ import org.kie.kogito.process.workitems.InternalKogitoWorkItem; import org.kie.kogito.process.workitems.InternalKogitoWorkItemManager; import org.kie.kogito.signal.SignalManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static java.util.Collections.emptyMap; public class LightWorkItemManager implements InternalKogitoWorkItemManager { + private static final Logger LOG = LoggerFactory.getLogger(LightWorkItemManager.class); + private Map workItems = new ConcurrentHashMap<>(); private Map workItemHandlers = new HashMap<>(); @@ -70,6 +74,7 @@ public void registerWorkItemHandler(String workItemName, KogitoWorkItemHandler h @Override public void internalAddWorkItem(InternalKogitoWorkItem workItem) { + LOG.info("internal add work item {}", workItem); workItems.put(workItem.getStringId(), workItem); } @@ -84,7 +89,8 @@ public InternalKogitoWorkItem getWorkItem(String workItemId) { @Override public void internalRemoveWorkItem(String id) { - workItems.remove(id); + InternalKogitoWorkItem workItem = workItems.remove(id); + LOG.info("internal remove work item {}", workItem); } @Override diff --git a/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/AbstractProcessInstance.java b/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/AbstractProcessInstance.java index 1a6e8f1d42e..0c0042fcb41 100644 --- a/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/AbstractProcessInstance.java +++ b/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/AbstractProcessInstance.java @@ -73,9 +73,13 @@ import org.kie.kogito.process.flexible.Milestone; import org.kie.kogito.process.workitems.InternalKogitoWorkItem; import org.kie.kogito.services.uow.ProcessInstanceWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AbstractProcessInstance implements ProcessInstance { + private static final Logger LOG = LoggerFactory.getLogger(AbstractProcessInstance.class); + private static final String KOGITO_PROCESS_INSTANCE = "KogitoProcessInstance"; protected final T variables; @@ -156,6 +160,7 @@ public AbstractProcessInstance(AbstractProcess process, T variables, ProcessR } protected void reconnect() { + LOG.info("reconnect process {}", processInstance.getId()); //set correlation if (correlationInstance.isEmpty()) { correlationInstance = process().correlations().findByCorrelatedId(id()); @@ -170,8 +175,8 @@ protected void reconnect() { addCompletionEventListener(); for (org.kie.api.runtime.process.NodeInstance nodeInstance : processInstance.getNodeInstances(true)) { - if (nodeInstance instanceof WorkItemNodeInstance) { - ((WorkItemNodeInstance) nodeInstance).internalRegisterWorkItem(); + if (nodeInstance instanceof WorkItemNodeInstance workItemNodeInstance) { + workItemNodeInstance.internalRegisterWorkItem(); } } @@ -193,13 +198,15 @@ private void removeCompletionListener() { } protected void disconnect() { + if (processInstance == null) { return; } + LOG.info("disconnect process {}", processInstance.getId()); for (org.kie.api.runtime.process.NodeInstance nodeInstance : processInstance.getNodeInstances(true)) { - if (nodeInstance instanceof WorkItemNodeInstance) { - ((WorkItemNodeInstance) nodeInstance).internalRemoveWorkItem(); + if (nodeInstance instanceof WorkItemNodeInstance workItemNodeInstance) { + workItemNodeInstance.internalRemoveWorkItem(); } } diff --git a/jbpm/jbpm-usertask/src/main/java/org/kie/kogito/jbpm/usertask/handler/UserTaskKogitoWorkItemHandlerProcessListener.java b/jbpm/jbpm-usertask/src/main/java/org/kie/kogito/jbpm/usertask/handler/UserTaskKogitoWorkItemHandlerProcessListener.java index f1798035d46..b4cc54f91c7 100644 --- a/jbpm/jbpm-usertask/src/main/java/org/kie/kogito/jbpm/usertask/handler/UserTaskKogitoWorkItemHandlerProcessListener.java +++ b/jbpm/jbpm-usertask/src/main/java/org/kie/kogito/jbpm/usertask/handler/UserTaskKogitoWorkItemHandlerProcessListener.java @@ -20,16 +20,19 @@ import org.kie.kogito.Model; import org.kie.kogito.process.ProcessInstance; -import org.kie.kogito.process.ProcessInstanceReadMode; import org.kie.kogito.process.Processes; import org.kie.kogito.process.workitems.InternalKogitoWorkItem; import org.kie.kogito.usertask.UserTaskEventListener; import org.kie.kogito.usertask.events.UserTaskStateEvent; import org.kie.kogito.usertask.impl.lifecycle.DefaultUserTaskLifeCycle; import org.kie.kogito.usertask.lifecycle.UserTaskState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class UserTaskKogitoWorkItemHandlerProcessListener implements UserTaskEventListener { + private static final Logger LOG = LoggerFactory.getLogger(UserTaskKogitoWorkItemHandlerProcessListener.class); + private Processes processes; public UserTaskKogitoWorkItemHandlerProcessListener(Processes processes) { @@ -44,9 +47,11 @@ public void onUserTaskState(UserTaskStateEvent event) { return; } + LOG.info("onUserTaskState {} on termination trigger {}", event, userTaskState); + String processId = (String) event.getUserTaskInstance().getMetadata().get("ProcessId"); String processInstanceId = (String) event.getUserTaskInstance().getMetadata().get("ProcessInstanceId"); - ProcessInstance processInstance = processes.processById(processId).instances().findById(processInstanceId, ProcessInstanceReadMode.MUTABLE).orElse(null); + ProcessInstance processInstance = processes.processById(processId).instances().findById(processInstanceId).orElse(null); processInstance.updateWorkItem(event.getUserTaskInstance().getExternalReferenceId(), workItem -> { ((InternalKogitoWorkItem) workItem).setActualOwner(event.getUserTaskInstance().getActualOwner()); return workItem; @@ -56,7 +61,8 @@ public void onUserTaskState(UserTaskStateEvent event) { if (notify != null && !notify) { return; } - + LOG.info("onUserTaskState {} on complete work item", event); + processInstance = processes.processById(processId).instances().findById(processInstanceId).orElse(null); processInstance.completeWorkItem(event.getUserTaskInstance().getExternalReferenceId(), event.getUserTaskInstance().getOutputs()); } diff --git a/jbpm/jbpm-usertask/src/main/java/org/kie/kogito/usertask/impl/InMemoryUserTaskInstances.java b/jbpm/jbpm-usertask/src/main/java/org/kie/kogito/usertask/impl/InMemoryUserTaskInstances.java index 1fdb77bf350..55da7c4c0e2 100644 --- a/jbpm/jbpm-usertask/src/main/java/org/kie/kogito/usertask/impl/InMemoryUserTaskInstances.java +++ b/jbpm/jbpm-usertask/src/main/java/org/kie/kogito/usertask/impl/InMemoryUserTaskInstances.java @@ -69,6 +69,9 @@ public void setDisconnectUserTaskInstance(Function findById(String userTaskInstanceId) { try { + if (!userTaskInstances.containsKey(userTaskInstanceId)) { + return null; + } UserTaskInstance userTaskInstance = mapper.readValue(userTaskInstances.get(userTaskInstanceId), DefaultUserTaskInstance.class); return Optional.ofNullable(reconnectUserTaskInstance.apply(userTaskInstance)); } catch (Exception e) { @@ -160,6 +163,9 @@ public UserTaskInstance update(UserTaskInstance userTaskInstance) { @Override public UserTaskInstance remove(String userTaskInstanceId) { try { + if (!userTaskInstances.containsKey(userTaskInstanceId)) { + return null; + } return disconnectUserTaskInstance.apply(mapper.readValue(userTaskInstances.remove(userTaskInstanceId), DefaultUserTaskInstance.class)); } catch (Exception e) { LOG.error("during remove {}", userTaskInstanceId, e);