From 46c6317ad05e29ab2e48b7739f522a9f9d9501ca Mon Sep 17 00:00:00 2001 From: Joaquin Felici Date: Tue, 21 May 2024 18:33:42 +0200 Subject: [PATCH] fix(engine): remove orphan event subscriptions on process definition redeployment Related to https://github.com/camunda/camunda-bpm-platform/issues/2460 --- .../impl/bpmn/deployer/BpmnDeployer.java | 79 ++++++--- .../DeleteProcessDefinitionTest.java | 165 ++++++++++++++++++ .../processWithStartSignalEvent.bpmn20.xml | 25 +++ 3 files changed, 247 insertions(+), 22 deletions(-) create mode 100644 engine/src/test/java/org/camunda/bpm/engine/test/concurrency/DeleteProcessDefinitionTest.java create mode 100644 engine/src/test/resources/org/camunda/bpm/engine/test/api/repository/processWithStartSignalEvent.bpmn20.xml diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/bpmn/deployer/BpmnDeployer.java b/engine/src/main/java/org/camunda/bpm/engine/impl/bpmn/deployer/BpmnDeployer.java index 136bd5f6338..d23b82e561d 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/impl/bpmn/deployer/BpmnDeployer.java +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/bpmn/deployer/BpmnDeployer.java @@ -18,10 +18,12 @@ import java.io.ByteArrayInputStream; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.camunda.bpm.engine.delegate.Expression; import org.camunda.bpm.engine.impl.AbstractDefinitionDeployer; import org.camunda.bpm.engine.impl.ProcessEngineLogger; @@ -228,11 +230,11 @@ protected void createJobDefinition(ProcessDefinition processDefinition, JobDecla * subscriptions and add new ones for the new deployed process definitions. */ protected void adjustStartEventSubscriptions(ProcessDefinitionEntity newLatestProcessDefinition, ProcessDefinitionEntity oldLatestProcessDefinition) { - removeObsoleteTimers(newLatestProcessDefinition); - addTimerDeclarations(newLatestProcessDefinition); + removeObsoleteTimers(newLatestProcessDefinition); + removeObsoleteEventSubscriptions(newLatestProcessDefinition, oldLatestProcessDefinition); - removeObsoleteEventSubscriptions(newLatestProcessDefinition, oldLatestProcessDefinition); - addEventSubscriptions(newLatestProcessDefinition); + addTimerDeclarations(newLatestProcessDefinition); + addEventSubscriptions(newLatestProcessDefinition); } @SuppressWarnings("unchecked") @@ -255,29 +257,62 @@ protected void removeObsoleteTimers(ProcessDefinitionEntity processDefinition) { } } - protected void removeObsoleteEventSubscriptions(ProcessDefinitionEntity processDefinition, ProcessDefinitionEntity latestProcessDefinition) { - // remove all subscriptions for the previous version - if (latestProcessDefinition != null) { - EventSubscriptionManager eventSubscriptionManager = getEventSubscriptionManager(); + protected void removeObsoleteEventSubscriptions(ProcessDefinitionEntity newLatestProcessDefinition, ProcessDefinitionEntity latestProcessDefinition) { + List orphanSubscriptions = getOrphanSubscriptionEvents(newLatestProcessDefinition); + if(!orphanSubscriptions.isEmpty()) { // remove orphan subscriptions if any + for (EventSubscriptionEntity eventSubscriptionEntity : orphanSubscriptions) { + getEventSubscriptionManager().deleteEventSubscription(eventSubscriptionEntity); + } + } - List subscriptionsToDelete = new ArrayList<>(); + if (latestProcessDefinition != null) { // remove all subscriptions for the previous version + List previousSubscriptions = getPreviousSubscriptionEvents(latestProcessDefinition); + for (EventSubscriptionEntity eventSubscriptionEntity : previousSubscriptions) { + eventSubscriptionEntity.delete(); + } + } + } - List messageEventSubscriptions = eventSubscriptionManager - .findEventSubscriptionsByConfiguration(EventType.MESSAGE.name(), latestProcessDefinition.getId()); - subscriptionsToDelete.addAll(messageEventSubscriptions); + protected List getPreviousSubscriptionEvents(ProcessDefinitionEntity latestProcessDefinition) { + EventSubscriptionManager eventSubscriptionManager = getEventSubscriptionManager(); - List signalEventSubscriptions = eventSubscriptionManager - .findEventSubscriptionsByConfiguration(EventType.SIGNAL.name(), latestProcessDefinition.getId()); - subscriptionsToDelete.addAll(signalEventSubscriptions); + List subscriptionsToDelete = new ArrayList<>(); - List conditionalEventSubscriptions = eventSubscriptionManager - .findEventSubscriptionsByConfiguration(EventType.CONDITONAL.name(), latestProcessDefinition.getId()); - subscriptionsToDelete.addAll(conditionalEventSubscriptions); + List messageEventSubscriptions = eventSubscriptionManager + .findEventSubscriptionsByConfiguration(EventType.MESSAGE.name(), latestProcessDefinition.getId()); + subscriptionsToDelete.addAll(messageEventSubscriptions); - for (EventSubscriptionEntity eventSubscriptionEntity : subscriptionsToDelete) { - eventSubscriptionEntity.delete(); - } + List signalEventSubscriptions = eventSubscriptionManager + .findEventSubscriptionsByConfiguration(EventType.SIGNAL.name(), latestProcessDefinition.getId()); + subscriptionsToDelete.addAll(signalEventSubscriptions); + + List conditionalEventSubscriptions = eventSubscriptionManager + .findEventSubscriptionsByConfiguration(EventType.CONDITONAL.name(), latestProcessDefinition.getId()); + subscriptionsToDelete.addAll(conditionalEventSubscriptions); + return subscriptionsToDelete; + } + + protected List getOrphanSubscriptionEvents(ProcessDefinitionEntity processDefinition) { + Map eventSubscriptionDeclarations = processDefinition.getProperties().get(BpmnProperties.EVENT_SUBSCRIPTION_DECLARATIONS); + + // if finding subscriptions by name and tenant ID impacts performance, we could think about another way of filtering + // (for example doing "configuration" LIKE "processDefinition.key%") + if (!eventSubscriptionDeclarations.isEmpty()) { + return eventSubscriptionDeclarations.values() + .stream() + .filter(EventSubscriptionDeclaration::hasEventName) + .flatMap(declaration -> getEventSubscriptionManager().findEventSubscriptionsByNameAndTenantId( + declaration.getEventType(), declaration.getUnresolvedEventName(), processDefinition.getTenantId()) + .stream()) + .filter(this::isOrphan) + .collect(Collectors.toList()); } + + return Collections.emptyList(); + } + + protected boolean isOrphan(EventSubscriptionEntity entity) { + return entity.getConfiguration() != null && getProcessDefinitionManager().findLatestProcessDefinitionById(entity.getConfiguration()) == null; } public void addEventSubscriptions(ProcessDefinitionEntity processDefinition) { @@ -327,7 +362,7 @@ protected boolean isSameMessageStartEventSubscriptionAlreadyPresent(EventSubscri for (EventSubscriptionEntity cachedSubscription : cachedSubscriptions) { if (eventSubscription.getUnresolvedEventName().equals(cachedSubscription.getEventName()) && hasTenantId(cachedSubscription, tenantId) && - !subscriptionForSameMessageName.equals(cachedSubscription) && + !cachedSubscription.equals(subscriptionForSameMessageName) && !isSubscriptionOfDifferentTypeAsDeclaration(cachedSubscription, eventSubscription)) { subscriptionForSameMessageName = cachedSubscription; diff --git a/engine/src/test/java/org/camunda/bpm/engine/test/concurrency/DeleteProcessDefinitionTest.java b/engine/src/test/java/org/camunda/bpm/engine/test/concurrency/DeleteProcessDefinitionTest.java new file mode 100644 index 00000000000..ba7cc0edfa0 --- /dev/null +++ b/engine/src/test/java/org/camunda/bpm/engine/test/concurrency/DeleteProcessDefinitionTest.java @@ -0,0 +1,165 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. Camunda licenses this file to you under the Apache License, + * Version 2.0; you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.camunda.bpm.engine.test.concurrency; + +import org.camunda.bpm.engine.impl.interceptor.CommandContext; +import org.camunda.bpm.engine.repository.ProcessDefinition; +import org.junit.After; +import org.junit.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class DeleteProcessDefinitionTest extends ConcurrencyTestCase { + + @After + public void tearDown() { + repositoryService.createDeploymentQuery().list().forEach(deployment -> repositoryService.deleteDeployment(deployment.getId(), true)); + processEngineConfiguration.getDeploymentCache().purgeCache(); + } + + @Test + public void testDeploymentOfProcessDefinitionWithOrphanMessageEvent() { + // given + String resource = "org/camunda/bpm/engine/test/api/repository/processWithNewInvoiceMessage.bpmn20.xml"; + List processDefinitions = deployProcessDefinitionTwice(resource); + assertThat(processDefinitions.size()).isEqualTo(2); + deleteProcessDefinitionsSimultaneously(processDefinitions.get(0).getId(), processDefinitions.get(1).getId()); + + assertThat(repositoryService.createProcessDefinitionQuery().list()).isEmpty(); + assertThat(runtimeService.createEventSubscriptionQuery().count()).isEqualTo(1); + + // when + repositoryService.createDeployment().addClasspathResource(resource).deploy(); + + // then + assertThat(repositoryService.createProcessDefinitionQuery().count()).isEqualTo(1); + assertThat(runtimeService.createEventSubscriptionQuery().count()).isEqualTo(1); + } + + @Test + public void testDeploymentOfProcessDefinitionWithOrphanJob() { + // given + String resource = "org/camunda/bpm/engine/test/bpmn/event/timer/StartTimerEventTest.testTimeCycle.bpmn20.xml"; + List processDefinitions = deployProcessDefinitionTwice(resource); + assertThat(processDefinitions.size()).isEqualTo(2); + deleteProcessDefinitionsSimultaneously(processDefinitions.get(0).getId(), processDefinitions.get(1).getId()); + + assertThat(repositoryService.createProcessDefinitionQuery().list()).isEmpty(); + assertThat(managementService.createJobQuery().count()).isEqualTo(1); + + // when + repositoryService.createDeployment().addClasspathResource(resource).deploy(); + + // then + assertThat(repositoryService.createProcessDefinitionQuery().count()).isEqualTo(1); + assertThat(managementService.createJobQuery().count()).isEqualTo(1); + } + + @Test + public void testDeploymentOfProcessDefinitionWithOrphanSignalEvent() { + // given + String resource = "org/camunda/bpm/engine/test/api/repository/processWithStartSignalEvent.bpmn20.xml"; + List processDefinitions = deployProcessDefinitionTwice(resource); + assertThat(processDefinitions.size()).isEqualTo(2); + deleteProcessDefinitionsSimultaneously(processDefinitions.get(0).getId(), processDefinitions.get(1).getId()); + + assertThat(repositoryService.createProcessDefinitionQuery().list()).isEmpty(); + assertThat(runtimeService.createEventSubscriptionQuery().count()).isEqualTo(1); + + // when + repositoryService.createDeployment().addClasspathResource(resource).deploy(); + + // then + assertThat(repositoryService.createProcessDefinitionQuery().count()).isEqualTo(1); + assertThat(runtimeService.createEventSubscriptionQuery().count()).isEqualTo(1); + } + + @Test + public void testDeploymentOfProcessDefinitionWithOrphanEventAndPreviousVersion() { + // given + String resource = "org/camunda/bpm/engine/test/api/repository/processWithNewInvoiceMessage.bpmn20.xml"; + repositoryService.createDeployment().addClasspathResource(resource).deploy(); + repositoryService.createDeployment().addClasspathResource(resource).deploy(); + repositoryService.createDeployment().addClasspathResource(resource).deploy(); + List processDefinitions = repositoryService.createProcessDefinitionQuery().processDefinitionKey("otherMessageProcess").list(); + assertThat(processDefinitions.size()).isEqualTo(3); + + deleteProcessDefinitionsSimultaneously(processDefinitions.get(1).getId(), processDefinitions.get(2).getId()); + + assertThat(repositoryService.createProcessDefinitionQuery().count()).isEqualTo(1); + assertThat(runtimeService.createEventSubscriptionQuery().count()).isEqualTo(1); + + // when + repositoryService.createDeployment().addClasspathResource(resource).deploy(); + + // then + assertThat(repositoryService.createProcessDefinitionQuery().count()).isEqualTo(2); + assertThat(runtimeService.createEventSubscriptionQuery().count()).isEqualTo(1); + } + + protected static class ControllableDeleteProcessDefinitionCommand extends ControllableCommand { + + protected String processDefinitionId; + protected Exception exception; + + public ControllableDeleteProcessDefinitionCommand(String processDefinitionId) { + this.processDefinitionId = processDefinitionId; + } + + @Override + public Void execute(CommandContext commandContext) { + monitor.sync(); // thread will block here until makeContinue() is called from main thread + commandContext.getProcessEngineConfiguration().getRepositoryService().deleteProcessDefinition(processDefinitionId); + monitor.sync(); // thread will block here until waitUntilDone() is called form main thread + return null; + } + } + + protected void deleteProcessDefinitionsSimultaneously(String id1, String id2) { + ThreadControl thread1 = executeControllableCommand(new ControllableDeleteProcessDefinitionCommand(id1)); + thread1.reportInterrupts(); + thread1.waitForSync(); + + ThreadControl thread2 = executeControllableCommand(new ControllableDeleteProcessDefinitionCommand(id2)); + thread2.reportInterrupts(); + thread2.waitForSync(); + + // delete process definition version 1 without committing + thread1.makeContinue(); + thread1.waitForSync(); + + // delete process definition version 2 without committing + thread2.makeContinue(); + thread2.waitForSync(); + + // commit deletion on version 1 + thread1.makeContinue(); + thread1.waitUntilDone(); + + // commit deletion on version 2 + thread2.makeContinue(); + thread2.waitUntilDone(); + } + + protected List deployProcessDefinitionTwice(String resource) { + repositoryService.createDeployment().addClasspathResource(resource).deploy(); + repositoryService.createDeployment().addClasspathResource(resource).deploy(); + return repositoryService.createProcessDefinitionQuery().list(); + } +} diff --git a/engine/src/test/resources/org/camunda/bpm/engine/test/api/repository/processWithStartSignalEvent.bpmn20.xml b/engine/src/test/resources/org/camunda/bpm/engine/test/api/repository/processWithStartSignalEvent.bpmn20.xml new file mode 100644 index 00000000000..f07740271a0 --- /dev/null +++ b/engine/src/test/resources/org/camunda/bpm/engine/test/api/repository/processWithStartSignalEvent.bpmn20.xml @@ -0,0 +1,25 @@ + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file