Skip to content

Commit

Permalink
Merge pull request #21 from hmrc/APID-66
Browse files Browse the repository at this point in the history
APID-66 Add thread pool for blocking IO operations
  • Loading branch information
jameshall1999 authored Feb 2, 2021
2 parents 67e5553 + 0c04f53 commit 84e2b9c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package uk.gov.hmrc.apiplatformoutboundsoap.services

import akka.Done
import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import org.apache.axiom.om.OMAbstractFactory.{getOMFactory, getSOAP12Factory}
Expand Down Expand Up @@ -49,28 +50,23 @@ class OutboundService @Inject()(outboundConnector: OutboundConnector,
wsSecurityService: WsSecurityService,
outboundMessageRepository: OutboundMessageRepository,
notificationCallbackConnector: NotificationCallbackConnector,
appConfig: AppConfig)
appConfig: AppConfig,
actorSystem: ActorSystem)
(implicit val ec: ExecutionContext, mat: Materializer)
extends HttpErrorFunctions {
val logger: LoggerLike = Logger
val blockingIoContext: ExecutionContext = actorSystem.dispatchers.lookup("blocking-io-context")
val dateTimeFormatter: DateTimeFormatter = ISODateTimeFormat.dateTime()
def now: DateTime = DateTime.now(UTC)
def randomUUID: UUID = UUID.randomUUID

def sendMessage(message: MessageRequest): Future[OutboundSoapMessage] = {
val envelope = buildEnvelope(message)
outboundConnector.postMessage(envelope) flatMap { result =>
val globalId: UUID = randomUUID
val messageId = message.addressing.flatMap(_.messageId)
val outboundSoapMessage = if (is2xx(result)) {
logger.info(s"Message with global ID $globalId and message ID $messageId successfully sent")
SentOutboundSoapMessage(globalId, messageId, envelope, now, message.notificationUrl)
} else {
logger.info(s"Message with global ID $globalId and message ID $messageId failed on first attempt")
RetryingOutboundSoapMessage(globalId, messageId, envelope, now, now.plus(appConfig.retryInterval.toMillis), message.notificationUrl)
}
outboundMessageRepository.persist(outboundSoapMessage).map(_ => outboundSoapMessage)
}
for {
envelope <- buildEnvelope(message)
result <- outboundConnector.postMessage(envelope)
outboundSoapMessage = buildOutboundSoapMessage(message, envelope, result)
_ <- outboundMessageRepository.persist(outboundSoapMessage)
} yield outboundSoapMessage
}

def retryMessages(implicit hc: HeaderCarrier): Future[Done] = {
Expand Down Expand Up @@ -101,22 +97,37 @@ class OutboundService @Inject()(outboundConnector: OutboundConnector,
}
}

private def buildEnvelope(message: MessageRequest): String = {
val wsdlDefinition: Definition = parseWsdl(message.wsdlUrl)
val portType = wsdlDefinition.getAllPortTypes.asScala.values.head.asInstanceOf[PortType]
val operation: Operation = portType.getOperations.asScala.map(_.asInstanceOf[Operation])
.find(_.getName == message.wsdlOperation).getOrElse(throw new NotFoundException(s"Operation ${message.wsdlOperation} not found"))
private def buildOutboundSoapMessage(message: MessageRequest, envelope: String, result: Int): OutboundSoapMessage = {
val globalId: UUID = randomUUID
val messageId = message.addressing.flatMap(_.messageId)
if (is2xx(result)) {
logger.info(s"Message with global ID $globalId and message ID $messageId successfully sent")
SentOutboundSoapMessage(globalId, messageId, envelope, now, message.notificationUrl)
} else {
logger.info(s"Message with global ID $globalId and message ID $messageId failed on first attempt")
RetryingOutboundSoapMessage(globalId, messageId, envelope, now, now.plus(appConfig.retryInterval.toMillis), message.notificationUrl)
}
}

private def buildEnvelope(message: MessageRequest): Future[String] = {
parseWsdl(message.wsdlUrl) map { wsdlDefinition: Definition =>
val portType = wsdlDefinition.getAllPortTypes.asScala.values.head.asInstanceOf[PortType]
val operation: Operation = portType.getOperations.asScala.map(_.asInstanceOf[Operation])
.find(_.getName == message.wsdlOperation).getOrElse(throw new NotFoundException(s"Operation ${message.wsdlOperation} not found"))

val envelope: SOAPEnvelope = getSOAP12Factory.getDefaultEnvelope
addHeaders(message, operation, envelope)
addBody(message, operation, envelope)
wsSecurityService.addUsernameToken(envelope)
val envelope: SOAPEnvelope = getSOAP12Factory.getDefaultEnvelope
addHeaders(message, operation, envelope)
addBody(message, operation, envelope)
wsSecurityService.addUsernameToken(envelope)
}
}

private def parseWsdl(wsdlUrl: String): Definition = {
private def parseWsdl(wsdlUrl: String): Future[Definition] = {
val reader: WSDLReader = WSDLUtil.newWSDLReaderWithPopulatedExtensionRegistry
reader.setFeature("javax.wsdl.importDocuments", true)
reader.readWSDL(wsdlUrl)
Future {
reader.readWSDL(wsdlUrl)
}(blockingIoContext)
}

private def addHeaders(message: MessageRequest, operation: Operation, envelope: SOAPEnvelope): Unit = {
Expand Down
8 changes: 8 additions & 0 deletions conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ ccn2Url = "http://localhost:6704/destination/notifications"
ccn2Username = "joe.bloggs"
ccn2Password = "foobar"

blocking-io-context {
executor = "thread-pool-executor"
throughput = 1
thread-pool-executor {
fixed-pool-size = 100
}
}

microservice {
metrics {
graphite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package uk.gov.hmrc.apiplatformoutboundsoap.services

import akka.actor.ActorSystem
import akka.stream.scaladsl.Source.{fromFutureSource, fromIterator}
import org.apache.axiom.soap.SOAPEnvelope
import org.joda.time.DateTime
Expand Down Expand Up @@ -48,6 +49,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS

implicit val mat: Materializer = app.injector.instanceOf[Materializer]
implicit val hc: HeaderCarrier = HeaderCarrier()
val actorSystem: ActorSystem = app.injector.instanceOf[ActorSystem]

trait Setup {
val outboundConnectorMock: OutboundConnector = mock[OutboundConnector]
Expand All @@ -60,7 +62,7 @@ class OutboundServiceSpec extends AnyWordSpec with Matchers with GuiceOneAppPerS
val expectedGlobalId: UUID = UUID.randomUUID

val underTest: OutboundService = new OutboundService(outboundConnectorMock, wsSecurityServiceMock,
outboundMessageRepositoryMock, notificationCallbackConnectorMock, appConfigMock) {
outboundMessageRepositoryMock, notificationCallbackConnectorMock, appConfigMock, actorSystem) {
override def now: DateTime = expectedCreateDateTime
override def randomUUID: UUID = expectedGlobalId
}
Expand Down

0 comments on commit 84e2b9c

Please sign in to comment.