Skip to content

Commit

Permalink
fix(engine): remove orphan event subscriptions on process definition …
Browse files Browse the repository at this point in the history
…redeployment

Related to #2460
  • Loading branch information
joaquinfelici committed May 27, 2024
1 parent 84eacc6 commit 46c6317
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.camunda.bpm.engine.delegate.Expression;
import org.camunda.bpm.engine.impl.AbstractDefinitionDeployer;
import org.camunda.bpm.engine.impl.ProcessEngineLogger;
Expand Down Expand Up @@ -228,11 +230,11 @@ protected void createJobDefinition(ProcessDefinition processDefinition, JobDecla
* subscriptions and add new ones for the new deployed process definitions.
*/
protected void adjustStartEventSubscriptions(ProcessDefinitionEntity newLatestProcessDefinition, ProcessDefinitionEntity oldLatestProcessDefinition) {
removeObsoleteTimers(newLatestProcessDefinition);
addTimerDeclarations(newLatestProcessDefinition);
removeObsoleteTimers(newLatestProcessDefinition);
removeObsoleteEventSubscriptions(newLatestProcessDefinition, oldLatestProcessDefinition);

removeObsoleteEventSubscriptions(newLatestProcessDefinition, oldLatestProcessDefinition);
addEventSubscriptions(newLatestProcessDefinition);
addTimerDeclarations(newLatestProcessDefinition);
addEventSubscriptions(newLatestProcessDefinition);
}

@SuppressWarnings("unchecked")
Expand All @@ -255,29 +257,62 @@ protected void removeObsoleteTimers(ProcessDefinitionEntity processDefinition) {
}
}

protected void removeObsoleteEventSubscriptions(ProcessDefinitionEntity processDefinition, ProcessDefinitionEntity latestProcessDefinition) {
// remove all subscriptions for the previous version
if (latestProcessDefinition != null) {
EventSubscriptionManager eventSubscriptionManager = getEventSubscriptionManager();
protected void removeObsoleteEventSubscriptions(ProcessDefinitionEntity newLatestProcessDefinition, ProcessDefinitionEntity latestProcessDefinition) {
List<EventSubscriptionEntity> orphanSubscriptions = getOrphanSubscriptionEvents(newLatestProcessDefinition);
if(!orphanSubscriptions.isEmpty()) { // remove orphan subscriptions if any
for (EventSubscriptionEntity eventSubscriptionEntity : orphanSubscriptions) {
getEventSubscriptionManager().deleteEventSubscription(eventSubscriptionEntity);
}
}

List<EventSubscriptionEntity> subscriptionsToDelete = new ArrayList<>();
if (latestProcessDefinition != null) { // remove all subscriptions for the previous version
List<EventSubscriptionEntity> previousSubscriptions = getPreviousSubscriptionEvents(latestProcessDefinition);
for (EventSubscriptionEntity eventSubscriptionEntity : previousSubscriptions) {
eventSubscriptionEntity.delete();
}
}
}

List<EventSubscriptionEntity> messageEventSubscriptions = eventSubscriptionManager
.findEventSubscriptionsByConfiguration(EventType.MESSAGE.name(), latestProcessDefinition.getId());
subscriptionsToDelete.addAll(messageEventSubscriptions);
protected List<EventSubscriptionEntity> getPreviousSubscriptionEvents(ProcessDefinitionEntity latestProcessDefinition) {
EventSubscriptionManager eventSubscriptionManager = getEventSubscriptionManager();

List<EventSubscriptionEntity> signalEventSubscriptions = eventSubscriptionManager
.findEventSubscriptionsByConfiguration(EventType.SIGNAL.name(), latestProcessDefinition.getId());
subscriptionsToDelete.addAll(signalEventSubscriptions);
List<EventSubscriptionEntity> subscriptionsToDelete = new ArrayList<>();

List<EventSubscriptionEntity> conditionalEventSubscriptions = eventSubscriptionManager
.findEventSubscriptionsByConfiguration(EventType.CONDITONAL.name(), latestProcessDefinition.getId());
subscriptionsToDelete.addAll(conditionalEventSubscriptions);
List<EventSubscriptionEntity> messageEventSubscriptions = eventSubscriptionManager
.findEventSubscriptionsByConfiguration(EventType.MESSAGE.name(), latestProcessDefinition.getId());
subscriptionsToDelete.addAll(messageEventSubscriptions);

for (EventSubscriptionEntity eventSubscriptionEntity : subscriptionsToDelete) {
eventSubscriptionEntity.delete();
}
List<EventSubscriptionEntity> signalEventSubscriptions = eventSubscriptionManager
.findEventSubscriptionsByConfiguration(EventType.SIGNAL.name(), latestProcessDefinition.getId());
subscriptionsToDelete.addAll(signalEventSubscriptions);

List<EventSubscriptionEntity> conditionalEventSubscriptions = eventSubscriptionManager
.findEventSubscriptionsByConfiguration(EventType.CONDITONAL.name(), latestProcessDefinition.getId());
subscriptionsToDelete.addAll(conditionalEventSubscriptions);
return subscriptionsToDelete;
}

protected List<EventSubscriptionEntity> getOrphanSubscriptionEvents(ProcessDefinitionEntity processDefinition) {
Map<String, EventSubscriptionDeclaration> eventSubscriptionDeclarations = processDefinition.getProperties().get(BpmnProperties.EVENT_SUBSCRIPTION_DECLARATIONS);

// if finding subscriptions by name and tenant ID impacts performance, we could think about another way of filtering
// (for example doing "configuration" LIKE "processDefinition.key%")
if (!eventSubscriptionDeclarations.isEmpty()) {
return eventSubscriptionDeclarations.values()
.stream()
.filter(EventSubscriptionDeclaration::hasEventName)
.flatMap(declaration -> getEventSubscriptionManager().findEventSubscriptionsByNameAndTenantId(
declaration.getEventType(), declaration.getUnresolvedEventName(), processDefinition.getTenantId())
.stream())
.filter(this::isOrphan)
.collect(Collectors.toList());
}

return Collections.emptyList();
}

protected boolean isOrphan(EventSubscriptionEntity entity) {
return entity.getConfiguration() != null && getProcessDefinitionManager().findLatestProcessDefinitionById(entity.getConfiguration()) == null;
}

public void addEventSubscriptions(ProcessDefinitionEntity processDefinition) {
Expand Down Expand Up @@ -327,7 +362,7 @@ protected boolean isSameMessageStartEventSubscriptionAlreadyPresent(EventSubscri
for (EventSubscriptionEntity cachedSubscription : cachedSubscriptions) {
if (eventSubscription.getUnresolvedEventName().equals(cachedSubscription.getEventName()) &&
hasTenantId(cachedSubscription, tenantId) &&
!subscriptionForSameMessageName.equals(cachedSubscription) &&
!cachedSubscription.equals(subscriptionForSameMessageName) &&
!isSubscriptionOfDifferentTypeAsDeclaration(cachedSubscription, eventSubscription)) {

subscriptionForSameMessageName = cachedSubscription;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.camunda.bpm.engine.test.concurrency;

import org.camunda.bpm.engine.impl.interceptor.CommandContext;
import org.camunda.bpm.engine.repository.ProcessDefinition;
import org.junit.After;
import org.junit.Test;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

public class DeleteProcessDefinitionTest extends ConcurrencyTestCase {

@After
public void tearDown() {
repositoryService.createDeploymentQuery().list().forEach(deployment -> repositoryService.deleteDeployment(deployment.getId(), true));
processEngineConfiguration.getDeploymentCache().purgeCache();
}

@Test
public void testDeploymentOfProcessDefinitionWithOrphanMessageEvent() {
// given
String resource = "org/camunda/bpm/engine/test/api/repository/processWithNewInvoiceMessage.bpmn20.xml";
List<ProcessDefinition> processDefinitions = deployProcessDefinitionTwice(resource);
assertThat(processDefinitions.size()).isEqualTo(2);
deleteProcessDefinitionsSimultaneously(processDefinitions.get(0).getId(), processDefinitions.get(1).getId());

assertThat(repositoryService.createProcessDefinitionQuery().list()).isEmpty();
assertThat(runtimeService.createEventSubscriptionQuery().count()).isEqualTo(1);

// when
repositoryService.createDeployment().addClasspathResource(resource).deploy();

// then
assertThat(repositoryService.createProcessDefinitionQuery().count()).isEqualTo(1);
assertThat(runtimeService.createEventSubscriptionQuery().count()).isEqualTo(1);
}

@Test
public void testDeploymentOfProcessDefinitionWithOrphanJob() {
// given
String resource = "org/camunda/bpm/engine/test/bpmn/event/timer/StartTimerEventTest.testTimeCycle.bpmn20.xml";
List<ProcessDefinition> processDefinitions = deployProcessDefinitionTwice(resource);
assertThat(processDefinitions.size()).isEqualTo(2);
deleteProcessDefinitionsSimultaneously(processDefinitions.get(0).getId(), processDefinitions.get(1).getId());

assertThat(repositoryService.createProcessDefinitionQuery().list()).isEmpty();
assertThat(managementService.createJobQuery().count()).isEqualTo(1);

// when
repositoryService.createDeployment().addClasspathResource(resource).deploy();

// then
assertThat(repositoryService.createProcessDefinitionQuery().count()).isEqualTo(1);
assertThat(managementService.createJobQuery().count()).isEqualTo(1);
}

@Test
public void testDeploymentOfProcessDefinitionWithOrphanSignalEvent() {
// given
String resource = "org/camunda/bpm/engine/test/api/repository/processWithStartSignalEvent.bpmn20.xml";
List<ProcessDefinition> processDefinitions = deployProcessDefinitionTwice(resource);
assertThat(processDefinitions.size()).isEqualTo(2);
deleteProcessDefinitionsSimultaneously(processDefinitions.get(0).getId(), processDefinitions.get(1).getId());

assertThat(repositoryService.createProcessDefinitionQuery().list()).isEmpty();
assertThat(runtimeService.createEventSubscriptionQuery().count()).isEqualTo(1);

// when
repositoryService.createDeployment().addClasspathResource(resource).deploy();

// then
assertThat(repositoryService.createProcessDefinitionQuery().count()).isEqualTo(1);
assertThat(runtimeService.createEventSubscriptionQuery().count()).isEqualTo(1);
}

@Test
public void testDeploymentOfProcessDefinitionWithOrphanEventAndPreviousVersion() {
// given
String resource = "org/camunda/bpm/engine/test/api/repository/processWithNewInvoiceMessage.bpmn20.xml";
repositoryService.createDeployment().addClasspathResource(resource).deploy();
repositoryService.createDeployment().addClasspathResource(resource).deploy();
repositoryService.createDeployment().addClasspathResource(resource).deploy();
List<ProcessDefinition> processDefinitions = repositoryService.createProcessDefinitionQuery().processDefinitionKey("otherMessageProcess").list();
assertThat(processDefinitions.size()).isEqualTo(3);

deleteProcessDefinitionsSimultaneously(processDefinitions.get(1).getId(), processDefinitions.get(2).getId());

assertThat(repositoryService.createProcessDefinitionQuery().count()).isEqualTo(1);
assertThat(runtimeService.createEventSubscriptionQuery().count()).isEqualTo(1);

// when
repositoryService.createDeployment().addClasspathResource(resource).deploy();

// then
assertThat(repositoryService.createProcessDefinitionQuery().count()).isEqualTo(2);
assertThat(runtimeService.createEventSubscriptionQuery().count()).isEqualTo(1);
}

protected static class ControllableDeleteProcessDefinitionCommand extends ControllableCommand<Void> {

protected String processDefinitionId;
protected Exception exception;

public ControllableDeleteProcessDefinitionCommand(String processDefinitionId) {
this.processDefinitionId = processDefinitionId;
}

@Override
public Void execute(CommandContext commandContext) {
monitor.sync(); // thread will block here until makeContinue() is called from main thread
commandContext.getProcessEngineConfiguration().getRepositoryService().deleteProcessDefinition(processDefinitionId);
monitor.sync(); // thread will block here until waitUntilDone() is called form main thread
return null;
}
}

protected void deleteProcessDefinitionsSimultaneously(String id1, String id2) {
ThreadControl thread1 = executeControllableCommand(new ControllableDeleteProcessDefinitionCommand(id1));
thread1.reportInterrupts();
thread1.waitForSync();

ThreadControl thread2 = executeControllableCommand(new ControllableDeleteProcessDefinitionCommand(id2));
thread2.reportInterrupts();
thread2.waitForSync();

// delete process definition version 1 without committing
thread1.makeContinue();
thread1.waitForSync();

// delete process definition version 2 without committing
thread2.makeContinue();
thread2.waitForSync();

// commit deletion on version 1
thread1.makeContinue();
thread1.waitUntilDone();

// commit deletion on version 2
thread2.makeContinue();
thread2.waitUntilDone();
}

protected List<ProcessDefinition> deployProcessDefinitionTwice(String resource) {
repositoryService.createDeployment().addClasspathResource(resource).deploy();
repositoryService.createDeployment().addClasspathResource(resource).deploy();
return repositoryService.createProcessDefinitionQuery().list();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions id="definitions"
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:camunda="http://camunda.org/schema/1.0/bpmn"
targetNamespace="Examples"
xmlns:tns="Examples">

<signal id="signal" name="warning" />

<process id="otherMessageProcess" isExecutable="true">

<startEvent id="theStart">
<signalEventDefinition signalRef="signal"/>
</startEvent>

<sequenceFlow id="flow1" sourceRef="theStart" targetRef="task" />

<userTask id="task" name="Task" />
<sequenceFlow id="flow2" sourceRef="task" targetRef="theEnd" />

<endEvent id="theEnd" />

</process>

</definitions>

0 comments on commit 46c6317

Please sign in to comment.