Skip to content

Commit

Permalink
[Fix apache/incubator-kie-issues#987] New onError event (apache#3428)
Browse files Browse the repository at this point in the history
* [Fix apache/incubator-kie-issues#987] New onError event

* [Fix apache/incubator-kie-issues#987] Adding unit test

* [Fix apache/incubator-kie-issues#987] Refactor ProcessNodeEvent children

Although not strictly needed by the PR, for future additions, a new
class in the hierarchy that holds the NodeInstance reference has been
included.
fields has been made final to remark that event objects are inmutable
  • Loading branch information
fjtirado committed Apr 3, 2024
1 parent 132fba7 commit 66289ac
Show file tree
Hide file tree
Showing 12 changed files with 230 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,11 @@ void fireOnUserTaskCommentAdded(
KieRuntime kruntime,
Comment addedComment);

//

void reset();

void addEventListener(KogitoProcessEventListener listener);

void removeEventListener(KogitoProcessEventListener listener);

}
void fireOnError(KogitoProcessInstance instance, KogitoNodeInstance nodeInstance, KieRuntime kruntime, Exception exception);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.kie.kogito.uow.events;

import org.kie.api.event.process.ErrorEvent;
import org.kie.api.event.process.MessageEvent;
import org.kie.api.event.process.ProcessCompletedEvent;
import org.kie.api.event.process.ProcessEvent;
Expand Down Expand Up @@ -59,12 +60,11 @@ private void intercept(UserTaskEvent event) {

@Override
public void beforeProcessStarted(ProcessStartedEvent event) {

intercept(event);
}

@Override
public void afterProcessStarted(ProcessStartedEvent event) {
intercept(event);
}

@Override
Expand Down Expand Up @@ -193,4 +193,9 @@ public void onUserTaskOutputVariable(UserTaskVariableEvent event) {
intercept(event);
}

@Override
public void onError(ErrorEvent event) {
intercept(event);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.kie.kogito.event.impl;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.TreeSet;

import org.kie.api.event.process.ErrorEvent;
import org.kie.api.event.process.ProcessCompletedEvent;
import org.kie.api.event.process.ProcessEvent;
import org.kie.api.event.process.ProcessNodeEvent;
Expand All @@ -38,7 +40,6 @@
import org.kie.api.event.usertask.UserTaskAttachmentEvent;
import org.kie.api.event.usertask.UserTaskCommentEvent;
import org.kie.api.event.usertask.UserTaskDeadlineEvent;
import org.kie.api.event.usertask.UserTaskEvent;
import org.kie.api.event.usertask.UserTaskStateEvent;
import org.kie.api.event.usertask.UserTaskVariableEvent;
import org.kie.kogito.Addons;
Expand Down Expand Up @@ -70,7 +71,6 @@
import org.kie.kogito.event.usertask.UserTaskInstanceVariableEventBody;
import org.kie.kogito.internal.process.event.KogitoProcessVariableChangedEvent;
import org.kie.kogito.internal.process.runtime.KogitoNodeInstance;
import org.kie.kogito.internal.process.runtime.KogitoProcessInstance;
import org.kie.kogito.internal.process.runtime.KogitoWorkItem;
import org.kie.kogito.internal.process.runtime.KogitoWorkItemNodeInstance;
import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcessInstance;
Expand All @@ -86,25 +86,19 @@ public class ProcessInstanceEventBatch implements EventBatch {
public ProcessInstanceEventBatch(String service, Addons addons) {
this.service = service;
this.addons = addons != null ? addons : Addons.EMTPY;
this.processedEvents = new ArrayList<>();
}

@Override
public void append(Object rawEvent) {
if (rawEvent instanceof ProcessEvent) {
addDataEvent((ProcessEvent) rawEvent);
} else if (rawEvent instanceof UserTaskEvent) {
addDataEvent((UserTaskEvent) rawEvent);
}
this.processedEvents = new TreeSet<>(new Comparator<DataEvent<?>>() {
@Override
public int compare(DataEvent<?> event1, DataEvent<?> event2) {
return event2 instanceof ProcessInstanceStateDataEvent &&
((ProcessInstanceStateDataEvent) event2).getData().getEventType() == ProcessInstanceStateEventBody.EVENT_TYPE_ENDED
|| event1 instanceof ProcessInstanceStateDataEvent &&
((ProcessInstanceStateDataEvent) event1).getData().getEventType() == ProcessInstanceStateEventBody.EVENT_TYPE_STARTED ? -1 : 1;
}
});
}

@Override
public Collection<DataEvent<?>> events() {
return processedEvents;
}

private void addDataEvent(ProcessEvent event) {
// process events
public void append(Object event) {
if (event instanceof ProcessStartedEvent) {
handleProcessStateEvent((ProcessStartedEvent) event);
} else if (event instanceof ProcessCompletedEvent) {
Expand All @@ -117,9 +111,28 @@ private void addDataEvent(ProcessEvent event) {
handleProcesssNodeEvent((SLAViolatedEvent) event);
} else if (event instanceof ProcessVariableChangedEvent) {
handleProcessVariableEvent((ProcessVariableChangedEvent) event);
} else if (event instanceof ErrorEvent) {
handleErrorEvent((ErrorEvent) event);
} else if (event instanceof UserTaskStateEvent) {
handleUserTaskStateEvent((UserTaskStateEvent) event);
} else if (event instanceof UserTaskDeadlineEvent) {
handleUserTaskDeadlineEvent((UserTaskDeadlineEvent) event);
} else if (event instanceof UserTaskAssignmentEvent) {
handleUserTaskAssignmentEvent((UserTaskAssignmentEvent) event);
} else if (event instanceof UserTaskVariableEvent) {
handleUserTaskVariableEvent((UserTaskVariableEvent) event);
} else if (event instanceof UserTaskAttachmentEvent) {
handleUserTaskAttachmentEvent((UserTaskAttachmentEvent) event);
} else if (event instanceof UserTaskCommentEvent) {
handleUserTaskCommentEvent((UserTaskCommentEvent) event);
}
}

@Override
public Collection<DataEvent<?>> events() {
return processedEvents;
}

private void handleProcessVariableEvent(ProcessVariableChangedEvent event) {
if (event.getTags().contains(KogitoTags.INTERNAL_TAG)) {
return;
Expand Down Expand Up @@ -245,52 +258,31 @@ private ProcessInstanceNodeDataEvent toProcessInstanceNodeEvent(ProcessNodeEvent
return piEvent;
}

private void handleProcessStateEvent(ProcessCompletedEvent event) {
private void handleErrorEvent(ErrorEvent event) {
KogitoWorkflowProcessInstance pi = (KogitoWorkflowProcessInstance) event.getProcessInstance();
ProcessInstanceErrorEventBody errorBody = ProcessInstanceErrorEventBody.create()
.eventDate(new Date())
.eventUser(event.getEventIdentity())
.processInstanceId(pi.getId())
.processId(pi.getProcessId())
.processVersion(pi.getProcessVersion())
.nodeDefinitionId(pi.getNodeIdInError())
.nodeInstanceId(pi.getNodeInstanceIdInError())
.errorMessage(pi.getErrorMessage())
.build();
Map<String, Object> metadata = buildProcessMetadata((KogitoWorkflowProcessInstance) event.getProcessInstance());
ProcessInstanceErrorDataEvent piEvent =
new ProcessInstanceErrorDataEvent(buildSource(event.getProcessInstance().getProcessId()), addons.toString(), event.getEventIdentity(), metadata, errorBody);
piEvent.setKogitoBusinessKey(pi.getBusinessKey());
processedEvents.add(piEvent);
}

private void handleProcessStateEvent(ProcessCompletedEvent event) {
processedEvents.add(toProcessInstanceStateEvent(event, ProcessInstanceStateEventBody.EVENT_TYPE_ENDED));

KogitoWorkflowProcessInstance pi = (KogitoWorkflowProcessInstance) event.getProcessInstance();
if (pi.getState() == KogitoProcessInstance.STATE_ERROR) {
ProcessInstanceErrorEventBody errorBody = ProcessInstanceErrorEventBody.create()
.eventDate(new Date())
.eventUser(event.getEventIdentity())
.processInstanceId(pi.getId())
.processId(pi.getProcessId())
.processVersion(pi.getProcessVersion())
.nodeDefinitionId(pi.getNodeIdInError())
.nodeInstanceId(pi.getNodeInstanceIdInError())
.errorMessage(pi.getErrorMessage())
.build();
Map<String, Object> metadata = buildProcessMetadata((KogitoWorkflowProcessInstance) event.getProcessInstance());
ProcessInstanceErrorDataEvent piEvent =
new ProcessInstanceErrorDataEvent(buildSource(event.getProcessInstance().getProcessId()), addons.toString(), event.getEventIdentity(), metadata, errorBody);
piEvent.setKogitoBusinessKey(pi.getBusinessKey());
processedEvents.add(piEvent);
}
}

private void handleProcessStateEvent(ProcessStartedEvent event) {
processedEvents.add(toProcessInstanceStateEvent(event, ProcessInstanceStateEventBody.EVENT_TYPE_STARTED));

KogitoWorkflowProcessInstance pi = (KogitoWorkflowProcessInstance) event.getProcessInstance();
if (pi.getState() == KogitoProcessInstance.STATE_ERROR) {
ProcessInstanceErrorEventBody errorBody = ProcessInstanceErrorEventBody.create()
.eventDate(new Date())
.eventUser(event.getEventIdentity())
.processInstanceId(pi.getId())
.processId(pi.getProcessId())
.processVersion(pi.getProcessVersion())
.nodeDefinitionId(pi.getNodeIdInError())
.nodeInstanceId(pi.getNodeInstanceIdInError())
.errorMessage(pi.getErrorMessage())
.build();
Map<String, Object> metadata = buildProcessMetadata((KogitoWorkflowProcessInstance) event.getProcessInstance());
ProcessInstanceErrorDataEvent piEvent =
new ProcessInstanceErrorDataEvent(buildSource(event.getProcessInstance().getProcessId()), addons.toString(), event.getEventIdentity(), metadata, errorBody);
piEvent.setKogitoBusinessKey(pi.getBusinessKey());
processedEvents.add(piEvent);
}

}

private ProcessInstanceStateDataEvent toProcessInstanceStateEvent(ProcessEvent event, int eventType) {
Expand Down Expand Up @@ -339,23 +331,6 @@ private Map<String, Object> buildProcessMetadata(KogitoWorkflowProcessInstance p
return metadata;
}

private void addDataEvent(UserTaskEvent event) {
// this should go in another event types
if (event instanceof UserTaskStateEvent) {
handleUserTaskStateEvent((UserTaskStateEvent) event);
} else if (event instanceof UserTaskDeadlineEvent) {
handleUserTaskDeadlineEvent((UserTaskDeadlineEvent) event);
} else if (event instanceof UserTaskAssignmentEvent) {
handleUserTaskAssignmentEvent((UserTaskAssignmentEvent) event);
} else if (event instanceof UserTaskVariableEvent) {
handleUserTaskVariableEvent((UserTaskVariableEvent) event);
} else if (event instanceof UserTaskAttachmentEvent) {
handleUserTaskAttachmentEvent((UserTaskAttachmentEvent) event);
} else if (event instanceof UserTaskCommentEvent) {
handleUserTaskCommentEvent((UserTaskCommentEvent) event);
}
}

private void handleUserTaskCommentEvent(UserTaskCommentEvent event) {
Map<String, Object> metadata = buildUserTaskMetadata((HumanTaskWorkItem) event.getWorkItem());
metadata.putAll(buildProcessMetadata((KogitoWorkflowProcessInstance) event.getProcessInstance()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,25 @@
package org.kie.kogito.event.impl;

import org.junit.jupiter.api.Test;
import org.kie.api.event.process.ErrorEvent;
import org.kie.api.event.process.ProcessCompletedEvent;
import org.kie.api.event.process.ProcessEvent;
import org.kie.api.event.process.ProcessNodeEvent;
import org.kie.api.event.process.ProcessNodeLeftEvent;
import org.kie.api.event.process.ProcessNodeTriggeredEvent;
import org.kie.api.event.process.ProcessStartedEvent;
import org.kie.api.event.process.ProcessVariableChangedEvent;
import org.kie.api.runtime.process.NodeInstance;
import org.kie.api.runtime.process.ProcessInstance;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
import org.kie.kogito.internal.process.runtime.KogitoNode;
import org.kie.kogito.internal.process.runtime.KogitoNodeInstance;
import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcess;
import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcessInstance;
import org.mockito.Mockito;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
Expand Down Expand Up @@ -49,4 +68,41 @@ public void testServiceDefined() {
assertThat(batch.extractRuntimeSource(singletonMap(PROCESS_ID_META_DATA, "demo.orders"))).isEqualTo("http://localhost:8080/orders");
}

@Test
public void testEventSorting() {
ProcessInstanceEventBatch batch = new ProcessInstanceEventBatch("", null);
KogitoWorkflowProcess process = Mockito.mock(KogitoWorkflowProcess.class);
KogitoWorkflowProcessInstance processInstance = Mockito.mock(KogitoWorkflowProcessInstance.class);
KogitoNodeInstance nodeInstance = Mockito.mock(KogitoNodeInstance.class);
KogitoNode node = Mockito.mock(KogitoNode.class);
Mockito.when(processInstance.getProcess()).thenReturn(process);
Mockito.when(nodeInstance.getNode()).thenReturn(node);
batch.append(mockEvent(ProcessVariableChangedEvent.class, processInstance));
batch.append(mockEvent(ProcessNodeTriggeredEvent.class, processInstance, nodeInstance));
batch.append(mockEvent(ProcessNodeLeftEvent.class, processInstance, nodeInstance));
batch.append(mockEvent(ProcessStartedEvent.class, processInstance));
batch.append(mockEvent(ProcessNodeTriggeredEvent.class, processInstance, nodeInstance));
batch.append(mockEvent(ProcessNodeLeftEvent.class, processInstance, nodeInstance));
batch.append(mockEvent(ErrorEvent.class, processInstance, nodeInstance));
batch.append(mockEvent(ProcessCompletedEvent.class, processInstance));
batch.append(mockEvent(ProcessNodeTriggeredEvent.class, processInstance, nodeInstance));
batch.append(mockEvent(ProcessNodeLeftEvent.class, processInstance, nodeInstance));
assertThat(batch.processedEvents).hasExactlyElementsOfTypes(ProcessInstanceStateDataEvent.class, ProcessInstanceVariableDataEvent.class, ProcessInstanceNodeDataEvent.class,
ProcessInstanceNodeDataEvent.class,
ProcessInstanceNodeDataEvent.class, ProcessInstanceNodeDataEvent.class, ProcessInstanceErrorDataEvent.class, ProcessInstanceNodeDataEvent.class, ProcessInstanceNodeDataEvent.class,
ProcessInstanceStateDataEvent.class);
}

private <T extends ProcessEvent> T mockEvent(Class<T> clazz, ProcessInstance processInstance) {
T event = Mockito.mock(clazz);
Mockito.when(event.getProcessInstance()).thenReturn(processInstance);
return event;
}

private <T extends ProcessNodeEvent> T mockEvent(Class<T> clazz, ProcessInstance processInstance, NodeInstance nodeInstance) {
T event = Mockito.mock(clazz);
Mockito.when(event.getProcessInstance()).thenReturn(processInstance);
Mockito.when(event.getNodeInstance()).thenReturn(nodeInstance);
return event;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jbpm.process.instance.event;

import org.kie.api.event.process.ProcessNodeEvent;
import org.kie.api.runtime.KieRuntime;
import org.kie.api.runtime.process.NodeInstance;
import org.kie.api.runtime.process.ProcessInstance;

public abstract class AbstractProcessNodeEvent extends ProcessEvent implements ProcessNodeEvent {

private final NodeInstance nodeInstance;

protected AbstractProcessNodeEvent(NodeInstance nodeInstance, ProcessInstance instance, KieRuntime kruntime) {
super(instance, kruntime);
this.nodeInstance = nodeInstance;
}

protected AbstractProcessNodeEvent(NodeInstance nodeInstance, ProcessInstance instance, KieRuntime kruntime, String identity) {
super(instance, kruntime, identity);
this.nodeInstance = nodeInstance;
}

@Override
public NodeInstance getNodeInstance() {
return nodeInstance;
}

}
Loading

0 comments on commit 66289ac

Please sign in to comment.