Skip to content

Commit

Permalink
Apid 127 wip (#43)
Browse files Browse the repository at this point in the history
* Part migration to hmrc_mongo but with failing JSON serialise

* Adds JSON reads and writes for all message types.

* Remove unnecessary implicit

* APID-127: repo integration tests rewritten to work with new Mongo driver

* APID-127: merged master; updated repository to use messageId field where confirmation status update saved

* Tidy imports

* APID-127: Adds tests for typeToStatus and JSON writes

* APID-127: tidying
  • Loading branch information
worthydolt authored Jun 29, 2021
1 parent bb511d5 commit 69b5098
Show file tree
Hide file tree
Showing 9 changed files with 415 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object OutboundSoapMessage {
} else if (fullyQualifiedName == classTag[CodSoapMessage].runtimeClass.getCanonicalName) {
DeliveryStatus.COD
} else {
throw new IllegalArgumentException
throw new IllegalArgumentException(s"${fullyQualifiedName} is not a valid class")
}
}
}
Expand Down Expand Up @@ -127,10 +127,10 @@ case class RetryingOutboundSoapMessage(globalId: UUID,
sealed abstract class StatusType extends EnumEntry

object StatusType extends Enum[StatusType] with PlayJsonEnum[StatusType]{
val values = findValues
val values: immutable.IndexedSeq[StatusType] = findValues
}

sealed abstract class DeliveryStatus extends StatusType
sealed abstract class DeliveryStatus(override val entryName: String) extends StatusType

object DeliveryStatus extends Enum[DeliveryStatus] with PlayJsonEnum[DeliveryStatus] {
def fromAction(action: String): DeliveryStatus = {
Expand All @@ -142,19 +142,19 @@ object DeliveryStatus extends Enum[DeliveryStatus] with PlayJsonEnum[DeliverySta
}
val values: immutable.IndexedSeq[DeliveryStatus] = findValues

case object COE extends DeliveryStatus
case object COE extends DeliveryStatus("COE")

case object COD extends DeliveryStatus
case object COD extends DeliveryStatus("COD")
}

sealed abstract class SendingStatus extends StatusType
sealed abstract class SendingStatus(override val entryName: String) extends StatusType

object SendingStatus extends Enum[SendingStatus] with PlayJsonEnum[SendingStatus] {
val values: immutable.IndexedSeq[SendingStatus] = findValues

case object SENT extends SendingStatus
case object SENT extends SendingStatus("SENT")

case object FAILED extends SendingStatus
case object FAILED extends SendingStatus("FAILED")

case object RETRYING extends SendingStatus
case object RETRYING extends SendingStatus("RETRYING")
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@

package uk.gov.hmrc.apiplatformoutboundsoap.repositories

import org.joda.time.DateTime
import play.api.libs.json.{Format, Json, JsonConfiguration, JsonNaming, OFormat}
import uk.gov.hmrc.apiplatformoutboundsoap.models.{CodSoapMessage, CoeSoapMessage, FailedOutboundSoapMessage, OutboundSoapMessage, RetryingOutboundSoapMessage, SentOutboundSoapMessage}
import uk.gov.hmrc.mongo.json.ReactiveMongoFormats

import play.api.libs.json.{JsObject, JsPath, Json, JsonConfiguration, JsonNaming, OFormat, OWrites, Reads}
import uk.gov.hmrc.apiplatformoutboundsoap.models.{CodSoapMessage, CoeSoapMessage, DeliveryStatus, FailedOutboundSoapMessage, OutboundSoapMessage, RetryingOutboundSoapMessage, SendingStatus, SentOutboundSoapMessage}
import uk.gov.hmrc.mongo.play.json.formats.MongoJodaFormats
private[repositories] object MongoFormatter {

implicit val cfg = JsonConfiguration(
Expand All @@ -30,11 +28,81 @@ private[repositories] object MongoFormatter {
OutboundSoapMessage.typeToStatus(fullName).entryName
})

implicit val dateFormat: Format[DateTime] = ReactiveMongoFormats.dateTimeFormats
implicit val outboundSoapMessageFormatter: OFormat[OutboundSoapMessage] = Json.format[OutboundSoapMessage]
implicit val retryingSoapMessageFormatter: OFormat[RetryingOutboundSoapMessage] = Json.format[RetryingOutboundSoapMessage]
implicit val sentSoapMessageFormatter: OFormat[SentOutboundSoapMessage] = Json.format[SentOutboundSoapMessage]
implicit val failedSoapMessageFormatter: OFormat[FailedOutboundSoapMessage] = Json.format[FailedOutboundSoapMessage]
implicit val coeSoapMessageFormatter: OFormat[CoeSoapMessage] = Json.format[CoeSoapMessage]
implicit val codSoapMessageFormatter: OFormat[CodSoapMessage] = Json.format[CodSoapMessage]
implicit val dateTimeFormat = MongoJodaFormats.dateTimeFormat

implicit val retryingMessageReads: Reads[RetryingOutboundSoapMessage] =
Json.reads[RetryingOutboundSoapMessage]
implicit val retryingMessageWrites: OWrites[RetryingOutboundSoapMessage] =
Json.writes[RetryingOutboundSoapMessage].transform(_ ++ Json.obj("status" -> SendingStatus.RETRYING.entryName))
implicit val retryingSoapMessageFormatter: OFormat[RetryingOutboundSoapMessage] =
OFormat(retryingMessageReads, retryingMessageWrites)

implicit val sentMessageReads: Reads[SentOutboundSoapMessage] =
Json.reads[SentOutboundSoapMessage]
implicit val sentMessageWrites: OWrites[SentOutboundSoapMessage] =
Json.writes[SentOutboundSoapMessage].transform(_ ++ Json.obj("status" -> SendingStatus.SENT.entryName))
implicit val sentSoapMessageFormatter: OFormat[SentOutboundSoapMessage] =
OFormat(sentMessageReads, sentMessageWrites)

implicit val failedMessageReads: Reads[FailedOutboundSoapMessage] =
Json.reads[FailedOutboundSoapMessage]
implicit val failedMessageWrites: OWrites[FailedOutboundSoapMessage] =
Json.writes[FailedOutboundSoapMessage].transform(_ ++ Json.obj("status" -> SendingStatus.FAILED.entryName))
implicit val failedSoapMessageFormatter: OFormat[FailedOutboundSoapMessage] =
OFormat(failedMessageReads, failedMessageWrites)

implicit val codMessageReads: Reads[CodSoapMessage] =
Json.reads[CodSoapMessage]
implicit val codMessageWrites: OWrites[CodSoapMessage] =
Json.writes[CodSoapMessage].transform(_ ++ Json.obj("status" -> DeliveryStatus.COD.entryName))
implicit val codSoapMessageFormatter: OFormat[CodSoapMessage] =
OFormat(codMessageReads, codMessageWrites)

implicit val coeMessageReads: Reads[CoeSoapMessage] =
Json.reads[CoeSoapMessage]
implicit val coeMessageWrites: OWrites[CoeSoapMessage] =
Json.writes[CoeSoapMessage].transform(_ ++ Json.obj("status" -> DeliveryStatus.COE.entryName))
implicit val coeSoapMessageFormatter: OFormat[CoeSoapMessage] =
OFormat(coeMessageReads, coeMessageWrites)


implicit val outboundSoapMessageReads: Reads[OutboundSoapMessage] =
(JsPath \ "status").read[String].flatMap {
case SendingStatus.RETRYING.entryName =>
retryingSoapMessageFormatter.widen[OutboundSoapMessage]
case SendingStatus.SENT.entryName =>
sentSoapMessageFormatter.widen[OutboundSoapMessage]
case SendingStatus.FAILED.entryName =>
failedSoapMessageFormatter.widen[OutboundSoapMessage]
case DeliveryStatus.COD.entryName =>
codSoapMessageFormatter.widen[OutboundSoapMessage]
case DeliveryStatus.COE.entryName =>
coeSoapMessageFormatter.widen[OutboundSoapMessage]
}

implicit val outboundSoapMessageWrites: OWrites[OutboundSoapMessage] = new OWrites[OutboundSoapMessage] {
override def writes(soapMessage: OutboundSoapMessage): JsObject = soapMessage match {
case r @ RetryingOutboundSoapMessage(_, _, _, _, _, _, _, _, _, _) =>
retryingSoapMessageFormatter.writes(r) ++ Json.obj(
"status" -> SendingStatus.RETRYING.entryName
)
case f @ FailedOutboundSoapMessage(_, _, _, _, _, _, _, _, _) =>
failedSoapMessageFormatter.writes(f) ++ Json.obj(
"status" -> SendingStatus.FAILED.entryName
)
case s @ SentOutboundSoapMessage(_, _, _, _, _, _, _, _, _) =>
sentSoapMessageFormatter.writes(s) ++ Json.obj(
"status" -> SendingStatus.SENT.entryName
)
case cod @ CodSoapMessage(_, _, _, _, _, _, _, _, _) =>
codSoapMessageFormatter.writes(cod) ++ Json.obj(
"status" -> DeliveryStatus.COD.entryName
)
case coe @ CoeSoapMessage(_, _, _, _, _, _, _, _, _) =>
coeSoapMessageFormatter.writes(coe) ++ Json.obj(
"status" -> DeliveryStatus.COE.entryName
)
}
}
implicit val outboundSoapMessageFormatter: OFormat[OutboundSoapMessage] = OFormat(outboundSoapMessageReads, outboundSoapMessageWrites)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,86 +16,110 @@

package uk.gov.hmrc.apiplatformoutboundsoap.repositories

import akka.stream.Materializer
import akka.NotUsed
import akka.stream.alpakka.mongodb.scaladsl.MongoSource
import akka.stream.scaladsl.Source
import org.bson.codecs.configuration.CodecRegistries._
import org.joda.time.DateTime
import org.joda.time.DateTime.now
import org.joda.time.DateTimeZone.UTC
import play.api.libs.json.{JsObject, JsString, Json}
import play.modules.reactivemongo.ReactiveMongoComponent
import reactivemongo.akkastream.cursorProducer
import reactivemongo.api.ReadPreference
import reactivemongo.api.indexes.Index
import reactivemongo.api.indexes.IndexType.Ascending
import reactivemongo.bson.{BSONDocument, BSONLong, BSONObjectID}
import reactivemongo.play.json.ImplicitBSONHandlers.JsObjectDocumentWriter
import org.mongodb.scala.{MongoClient, MongoCollection}
import org.mongodb.scala.ReadPreference.primaryPreferred
import org.mongodb.scala.model.Filters.{and, equal, lte}
import org.mongodb.scala.model.Indexes.ascending
import org.mongodb.scala.model.Updates.{combine, set}
import org.mongodb.scala.model.{FindOneAndUpdateOptions, IndexModel, IndexOptions, ReturnDocument}
import org.mongodb.scala.result.InsertOneResult
import play.api.Logging
import uk.gov.hmrc.apiplatformoutboundsoap.config.AppConfig
import uk.gov.hmrc.apiplatformoutboundsoap.models.{DeliveryStatus, OutboundSoapMessage, RetryingOutboundSoapMessage, SendingStatus}
import uk.gov.hmrc.apiplatformoutboundsoap.repositories.MongoFormatter.{dateFormat, outboundSoapMessageFormatter}
import uk.gov.hmrc.mongo.ReactiveRepository
import uk.gov.hmrc.mongo.json.ReactiveMongoFormats
import uk.gov.hmrc.apiplatformoutboundsoap.models.{DeliveryStatus, OutboundSoapMessage, RetryingOutboundSoapMessage, SendingStatus, StatusType}
import uk.gov.hmrc.apiplatformoutboundsoap.repositories.MongoFormatter.outboundSoapMessageFormatter
import uk.gov.hmrc.mongo.MongoComponent
import uk.gov.hmrc.mongo.play.json.{Codecs, CollectionFactory, PlayMongoRepository}

import java.util.UUID
import java.util.concurrent.TimeUnit
import javax.inject.{Inject, Singleton}
import scala.concurrent.{ExecutionContext, Future}

@Singleton
class OutboundMessageRepository @Inject()(mongoComponent: ReactiveMongoComponent, appConfig: AppConfig)
(implicit ec: ExecutionContext, m: Materializer)
extends ReactiveRepository[OutboundSoapMessage, BSONObjectID](
"messages",
mongoComponent.mongoConnector.db,
outboundSoapMessageFormatter,
ReactiveMongoFormats.objectIdFormats) {
class OutboundMessageRepository @Inject()(mongoComponent: MongoComponent, appConfig: AppConfig)
(implicit ec: ExecutionContext)
extends PlayMongoRepository[OutboundSoapMessage](
collectionName = "messages",
mongoComponent = mongoComponent,
domainFormat = outboundSoapMessageFormatter,
indexes = Seq(IndexModel(ascending("globalId"),
IndexOptions().name("globalIdIndex").background(true).unique(true)),
IndexModel(ascending("createDateTime"),
IndexOptions().name("ttlIndex").background(true)
.expireAfter(appConfig.retryMessagesTtl.toSeconds, TimeUnit.SECONDS))))
with Logging {

override def indexes = Seq(
Index(key = List("globalId" -> Ascending), name = Some("globalIdIndex"), unique = true, background = true),
Index(key = List("createDateTime" -> Ascending),
name = Some("ttlIndex"), background = true, options = BSONDocument("expireAfterSeconds" -> BSONLong(appConfig.retryMessagesTtl.toSeconds)))
)
override lazy val collection: MongoCollection[OutboundSoapMessage] =
CollectionFactory
.collection(mongoComponent.database, collectionName, domainFormat)
.withCodecRegistry(
fromRegistries(
fromCodecs(
Codecs.playFormatCodec(domainFormat),
Codecs.playFormatCodec(MongoFormatter.retryingSoapMessageFormatter),
Codecs.playFormatCodec(MongoFormatter.failedSoapMessageFormatter),
Codecs.playFormatCodec(MongoFormatter.sentSoapMessageFormatter),
Codecs.playFormatCodec(MongoFormatter.codSoapMessageFormatter),
Codecs.playFormatCodec(MongoFormatter.coeSoapMessageFormatter),
Codecs.playFormatCodec(MongoFormatter.dateTimeFormat),
Codecs.playFormatCodec(StatusType.jsonFormat),
Codecs.playFormatCodec(DeliveryStatus.jsonFormat),
Codecs.playFormatCodec(SendingStatus.jsonFormat)
),
MongoClient.DEFAULT_CODEC_REGISTRY
)
)

def persist(entity: OutboundSoapMessage)(implicit ec: ExecutionContext): Future[Unit] = {
insert(entity).map(_ => ())
def persist(entity: OutboundSoapMessage): Future[InsertOneResult] = {
collection.insertOne(entity).toFuture()
}

def retrieveMessagesForRetry: Source[RetryingOutboundSoapMessage, Future[Any]] = {
import uk.gov.hmrc.apiplatformoutboundsoap.repositories.MongoFormatter.retryingSoapMessageFormatter

collection
.find(Json.obj("status" -> SendingStatus.RETRYING.entryName,
"retryDateTime" -> Json.obj("$lte" -> now(UTC))), Option.empty[JsObject])
.sort(Json.obj("retryDateTime" -> 1))
.cursor[RetryingOutboundSoapMessage](ReadPreference.primaryPreferred)
.documentSource()
def retrieveMessagesForRetry: Source[RetryingOutboundSoapMessage, NotUsed] = {
MongoSource(collection.withReadPreference(primaryPreferred)
.find(filter = and(equal("status", SendingStatus.RETRYING.entryName),
and(lte("retryDateTime", now(UTC)))))
.sort(ascending("retryDateTime"))
.map(_.asInstanceOf[RetryingOutboundSoapMessage]))
}

def updateNextRetryTime(globalId: UUID, newRetryDateTime: DateTime): Future[Option[RetryingOutboundSoapMessage]] = {
import uk.gov.hmrc.apiplatformoutboundsoap.repositories.MongoFormatter.retryingSoapMessageFormatter

findAndUpdate(Json.obj("globalId" -> globalId),
Json.obj("$set" -> Json.obj("retryDateTime" -> newRetryDateTime)), fetchNewObject = true)
.map(_.result[RetryingOutboundSoapMessage])
collection.withReadPreference(primaryPreferred)
.findOneAndUpdate(filter = equal("globalId", Codecs.toBson(globalId)),
update = set("retryDateTime", newRetryDateTime),
options = FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER)
).map(_.asInstanceOf[RetryingOutboundSoapMessage]).headOption()
}

def updateSendingStatus(globalId: UUID, newStatus: SendingStatus): Future[Option[OutboundSoapMessage]] = {
findAndUpdate(Json.obj("globalId" -> globalId),
Json.obj("$set" -> Json.obj("status" -> newStatus.entryName)), fetchNewObject = true)
.map(_.result[OutboundSoapMessage])
collection.withReadPreference(primaryPreferred)
.findOneAndUpdate(filter = equal("globalId", Codecs.toBson(globalId)),
update = set("status", Codecs.toBson(newStatus.entryName)),
options = FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER)
).toFutureOption()
}

def updateConfirmationStatus(messageId: String, newStatus: DeliveryStatus, confirmationMsg: String): Future[Option[OutboundSoapMessage]] = {
logger.info(s"conf message is ${confirmationMsg}")
val field: String = newStatus match {
case DeliveryStatus.COD => "codMessage"
case DeliveryStatus.COE => "coeMessage"
}
findAndUpdate(Json.obj("messageId" -> messageId),
Json.obj("$set" -> Json.obj("status" -> newStatus.entryName, field -> confirmationMsg)), fetchNewObject = true)
.map(_.result[OutboundSoapMessage])

collection.withReadPreference(primaryPreferred)
.findOneAndUpdate(filter = equal("messageId", messageId),
update = combine(set("status", Codecs.toBson(newStatus.entryName)), set(field, confirmationMsg)),
options = FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER))
.toFutureOption()
}

def findById(messageId: String): Future[Option[OutboundSoapMessage]] = {
find("messageId" -> JsString(messageId)).map(_.headOption)
collection.find(filter = equal("messageId", messageId)).headOption()
.recover {
case e: Exception =>
logger.warn(s"error finding message - ${e.getMessage}")
Expand Down
2 changes: 1 addition & 1 deletion conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ play.http.errorHandler = "uk.gov.hmrc.play.bootstrap.backend.http.JsonErrorHandl
# Play Modules
# ~~~~
# Additional play modules can be added here
play.modules.enabled += "play.modules.reactivemongo.ReactiveMongoHmrcModule"
play.modules.enabled += "uk.gov.hmrc.mongo.play.PlayMongoModule"
play.modules.enabled += "uk.gov.hmrc.apiplatformoutboundsoap.scheduled.SchedulerModule"
play.modules.enabled += "uk.gov.hmrc.apiplatformoutboundsoap.scheduled.SchedulerPlayModule"

Expand Down
Loading

0 comments on commit 69b5098

Please sign in to comment.