diff --git a/src/main/java/com/aws/greengrass/deployment/DeploymentConfigMerger.java b/src/main/java/com/aws/greengrass/deployment/DeploymentConfigMerger.java index 26723f719e..63989fc846 100644 --- a/src/main/java/com/aws/greengrass/deployment/DeploymentConfigMerger.java +++ b/src/main/java/com/aws/greengrass/deployment/DeploymentConfigMerger.java @@ -126,7 +126,7 @@ private void updateActionForDeployment(Map newConfig, Deployment // if the update is cancelled, don't perform merge if (totallyCompleteFuture.isCancelled()) { logger.atInfo(MERGE_CONFIG_EVENT_KEY).kv("deployment", deploymentId) - .log("Future was cancelled so no need to go through with the update"); + .log("Deployment was cancelled, so no need to perform config merge update"); return; } Map serviceConfig; diff --git a/src/main/java/com/aws/greengrass/deployment/DeploymentService.java b/src/main/java/com/aws/greengrass/deployment/DeploymentService.java index d668840a5f..91e37e67ad 100644 --- a/src/main/java/com/aws/greengrass/deployment/DeploymentService.java +++ b/src/main/java/com/aws/greengrass/deployment/DeploymentService.java @@ -50,6 +50,7 @@ import lombok.Setter; import org.apache.commons.io.FileUtils; import software.amazon.awssdk.iot.iotjobs.model.JobStatus; +import software.amazon.awssdk.services.greengrassv2.model.DeploymentComponentUpdatePolicyAction; import java.io.IOException; import java.nio.file.Files; @@ -504,10 +505,12 @@ private void cancelCurrentDeployment() { .isCancellable()) { logger.atInfo().log("Deployment already finished processing or cannot be cancelled"); } else { - boolean canCancelDeployment = context.get(UpdateSystemPolicyService.class).discardPendingUpdateAction( - ((DefaultDeploymentTask) currentDeploymentTaskMetadata.getDeploymentTask()).getDeployment() - .getGreengrassDeploymentId()); - if (canCancelDeployment) { + DeploymentComponentUpdatePolicyAction currentDeploymentUpdatePolicyAction = + currentDeploymentTaskMetadata.getDeploymentDocument().getComponentUpdatePolicy() + .getComponentUpdatePolicyAction(); + if (DeploymentComponentUpdatePolicyAction.NOTIFY_COMPONENTS.equals(currentDeploymentUpdatePolicyAction) + && context.get(UpdateSystemPolicyService.class) + .discardPendingUpdateAction(currentDeploymentTaskMetadata.getGreengrassDeploymentId())) { currentDeploymentTaskMetadata.getDeploymentResultFuture().cancel(true); DeploymentType deploymentType = currentDeploymentTaskMetadata.getDeploymentType(); if (DeploymentType.SHADOW.equals(deploymentType) || DeploymentType.LOCAL.equals(deploymentType)) { @@ -522,13 +525,11 @@ private void cancelCurrentDeployment() { .kv(GG_DEPLOYMENT_ID_LOG_KEY_NAME, currentDeploymentTaskMetadata.getGreengrassDeploymentId()) .log("Deployment was cancelled"); - } else { - logger.atInfo().kv(DEPLOYMENT_ID_LOG_KEY_NAME, currentDeploymentTaskMetadata.getDeploymentId()) - .kv(GG_DEPLOYMENT_ID_LOG_KEY_NAME, - currentDeploymentTaskMetadata.getGreengrassDeploymentId()) - .log("Deployment is in a stage where it cannot be cancelled," - + " need to wait for it to finish"); + return; } + logger.atInfo().kv(DEPLOYMENT_ID_LOG_KEY_NAME, currentDeploymentTaskMetadata.getDeploymentId()) + .kv(GG_DEPLOYMENT_ID_LOG_KEY_NAME, currentDeploymentTaskMetadata.getGreengrassDeploymentId()) + .log("Deployment is in a stage where it cannot be cancelled, need to wait for it to finish"); } } } diff --git a/src/main/java/com/aws/greengrass/lifecyclemanager/UpdateSystemPolicyService.java b/src/main/java/com/aws/greengrass/lifecyclemanager/UpdateSystemPolicyService.java index 1c2234fee6..818a353139 100644 --- a/src/main/java/com/aws/greengrass/lifecyclemanager/UpdateSystemPolicyService.java +++ b/src/main/java/com/aws/greengrass/lifecyclemanager/UpdateSystemPolicyService.java @@ -28,7 +28,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import javax.inject.Inject; import javax.inject.Singleton; @@ -52,7 +51,6 @@ public class UpdateSystemPolicyService extends GreengrassService { // represents the value in seconds the kernel will wait for components to respond to // an precomponent update event private final Map pendingActions = Collections.synchronizedMap(new LinkedHashMap<>()); - private final AtomicReference actionInProgress = new AtomicReference<>(); @Inject private LifecycleIPCEventStreamAgent lifecycleIPCAgent; @@ -96,20 +94,21 @@ public Set getPendingActions() { @SuppressWarnings("PMD.AvoidCatchingThrowable") protected void runUpdateActions(String deploymentId) { try (LockScope ls = LockScope.lock(lock)) { - for (Map.Entry todo : pendingActions.entrySet()) { - try { - actionInProgress.set(todo.getKey()); - todo.getValue().getAction().run(); - logger.atDebug().setEventType("service-update-action").addKeyValue("action", todo.getKey()).log(); - } catch (Throwable t) { - logger.atError().setEventType("service-update-action-error").addKeyValue("action", todo.getKey()) - .setCause(t).log(); - } + final UpdateAction pendingUpdateAction = pendingActions.remove(deploymentId); + if (pendingUpdateAction == null) { + // Update action is never added or discarded by this time. So, do nothing. + return; + } + try { + pendingUpdateAction.getAction().run(); + logger.atDebug().setEventType("service-update-action").addKeyValue("action", deploymentId).log(); + } catch (Throwable t) { + logger.atError().setEventType("service-update-action-error").addKeyValue("action", deploymentId) + .setCause(t).log(); } - pendingActions.clear(); lifecycleIPCAgent.sendPostComponentUpdateEvent( new PostComponentUpdateEvent().withDeploymentId(deploymentId)); - actionInProgress.set(null); + } } @@ -121,16 +120,13 @@ protected void runUpdateActions(String deploymentId) { * false if update actions were already in progress */ public boolean discardPendingUpdateAction(String tag) { - if (tag.equals(actionInProgress.get())) { + final UpdateAction pendingUpdateAction = pendingActions.remove(tag); + if (pendingUpdateAction == null) { + // Update action is never added or already in progress. return false; } - final UpdateAction pendingUpdateAction = pendingActions.get(tag); - if (pendingUpdateAction != null) { - // Signal components that they can resume their work since the update is not going to happen - lifecycleIPCAgent.sendPostComponentUpdateEvent( - new PostComponentUpdateEvent().withDeploymentId(pendingUpdateAction.getDeploymentId())); - pendingActions.remove(tag); - } + // Signal components that they can resume their work since the update is not going to happen + lifecycleIPCAgent.sendPostComponentUpdateEvent(new PostComponentUpdateEvent().withDeploymentId(tag)); return true; } @@ -149,39 +145,41 @@ protected void startup() throws InterruptedException { } logger.atDebug().setEventType("service-update-pending").addKeyValue("numOfUpdates", pendingActions.size()) .log(); - - boolean ggcRestarting = false; - for (UpdateAction action : pendingActions.values()) { - if (action.isGgcRestart()) { - ggcRestarting = true; - break; - } - } - - PreComponentUpdateEvent preComponentUpdateEvent = new PreComponentUpdateEvent(); - preComponentUpdateEvent.setIsGgcRestarting(ggcRestarting); - String deploymentId = pendingActions.values().stream().map(UpdateAction::getDeploymentId).findFirst().get(); - preComponentUpdateEvent.setDeploymentId(deploymentId); - List> deferRequestFutures = - lifecycleIPCAgent.sendPreComponentUpdateEvent(preComponentUpdateEvent); - - long timeToReCheck = getTimeToReCheck(getMaxTimeoutInMillis(), deploymentId, deferRequestFutures); - if (timeToReCheck > 0) { - logger.atDebug().setEventType("service-update-pending").addKeyValue("waitInMS", timeToReCheck).log(); - Thread.sleep(timeToReCheck); - } else { - lifecycleIPCAgent.discardDeferComponentUpdateFutures(); - logger.atDebug().setEventType("service-update-scheduled").log(); + pendingActions.entrySet().stream().findFirst().ifPresent((actionEntry) -> { try { - context.get(ExecutorService.class).submit(() -> { - logger.atInfo().setEventType("service-update-start").log(); - runUpdateActions(deploymentId); - logger.atInfo().setEventType("service-update-finish").log(); - }).get(); - } catch (ExecutionException e) { - logger.atError().setEventType("service-update-error") - .log("Run update actions errored", e); + sendComponentUpdateEvents(actionEntry.getValue()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); } + }); + } + } + + private void sendComponentUpdateEvents(UpdateAction action) throws InterruptedException { + PreComponentUpdateEvent preComponentUpdateEvent = new PreComponentUpdateEvent(); + boolean ggcRestarting = action.isGgcRestart(); + preComponentUpdateEvent.setIsGgcRestarting(ggcRestarting); + + preComponentUpdateEvent.setDeploymentId(action.getDeploymentId()); + List> deferRequestFutures = + lifecycleIPCAgent.sendPreComponentUpdateEvent(preComponentUpdateEvent); + + long timeToReCheck = getTimeToReCheck(getMaxTimeoutInMillis(), action.getDeploymentId(), deferRequestFutures); + if (timeToReCheck > 0) { + logger.atDebug().setEventType("service-update-pending").addKeyValue("waitInMS", timeToReCheck).log(); + Thread.sleep(timeToReCheck); + } else { + lifecycleIPCAgent.discardDeferComponentUpdateFutures(); + logger.atDebug().setEventType("service-update-scheduled").log(); + try { + context.get(ExecutorService.class).submit(() -> { + logger.atInfo().setEventType("service-update-start").log(); + runUpdateActions(action.getDeploymentId()); + logger.atInfo().setEventType("service-update-finish").log(); + }).get(); + } catch (ExecutionException e) { + logger.atError().setEventType("service-update-error").log("Run update actions errored", e); } } } diff --git a/src/test/java/com/aws/greengrass/deployment/DeploymentServiceTest.java b/src/test/java/com/aws/greengrass/deployment/DeploymentServiceTest.java index c6186a82a2..a1cd2c4c5e 100644 --- a/src/test/java/com/aws/greengrass/deployment/DeploymentServiceTest.java +++ b/src/test/java/com/aws/greengrass/deployment/DeploymentServiceTest.java @@ -616,6 +616,34 @@ void GIVEN_deployment_job_cancelled_WHEN_waiting_for_safe_time_THEN_then_cancel_ String deploymentDocument = getTestDeploymentDocument(); + deploymentQueue.offer(new Deployment(deploymentDocument, + Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1)); + + when(mockExecutorService.submit(any(DefaultDeploymentTask.class))).thenReturn(mockFuture); + startDeploymentServiceInAnotherThread(); + + // Simulate a cancellation deployment + Thread.sleep(TEST_DEPLOYMENT_POLLING_FREQUENCY.toMillis()); // wait for previous deployment to be polled + deploymentQueue.offer(new Deployment(Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1, true)); + + // Expecting three invocations, once for each retry attempt + verify(mockExecutorService, WAIT_FOUR_SECONDS).submit(any(DefaultDeploymentTask.class)); + verify(deploymentStatusKeeper, WAIT_FOUR_SECONDS).persistAndPublishDeploymentStatus(eq(TEST_JOB_ID_1), + eq(TEST_UUID), eq(TEST_CONFIGURATION_ARN), eq(Deployment.DeploymentType.IOT_JOBS), + eq(JobStatus.IN_PROGRESS.toString()), any(), eq(EXPECTED_ROOT_PACKAGE_LIST)); + verify(updateSystemPolicyService, times(0)).discardPendingUpdateAction(TEST_DEPLOYMENT_ID); + verify(mockFuture, WAIT_FOUR_SECONDS).cancel(true); + } + + @Test + void GIVEN_deployment_job_with_notify_cancelled_WHEN_waiting_for_safe_time_THEN_then_cancel_deployment() + throws Exception { + Topics groupToLastDeploymentTopics = Topics.of(context, GROUP_TO_LAST_DEPLOYMENT_TOPICS, null); + when(config.lookupTopics(eq(GROUP_TO_LAST_DEPLOYMENT_TOPICS), anyString())).thenReturn( + groupToLastDeploymentTopics); + + String deploymentDocument = getTestDeploymentDocumentNotify(); + deploymentQueue.offer(new Deployment(deploymentDocument, Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1)); @@ -645,6 +673,36 @@ void GIVEN_deployment_job_cancelled_WHEN_already_executing_update_THEN_then_fini String deploymentDocument = getTestDeploymentDocument(); + deploymentQueue.offer(new Deployment(deploymentDocument, + Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1)); + + when(mockExecutorService.submit(any(DefaultDeploymentTask.class))).thenReturn(mockFuture); + startDeploymentServiceInAnotherThread(); + + // Simulate a cancellation deployment + Thread.sleep(TEST_DEPLOYMENT_POLLING_FREQUENCY.toMillis()); // wait for previous deployment to be polled + deploymentQueue.offer(new Deployment(Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1, true)); + + // Expecting three invocations, once for each retry attempt + verify(mockExecutorService, WAIT_FOUR_SECONDS).submit(any(DefaultDeploymentTask.class)); + // ComponentUpdatePolicy is set to SKIP_NOTIFY_COMPONENTS + verify(updateSystemPolicyService, times(0)).discardPendingUpdateAction(TEST_DEPLOYMENT_ID); + verify(mockFuture, times(0)).cancel(true); + verify(deploymentStatusKeeper, WAIT_FOUR_SECONDS).persistAndPublishDeploymentStatus(eq(TEST_JOB_ID_1), + eq(TEST_UUID), eq(TEST_CONFIGURATION_ARN), eq(Deployment.DeploymentType.IOT_JOBS), + eq(JobStatus.IN_PROGRESS.toString()), any(), eq(EXPECTED_ROOT_PACKAGE_LIST)); + } + + + @Test + void GIVEN_deployment_job_with_notify_cancelled_WHEN_already_executing_update_THEN_then_finish_deployment() + throws Exception { + Topics groupToLastDeploymentTopics = Topics.of(context, GROUP_TO_LAST_DEPLOYMENT_TOPICS, null); + when(config.lookupTopics(eq(GROUP_TO_LAST_DEPLOYMENT_TOPICS), anyString())).thenReturn( + groupToLastDeploymentTopics); + + String deploymentDocument = getTestDeploymentDocumentNotify(); + deploymentQueue.offer(new Deployment(deploymentDocument, Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1)); @@ -713,6 +771,13 @@ String getTestDeploymentDocument() { .collect(Collectors.joining("\n")).replace(CONFIG_ARN_PLACEHOLDER, TEST_CONFIGURATION_ARN); } + String getTestDeploymentDocumentNotify() { + return new BufferedReader(new InputStreamReader( + getClass().getResourceAsStream("TestDeploymentDocNotify.json"), StandardCharsets.UTF_8)) + .lines() + .collect(Collectors.joining("\n")).replace(CONFIG_ARN_PLACEHOLDER, TEST_CONFIGURATION_ARN); + } + private void assertListEquals(List first, List second) { assertEquals(first.size(), second.size()); for (int i = 0; i < first.size(); i++) { diff --git a/src/test/resources/com/aws/greengrass/deployment/TestDeploymentDocNotify.json b/src/test/resources/com/aws/greengrass/deployment/TestDeploymentDocNotify.json new file mode 100644 index 0000000000..618227b7c8 --- /dev/null +++ b/src/test/resources/com/aws/greengrass/deployment/TestDeploymentDocNotify.json @@ -0,0 +1,17 @@ +{ + "deploymentId": "testDeploymentId", + "configurationArn": "arn:aws:greengrass:us-east-1:12345678910:configuration:thinggroup/group1:1", + "components": { + "component1": { + "rootComponent": true, + "version": "1.0.0" + } + }, + "componentUpdatePolicy": { + "timeout": 60, + "action": "NOTIFY_COMPONENTS" + }, + "configurationValidationPolicy": { + "timeout": 20 + } +} \ No newline at end of file