Skip to content

Commit

Permalink
fix: discard update action on notify only
Browse files Browse the repository at this point in the history
  • Loading branch information
saranyailla committed Jul 11, 2024
1 parent 9b644d2 commit 7eb23e6
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void updateActionForDeployment(Map<String, Object> newConfig, Deployment
// if the update is cancelled, don't perform merge
if (totallyCompleteFuture.isCancelled()) {
logger.atInfo(MERGE_CONFIG_EVENT_KEY).kv("deployment", deploymentId)
.log("Future was cancelled so no need to go through with the update");
.log("Deployment was cancelled, so no need to perform config merge update");
return;
}
Map<String, Object> serviceConfig;
Expand Down
21 changes: 11 additions & 10 deletions src/main/java/com/aws/greengrass/deployment/DeploymentService.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import lombok.Setter;
import org.apache.commons.io.FileUtils;
import software.amazon.awssdk.iot.iotjobs.model.JobStatus;
import software.amazon.awssdk.services.greengrassv2.model.DeploymentComponentUpdatePolicyAction;

import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -504,10 +505,12 @@ private void cancelCurrentDeployment() {
.isCancellable()) {
logger.atInfo().log("Deployment already finished processing or cannot be cancelled");
} else {
boolean canCancelDeployment = context.get(UpdateSystemPolicyService.class).discardPendingUpdateAction(
((DefaultDeploymentTask) currentDeploymentTaskMetadata.getDeploymentTask()).getDeployment()
.getGreengrassDeploymentId());
if (canCancelDeployment) {
DeploymentComponentUpdatePolicyAction currentDeploymentUpdatePolicyAction =
currentDeploymentTaskMetadata.getDeploymentDocument().getComponentUpdatePolicy()
.getComponentUpdatePolicyAction();
if (DeploymentComponentUpdatePolicyAction.NOTIFY_COMPONENTS.equals(currentDeploymentUpdatePolicyAction)
&& context.get(UpdateSystemPolicyService.class)
.discardPendingUpdateAction(currentDeploymentTaskMetadata.getGreengrassDeploymentId())) {
currentDeploymentTaskMetadata.getDeploymentResultFuture().cancel(true);
DeploymentType deploymentType = currentDeploymentTaskMetadata.getDeploymentType();
if (DeploymentType.SHADOW.equals(deploymentType) || DeploymentType.LOCAL.equals(deploymentType)) {
Expand All @@ -522,13 +525,11 @@ private void cancelCurrentDeployment() {
.kv(GG_DEPLOYMENT_ID_LOG_KEY_NAME,
currentDeploymentTaskMetadata.getGreengrassDeploymentId())
.log("Deployment was cancelled");
} else {
logger.atInfo().kv(DEPLOYMENT_ID_LOG_KEY_NAME, currentDeploymentTaskMetadata.getDeploymentId())
.kv(GG_DEPLOYMENT_ID_LOG_KEY_NAME,
currentDeploymentTaskMetadata.getGreengrassDeploymentId())
.log("Deployment is in a stage where it cannot be cancelled,"
+ " need to wait for it to finish");
return;
}
logger.atInfo().kv(DEPLOYMENT_ID_LOG_KEY_NAME, currentDeploymentTaskMetadata.getDeploymentId())
.kv(GG_DEPLOYMENT_ID_LOG_KEY_NAME, currentDeploymentTaskMetadata.getGreengrassDeploymentId())
.log("Deployment is in a stage where it cannot be cancelled, need to wait for it to finish");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import javax.inject.Inject;
import javax.inject.Singleton;
Expand All @@ -52,7 +51,6 @@ public class UpdateSystemPolicyService extends GreengrassService {
// represents the value in seconds the kernel will wait for components to respond to
// an precomponent update event
private final Map<String, UpdateAction> pendingActions = Collections.synchronizedMap(new LinkedHashMap<>());
private final AtomicReference<String> actionInProgress = new AtomicReference<>();

@Inject
private LifecycleIPCEventStreamAgent lifecycleIPCAgent;
Expand Down Expand Up @@ -96,20 +94,21 @@ public Set<String> getPendingActions() {
@SuppressWarnings("PMD.AvoidCatchingThrowable")
protected void runUpdateActions(String deploymentId) {
try (LockScope ls = LockScope.lock(lock)) {
for (Map.Entry<String, UpdateAction> todo : pendingActions.entrySet()) {
try {
actionInProgress.set(todo.getKey());
todo.getValue().getAction().run();
logger.atDebug().setEventType("service-update-action").addKeyValue("action", todo.getKey()).log();
} catch (Throwable t) {
logger.atError().setEventType("service-update-action-error").addKeyValue("action", todo.getKey())
.setCause(t).log();
}
final UpdateAction pendingUpdateAction = pendingActions.remove(deploymentId);
if (pendingUpdateAction == null) {
// Update action is never added or discarded by this time. So, do nothing.
return;
}
try {
pendingUpdateAction.getAction().run();
logger.atDebug().setEventType("service-update-action").addKeyValue("action", deploymentId).log();
} catch (Throwable t) {
logger.atError().setEventType("service-update-action-error").addKeyValue("action", deploymentId)
.setCause(t).log();
}
pendingActions.clear();
lifecycleIPCAgent.sendPostComponentUpdateEvent(
new PostComponentUpdateEvent().withDeploymentId(deploymentId));
actionInProgress.set(null);

}
}

Expand All @@ -121,16 +120,13 @@ protected void runUpdateActions(String deploymentId) {
* false if update actions were already in progress
*/
public boolean discardPendingUpdateAction(String tag) {
if (tag.equals(actionInProgress.get())) {
final UpdateAction pendingUpdateAction = pendingActions.remove(tag);
if (pendingUpdateAction == null) {
// Update action is never added or already in progress.
return false;
}
final UpdateAction pendingUpdateAction = pendingActions.get(tag);
if (pendingUpdateAction != null) {
// Signal components that they can resume their work since the update is not going to happen
lifecycleIPCAgent.sendPostComponentUpdateEvent(
new PostComponentUpdateEvent().withDeploymentId(pendingUpdateAction.getDeploymentId()));
pendingActions.remove(tag);
}
// Signal components that they can resume their work since the update is not going to happen
lifecycleIPCAgent.sendPostComponentUpdateEvent(new PostComponentUpdateEvent().withDeploymentId(tag));
return true;
}

Expand All @@ -149,39 +145,41 @@ protected void startup() throws InterruptedException {
}
logger.atDebug().setEventType("service-update-pending").addKeyValue("numOfUpdates", pendingActions.size())
.log();

boolean ggcRestarting = false;
for (UpdateAction action : pendingActions.values()) {
if (action.isGgcRestart()) {
ggcRestarting = true;
break;
}
}

PreComponentUpdateEvent preComponentUpdateEvent = new PreComponentUpdateEvent();
preComponentUpdateEvent.setIsGgcRestarting(ggcRestarting);
String deploymentId = pendingActions.values().stream().map(UpdateAction::getDeploymentId).findFirst().get();
preComponentUpdateEvent.setDeploymentId(deploymentId);
List<Future<DeferComponentUpdateRequest>> deferRequestFutures =
lifecycleIPCAgent.sendPreComponentUpdateEvent(preComponentUpdateEvent);

long timeToReCheck = getTimeToReCheck(getMaxTimeoutInMillis(), deploymentId, deferRequestFutures);
if (timeToReCheck > 0) {
logger.atDebug().setEventType("service-update-pending").addKeyValue("waitInMS", timeToReCheck).log();
Thread.sleep(timeToReCheck);
} else {
lifecycleIPCAgent.discardDeferComponentUpdateFutures();
logger.atDebug().setEventType("service-update-scheduled").log();
pendingActions.entrySet().stream().findFirst().ifPresent((actionEntry) -> {
try {
context.get(ExecutorService.class).submit(() -> {
logger.atInfo().setEventType("service-update-start").log();
runUpdateActions(deploymentId);
logger.atInfo().setEventType("service-update-finish").log();
}).get();
} catch (ExecutionException e) {
logger.atError().setEventType("service-update-error")
.log("Run update actions errored", e);
sendComponentUpdateEvents(actionEntry.getValue());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
}
}

private void sendComponentUpdateEvents(UpdateAction action) throws InterruptedException {
PreComponentUpdateEvent preComponentUpdateEvent = new PreComponentUpdateEvent();
boolean ggcRestarting = action.isGgcRestart();
preComponentUpdateEvent.setIsGgcRestarting(ggcRestarting);

preComponentUpdateEvent.setDeploymentId(action.getDeploymentId());
List<Future<DeferComponentUpdateRequest>> deferRequestFutures =
lifecycleIPCAgent.sendPreComponentUpdateEvent(preComponentUpdateEvent);

long timeToReCheck = getTimeToReCheck(getMaxTimeoutInMillis(), action.getDeploymentId(), deferRequestFutures);
if (timeToReCheck > 0) {
logger.atDebug().setEventType("service-update-pending").addKeyValue("waitInMS", timeToReCheck).log();
Thread.sleep(timeToReCheck);
} else {
lifecycleIPCAgent.discardDeferComponentUpdateFutures();
logger.atDebug().setEventType("service-update-scheduled").log();
try {
context.get(ExecutorService.class).submit(() -> {
logger.atInfo().setEventType("service-update-start").log();
runUpdateActions(action.getDeploymentId());
logger.atInfo().setEventType("service-update-finish").log();
}).get();
} catch (ExecutionException e) {
logger.atError().setEventType("service-update-error").log("Run update actions errored", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,34 @@ void GIVEN_deployment_job_cancelled_WHEN_waiting_for_safe_time_THEN_then_cancel_

String deploymentDocument = getTestDeploymentDocument();

deploymentQueue.offer(new Deployment(deploymentDocument,
Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1));

when(mockExecutorService.submit(any(DefaultDeploymentTask.class))).thenReturn(mockFuture);
startDeploymentServiceInAnotherThread();

// Simulate a cancellation deployment
Thread.sleep(TEST_DEPLOYMENT_POLLING_FREQUENCY.toMillis()); // wait for previous deployment to be polled
deploymentQueue.offer(new Deployment(Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1, true));

// Expecting three invocations, once for each retry attempt
verify(mockExecutorService, WAIT_FOUR_SECONDS).submit(any(DefaultDeploymentTask.class));
verify(deploymentStatusKeeper, WAIT_FOUR_SECONDS).persistAndPublishDeploymentStatus(eq(TEST_JOB_ID_1),
eq(TEST_UUID), eq(TEST_CONFIGURATION_ARN), eq(Deployment.DeploymentType.IOT_JOBS),
eq(JobStatus.IN_PROGRESS.toString()), any(), eq(EXPECTED_ROOT_PACKAGE_LIST));
verify(updateSystemPolicyService, times(0)).discardPendingUpdateAction(TEST_DEPLOYMENT_ID);
verify(mockFuture, WAIT_FOUR_SECONDS).cancel(true);
}

@Test
void GIVEN_deployment_job_with_notify_cancelled_WHEN_waiting_for_safe_time_THEN_then_cancel_deployment()
throws Exception {
Topics groupToLastDeploymentTopics = Topics.of(context, GROUP_TO_LAST_DEPLOYMENT_TOPICS, null);
when(config.lookupTopics(eq(GROUP_TO_LAST_DEPLOYMENT_TOPICS), anyString())).thenReturn(
groupToLastDeploymentTopics);

String deploymentDocument = getTestDeploymentDocumentNotify();

deploymentQueue.offer(new Deployment(deploymentDocument,
Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1));

Expand Down Expand Up @@ -645,6 +673,36 @@ void GIVEN_deployment_job_cancelled_WHEN_already_executing_update_THEN_then_fini

String deploymentDocument = getTestDeploymentDocument();

deploymentQueue.offer(new Deployment(deploymentDocument,
Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1));

when(mockExecutorService.submit(any(DefaultDeploymentTask.class))).thenReturn(mockFuture);
startDeploymentServiceInAnotherThread();

// Simulate a cancellation deployment
Thread.sleep(TEST_DEPLOYMENT_POLLING_FREQUENCY.toMillis()); // wait for previous deployment to be polled
deploymentQueue.offer(new Deployment(Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1, true));

// Expecting three invocations, once for each retry attempt
verify(mockExecutorService, WAIT_FOUR_SECONDS).submit(any(DefaultDeploymentTask.class));
// ComponentUpdatePolicy is set to SKIP_NOTIFY_COMPONENTS
verify(updateSystemPolicyService, times(0)).discardPendingUpdateAction(TEST_DEPLOYMENT_ID);
verify(mockFuture, times(0)).cancel(true);
verify(deploymentStatusKeeper, WAIT_FOUR_SECONDS).persistAndPublishDeploymentStatus(eq(TEST_JOB_ID_1),
eq(TEST_UUID), eq(TEST_CONFIGURATION_ARN), eq(Deployment.DeploymentType.IOT_JOBS),
eq(JobStatus.IN_PROGRESS.toString()), any(), eq(EXPECTED_ROOT_PACKAGE_LIST));
}


@Test
void GIVEN_deployment_job_with_notify_cancelled_WHEN_already_executing_update_THEN_then_finish_deployment()
throws Exception {
Topics groupToLastDeploymentTopics = Topics.of(context, GROUP_TO_LAST_DEPLOYMENT_TOPICS, null);
when(config.lookupTopics(eq(GROUP_TO_LAST_DEPLOYMENT_TOPICS), anyString())).thenReturn(
groupToLastDeploymentTopics);

String deploymentDocument = getTestDeploymentDocumentNotify();

deploymentQueue.offer(new Deployment(deploymentDocument,
Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1));

Expand Down Expand Up @@ -713,6 +771,13 @@ String getTestDeploymentDocument() {
.collect(Collectors.joining("\n")).replace(CONFIG_ARN_PLACEHOLDER, TEST_CONFIGURATION_ARN);
}

String getTestDeploymentDocumentNotify() {
return new BufferedReader(new InputStreamReader(
getClass().getResourceAsStream("TestDeploymentDocNotify.json"), StandardCharsets.UTF_8))
.lines()
.collect(Collectors.joining("\n")).replace(CONFIG_ARN_PLACEHOLDER, TEST_CONFIGURATION_ARN);
}

private void assertListEquals(List<String> first, List<String> second) {
assertEquals(first.size(), second.size());
for (int i = 0; i < first.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"deploymentId": "testDeploymentId",
"configurationArn": "arn:aws:greengrass:us-east-1:12345678910:configuration:thinggroup/group1:1",
"components": {
"component1": {
"rootComponent": true,
"version": "1.0.0"
}
},
"componentUpdatePolicy": {
"timeout": 60,
"action": "NOTIFY_COMPONENTS"
},
"configurationValidationPolicy": {
"timeout": 20
}
}

0 comments on commit 7eb23e6

Please sign in to comment.