diff --git a/app/uk/gov/hmrc/apiplatformoutboundsoap/models/OutboundSoapMessage.scala b/app/uk/gov/hmrc/apiplatformoutboundsoap/models/OutboundSoapMessage.scala index d68bee5..b6f8342 100644 --- a/app/uk/gov/hmrc/apiplatformoutboundsoap/models/OutboundSoapMessage.scala +++ b/app/uk/gov/hmrc/apiplatformoutboundsoap/models/OutboundSoapMessage.scala @@ -51,7 +51,7 @@ object OutboundSoapMessage { } else if (fullyQualifiedName == classTag[CodSoapMessage].runtimeClass.getCanonicalName) { DeliveryStatus.COD } else { - throw new IllegalArgumentException + throw new IllegalArgumentException(s"${fullyQualifiedName} is not a valid class") } } } @@ -127,10 +127,10 @@ case class RetryingOutboundSoapMessage(globalId: UUID, sealed abstract class StatusType extends EnumEntry object StatusType extends Enum[StatusType] with PlayJsonEnum[StatusType]{ - val values = findValues + val values: immutable.IndexedSeq[StatusType] = findValues } -sealed abstract class DeliveryStatus extends StatusType +sealed abstract class DeliveryStatus(override val entryName: String) extends StatusType object DeliveryStatus extends Enum[DeliveryStatus] with PlayJsonEnum[DeliveryStatus] { def fromAction(action: String): DeliveryStatus = { @@ -142,19 +142,19 @@ object DeliveryStatus extends Enum[DeliveryStatus] with PlayJsonEnum[DeliverySta } val values: immutable.IndexedSeq[DeliveryStatus] = findValues - case object COE extends DeliveryStatus + case object COE extends DeliveryStatus("COE") - case object COD extends DeliveryStatus + case object COD extends DeliveryStatus("COD") } -sealed abstract class SendingStatus extends StatusType +sealed abstract class SendingStatus(override val entryName: String) extends StatusType object SendingStatus extends Enum[SendingStatus] with PlayJsonEnum[SendingStatus] { val values: immutable.IndexedSeq[SendingStatus] = findValues - case object SENT extends SendingStatus + case object SENT extends SendingStatus("SENT") - case object FAILED extends SendingStatus + case object FAILED extends SendingStatus("FAILED") - case object RETRYING extends SendingStatus + case object RETRYING extends SendingStatus("RETRYING") } diff --git a/app/uk/gov/hmrc/apiplatformoutboundsoap/repositories/MongoFormatter.scala b/app/uk/gov/hmrc/apiplatformoutboundsoap/repositories/MongoFormatter.scala index 1fa65e6..f30c1ee 100644 --- a/app/uk/gov/hmrc/apiplatformoutboundsoap/repositories/MongoFormatter.scala +++ b/app/uk/gov/hmrc/apiplatformoutboundsoap/repositories/MongoFormatter.scala @@ -16,11 +16,9 @@ package uk.gov.hmrc.apiplatformoutboundsoap.repositories -import org.joda.time.DateTime -import play.api.libs.json.{Format, Json, JsonConfiguration, JsonNaming, OFormat} -import uk.gov.hmrc.apiplatformoutboundsoap.models.{CodSoapMessage, CoeSoapMessage, FailedOutboundSoapMessage, OutboundSoapMessage, RetryingOutboundSoapMessage, SentOutboundSoapMessage} -import uk.gov.hmrc.mongo.json.ReactiveMongoFormats - +import play.api.libs.json.{JsObject, JsPath, Json, JsonConfiguration, JsonNaming, OFormat, OWrites, Reads} +import uk.gov.hmrc.apiplatformoutboundsoap.models.{CodSoapMessage, CoeSoapMessage, DeliveryStatus, FailedOutboundSoapMessage, OutboundSoapMessage, RetryingOutboundSoapMessage, SendingStatus, SentOutboundSoapMessage} +import uk.gov.hmrc.mongo.play.json.formats.MongoJodaFormats private[repositories] object MongoFormatter { implicit val cfg = JsonConfiguration( @@ -30,11 +28,81 @@ private[repositories] object MongoFormatter { OutboundSoapMessage.typeToStatus(fullName).entryName }) - implicit val dateFormat: Format[DateTime] = ReactiveMongoFormats.dateTimeFormats - implicit val outboundSoapMessageFormatter: OFormat[OutboundSoapMessage] = Json.format[OutboundSoapMessage] - implicit val retryingSoapMessageFormatter: OFormat[RetryingOutboundSoapMessage] = Json.format[RetryingOutboundSoapMessage] - implicit val sentSoapMessageFormatter: OFormat[SentOutboundSoapMessage] = Json.format[SentOutboundSoapMessage] - implicit val failedSoapMessageFormatter: OFormat[FailedOutboundSoapMessage] = Json.format[FailedOutboundSoapMessage] - implicit val coeSoapMessageFormatter: OFormat[CoeSoapMessage] = Json.format[CoeSoapMessage] - implicit val codSoapMessageFormatter: OFormat[CodSoapMessage] = Json.format[CodSoapMessage] + implicit val dateTimeFormat = MongoJodaFormats.dateTimeFormat + + implicit val retryingMessageReads: Reads[RetryingOutboundSoapMessage] = + Json.reads[RetryingOutboundSoapMessage] + implicit val retryingMessageWrites: OWrites[RetryingOutboundSoapMessage] = + Json.writes[RetryingOutboundSoapMessage].transform(_ ++ Json.obj("status" -> SendingStatus.RETRYING.entryName)) + implicit val retryingSoapMessageFormatter: OFormat[RetryingOutboundSoapMessage] = + OFormat(retryingMessageReads, retryingMessageWrites) + + implicit val sentMessageReads: Reads[SentOutboundSoapMessage] = + Json.reads[SentOutboundSoapMessage] + implicit val sentMessageWrites: OWrites[SentOutboundSoapMessage] = + Json.writes[SentOutboundSoapMessage].transform(_ ++ Json.obj("status" -> SendingStatus.SENT.entryName)) + implicit val sentSoapMessageFormatter: OFormat[SentOutboundSoapMessage] = + OFormat(sentMessageReads, sentMessageWrites) + + implicit val failedMessageReads: Reads[FailedOutboundSoapMessage] = + Json.reads[FailedOutboundSoapMessage] + implicit val failedMessageWrites: OWrites[FailedOutboundSoapMessage] = + Json.writes[FailedOutboundSoapMessage].transform(_ ++ Json.obj("status" -> SendingStatus.FAILED.entryName)) + implicit val failedSoapMessageFormatter: OFormat[FailedOutboundSoapMessage] = + OFormat(failedMessageReads, failedMessageWrites) + + implicit val codMessageReads: Reads[CodSoapMessage] = + Json.reads[CodSoapMessage] + implicit val codMessageWrites: OWrites[CodSoapMessage] = + Json.writes[CodSoapMessage].transform(_ ++ Json.obj("status" -> DeliveryStatus.COD.entryName)) + implicit val codSoapMessageFormatter: OFormat[CodSoapMessage] = + OFormat(codMessageReads, codMessageWrites) + + implicit val coeMessageReads: Reads[CoeSoapMessage] = + Json.reads[CoeSoapMessage] + implicit val coeMessageWrites: OWrites[CoeSoapMessage] = + Json.writes[CoeSoapMessage].transform(_ ++ Json.obj("status" -> DeliveryStatus.COE.entryName)) + implicit val coeSoapMessageFormatter: OFormat[CoeSoapMessage] = + OFormat(coeMessageReads, coeMessageWrites) + + + implicit val outboundSoapMessageReads: Reads[OutboundSoapMessage] = + (JsPath \ "status").read[String].flatMap { + case SendingStatus.RETRYING.entryName => + retryingSoapMessageFormatter.widen[OutboundSoapMessage] + case SendingStatus.SENT.entryName => + sentSoapMessageFormatter.widen[OutboundSoapMessage] + case SendingStatus.FAILED.entryName => + failedSoapMessageFormatter.widen[OutboundSoapMessage] + case DeliveryStatus.COD.entryName => + codSoapMessageFormatter.widen[OutboundSoapMessage] + case DeliveryStatus.COE.entryName => + coeSoapMessageFormatter.widen[OutboundSoapMessage] + } + + implicit val outboundSoapMessageWrites: OWrites[OutboundSoapMessage] = new OWrites[OutboundSoapMessage] { + override def writes(soapMessage: OutboundSoapMessage): JsObject = soapMessage match { + case r @ RetryingOutboundSoapMessage(_, _, _, _, _, _, _, _, _, _) => + retryingSoapMessageFormatter.writes(r) ++ Json.obj( + "status" -> SendingStatus.RETRYING.entryName + ) + case f @ FailedOutboundSoapMessage(_, _, _, _, _, _, _, _, _) => + failedSoapMessageFormatter.writes(f) ++ Json.obj( + "status" -> SendingStatus.FAILED.entryName + ) + case s @ SentOutboundSoapMessage(_, _, _, _, _, _, _, _, _) => + sentSoapMessageFormatter.writes(s) ++ Json.obj( + "status" -> SendingStatus.SENT.entryName + ) + case cod @ CodSoapMessage(_, _, _, _, _, _, _, _, _) => + codSoapMessageFormatter.writes(cod) ++ Json.obj( + "status" -> DeliveryStatus.COD.entryName + ) + case coe @ CoeSoapMessage(_, _, _, _, _, _, _, _, _) => + coeSoapMessageFormatter.writes(coe) ++ Json.obj( + "status" -> DeliveryStatus.COE.entryName + ) + } + } + implicit val outboundSoapMessageFormatter: OFormat[OutboundSoapMessage] = OFormat(outboundSoapMessageReads, outboundSoapMessageWrites) } diff --git a/app/uk/gov/hmrc/apiplatformoutboundsoap/repositories/OutboundMessageRepository.scala b/app/uk/gov/hmrc/apiplatformoutboundsoap/repositories/OutboundMessageRepository.scala index e261961..c42b632 100644 --- a/app/uk/gov/hmrc/apiplatformoutboundsoap/repositories/OutboundMessageRepository.scala +++ b/app/uk/gov/hmrc/apiplatformoutboundsoap/repositories/OutboundMessageRepository.scala @@ -16,86 +16,110 @@ package uk.gov.hmrc.apiplatformoutboundsoap.repositories -import akka.stream.Materializer +import akka.NotUsed +import akka.stream.alpakka.mongodb.scaladsl.MongoSource import akka.stream.scaladsl.Source +import org.bson.codecs.configuration.CodecRegistries._ import org.joda.time.DateTime import org.joda.time.DateTime.now import org.joda.time.DateTimeZone.UTC -import play.api.libs.json.{JsObject, JsString, Json} -import play.modules.reactivemongo.ReactiveMongoComponent -import reactivemongo.akkastream.cursorProducer -import reactivemongo.api.ReadPreference -import reactivemongo.api.indexes.Index -import reactivemongo.api.indexes.IndexType.Ascending -import reactivemongo.bson.{BSONDocument, BSONLong, BSONObjectID} -import reactivemongo.play.json.ImplicitBSONHandlers.JsObjectDocumentWriter +import org.mongodb.scala.{MongoClient, MongoCollection} +import org.mongodb.scala.ReadPreference.primaryPreferred +import org.mongodb.scala.model.Filters.{and, equal, lte} +import org.mongodb.scala.model.Indexes.ascending +import org.mongodb.scala.model.Updates.{combine, set} +import org.mongodb.scala.model.{FindOneAndUpdateOptions, IndexModel, IndexOptions, ReturnDocument} +import org.mongodb.scala.result.InsertOneResult +import play.api.Logging import uk.gov.hmrc.apiplatformoutboundsoap.config.AppConfig -import uk.gov.hmrc.apiplatformoutboundsoap.models.{DeliveryStatus, OutboundSoapMessage, RetryingOutboundSoapMessage, SendingStatus} -import uk.gov.hmrc.apiplatformoutboundsoap.repositories.MongoFormatter.{dateFormat, outboundSoapMessageFormatter} -import uk.gov.hmrc.mongo.ReactiveRepository -import uk.gov.hmrc.mongo.json.ReactiveMongoFormats +import uk.gov.hmrc.apiplatformoutboundsoap.models.{DeliveryStatus, OutboundSoapMessage, RetryingOutboundSoapMessage, SendingStatus, StatusType} +import uk.gov.hmrc.apiplatformoutboundsoap.repositories.MongoFormatter.outboundSoapMessageFormatter +import uk.gov.hmrc.mongo.MongoComponent +import uk.gov.hmrc.mongo.play.json.{Codecs, CollectionFactory, PlayMongoRepository} import java.util.UUID +import java.util.concurrent.TimeUnit import javax.inject.{Inject, Singleton} import scala.concurrent.{ExecutionContext, Future} @Singleton -class OutboundMessageRepository @Inject()(mongoComponent: ReactiveMongoComponent, appConfig: AppConfig) - (implicit ec: ExecutionContext, m: Materializer) - extends ReactiveRepository[OutboundSoapMessage, BSONObjectID]( - "messages", - mongoComponent.mongoConnector.db, - outboundSoapMessageFormatter, - ReactiveMongoFormats.objectIdFormats) { +class OutboundMessageRepository @Inject()(mongoComponent: MongoComponent, appConfig: AppConfig) + (implicit ec: ExecutionContext) + extends PlayMongoRepository[OutboundSoapMessage]( + collectionName = "messages", + mongoComponent = mongoComponent, + domainFormat = outboundSoapMessageFormatter, + indexes = Seq(IndexModel(ascending("globalId"), + IndexOptions().name("globalIdIndex").background(true).unique(true)), + IndexModel(ascending("createDateTime"), + IndexOptions().name("ttlIndex").background(true) + .expireAfter(appConfig.retryMessagesTtl.toSeconds, TimeUnit.SECONDS)))) + with Logging { - override def indexes = Seq( - Index(key = List("globalId" -> Ascending), name = Some("globalIdIndex"), unique = true, background = true), - Index(key = List("createDateTime" -> Ascending), - name = Some("ttlIndex"), background = true, options = BSONDocument("expireAfterSeconds" -> BSONLong(appConfig.retryMessagesTtl.toSeconds))) - ) + override lazy val collection: MongoCollection[OutboundSoapMessage] = + CollectionFactory + .collection(mongoComponent.database, collectionName, domainFormat) + .withCodecRegistry( + fromRegistries( + fromCodecs( + Codecs.playFormatCodec(domainFormat), + Codecs.playFormatCodec(MongoFormatter.retryingSoapMessageFormatter), + Codecs.playFormatCodec(MongoFormatter.failedSoapMessageFormatter), + Codecs.playFormatCodec(MongoFormatter.sentSoapMessageFormatter), + Codecs.playFormatCodec(MongoFormatter.codSoapMessageFormatter), + Codecs.playFormatCodec(MongoFormatter.coeSoapMessageFormatter), + Codecs.playFormatCodec(MongoFormatter.dateTimeFormat), + Codecs.playFormatCodec(StatusType.jsonFormat), + Codecs.playFormatCodec(DeliveryStatus.jsonFormat), + Codecs.playFormatCodec(SendingStatus.jsonFormat) + ), + MongoClient.DEFAULT_CODEC_REGISTRY + ) + ) - def persist(entity: OutboundSoapMessage)(implicit ec: ExecutionContext): Future[Unit] = { - insert(entity).map(_ => ()) + def persist(entity: OutboundSoapMessage): Future[InsertOneResult] = { + collection.insertOne(entity).toFuture() } - def retrieveMessagesForRetry: Source[RetryingOutboundSoapMessage, Future[Any]] = { - import uk.gov.hmrc.apiplatformoutboundsoap.repositories.MongoFormatter.retryingSoapMessageFormatter - - collection - .find(Json.obj("status" -> SendingStatus.RETRYING.entryName, - "retryDateTime" -> Json.obj("$lte" -> now(UTC))), Option.empty[JsObject]) - .sort(Json.obj("retryDateTime" -> 1)) - .cursor[RetryingOutboundSoapMessage](ReadPreference.primaryPreferred) - .documentSource() + def retrieveMessagesForRetry: Source[RetryingOutboundSoapMessage, NotUsed] = { + MongoSource(collection.withReadPreference(primaryPreferred) + .find(filter = and(equal("status", SendingStatus.RETRYING.entryName), + and(lte("retryDateTime", now(UTC))))) + .sort(ascending("retryDateTime")) + .map(_.asInstanceOf[RetryingOutboundSoapMessage])) } def updateNextRetryTime(globalId: UUID, newRetryDateTime: DateTime): Future[Option[RetryingOutboundSoapMessage]] = { - import uk.gov.hmrc.apiplatformoutboundsoap.repositories.MongoFormatter.retryingSoapMessageFormatter - - findAndUpdate(Json.obj("globalId" -> globalId), - Json.obj("$set" -> Json.obj("retryDateTime" -> newRetryDateTime)), fetchNewObject = true) - .map(_.result[RetryingOutboundSoapMessage]) + collection.withReadPreference(primaryPreferred) + .findOneAndUpdate(filter = equal("globalId", Codecs.toBson(globalId)), + update = set("retryDateTime", newRetryDateTime), + options = FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER) + ).map(_.asInstanceOf[RetryingOutboundSoapMessage]).headOption() } def updateSendingStatus(globalId: UUID, newStatus: SendingStatus): Future[Option[OutboundSoapMessage]] = { - findAndUpdate(Json.obj("globalId" -> globalId), - Json.obj("$set" -> Json.obj("status" -> newStatus.entryName)), fetchNewObject = true) - .map(_.result[OutboundSoapMessage]) + collection.withReadPreference(primaryPreferred) + .findOneAndUpdate(filter = equal("globalId", Codecs.toBson(globalId)), + update = set("status", Codecs.toBson(newStatus.entryName)), + options = FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER) + ).toFutureOption() } def updateConfirmationStatus(messageId: String, newStatus: DeliveryStatus, confirmationMsg: String): Future[Option[OutboundSoapMessage]] = { - logger.info(s"conf message is ${confirmationMsg}") val field: String = newStatus match { case DeliveryStatus.COD => "codMessage" case DeliveryStatus.COE => "coeMessage" } - findAndUpdate(Json.obj("messageId" -> messageId), - Json.obj("$set" -> Json.obj("status" -> newStatus.entryName, field -> confirmationMsg)), fetchNewObject = true) - .map(_.result[OutboundSoapMessage]) + + collection.withReadPreference(primaryPreferred) + .findOneAndUpdate(filter = equal("messageId", messageId), + update = combine(set("status", Codecs.toBson(newStatus.entryName)), set(field, confirmationMsg)), + options = FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER)) + .toFutureOption() } def findById(messageId: String): Future[Option[OutboundSoapMessage]] = { - find("messageId" -> JsString(messageId)).map(_.headOption) + collection.find(filter = equal("messageId", messageId)).headOption() .recover { case e: Exception => logger.warn(s"error finding message - ${e.getMessage}") diff --git a/conf/application.conf b/conf/application.conf index 6acd1ac..af722c3 100644 --- a/conf/application.conf +++ b/conf/application.conf @@ -46,7 +46,7 @@ play.http.errorHandler = "uk.gov.hmrc.play.bootstrap.backend.http.JsonErrorHandl # Play Modules # ~~~~ # Additional play modules can be added here -play.modules.enabled += "play.modules.reactivemongo.ReactiveMongoHmrcModule" +play.modules.enabled += "uk.gov.hmrc.mongo.play.PlayMongoModule" play.modules.enabled += "uk.gov.hmrc.apiplatformoutboundsoap.scheduled.SchedulerModule" play.modules.enabled += "uk.gov.hmrc.apiplatformoutboundsoap.scheduled.SchedulerPlayModule" diff --git a/it/uk/gov/hmrc/apiplatformoutboundsoap/repositories/OutboundMessageRepositoryISpec.scala b/it/uk/gov/hmrc/apiplatformoutboundsoap/repositories/OutboundMessageRepositoryISpec.scala index b9114c6..ab028a5 100644 --- a/it/uk/gov/hmrc/apiplatformoutboundsoap/repositories/OutboundMessageRepositoryISpec.scala +++ b/it/uk/gov/hmrc/apiplatformoutboundsoap/repositories/OutboundMessageRepositoryISpec.scala @@ -20,101 +20,120 @@ import akka.stream.Materializer import akka.stream.scaladsl.Sink import org.joda.time.DateTime import org.joda.time.DateTimeZone.UTC +import org.mongodb.scala.MongoWriteException +import org.mongodb.scala.ReadPreference.primaryPreferred +import org.mongodb.scala.bson.{BsonBoolean, BsonInt64} import org.scalatest.BeforeAndAfterEach import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec import org.scalatestplus.play.guice.GuiceOneAppPerSuite import play.api.Application import play.api.inject.guice.GuiceApplicationBuilder -import play.api.libs.json.{JsObject, Json} -import reactivemongo.api.ReadPreference -import reactivemongo.bson.BSONLong -import reactivemongo.core.errors.DatabaseException -import reactivemongo.play.json.ImplicitBSONHandlers.JsObjectDocumentWriter +import play.api.test.Helpers.{await, defaultAwaitTimeout} import uk.gov.hmrc.apiplatformoutboundsoap.models._ -import uk.gov.hmrc.mongo.RepositoryPreparation +import uk.gov.hmrc.mongo.play.json.PlayMongoRepository +import uk.gov.hmrc.mongo.test.PlayMongoRepositorySupport import java.util.UUID.randomUUID -class OutboundMessageRepositoryISpec extends AnyWordSpec with Matchers with RepositoryPreparation with BeforeAndAfterEach with GuiceOneAppPerSuite { - - protected def appBuilder: GuiceApplicationBuilder = - new GuiceApplicationBuilder() - .configure( - "mongodb.uri" -> s"mongodb://127.0.0.1:27017/test-${this.getClass.getSimpleName}" - ) +class OutboundMessageRepositoryISpec extends AnyWordSpec with PlayMongoRepositorySupport[OutboundSoapMessage] with + Matchers with BeforeAndAfterEach with GuiceOneAppPerSuite { + val serviceRepo = repository.asInstanceOf[OutboundMessageRepository] override implicit lazy val app: Application = appBuilder.build() - val repo: OutboundMessageRepository = app.injector.instanceOf[OutboundMessageRepository] val ccnHttpStatus: Int = 200 + val retryingMessage = RetryingOutboundSoapMessage(randomUUID, "MessageId-A1", "payload", "some url", + DateTime.now(UTC), DateTime.now(UTC), ccnHttpStatus) + val sentMessage = SentOutboundSoapMessage(randomUUID, "MessageId-A2", "payload", "some url", DateTime.now(UTC), ccnHttpStatus) implicit val materialiser: Materializer = app.injector.instanceOf[Materializer] + val failedMessage = FailedOutboundSoapMessage(randomUUID, "MessageId-A3", "payload", "some url", DateTime.now(UTC), ccnHttpStatus) + val coeMessage = CoeSoapMessage(randomUUID, "MessageId-A4", "payload", "some url", DateTime.now(UTC), ccnHttpStatus) + val codMessage = CodSoapMessage(randomUUID, "MessageId-A5", "payload", "some url", DateTime.now(UTC), ccnHttpStatus) override def beforeEach(): Unit = { - prepare(repo) + prepareDatabase() } - val retryingMessage = RetryingOutboundSoapMessage(randomUUID, "MessageId-A1", "payload", "some url", - DateTime.now(UTC), DateTime.now(UTC), ccnHttpStatus) - val sentMessage = SentOutboundSoapMessage(randomUUID, "MessageId-A2", "payload", "some url", DateTime.now(UTC), ccnHttpStatus) - val failedMessage = FailedOutboundSoapMessage(randomUUID, "MessageId-A3", "payload", "some url", DateTime.now(UTC), ccnHttpStatus) + protected def appBuilder: GuiceApplicationBuilder = + new GuiceApplicationBuilder() + .configure( + "mongodb.uri" -> s"mongodb://127.0.0.1:27017/test-${this.getClass.getSimpleName}" + ) + + override protected def repository: PlayMongoRepository[OutboundSoapMessage] = app.injector.instanceOf[OutboundMessageRepository] + "persist" should { "insert a retrying message when it does not exist" in { - await(repo.persist(retryingMessage)) - - val fetchedRecords = await(repo.findAll(ReadPreference.primaryPreferred)) - val Some(jsonRecord) = await(repo.collection.find(Json.obj(), Option.empty[JsObject]).one[JsObject]) - (jsonRecord \ "status").as[String] shouldBe "RETRYING" + await(serviceRepo.persist(retryingMessage)) + val fetchedRecords = await(serviceRepo.collection.withReadPreference(primaryPreferred).find().toFuture()) fetchedRecords.size shouldBe 1 fetchedRecords.head shouldBe retryingMessage + fetchedRecords.head.status shouldBe SendingStatus.RETRYING } "insert a sent message when it does not exist" in { - await(repo.persist(sentMessage)) + await(serviceRepo.persist(sentMessage)) - val fetchedRecords = await(repo.findAll(ReadPreference.primaryPreferred)) - val Some(jsonRecord) = await(repo.collection.find(Json.obj(), Option.empty[JsObject]).one[JsObject]) - (jsonRecord \ "status").as[String] shouldBe "SENT" + val fetchedRecords = await(serviceRepo.collection.withReadPreference(primaryPreferred).find().toFuture()) fetchedRecords.size shouldBe 1 fetchedRecords.head shouldBe sentMessage + fetchedRecords.head.status shouldBe SendingStatus.SENT } "insert a failed message when it does not exist" in { - await(repo.persist(failedMessage)) + await(serviceRepo.persist(failedMessage)) - val fetchedRecords = await(repo.findAll(ReadPreference.primaryPreferred)) - val Some(jsonRecord) = await(repo.collection.find(Json.obj(), Option.empty[JsObject]).one[JsObject]) - (jsonRecord \ "status").as[String] shouldBe "FAILED" + val fetchedRecords = await(serviceRepo.collection.withReadPreference(primaryPreferred).find().toFuture()) fetchedRecords.size shouldBe 1 fetchedRecords.head shouldBe failedMessage + fetchedRecords.head.status shouldBe SendingStatus.FAILED + } + "insert a confirmation of exception message when it does not exist" in { + await(serviceRepo.persist(coeMessage)) + + val fetchedRecords = await(serviceRepo.collection.withReadPreference(primaryPreferred).find().toFuture()) + + fetchedRecords.size shouldBe 1 + fetchedRecords.head shouldBe coeMessage + fetchedRecords.head.status shouldBe DeliveryStatus.COE + } + "insert a confirmation of delivery message when it does not exist" in { + await(serviceRepo.persist(codMessage)) + + val fetchedRecords = await(serviceRepo.collection.withReadPreference(primaryPreferred).find().toFuture()) + + fetchedRecords.size shouldBe 1 + fetchedRecords.head shouldBe codMessage + fetchedRecords.head.status shouldBe DeliveryStatus.COD } "message is persisted with TTL" in { - await(repo.persist(sentMessage)) + await(serviceRepo.persist(sentMessage)) - val Some(ttlIndex) = await(repo.collection.indexesManager.list()).find(i => i.name.contains("ttlIndex")) - ttlIndex.unique shouldBe false - ttlIndex.background shouldBe true - ttlIndex.options.get("expireAfterSeconds") shouldBe Some(BSONLong(60 * 60 * 24 * 30)) + val Some(ttlIndex) = await(serviceRepo.collection.listIndexes().toFuture()).find(i => i.get("name").get.asString().getValue == "ttlIndex") + ttlIndex.get("unique") shouldBe None + ttlIndex.get("background").get shouldBe BsonBoolean(true) + ttlIndex.get("expireAfterSeconds") shouldBe Some(BsonInt64(60 * 60 * 24 * 30)) } "message is persisted with unique ID" in { - await(repo.persist(sentMessage)) + await(serviceRepo.persist(sentMessage)) - val Some(globalIdIndex) = await(repo.collection.indexesManager.list()).find(i => i.name.contains("globalIdIndex")) - globalIdIndex.background shouldBe true - globalIdIndex.unique shouldBe true + val Some(globalIdIndex) = await(serviceRepo.collection.listIndexes().toFuture()).find(i => i.get("name").get.asString().getValue == "globalIdIndex") + globalIdIndex.get("unique") shouldBe Some(BsonBoolean(true)) + globalIdIndex.get("background").get shouldBe BsonBoolean(true) } "fail when a message with the same ID already exists" in { - await(repo.persist(retryingMessage)) + await(serviceRepo.persist(retryingMessage)) - val exception: DatabaseException = intercept[DatabaseException] { - await(repo.persist(retryingMessage)) + val exception: MongoWriteException = intercept[MongoWriteException] { + await(serviceRepo.persist(retryingMessage)) } exception.getMessage should include("E11000 duplicate key error collection") @@ -123,10 +142,10 @@ class OutboundMessageRepositoryISpec extends AnyWordSpec with Matchers with Repo "retrieveMessagesForRetry" should { "retrieve retrying messages and ignore sent messages" in { - await(repo.persist(retryingMessage)) - await(repo.persist(sentMessage)) + await(serviceRepo.persist(retryingMessage)) + await(serviceRepo.persist(sentMessage)) - val fetchedRecords = await(repo.retrieveMessagesForRetry.runWith(Sink.seq[RetryingOutboundSoapMessage])) + val fetchedRecords = await(serviceRepo.retrieveMessagesForRetry.runWith(Sink.seq[RetryingOutboundSoapMessage])) fetchedRecords.size shouldBe 1 fetchedRecords.head shouldBe retryingMessage } @@ -136,21 +155,21 @@ class OutboundMessageRepositoryISpec extends AnyWordSpec with Matchers with Repo randomUUID, "MessageId-A1", "payload", "some url", DateTime.now(UTC), DateTime.now(UTC).plusHours(1), ccnHttpStatus) - await(repo.persist(retryingMessageNotReadyForRetrying)) - await(repo.persist(sentMessage)) + await(serviceRepo.persist(retryingMessageNotReadyForRetrying)) + await(serviceRepo.persist(sentMessage)) - val fetchedRecords = await(repo.retrieveMessagesForRetry.runWith(Sink.seq[RetryingOutboundSoapMessage])) + val fetchedRecords = await(serviceRepo.retrieveMessagesForRetry.runWith(Sink.seq[RetryingOutboundSoapMessage])) fetchedRecords.size shouldBe 0 } "retrieve retrying messages with retryDate in ascending order" in { val retryingMessageOldRetryDatetime = retryingMessage.copy(globalId = randomUUID, retryDateTime = retryingMessage.retryDateTime.minusHours(1)) val retryingMessageEvenOlderRetryDatetime = retryingMessage.copy(globalId = randomUUID, retryDateTime = retryingMessage.retryDateTime.minusHours(2)) - await(repo.persist(retryingMessageEvenOlderRetryDatetime)) - await(repo.persist(retryingMessage)) - await(repo.persist(retryingMessageOldRetryDatetime)) + await(serviceRepo.persist(retryingMessageEvenOlderRetryDatetime)) + await(serviceRepo.persist(retryingMessage)) + await(serviceRepo.persist(retryingMessageOldRetryDatetime)) - val fetchedRecords = await(repo.retrieveMessagesForRetry.runWith(Sink.seq[RetryingOutboundSoapMessage])) + val fetchedRecords = await(serviceRepo.retrieveMessagesForRetry.runWith(Sink.seq[RetryingOutboundSoapMessage])) fetchedRecords.size shouldBe 3 fetchedRecords.head shouldBe retryingMessageEvenOlderRetryDatetime fetchedRecords(1) shouldBe retryingMessageOldRetryDatetime @@ -160,20 +179,20 @@ class OutboundMessageRepositoryISpec extends AnyWordSpec with Matchers with Repo "updateNextRetryTime" should { "update the retryDateTime on a record given its globalID" in { - await(repo.persist(retryingMessage)) + await(serviceRepo.persist(retryingMessage)) val newRetryDateTime = retryingMessage.retryDateTime.minusHours(2) - await(repo.updateNextRetryTime(retryingMessage.globalId, newRetryDateTime)) + await(serviceRepo.updateNextRetryTime(retryingMessage.globalId, newRetryDateTime)) - val fetchedRecords = await(repo.retrieveMessagesForRetry.runWith(Sink.seq[RetryingOutboundSoapMessage])) + val fetchedRecords = await(serviceRepo.retrieveMessagesForRetry.runWith(Sink.seq[RetryingOutboundSoapMessage])) fetchedRecords.size shouldBe 1 fetchedRecords.head.retryDateTime shouldBe newRetryDateTime } "updated message is returned from the database after updating retryDateTime" in { - await(repo.persist(retryingMessage)) + await(serviceRepo.persist(retryingMessage)) val newRetryDateTime = retryingMessage.retryDateTime.minusHours(2) - val Some(updatedMessage) = await(repo.updateNextRetryTime(retryingMessage.globalId, newRetryDateTime)) + val Some(updatedMessage) = await(serviceRepo.updateNextRetryTime(retryingMessage.globalId, newRetryDateTime)) updatedMessage.retryDateTime shouldBe newRetryDateTime } @@ -181,10 +200,10 @@ class OutboundMessageRepositoryISpec extends AnyWordSpec with Matchers with Repo "updateStatus" should { "update the message to have a status of FAILED" in { - await(repo.persist(retryingMessage)) - val Some(returnedSoapMessage) = await(repo.updateSendingStatus(retryingMessage.globalId, SendingStatus.FAILED)) + await(serviceRepo.persist(retryingMessage)) + val Some(returnedSoapMessage) = await(serviceRepo.updateSendingStatus(retryingMessage.globalId, SendingStatus.FAILED)) - val fetchedRecords = await(repo.findAll(ReadPreference.primaryPreferred)) + val fetchedRecords = await(serviceRepo.collection.withReadPreference(primaryPreferred).find.toFuture()) fetchedRecords.size shouldBe 1 fetchedRecords.head.status shouldBe SendingStatus.FAILED fetchedRecords.head.isInstanceOf[FailedOutboundSoapMessage] shouldBe true @@ -193,10 +212,10 @@ class OutboundMessageRepositoryISpec extends AnyWordSpec with Matchers with Repo } "update the message to have a status of SENT" in { - await(repo.persist(retryingMessage)) - val Some(returnedSoapMessage) = await(repo.updateSendingStatus(retryingMessage.globalId, SendingStatus.SENT)) + await(serviceRepo.persist(retryingMessage)) + val Some(returnedSoapMessage) = await(serviceRepo.updateSendingStatus(retryingMessage.globalId, SendingStatus.SENT)) - val fetchedRecords = await(repo.findAll(ReadPreference.primaryPreferred)) + val fetchedRecords = await(serviceRepo.collection.withReadPreference(primaryPreferred).find.toFuture()) fetchedRecords.size shouldBe 1 fetchedRecords.head.status shouldBe SendingStatus.SENT fetchedRecords.head.isInstanceOf[SentOutboundSoapMessage] shouldBe true @@ -208,44 +227,38 @@ class OutboundMessageRepositoryISpec extends AnyWordSpec with Matchers with Repo "updateConfirmationStatus" should { val expectedConfirmationMessageBody = "foobar" "update a message when a CoE is received" in { - await(repo.persist(sentMessage)) - await(repo.updateConfirmationStatus(sentMessage.messageId, DeliveryStatus.COE, expectedConfirmationMessageBody)) + await(serviceRepo.persist(sentMessage)) + await(serviceRepo.updateConfirmationStatus(sentMessage.messageId, DeliveryStatus.COE, expectedConfirmationMessageBody)) - val fetchedRecords = await(repo.findAll(ReadPreference.primaryPreferred)) + val fetchedRecords = await(serviceRepo.collection.withReadPreference(primaryPreferred).find.toFuture()) fetchedRecords.size shouldBe 1 fetchedRecords.head.status shouldBe DeliveryStatus.COE fetchedRecords.head.asInstanceOf[CoeSoapMessage].coeMessage shouldBe Some(expectedConfirmationMessageBody) } "update a message when a CoD is received" in { - await(repo.persist(sentMessage)) - await(repo.updateConfirmationStatus(sentMessage.messageId, DeliveryStatus.COD, expectedConfirmationMessageBody)) + await(serviceRepo.persist(sentMessage)) + await(serviceRepo.updateConfirmationStatus(sentMessage.messageId, DeliveryStatus.COD, expectedConfirmationMessageBody)) + + val fetchedRecords = await(serviceRepo.collection.withReadPreference(primaryPreferred).find.toFuture()) - val fetchedRecords = await(repo.findAll(ReadPreference.primaryPreferred)) fetchedRecords.size shouldBe 1 fetchedRecords.head.status shouldBe DeliveryStatus.COD fetchedRecords.head.asInstanceOf[CodSoapMessage].codMessage shouldBe Some(expectedConfirmationMessageBody) } - - "return empty Option when asked to update a message that doesn't exist" in { - val fetchedRecords = await(repo.findAll(ReadPreference.primaryPreferred)) - fetchedRecords.size shouldBe 0 - val updated: Option[OutboundSoapMessage] = await(repo.updateConfirmationStatus( - "dummy id for not found", DeliveryStatus.COD, expectedConfirmationMessageBody)) - updated.isEmpty shouldBe true - } } "findById" should { "return message when ID exists" in { - await(repo.persist(sentMessage)) - val found: Option[OutboundSoapMessage] = await(repo.findById(sentMessage.messageId)) + await(serviceRepo.persist(sentMessage)) + val found: Option[OutboundSoapMessage] = await(serviceRepo.findById(sentMessage.messageId)) found shouldBe Some(sentMessage) } - "return nothing when ID does not exist" in { - val found: Option[OutboundSoapMessage] = await(repo.findById(sentMessage.messageId)) + "return nothing when ID does not exist" in { + val found: Option[OutboundSoapMessage] = await(serviceRepo.findById(sentMessage.messageId)) found shouldBe None } } } + diff --git a/project/AppDependencies.scala b/project/AppDependencies.scala index 659ab66..c68743b 100644 --- a/project/AppDependencies.scala +++ b/project/AppDependencies.scala @@ -3,11 +3,17 @@ import play.sbt.PlayImport.caffeine import sbt._ object AppDependencies { + val akkaVersion = "2.6.14" val compile = Seq( "uk.gov.hmrc" %% "bootstrap-backend-play-27" % "3.0.0", - "uk.gov.hmrc" %% "simple-reactivemongo" % "7.30.0-play-27", + "uk.gov.hmrc.mongo" %% "hmrc-mongo-play-27" % "0.50.0", "uk.gov.hmrc" %% "play-scheduling-play-27" % "7.10.0", + "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "3.0.1", + "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, + "com.typesafe.akka" %% "akka-actor" % akkaVersion, + "com.typesafe.akka" %% "akka-stream" % akkaVersion, + "com.typesafe.akka" %% "akka-protobuf-v3" % akkaVersion, "org.reactivemongo" %% "reactivemongo-akkastream" % "0.20.13", "org.apache.axis2" % "axis2-kernel" % "1.7.9", "org.apache.wss4j" % "wss4j-ws-security-dom" % "2.3.0", @@ -25,7 +31,7 @@ object AppDependencies { "org.scalatestplus.play" %% "scalatestplus-play" % "4.0.3" % "test, it", "com.github.tomakehurst" % "wiremock" % "2.25.1" % "it", "org.xmlunit" % "xmlunit-core" % "2.8.1" % "test, it", - "uk.gov.hmrc" %% "reactivemongo-test" % "4.21.0-play-26" % "it" + "uk.gov.hmrc.mongo" %% "hmrc-mongo-test-play-27" % "0.50.0" % "it" ) val jettyVersion = "9.2.24.v20180105" diff --git a/test/uk/gov/hmrc/apiplatformoutboundsoap/models/OutboundSoapMessageSpec.scala b/test/uk/gov/hmrc/apiplatformoutboundsoap/models/OutboundSoapMessageSpec.scala new file mode 100644 index 0000000..df2fb32 --- /dev/null +++ b/test/uk/gov/hmrc/apiplatformoutboundsoap/models/OutboundSoapMessageSpec.scala @@ -0,0 +1,66 @@ +/* + * Copyright 2021 HM Revenue & Customs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package uk.gov.hmrc.apiplatformoutboundsoap.models + +import org.joda.time.DateTime +import org.mockito.{ArgumentMatchersSugar, MockitoSugar} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import java.util.UUID + +class OutboundSoapMessageSpec extends AnyWordSpec with Matchers with MockitoSugar with ArgumentMatchersSugar { + + private val ccnHttpStatus: Int = 200 + private val now = DateTime.now + val retryingMessage = RetryingOutboundSoapMessage(UUID.randomUUID(), "11111", "some retrying message", "some destination url", now, now, ccnHttpStatus) + val failedMessage = FailedOutboundSoapMessage(UUID.randomUUID(), "22222", "failed message", "some destination url", now, ccnHttpStatus) + val sentMessage = SentOutboundSoapMessage(UUID.randomUUID(), "33333", "sent message", "some destination url", now, ccnHttpStatus) + val coeMessage = CoeSoapMessage(UUID.randomUUID(), "44444", "coe message", "some destination url", now, ccnHttpStatus) + val codMessage = CodSoapMessage(UUID.randomUUID(), "55555", "cod message", "some destination url", now, ccnHttpStatus) + + "typeNaming" should { + "return correct type for RetryingOutboundSoapMessage" in { + val typeName = OutboundSoapMessage.typeToStatus(retryingMessage.getClass.getCanonicalName) + typeName shouldBe SendingStatus.RETRYING + } + + "return correct type for FailedOutboundSoapMessage" in { + val typeName = OutboundSoapMessage.typeToStatus(failedMessage.getClass.getCanonicalName) + typeName shouldBe SendingStatus.FAILED + } + + "return correct type for SentOutboundSoapMessage" in { + val typeName = OutboundSoapMessage.typeToStatus(sentMessage.getClass.getCanonicalName) + typeName shouldBe SendingStatus.SENT + } + "return correct type for CoeSoapMessage" in { + val typeName = OutboundSoapMessage.typeToStatus(coeMessage.getClass.getCanonicalName) + typeName shouldBe DeliveryStatus.COE + } + "return correct type for CodSoapMessage" in { + val typeName = OutboundSoapMessage.typeToStatus(codMessage.getClass.getCanonicalName) + typeName shouldBe DeliveryStatus.COD + } + "throw exception for invalid type" in { + val e: IllegalArgumentException = intercept[IllegalArgumentException] { + OutboundSoapMessage.typeToStatus("a string".getClass.getCanonicalName) + } + e.getMessage should include ("java.lang.String is not a valid class") + } + } +} diff --git a/test/uk/gov/hmrc/apiplatformoutboundsoap/repositories/MongoFormatterSpec.scala b/test/uk/gov/hmrc/apiplatformoutboundsoap/repositories/MongoFormatterSpec.scala new file mode 100644 index 0000000..fb1c20f --- /dev/null +++ b/test/uk/gov/hmrc/apiplatformoutboundsoap/repositories/MongoFormatterSpec.scala @@ -0,0 +1,58 @@ +/* + * Copyright 2021 HM Revenue & Customs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package uk.gov.hmrc.apiplatformoutboundsoap.repositories + +import org.joda.time.DateTime +import org.mockito.{ArgumentMatchersSugar, MockitoSugar} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec +import play.api.libs.json.{JsObject, JsString} +import uk.gov.hmrc.apiplatformoutboundsoap.models.{CodSoapMessage, CoeSoapMessage, FailedOutboundSoapMessage, RetryingOutboundSoapMessage, SentOutboundSoapMessage} + +import java.util.UUID + +class MongoFormatterSpec extends AnyWordSpec with Matchers with MockitoSugar with ArgumentMatchersSugar { + + "format" should { + val formatter = MongoFormatter.outboundSoapMessageWrites + "correctly write a COD message" in { + val msgJson: JsObject = formatter.writes(CodSoapMessage(UUID.randomUUID(), "12334", "some cod message", "some destination url", DateTime.now, 200, Some("notify url"), Some("msg"))) + msgJson.values.size shouldBe 9 + msgJson.value.get("status") shouldBe Some(JsString("COD")) + } + "correctly write a COE message" in { + val msgJson: JsObject = formatter.writes(CoeSoapMessage(UUID.randomUUID(), "12334", "some coe message", "some destination url", DateTime.now, 200, Some("notify url"), Some("msg"))) + msgJson.values.size shouldBe 9 + msgJson.value.get("status") shouldBe Some(JsString("COE")) + } + "correctly write a SENT message" in { + val msgJson: JsObject = formatter.writes(SentOutboundSoapMessage(UUID.randomUUID(), "12334", "sent message", "some destination url", DateTime.now, 200, Some("notify url"), Some("msg"))) + msgJson.values.size shouldBe 9 + msgJson.value.get("status") shouldBe Some(JsString("SENT")) + } + "correctly write a FAILED message" in { + val msgJson: JsObject = formatter.writes(FailedOutboundSoapMessage(UUID.randomUUID(), "12334", "failed message", "some destination url", DateTime.now, 200, Some("notify url"), Some("msg"))) + msgJson.values.size shouldBe 9 + msgJson.value.get("status") shouldBe Some(JsString("FAILED")) + } + "correctly write a RETRYING message" in { + val msgJson: JsObject = formatter.writes(RetryingOutboundSoapMessage(UUID.randomUUID(), "12334", "retrying message", "some destination url", DateTime.now,DateTime.now, 200, Some("notify url"), Some("msg"))) + msgJson.values.size shouldBe 10 + msgJson.value.get("status") shouldBe Some(JsString("RETRYING")) + } + } +} diff --git a/test/uk/gov/hmrc/apiplatformoutboundsoap/services/OutboundServiceSpec.scala b/test/uk/gov/hmrc/apiplatformoutboundsoap/services/OutboundServiceSpec.scala index 4c536e0..8187b75 100644 --- a/test/uk/gov/hmrc/apiplatformoutboundsoap/services/OutboundServiceSpec.scala +++ b/test/uk/gov/hmrc/apiplatformoutboundsoap/services/OutboundServiceSpec.scala @@ -17,11 +17,13 @@ package uk.gov.hmrc.apiplatformoutboundsoap.services import akka.stream.Materializer -import akka.stream.scaladsl.Source.{fromFutureSource, fromIterator} +import akka.stream.scaladsl.Source.{fromIterator, single} +import com.mongodb.client.result.InsertOneResult import org.apache.axiom.soap.SOAPEnvelope import org.joda.time.DateTime import org.joda.time.DateTimeZone.UTC import org.mockito.{ArgumentCaptor, ArgumentMatchersSugar, MockitoSugar} +import org.mongodb.scala.bson.BsonNumber import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec import org.scalatestplus.play.guice.GuiceOneAppPerSuite @@ -42,6 +44,7 @@ import java.util.UUID import java.util.UUID.randomUUID import javax.wsdl.WSDLException import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future import scala.concurrent.Future.successful import scala.concurrent.duration.Duration @@ -166,7 +169,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS "return the outbound soap message for success" in new Setup { when(wsSecurityServiceMock.addUsernameToken(*)).thenReturn(expectedSoapEnvelope()) when(outboundConnectorMock.postMessage(*)).thenReturn(successful(OK)) - when(outboundMessageRepositoryMock.persist(*)(*)).thenReturn(successful(())) + when(outboundMessageRepositoryMock.persist(*)).thenReturn(Future(InsertOneResult.acknowledged(BsonNumber(1)))) val result: OutboundSoapMessage = await(underTest.sendMessage(messageRequestFullAddressing)) @@ -180,7 +183,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS "get the WSDL definition from cache" in new Setup { when(wsSecurityServiceMock.addUsernameToken(*)).thenReturn(expectedSoapEnvelope()) when(outboundConnectorMock.postMessage(*)).thenReturn(successful(OK)) - when(outboundMessageRepositoryMock.persist(*)(*)).thenReturn(successful(())) + when(outboundMessageRepositoryMock.persist(*)).thenReturn(Future(InsertOneResult.acknowledged(BsonNumber(1)))) await(underTest.sendMessage(messageRequestFullAddressing)) @@ -190,7 +193,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS "return the outbound soap message for failure" in new Setup { when(wsSecurityServiceMock.addUsernameToken(*)).thenReturn(expectedSoapEnvelope()) when(outboundConnectorMock.postMessage(*)).thenReturn(successful(BAD_REQUEST)) - when(outboundMessageRepositoryMock.persist(*)(*)).thenReturn(successful(())) + when(outboundMessageRepositoryMock.persist(*)).thenReturn(Future(InsertOneResult.acknowledged(BsonNumber(1)))) val expectedInterval = Duration("10s") when(appConfigMock.retryInterval).thenReturn(expectedInterval) @@ -209,7 +212,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS when(wsSecurityServiceMock.addUsernameToken(*)).thenReturn(expectedSoapEnvelope()) when(outboundConnectorMock.postMessage(*)).thenReturn(successful(httpCode)) val messageCaptor: ArgumentCaptor[OutboundSoapMessage] = ArgumentCaptor.forClass(classOf[OutboundSoapMessage]) - when(outboundMessageRepositoryMock.persist(messageCaptor.capture())(*)).thenReturn(successful(())) + when(outboundMessageRepositoryMock.persist(messageCaptor.capture())).thenReturn(Future(InsertOneResult.acknowledged(BsonNumber(1)))) await(underTest.sendMessage(messageRequestFullAddressing)) messageCaptor.getValue.status shouldBe SendingStatus.SENT @@ -227,7 +230,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS when(wsSecurityServiceMock.addUsernameToken(*)).thenReturn(expectedSoapEnvelope()) when(outboundConnectorMock.postMessage(*)).thenReturn(successful(httpCode)) val messageCaptor: ArgumentCaptor[OutboundSoapMessage] = ArgumentCaptor.forClass(classOf[OutboundSoapMessage]) - when(outboundMessageRepositoryMock.persist(messageCaptor.capture())(*)).thenReturn(successful(())) + when(outboundMessageRepositoryMock.persist(messageCaptor.capture())).thenReturn(Future(InsertOneResult.acknowledged(BsonNumber(1)))) val expectedInterval = Duration("10s") when(appConfigMock.retryInterval).thenReturn(expectedInterval) @@ -247,7 +250,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS "send the SOAP envelope returned from the security service to the connector" in new Setup { when(wsSecurityServiceMock.addUsernameToken(*)).thenReturn(expectedSoapEnvelope()) - when(outboundMessageRepositoryMock.persist(*)(*)).thenReturn(successful(())) + when(outboundMessageRepositoryMock.persist(*)).thenReturn(Future(InsertOneResult.acknowledged(BsonNumber(1)))) val messageCaptor: ArgumentCaptor[SoapRequest] = ArgumentCaptor.forClass(classOf[SoapRequest]) when(outboundConnectorMock.postMessage(messageCaptor.capture())).thenReturn(successful(expectedStatus)) @@ -259,7 +262,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS "resolve destination url not having port when sending the SOAP envelope returned from the security service to the connector" in new Setup { when(wsSecurityServiceMock.addUsernameToken(*)).thenReturn(expectedSoapEnvelope()) - when(outboundMessageRepositoryMock.persist(*)(*)).thenReturn(successful(())) + when(outboundMessageRepositoryMock.persist(*)).thenReturn(Future(InsertOneResult.acknowledged(BsonNumber(1)))) val messageCaptor: ArgumentCaptor[SoapRequest] = ArgumentCaptor.forClass(classOf[SoapRequest]) when(outboundConnectorMock.postMessage(messageCaptor.capture())).thenReturn(successful(expectedStatus)) @@ -271,7 +274,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS when(wsSecurityServiceMock.addUsernameToken(*)).thenReturn(expectedSoapEnvelopeWithEmptyBodyRequest()) when(outboundConnectorMock.postMessage(*)).thenReturn(successful(200)) val messageCaptor: ArgumentCaptor[OutboundSoapMessage] = ArgumentCaptor.forClass(classOf[OutboundSoapMessage]) - when(outboundMessageRepositoryMock.persist(messageCaptor.capture())(*)).thenReturn(successful(())) + when(outboundMessageRepositoryMock.persist(messageCaptor.capture())).thenReturn(Future(InsertOneResult.acknowledged(BsonNumber(1)))) await(underTest.sendMessage(messageRequestEmptyBody)) messageCaptor.getValue.status shouldBe SendingStatus.SENT @@ -285,7 +288,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS val messageCaptor: ArgumentCaptor[SOAPEnvelope] = ArgumentCaptor.forClass(classOf[SOAPEnvelope]) when(wsSecurityServiceMock.addUsernameToken(messageCaptor.capture())).thenReturn(expectedSoapEnvelope(mandatoryAddressingHeaders)) when(outboundConnectorMock.postMessage(*)).thenReturn(successful(expectedStatus)) - when(outboundMessageRepositoryMock.persist(*)(*)).thenReturn(successful(())) + when(outboundMessageRepositoryMock.persist(*)).thenReturn(Future(InsertOneResult.acknowledged(BsonNumber(1)))) await(underTest.sendMessage(messageRequestMinimalAddressing)) getXmlDiff(messageCaptor.getValue.toString, expectedSoapEnvelope(mandatoryAddressingHeaders)).build().getDifferences.forEach(d => println(d)) @@ -298,7 +301,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS when(appConfigMock.enableMessageSigning).thenReturn(true) when(wsSecurityServiceMock.addSignature(messageCaptor.capture())).thenReturn(expectedSoapEnvelope(mandatoryAddressingHeaders)) when(outboundConnectorMock.postMessage(*)).thenReturn(successful(expectedStatus)) - when(outboundMessageRepositoryMock.persist(*)(*)).thenReturn(successful(())) + when(outboundMessageRepositoryMock.persist(*)).thenReturn(Future(InsertOneResult.acknowledged(BsonNumber(1)))) await(underTest.sendMessage(messageRequestMinimalAddressing)) getXmlDiff(messageCaptor.getValue.toString, expectedSoapEnvelope(mandatoryAddressingHeaders)).build().getDifferences.forEach(d => println(d)) @@ -311,7 +314,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS val persistCaptor: ArgumentCaptor[OutboundSoapMessage] = ArgumentCaptor.forClass(classOf[OutboundSoapMessage]) when(wsSecurityServiceMock.addUsernameToken(messageCaptor.capture())).thenReturn(expectedSoapEnvelope(mandatoryAddressingHeaders)) when(outboundConnectorMock.postMessage(*)).thenReturn(successful(expectedStatus)) - when(outboundMessageRepositoryMock.persist(persistCaptor.capture())(*)).thenReturn(successful(())) + when(outboundMessageRepositoryMock.persist(persistCaptor.capture())).thenReturn(Future(InsertOneResult.acknowledged(BsonNumber(1)))) await(underTest.sendMessage(messageRequestMinimalAddressing)) persistCaptor.getValue.soapMessage shouldBe expectedSoapEnvelope(mandatoryAddressingHeaders) @@ -322,7 +325,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS when(wsSecurityServiceMock.addUsernameToken(*)).thenReturn(expectedSoapEnvelope(allAddressingHeaders)) when(outboundConnectorMock.postMessage(*)).thenReturn(successful(expectedStatus)) val messageCaptor: ArgumentCaptor[OutboundSoapMessage] = ArgumentCaptor.forClass(classOf[OutboundSoapMessage]) - when(outboundMessageRepositoryMock.persist(messageCaptor.capture())(*)).thenReturn(successful(())) + when(outboundMessageRepositoryMock.persist(messageCaptor.capture())).thenReturn(Future(InsertOneResult.acknowledged(BsonNumber(1)))) await(underTest.sendMessage(messageRequestMinimalAddressing)) @@ -334,7 +337,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS when(outboundConnectorMock.postMessage(*)).thenReturn(successful(INTERNAL_SERVER_ERROR)) when(appConfigMock.retryInterval).thenReturn(Duration("1s")) val messageCaptor: ArgumentCaptor[OutboundSoapMessage] = ArgumentCaptor.forClass(classOf[OutboundSoapMessage]) - when(outboundMessageRepositoryMock.persist(messageCaptor.capture())(*)).thenReturn(successful(())) + when(outboundMessageRepositoryMock.persist(messageCaptor.capture())).thenReturn(Future(InsertOneResult.acknowledged(BsonNumber(1)))) await(underTest.sendMessage(messageRequestMinimalAddressing)) @@ -405,8 +408,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS when(appConfigMock.retryInterval).thenReturn(Duration("1s")) when(outboundConnectorMock.postMessage(*)).thenReturn(successful(OK)) when(outboundMessageRepositoryMock.updateSendingStatus(*, *)).thenReturn(successful(None)) - when(outboundMessageRepositoryMock.retrieveMessagesForRetry). - thenReturn(fromFutureSource(successful(fromIterator(() => Seq(retryingMessage).toIterator)))) + when(outboundMessageRepositoryMock.retrieveMessagesForRetry).thenReturn(single(retryingMessage)) await(underTest.retryMessages) @@ -424,7 +426,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS .thenReturn(successful(OK)) when(outboundMessageRepositoryMock.updateSendingStatus(*, *)).thenReturn(successful(None)) when(outboundMessageRepositoryMock.retrieveMessagesForRetry). - thenReturn(fromFutureSource(successful(fromIterator(() => Seq(retryingMessage, anotherRetryingMessage).toIterator)))) + thenReturn(fromIterator(() => Seq(retryingMessage, anotherRetryingMessage).iterator)) intercept[Exception](await(underTest.retryMessages)) @@ -441,7 +443,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS when(outboundMessageRepositoryMock.updateSendingStatus(*, *)).thenReturn(successful(None)) when(outboundMessageRepositoryMock.updateNextRetryTime(*, *)).thenReturn(successful(None)) when(outboundMessageRepositoryMock.retrieveMessagesForRetry). - thenReturn(fromFutureSource(successful(fromIterator(() => Seq(retryingMessage).toIterator)))) + thenReturn(fromIterator(() => Seq(retryingMessage).toIterator)) await(underTest.retryMessages) @@ -458,7 +460,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS when(outboundConnectorMock.postMessage(*)).thenReturn(successful(INTERNAL_SERVER_ERROR)) when(outboundMessageRepositoryMock.updateSendingStatus(*, *)).thenReturn(successful(None)) when(outboundMessageRepositoryMock.retrieveMessagesForRetry). - thenReturn(fromFutureSource(successful(fromIterator(() => Seq(retryingMessage).toIterator)))) + thenReturn(fromIterator(() => Seq(retryingMessage).toIterator)) await(underTest.retryMessages) @@ -476,7 +478,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS when(outboundConnectorMock.postMessage(*)).thenReturn(successful(OK)) when(outboundMessageRepositoryMock.updateSendingStatus(*, *)).thenReturn(successful(Some(sentMessageForNotification))) when(outboundMessageRepositoryMock.retrieveMessagesForRetry). - thenReturn(fromFutureSource(successful(fromIterator(() => Seq(retryingMessage).toIterator)))) + thenReturn(fromIterator(() => Seq(retryingMessage).toIterator)) await(underTest.retryMessages) @@ -494,7 +496,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS when(outboundConnectorMock.postMessage(*)).thenReturn(successful(INTERNAL_SERVER_ERROR)) when(outboundMessageRepositoryMock.updateSendingStatus(*, *)).thenReturn(successful(Some(failedMessageForNotification))) when(outboundMessageRepositoryMock.retrieveMessagesForRetry). - thenReturn(fromFutureSource(successful(fromIterator(() => Seq(retryingMessage).toIterator)))) + thenReturn(fromIterator(() => Seq(retryingMessage).toIterator)) await(underTest.retryMessages) @@ -512,7 +514,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS when(outboundConnectorMock.postMessage(*)).thenReturn(successful(OK)) when(outboundMessageRepositoryMock.updateSendingStatus(*, *)).thenReturn(successful(Some(failedMessageForNotification))) when(outboundMessageRepositoryMock.retrieveMessagesForRetry). - thenReturn(fromFutureSource(successful(fromIterator(() => Seq(retryingMessage).toIterator)))) + thenReturn(fromIterator(() => Seq(retryingMessage).toIterator)) when(notificationCallbackConnectorMock.sendNotification(*)(*)).thenReturn(successful(Some(INTERNAL_SERVER_ERROR))) await(underTest.retryMessages)