From 2455a5941d99e476181e76f6795aba4ce9a63f0a Mon Sep 17 00:00:00 2001 From: Neil Frow <675806+worthydolt@users.noreply.github.com> Date: Wed, 21 Jul 2021 09:33:49 +0100 Subject: [PATCH] APID 190 - permit multiple messages sharing messageId (#51) * APID-190: update confirmation status should update all message with the same messageId * APID-190: enable VSCode integration and add metals generated file to Git ignores * Create tests for updating all records with the same messageID * Revert "Create tests for updating all records with the same messageID" This reverts commit 1dc6b1b6c5d62bd5c7088bafc9ef8de6439c9c48. * Create tests for updating all records with the same messageID * APID-190 Create tests for updating all records with the same messageID * APID-190: ensure that repository find method always returns newest message and update README to state this * APID-190: change unused variable assign to underscore Co-authored-by: aerodigi <54714520+aerodigi@users.noreply.github.com> --- .gitignore | 1 + README.md | 7 +- .../OutboundMessageRepository.scala | 32 +++---- build.sbt | 1 + .../OutboundMessageRepositoryISpec.scala | 83 ++++++++++++++++++- 5 files changed, 103 insertions(+), 21 deletions(-) diff --git a/.gitignore b/.gitignore index 5cf9b53..354ba8f 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,4 @@ test-result tmp .idea_modules bin +metals.sbt diff --git a/README.md b/README.md index 1ea2758..124d00f 100644 --- a/README.md +++ b/README.md @@ -73,9 +73,8 @@ HTTP Status: 200 (OK) | operation not found in the WSDL | `404` | ## `POST /acknowledgement` -Allows CCN2 system to asynchronously send an acknowledgment in reply to a message sent to them. Upon receipt of such a message, this service will update the message referred to in -the RelatesTo field with its new status - either COD or COE - and will append the acknowledgment message, in its entirety, to the -original request. +Allows CCN2 system to asynchronously send an acknowledgment in reply to a message sent to them. Upon receipt of such a message, this service will update the message referred to in the RelatesTo field with its new status - either COD or COE - and will append the acknowledgment message, in its entirety, to the +original request. In the event that multiple messages with the same `messageId` are found then they will all be updated in the same fashion. ### Request headers | Name | Description | @@ -182,7 +181,7 @@ HTTP Status: 202 (ACCEPTED) with an empty body | message ID supplied in `RelatesTo` element in request body does not match that of any message stored in the database | `404` | ## `GET /retrieve/:id` -Allows retrieval of the message which has either a `messageId` or a `globalId` matching that in the `id` path parameter +Allows retrieval of the message which has either a `messageId` or a `globalId` matching that in the `id` path parameter. In the event that more than one message with the same `messageId` exists then the message most recently sent will be returned. ###Response HTTP Status: 200 (OK) with a body similar to the following: diff --git a/app/uk/gov/hmrc/apiplatformoutboundsoap/repositories/OutboundMessageRepository.scala b/app/uk/gov/hmrc/apiplatformoutboundsoap/repositories/OutboundMessageRepository.scala index 759e442..a94377a 100644 --- a/app/uk/gov/hmrc/apiplatformoutboundsoap/repositories/OutboundMessageRepository.scala +++ b/app/uk/gov/hmrc/apiplatformoutboundsoap/repositories/OutboundMessageRepository.scala @@ -23,16 +23,18 @@ import org.bson.codecs.configuration.CodecRegistries._ import org.joda.time.DateTime import org.joda.time.DateTime.now import org.joda.time.DateTimeZone.UTC -import org.mongodb.scala.{MongoClient, MongoCollection} import org.mongodb.scala.ReadPreference.primaryPreferred +import org.mongodb.scala.bson.collection.immutable.Document import org.mongodb.scala.model.Filters.{and, equal, lte, or} import org.mongodb.scala.model.Indexes.ascending +import org.mongodb.scala.model.Sorts.descending import org.mongodb.scala.model.Updates.{combine, set} -import org.mongodb.scala.model.{FindOneAndUpdateOptions, IndexModel, IndexOptions, ReturnDocument} +import org.mongodb.scala.model._ import org.mongodb.scala.result.InsertOneResult +import org.mongodb.scala.{MongoClient, MongoCollection} import play.api.Logging import uk.gov.hmrc.apiplatformoutboundsoap.config.AppConfig -import uk.gov.hmrc.apiplatformoutboundsoap.models.{DeliveryStatus, OutboundSoapMessage, RetryingOutboundSoapMessage, SendingStatus, StatusType} +import uk.gov.hmrc.apiplatformoutboundsoap.models._ import uk.gov.hmrc.apiplatformoutboundsoap.repositories.MongoFormatter.outboundSoapMessageFormatter import uk.gov.hmrc.mongo.MongoComponent import uk.gov.hmrc.mongo.play.json.{Codecs, CollectionFactory, PlayMongoRepository} @@ -50,11 +52,11 @@ class OutboundMessageRepository @Inject()(mongoComponent: MongoComponent, appCon mongoComponent = mongoComponent, domainFormat = outboundSoapMessageFormatter, indexes = Seq(IndexModel(ascending("globalId"), - IndexOptions().name("globalIdIndex").background(true).unique(true)), - IndexModel(ascending("createDateTime"), - IndexOptions().name("ttlIndex").background(true) - .expireAfter(appConfig.retryMessagesTtl.toSeconds, TimeUnit.SECONDS)))) - with Logging { + IndexOptions().name("globalIdIndex").background(true).unique(true)), + IndexModel(ascending("createDateTime"), + IndexOptions().name("ttlIndex").background(true) + .expireAfter(appConfig.retryMessagesTtl.toSeconds, TimeUnit.SECONDS)))) + with Logging { override lazy val collection: MongoCollection[OutboundSoapMessage] = CollectionFactory @@ -111,15 +113,17 @@ class OutboundMessageRepository @Inject()(mongoComponent: MongoComponent, appCon case DeliveryStatus.COE => "coeMessage" } - collection.withReadPreference(primaryPreferred) - .findOneAndUpdate(filter = equal("messageId", messageId), - update = combine(set("status", Codecs.toBson(newStatus.entryName)), set(field, confirmationMsg)), - options = FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER)) - .toFutureOption() + for { + _ <- collection.bulkWrite( + List(UpdateManyModel(Document("messageId" -> messageId), combine(set("status", Codecs.toBson(newStatus.entryName)), set(field, confirmationMsg)))), + BulkWriteOptions().ordered(false)).toFuture() + findUpdated <- findById(messageId) + } yield findUpdated } def findById(searchForId: String): Future[Option[OutboundSoapMessage]] = { - collection.find(filter = or(equal("messageId", searchForId), equal("globalId", searchForId))).headOption() + val findQuery = or(Document("messageId" -> searchForId), Document("globalId" -> searchForId)) + collection.find(findQuery).sort(descending("createDateTime")).headOption() .recover { case e: Exception => logger.warn(s"error finding message - ${e.getMessage}") diff --git a/build.sbt b/build.sbt index 451414c..5a5dd07 100644 --- a/build.sbt +++ b/build.sbt @@ -1,5 +1,6 @@ import uk.gov.hmrc.DefaultBuildSettings.integrationTestSettings import uk.gov.hmrc.sbtdistributables.SbtDistributablesPlugin.publishingSettings +import bloop.integrations.sbt.BloopDefaults val appName = "api-platform-outbound-soap" diff --git a/it/uk/gov/hmrc/apiplatformoutboundsoap/repositories/OutboundMessageRepositoryISpec.scala b/it/uk/gov/hmrc/apiplatformoutboundsoap/repositories/OutboundMessageRepositoryISpec.scala index 7b1ee4f..e69713d 100644 --- a/it/uk/gov/hmrc/apiplatformoutboundsoap/repositories/OutboundMessageRepositoryISpec.scala +++ b/it/uk/gov/hmrc/apiplatformoutboundsoap/repositories/OutboundMessageRepositoryISpec.scala @@ -178,7 +178,7 @@ class OutboundMessageRepositoryISpec extends AnyWordSpec with PlayMongoRepositor } "updateNextRetryTime" should { - "update the retryDateTime on a record given its globalID" in { + "update the retryDateTime on a record given its globalId" in { await(serviceRepo.persist(retryingMessage)) val newRetryDateTime = retryingMessage.retryDateTime.minusHours(2) await(serviceRepo.updateNextRetryTime(retryingMessage.globalId, newRetryDateTime)) @@ -246,16 +246,85 @@ class OutboundMessageRepositoryISpec extends AnyWordSpec with PlayMongoRepositor fetchedRecords.head.status shouldBe DeliveryStatus.COD fetchedRecords.head.asInstanceOf[CodSoapMessage].codMessage shouldBe Some(expectedConfirmationMessageBody) } + + "update all records with the same messageId when a CoE is received" in { + val secondSentMessage = sentMessage.copy(globalId = randomUUID()) + await(serviceRepo.persist(sentMessage)) + await(serviceRepo.persist(secondSentMessage)) + + await(serviceRepo.updateConfirmationStatus(sentMessage.messageId, DeliveryStatus.COE, expectedConfirmationMessageBody)) + + val fetchedRecords = await(serviceRepo.collection.withReadPreference(primaryPreferred).find.toFuture()) + + fetchedRecords.size shouldBe 2 + fetchedRecords.head.globalId shouldBe sentMessage.globalId + fetchedRecords(1).globalId shouldBe secondSentMessage.globalId + fetchedRecords.head.messageId shouldBe sentMessage.messageId + fetchedRecords(1).messageId shouldBe secondSentMessage.messageId + fetchedRecords.head.soapMessage shouldBe sentMessage.soapMessage + fetchedRecords(1).soapMessage shouldBe secondSentMessage.soapMessage + fetchedRecords.head.destinationUrl shouldBe sentMessage.destinationUrl + fetchedRecords(1).destinationUrl shouldBe secondSentMessage.destinationUrl + fetchedRecords.head.status shouldBe DeliveryStatus.COE + fetchedRecords(1).status shouldBe DeliveryStatus.COE + fetchedRecords.head.createDateTime shouldBe sentMessage.createDateTime + fetchedRecords(1).createDateTime shouldBe secondSentMessage.createDateTime + fetchedRecords.head.notificationUrl shouldBe sentMessage.notificationUrl + fetchedRecords(1).notificationUrl shouldBe secondSentMessage.notificationUrl + fetchedRecords.head.ccnHttpStatus shouldBe sentMessage.ccnHttpStatus + fetchedRecords(1).ccnHttpStatus shouldBe secondSentMessage.ccnHttpStatus + fetchedRecords.head.coeMessage shouldBe Some(expectedConfirmationMessageBody) + fetchedRecords(1).coeMessage shouldBe Some(expectedConfirmationMessageBody) + fetchedRecords.head.codMessage shouldBe sentMessage.codMessage + fetchedRecords(1).codMessage shouldBe secondSentMessage.codMessage + } + + "update all records with the same messageId when a CoD is received" in { + val secondSentMessage = sentMessage.copy(globalId = randomUUID()) + await(serviceRepo.persist(sentMessage)) + await(serviceRepo.persist(secondSentMessage)) + + await(serviceRepo.updateConfirmationStatus(sentMessage.messageId, DeliveryStatus.COD, expectedConfirmationMessageBody)) + + val fetchedRecords = await(serviceRepo.collection.withReadPreference(primaryPreferred).find.toFuture()) + + fetchedRecords.size shouldBe 2 + fetchedRecords.head.globalId shouldBe sentMessage.globalId + fetchedRecords(1).globalId shouldBe secondSentMessage.globalId + fetchedRecords.head.messageId shouldBe sentMessage.messageId + fetchedRecords(1).messageId shouldBe secondSentMessage.messageId + fetchedRecords.head.soapMessage shouldBe sentMessage.soapMessage + fetchedRecords(1).soapMessage shouldBe secondSentMessage.soapMessage + fetchedRecords.head.destinationUrl shouldBe sentMessage.destinationUrl + fetchedRecords(1).destinationUrl shouldBe secondSentMessage.destinationUrl + fetchedRecords.head.status shouldBe DeliveryStatus.COD + fetchedRecords(1).status shouldBe DeliveryStatus.COD + fetchedRecords.head.createDateTime shouldBe sentMessage.createDateTime + fetchedRecords(1).createDateTime shouldBe secondSentMessage.createDateTime + fetchedRecords.head.notificationUrl shouldBe sentMessage.notificationUrl + fetchedRecords(1).notificationUrl shouldBe secondSentMessage.notificationUrl + fetchedRecords.head.ccnHttpStatus shouldBe sentMessage.ccnHttpStatus + fetchedRecords(1).ccnHttpStatus shouldBe secondSentMessage.ccnHttpStatus + fetchedRecords.head.coeMessage shouldBe sentMessage.coeMessage + fetchedRecords(1).coeMessage shouldBe secondSentMessage.coeMessage + fetchedRecords.head.codMessage shouldBe Some(expectedConfirmationMessageBody) + fetchedRecords(1).codMessage shouldBe Some(expectedConfirmationMessageBody) + } + + "ensure that unknown messageId returns empty option" in { + val emptyMessage = await(serviceRepo.updateConfirmationStatus(sentMessage.messageId, DeliveryStatus.COD, expectedConfirmationMessageBody)) + emptyMessage shouldBe None + } } "findById" should { - "return message when messageID matches" in { + "return message when messageId matches" in { await(serviceRepo.persist(sentMessage)) val Some(found): Option[OutboundSoapMessage] = await(serviceRepo.findById(sentMessage.messageId)) found shouldBe sentMessage } - "return message when globalID matches" in { + "return message when globalId matches" in { await(serviceRepo.persist(sentMessage)) val Some(found): Option[OutboundSoapMessage] = await(serviceRepo.findById(sentMessage.globalId.toString)) found shouldBe sentMessage @@ -265,6 +334,14 @@ class OutboundMessageRepositoryISpec extends AnyWordSpec with PlayMongoRepositor val found: Option[OutboundSoapMessage] = await(serviceRepo.findById(sentMessage.messageId)) found shouldBe None } + + "return newest message for a given messageId" in { + await(serviceRepo.persist(sentMessage)) + await(serviceRepo.persist(sentMessage.copy(createDateTime = DateTime.now().minusHours(1), globalId = randomUUID()))) + + val Some(found): Option[OutboundSoapMessage] = await(serviceRepo.findById(sentMessage.messageId)) + found shouldBe sentMessage + } } }