From 32bfc760795faa860b074f664a7ebf5635bcab21 Mon Sep 17 00:00:00 2001 From: Bethany Seeger Date: Wed, 10 Apr 2024 17:22:33 -0400 Subject: [PATCH] Add retry count mechanism and virus scan check to google drive transmission. --- build.gradle | 9 +- .../app/cli/TransmissionCommands.java | 156 +++++++++++++----- .../org/mdbenefits/app/data/Transmission.java | 3 + .../app/data/TransmissionRepository.java | 2 +- src/main/resources/application-test.yaml | 5 +- ...0.32__add_retry_count_to_transmissions.sql | 2 + .../app/cli/TransmissionCommandsTest.java | 17 +- .../TransmissionRepositoryTests.java | 87 ++++++++++ .../actions/HandleApplicationSignedTest.java | 26 +++ 9 files changed, 259 insertions(+), 48 deletions(-) create mode 100644 src/main/resources/db/migration/V2024.04.02.12.10.32__add_retry_count_to_transmissions.sql create mode 100644 src/test/java/org/mdbenefits/app/repository/TransmissionRepositoryTests.java diff --git a/build.gradle b/build.gradle index c8afd641..6f089482 100644 --- a/build.gradle +++ b/build.gradle @@ -44,8 +44,13 @@ dependencies { implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.17.0' implementation 'com.opencsv:opencsv:5.9' - implementation "org.codeforamerica.platform:form-flow:${formFlowLibraryVersion}" - println "📚Using form flow library ${formFlowLibraryVersion}" + if (profile == 'dev' || useLocalLibrary == 'true') { + implementation fileTree(dir: "$rootDir/../form-flow/build/libs", include: '*.jar') + println "📦 Using local library" + } else { + implementation "org.codeforamerica.platform:form-flow:${formFlowLibraryVersion}" + println "📚Using form flow library ${formFlowLibraryVersion}" + } implementation 'com.amazonaws:aws-encryption-sdk-java:3.0.0' implementation 'org.bouncycastle:bcpg-jdk15on:1.70' diff --git a/src/main/java/org/mdbenefits/app/cli/TransmissionCommands.java b/src/main/java/org/mdbenefits/app/cli/TransmissionCommands.java index f93bf6c4..0ca20c3c 100644 --- a/src/main/java/org/mdbenefits/app/cli/TransmissionCommands.java +++ b/src/main/java/org/mdbenefits/app/cli/TransmissionCommands.java @@ -1,6 +1,7 @@ package org.mdbenefits.app.cli; import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.Tag; import com.mailgun.model.message.MessageResponse; import formflow.library.data.Submission; import formflow.library.data.UserFile; @@ -17,6 +18,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.units.qual.A; import org.mdbenefits.app.data.Transmission; import org.mdbenefits.app.data.TransmissionRepository; import org.mdbenefits.app.data.enums.Counties; @@ -48,6 +50,8 @@ public class TransmissionCommands { @Value("${transmission.email-recipients.queen-annes-county}") private String QUEEN_ANNES_COUNTY_EMAIL_RECIPIENTS; + private final int MAX_RETRY_COUNT = 5; + private final TransmissionRepository transmissionRepository; private final CloudFileRepository cloudFileRepository; private final PdfService pdfService; @@ -90,7 +94,8 @@ public TransmissionCommands( public void transmit() { log.info("[Transmission] Checking for submissions to transmit..."); - List queuedTransmissions = transmissionRepository.findTransmissionsByStatus("QUEUED"); + List queuedTransmissions = + transmissionRepository.findByStatusAndRetryCountLessThanOrderByCreatedAtAsc("QUEUED", MAX_RETRY_COUNT); if (queuedTransmissions.isEmpty()) { log.info("[Transmission] Nothing to transmit. Exiting."); return; @@ -125,8 +130,10 @@ private void transmitBatch(List transmissions) { transmissions.forEach(transmission -> { Map errorMap = new HashMap<>(); Submission submission = transmission.getSubmission(); + log.info("[Transmission {}] Sending transmission for submission with ID: {}.", transmission.getId(), submission.getId()); + updateTransmissionStatus(transmission, TransmissionStatus.TRANSMITTING, errorMap, false); byte[] pdfFileBytes; try { @@ -139,13 +146,14 @@ private void transmitBatch(List transmissions) { handleError(transmission, "pdfGeneration", error, errorMap); return; } + String county = (String) submission.getInputData().get("county"); String folderId = getCountyFolderId(county); String emailRecipients = getCountyEmailRecipients(county); String confirmationNumber = (String) submission.getInputData().get("confirmationNumber"); - String pdfFileName = getPdfFilename(confirmationNumber); + // delete any existing directories with the same name List existingDirectories = googleDriveClient.findDirectory(confirmationNumber, folderId); if (!existingDirectories.isEmpty()) { log.info( @@ -154,17 +162,11 @@ private void transmitBatch(List transmissions) { existingDirectories.size(), confirmationNumber, submission.getId()); - // remove any already existing folders - for (File dir : existingDirectories) { - if (!googleDriveClient.trashDirectory(dir.getName(), dir.getId(), errorMap)) { - String error = String.format("Failed to delete existing Google Drive directory '%s'", dir.getId()); - handleError(transmission, null, error, errorMap); - // don't return - keep going. A new folder will be created and the link to that new - // folder will be sent to caseworker's office - } - } + + removeExistingGoogleDriveFolders(existingDirectories, transmission, errorMap); } + // create google drive folder GoogleDriveFolder newFolder = googleDriveClient.createFolder(folderId, confirmationNumber, errorMap); if (newFolder == null || newFolder.getId() == null) { // something is really wrong here; note the error and skip the entry @@ -186,30 +188,7 @@ private void transmitBatch(List transmissions) { return; } - List userFilesForSubmission = userFileRepositoryService.findAllBySubmission(submission); - - for (int count = 0; count < userFilesForSubmission.size(); count++) { - UserFile file = userFilesForSubmission.get(count); - try { - // get the file from S3 - CloudFile cloudFile = cloudFileRepository.get(file.getRepositoryPath()); - - String fileName = getUserFileName(confirmationNumber, file, count + 1, userFilesForSubmission.size()); - log.info("[Transmission {}] Uploading file {} of {} for submission with ID: {}.", - transmission.getId(), - count + 1, - userFilesForSubmission.size(), - submission.getId()); - // send to google - googleDriveClient.uploadFile(newFolder.getId(), fileName, file.getMimeType(), cloudFile.getFileBytes(), - file.getFileId().toString(), errorMap); - } catch (AmazonS3Exception e) { - String error = String.format( - "Unable to upload the UserFile (ID: %s) for submission with ID: %s. Exception: %s", - file.getFileId(), submission.getId(), e.getMessage()); - handleError(transmission, "fetchingS3File", error, errorMap); - } - } + sendFilesToGoogleDrive(transmission, confirmationNumber, newFolder, errorMap); sendEmailToCaseworkers(transmission, confirmationNumber, emailRecipients, newFolder.getUrl(), errorMap); @@ -217,6 +196,56 @@ private void transmitBatch(List transmissions) { }); } + private void removeExistingGoogleDriveFolders(List directories, Transmission transmission, + Map errorMap) { + // remove any already existing folders + for (File dir : directories) { + if (!googleDriveClient.trashDirectory(dir.getName(), dir.getId(), errorMap)) { + String error = String.format("Failed to delete existing Google Drive directory '%s'", dir.getId()); + handleError(transmission, null, error, errorMap); + // don't return - keep going. A new folder will be created and the link to that new + // folder will be sent to caseworker's office. The fact that this one couldn't be trashed + // doesn't mean that a new one cannot be created. + // Removing old ones just keeps it less confusing if someone in the office does a search for a particular + // folder. Then only 1 will show up. + } + } + } + + private void sendFilesToGoogleDrive(Transmission transmission, String confirmationNumber, GoogleDriveFolder destFolder, + Map errorMap) { + Submission submission = transmission.getSubmission(); + + List userFilesForSubmission = userFileRepositoryService.findAllBySubmission(submission); + + for (int count = 0; count < userFilesForSubmission.size(); count++) { + UserFile file = userFilesForSubmission.get(count); + try { + // get the file from S3 + CloudFile cloudFile = cloudFileRepository.get(file.getRepositoryPath()); + if (!hasBeenVirusScanned(cloudFile)) { + String message = String.format("Has not been scanned for virus yet. Re-queuing submission"); + handleRequeue(transmission, "fileVirusStatus", message, errorMap); + } + + String fileName = getUserFileName(confirmationNumber, file, count + 1, userFilesForSubmission.size()); + log.info("[Transmission {}] Uploading file {} of {} for submission with ID: {}.", + transmission.getId(), + count + 1, + userFilesForSubmission.size(), + submission.getId()); + // send to google + googleDriveClient.uploadFile(destFolder.getId(), fileName, file.getMimeType(), cloudFile.getFileBytes(), + file.getFileId().toString(), errorMap); + } catch (AmazonS3Exception e) { + String error = String.format( + "Unable to upload the UserFile (ID: %s) for submission with ID: %s. Exception: %s", + file.getFileId(), submission.getId(), e.getMessage()); + handleError(transmission, "fetchingS3File", error, errorMap); + } + } + } + /** * Send email about the transmission to specified email addresses. * @@ -262,8 +291,8 @@ private void sendEmailToCaseworkers(Transmission transmission, String confirmati * * @param transmission the transmission to update * @param errorKey the error key to use when recording the error in the error map - * @param errorMsg the message to put in the log and the errorMap - * @param errorMap the map of errors to get stored with the transmission in the db. + * @param errorMsg the message to put in the log and the error map + * @param errorMap the map of errors to get stored with the transmission in the db */ private void handleError(Transmission transmission, String errorKey, String errorMsg, Map errorMap) { log.error("[Transmission {}]: {}", transmission.getId(), errorMsg); @@ -273,6 +302,23 @@ private void handleError(Transmission transmission, String errorKey, String erro updateTransmissionStatus(transmission, TransmissionStatus.FAILED, errorMap, false); } + /** + * This will handle the re-queuing of a transmission. It will 1) log info about it 2) mark the transmission as QUEUED and 3) + * update the transmission status in the database. + * + * @param transmission the transmission to update + * @param messageKey the message key to use when recording the message in the error map + * @param message the message to put in the log or error map + * @param errorMap the map of errors (messages) to get stored with the transmission in the db + */ + private void handleRequeue(Transmission transmission, String messageKey, String message, Map errorMap) { + log.warn("[Transmission {}]: {}", transmission.getId(), message); + if (messageKey != null) { + errorMap.put(messageKey, message); + } + updateTransmissionStatus(transmission, TransmissionStatus.QUEUED, errorMap, false); + } + /** * Updates the transmission's status information, including the overall status, error messages and mark it sent (if * requested). @@ -280,15 +326,24 @@ private void handleError(Transmission transmission, String errorKey, String erro * @param transmission the transmission to update * @param status the TransmissionStatus status * @param errorMap a Map - * @param markSent + * @param markSent whether this should mark the record as sent to google drive or not */ private void updateTransmissionStatus(Transmission transmission, TransmissionStatus status, Map errorMap, boolean markSent) { - transmission.setStatus(status.name()); transmission.setErrors(errorMap); if (markSent) { transmission.setSentAt(OffsetDateTime.now()); } + + // don't increment retry when just marking as in process + if (!status.equals(TransmissionStatus.TRANSMITTING)) { + int retryCount = transmission.getRetryCount(); + retryCount++; + transmission.setRetryCount(retryCount); + } + + transmission.setStatus(status.name()); + transmissionRepository.save(transmission); } @@ -336,4 +391,27 @@ private String getCountyFolderId(String county) { private String getCountyEmailRecipients(String county) { return county.equals(Counties.BALTIMORE.name()) ? BALITMORE_COUNTY_EMAIL_RECIPIENTS : QUEEN_ANNES_COUNTY_EMAIL_RECIPIENTS; } + + /** + * Checks S3 tags included in the CloudFile metadata to ensure that the file has been virus scanned. + * + * @param cloudFile + * @return + */ + private boolean hasBeenVirusScanned(CloudFile cloudFile) { + Map metadata = cloudFile.getMetadata(); + boolean scanned = false; + if (metadata != null) { + List tags = (List) metadata.getOrDefault("tags", List.of()); + List filteredList = tags.stream() + .filter(tag -> tag.getKey().equals("scan-result")) + .toList(); + if (!filteredList.isEmpty()) { + if (filteredList.get(0).getValue().equalsIgnoreCase("clean")) { + scanned = true; + } + } + } + return scanned; + } } diff --git a/src/main/java/org/mdbenefits/app/data/Transmission.java b/src/main/java/org/mdbenefits/app/data/Transmission.java index 3db16aff..3a0f8b83 100644 --- a/src/main/java/org/mdbenefits/app/data/Transmission.java +++ b/src/main/java/org/mdbenefits/app/data/Transmission.java @@ -20,6 +20,7 @@ @Data @Table(name = "transmissions") public class Transmission { + @Id @GeneratedValue private UUID id; @@ -35,6 +36,8 @@ public class Transmission { private OffsetDateTime sentAt; + private int retryCount; + String status = "QUEUED"; @Type(JsonType.class) diff --git a/src/main/java/org/mdbenefits/app/data/TransmissionRepository.java b/src/main/java/org/mdbenefits/app/data/TransmissionRepository.java index 9d254ae2..ad28869f 100644 --- a/src/main/java/org/mdbenefits/app/data/TransmissionRepository.java +++ b/src/main/java/org/mdbenefits/app/data/TransmissionRepository.java @@ -7,7 +7,7 @@ public interface TransmissionRepository extends CrudRepository { - List findTransmissionsByStatus(String status); + List findByStatusAndRetryCountLessThanOrderByCreatedAtAsc(String status, int retryCount); Transmission findTransmissionBySubmission(Submission submission); } diff --git a/src/main/resources/application-test.yaml b/src/main/resources/application-test.yaml index 735bf36b..16ca50d0 100644 --- a/src/main/resources/application-test.yaml +++ b/src/main/resources/application-test.yaml @@ -36,8 +36,9 @@ spring: jdbc: initialize-schema: always transmission: - transmission-rate-seconds: 5 - transmission-initial-delay-seconds: 2 + #keep these large so they do not run + transmission-rate-seconds: 10000 + transmission-initial-delay-seconds: 10000 email-recipients: baltimore-county: test@maininator.com queen-annes-county: test@mailinator.com diff --git a/src/main/resources/db/migration/V2024.04.02.12.10.32__add_retry_count_to_transmissions.sql b/src/main/resources/db/migration/V2024.04.02.12.10.32__add_retry_count_to_transmissions.sql new file mode 100644 index 00000000..83b59412 --- /dev/null +++ b/src/main/resources/db/migration/V2024.04.02.12.10.32__add_retry_count_to_transmissions.sql @@ -0,0 +1,2 @@ +ALTER TABLE transmissions + ADD retry_count int default 0 NOT NULL; diff --git a/src/test/java/org/mdbenefits/app/cli/TransmissionCommandsTest.java b/src/test/java/org/mdbenefits/app/cli/TransmissionCommandsTest.java index a50d8f6e..04745827 100644 --- a/src/test/java/org/mdbenefits/app/cli/TransmissionCommandsTest.java +++ b/src/test/java/org/mdbenefits/app/cli/TransmissionCommandsTest.java @@ -45,9 +45,14 @@ @Slf4j @ActiveProfiles("test") -@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) @TestInstance(Lifecycle.PER_CLASS) @TestMethodOrder(OrderAnnotation.class) +@SpringBootTest( + properties = { + "transmission.transmission-rate-seconds=5", + "transmission.transmission-initial-delay-seconds=2" + }, + webEnvironment = WebEnvironment.RANDOM_PORT) public class TransmissionCommandsTest { @MockBean @@ -126,7 +131,8 @@ void setup() { @Test @Order(1) public void ensureSubmittedSubmissionsAreEnqueued() { - List transmissions = transmissionRepository.findTransmissionsByStatus(TransmissionStatus.QUEUED.name()); + List transmissions = transmissionRepository.findByStatusAndRetryCountLessThanOrderByCreatedAtAsc( + TransmissionStatus.QUEUED.name(), 5); assertThat(transmissions.size()).isEqualTo(submissionList.size()); } @@ -139,7 +145,9 @@ public void transmitterRunsAndProcessesWork() { () -> verify(transmissionCommands, times(2)).transmit()); // ensure that all transmissions were processed - assertThat(transmissionRepository.findTransmissionsByStatus(TransmissionStatus.QUEUED.name()).isEmpty()).isTrue(); + assertThat( + transmissionRepository.findByStatusAndRetryCountLessThanOrderByCreatedAtAsc(TransmissionStatus.QUEUED.name(), 5) + .isEmpty()).isTrue(); submissionList.forEach(s -> { Transmission transmission = transmissionRepository.findTransmissionBySubmission(s); @@ -151,7 +159,8 @@ public void transmitterRunsAndProcessesWork() { @Order(3) public void transmitterRunsWhenNoWorkIsQueued() { - assertThat(transmissionRepository.findTransmissionsByStatus(TransmissionStatus.QUEUED.name())).isEmpty(); + assertThat(transmissionRepository.findByStatusAndRetryCountLessThanOrderByCreatedAtAsc(TransmissionStatus.QUEUED.name(), + 5)).isEmpty(); await().atMost(12, TimeUnit.SECONDS).untilAsserted( () -> verify(transmissionCommands, times(2)).transmit()); diff --git a/src/test/java/org/mdbenefits/app/repository/TransmissionRepositoryTests.java b/src/test/java/org/mdbenefits/app/repository/TransmissionRepositoryTests.java new file mode 100644 index 00000000..009388d1 --- /dev/null +++ b/src/test/java/org/mdbenefits/app/repository/TransmissionRepositoryTests.java @@ -0,0 +1,87 @@ +package org.mdbenefits.app.repository; + +import static org.assertj.core.api.Assertions.assertThat; + +import formflow.library.data.Submission; +import formflow.library.data.SubmissionRepositoryService; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.mdbenefits.app.data.Transmission; +import org.mdbenefits.app.data.TransmissionRepository; +import org.mdbenefits.app.data.enums.TransmissionStatus; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.test.context.ActiveProfiles; + +@ActiveProfiles("test") +@TestInstance(Lifecycle.PER_CLASS) +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) +public class TransmissionRepositoryTests { + + @Autowired + private TransmissionRepository transmissionRepository; + + @Autowired + private SubmissionRepositoryService submissionRepositoryService; + + private List submissionList = new ArrayList<>(); + + @BeforeAll + void setup() { + + for (int i = 0; i < 6; i++) { + Map inputData = Map.of( + "confirmationNumber", "M0000" + i, + "testKey", "this is a test value", + "otherTestKey", List.of("A", "B", "C"), + "household", List.of(Map.of("firstName", "John", "lastName", "Perez"))); + Submission submission = Submission.builder() + .inputData(inputData) + .urlParams(new HashMap<>()) + .flow("testFlow") + .submittedAt(OffsetDateTime.now()) + .build(); + + submission = saveAndReload(submission); + + Transmission transmission = Transmission.fromSubmission(submission); + transmission.setRetryCount(i); + transmissionRepository.save(transmission); + submissionList.add(submission); + } + } + + @Test + void testGettingTransmissionBySubmission() { + submissionList.forEach(submission -> { + Transmission t = transmissionRepository.findTransmissionBySubmission(submission); + assertThat(t).isNotNull(); + }); + } + + @Test + void testGettingTransmissionBySubmissionByStatusAndRequeueCount() { + List transmissionsList = transmissionRepository.findByStatusAndRetryCountLessThanOrderByCreatedAtAsc( + TransmissionStatus.QUEUED.name(), 5); + assertThat(transmissionsList.size()).isEqualTo(5); + + transmissionsList = transmissionRepository.findByStatusAndRetryCountLessThanOrderByCreatedAtAsc( + TransmissionStatus.QUEUED.name(), 2); + assertThat(transmissionsList.size()).isEqualTo(2); + } + + + private Submission saveAndReload(Submission submission) { + Submission savedSubmission = submissionRepositoryService.save(submission); + return submissionRepositoryService.findById(savedSubmission.getId()).orElseThrow(); + } + +} diff --git a/src/test/java/org/mdbenefits/app/submission/actions/HandleApplicationSignedTest.java b/src/test/java/org/mdbenefits/app/submission/actions/HandleApplicationSignedTest.java index 92a2608e..8445b3f6 100644 --- a/src/test/java/org/mdbenefits/app/submission/actions/HandleApplicationSignedTest.java +++ b/src/test/java/org/mdbenefits/app/submission/actions/HandleApplicationSignedTest.java @@ -2,11 +2,15 @@ import com.mailgun.model.message.MessageResponse; import formflow.library.data.Submission; +import formflow.library.data.SubmissionRepository; import formflow.library.data.SubmissionRepositoryService; import formflow.library.email.MailgunEmailClient; import formflow.library.pdf.PdfService; +import java.util.ArrayList; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; import org.mdbenefits.app.data.SubmissionTestBuilder; import org.mdbenefits.app.data.Transmission; import org.mdbenefits.app.data.TransmissionRepository; @@ -30,6 +34,7 @@ @ActiveProfiles("test") @SpringBootTest +@TestInstance(Lifecycle.PER_CLASS) class HandleApplicationSignedTest { @MockBean @@ -52,6 +57,9 @@ class HandleApplicationSignedTest { private HandleApplicationSigned handleApplicationSigned; + @Autowired + private SubmissionRepository submissionRepository; + @BeforeEach void setup() { handleApplicationSigned = new HandleApplicationSigned(messageSource, mailgunEmailClient, submissionRepositoryService, @@ -67,6 +75,8 @@ public void shouldSkipSendingIfNoEmailAddress() { handleApplicationSigned.run(submission); Mockito.verify(mailgunEmailClient, Mockito.never()).sendEmail(Mockito.any(), Mockito.any(), Mockito.any()); + + cleanupSubmissionTransmission(submission); } @Test @@ -82,6 +92,8 @@ public void shouldRecordSuccessfulSend() { assertThat((String) submission.getInputData().get("confirmationNumber")).matches("M\\d{5,}"); Transmission transmission = transmissionRepository.findTransmissionBySubmission(submission); assertThat(transmission.getStatus()).isEqualTo("QUEUED"); + + cleanupSubmissionTransmission(submission); } @Test @@ -94,6 +106,8 @@ public void includesConfirmationNumberInEmailBody() { handleApplicationSigned.run(submission); assertThat(emailBodyCaptor.getValue()).contains((String) submission.getInputData().get("confirmationNumber")); + + cleanupSubmissionTransmission(submission); } @@ -107,6 +121,8 @@ public void includesAttachedPdf() { handleApplicationSigned.run(submission); assertThat(attachmentsCaptor.getValue()).hasSize(1); + + cleanupSubmissionTransmission(submission); } @Test @@ -118,8 +134,10 @@ public void shouldRecordFailedSend() { handleApplicationSigned.run(submission); assertThat(submission.getInputData().get("sentEmailToApplicant")).isEqualTo(false); + cleanupSubmissionTransmission(submission); } + private Submission buildValidSubmission() { Submission submission = new SubmissionTestBuilder() .with("emailAddress", "foo@example.com") @@ -128,4 +146,12 @@ private Submission buildValidSubmission() { submission.setFlow("mdBenefitsFlow"); return submission; } + + private void cleanupSubmissionTransmission(Submission submission) { + Transmission transmission = transmissionRepository.findTransmissionBySubmission(submission); + if (transmission != null) { + transmissionRepository.delete(transmission); + } + submissionRepository.delete(submission); + } }