diff --git a/src/main/scala/org/caoilte/spray/routing/LogAccessRouting.scala b/src/main/scala/org/caoilte/spray/routing/LogAccessRouting.scala index 8c417ab..091dfbf 100644 --- a/src/main/scala/org/caoilte/spray/routing/LogAccessRouting.scala +++ b/src/main/scala/org/caoilte/spray/routing/LogAccessRouting.scala @@ -1,6 +1,10 @@ package org.caoilte.spray.routing +import akka.io.Tcp import com.typesafe.config.Config +import spray.can.Http +import spray.can.Http.{Command, RegisterChunkHandler} +import spray.httpx.marshalling.{ToResponseMarshallingContext, ToResponseMarshaller} import spray.routing._ import akka.actor._ import scala.concurrent.duration._ @@ -33,11 +37,18 @@ object RequestAccessLogger { } class RequestAccessLogger(ctx: RequestContext, accessLogger: AccessLogger, - timeStampCalculator:(Unit => Long), requestTimeout:FiniteDuration) extends Actor { + timeStampCalculator:(Unit => Long), requestTimeout:FiniteDuration) extends Actor with ActorLogging { import ctx._ import context.dispatcher - val cancellable:Cancellable = context.system.scheduler.scheduleOnce(requestTimeout, self, RequestLoggingTimeout) + var cancellable:Cancellable = context.system.scheduler.scheduleOnce(requestTimeout, self, RequestLoggingTimeout) + var chunkedResponse:Option[HttpResponse] = Option.empty + + def handleChunkedResponseStart(wrapper: Any, response: HttpResponse) = { + // In case a chunk transfer is started, store the response for later logging + chunkedResponse = Option(response) + responder forward wrapper + } def receive = { case RequestLoggingTimeout => { @@ -47,7 +58,18 @@ class RequestAccessLogger(ctx: RequestContext, accessLogger: AccessLogger, s"'${requestTimeout}'.")) accessLogger.logAccess(request, errorResponse, requestTimeout.toMillis) - ctx.complete(errorResponse) + + if (chunkedResponse.isDefined) { + // There was already a ChunkedResponseStart thus the transfer is in-flight. There is no reasonable response + // that could be sent because that would be queued by spray itself - and we can not predict what the client will + // do with any content we append at the current state of the chunked message stream. + // Therefore the whole connection is just aborted + log.debug(s"timed out waiting for the request to complete after ${requestTimeout} - Aborting connection") + ctx.responder ! Tcp.Abort + } else { + log.debug(s"timed out waiting for the request to complete after ${requestTimeout} - Sending ${errorResponse}") + ctx.complete(errorResponse) + } context.stop(self) } case response:HttpResponse => { @@ -55,6 +77,28 @@ class RequestAccessLogger(ctx: RequestContext, accessLogger: AccessLogger, accessLogger.logAccess(request, response, timeStampCalculator(Unit)) forwardMsgAndStop(response) } + + case confirmed@Confirmed(ChunkedResponseStart(response), _) => handleChunkedResponseStart(confirmed, response) + case unconfirmed@ChunkedResponseStart(response) => handleChunkedResponseStart(unconfirmed, response) + + case confirmed@(Confirmed(MessageChunk(_,_), _) | MessageChunk) => { + // Each chunk is considered a heart-beat: We are still producing output and the client is still consuming + // - therefore reset the requestTimeout schedule + cancellable.cancel() + cancellable = context.system.scheduler.scheduleOnce(requestTimeout, self, RequestLoggingTimeout) + + responder forward confirmed + } + + case confirmed@(Confirmed(ChunkedMessageEnd, _) | ChunkedMessageEnd) => { + // Handled like HttpResponse: provide the (previously saved) ChunkedResponseStart for logging and quit + cancellable.cancel() + accessLogger.logAccess(request, chunkedResponse.getOrElse({ + log.warning(s"Never received a ChunkedResponseStart before this '$confirmed' - thus returning NULL to AccessLogger '$accessLogger'") + null}), timeStampCalculator(Unit)) + forwardMsgAndStop(confirmed) + } + case other => { responder forward other } diff --git a/src/test/scala/org/caoilte/spray/routing/LogAccessRoutingDemo.scala b/src/test/scala/org/caoilte/spray/routing/LogAccessRoutingDemo.scala index fa41a95..de30f88 100644 --- a/src/test/scala/org/caoilte/spray/routing/LogAccessRoutingDemo.scala +++ b/src/test/scala/org/caoilte/spray/routing/LogAccessRoutingDemo.scala @@ -5,7 +5,7 @@ import akka.io.{Tcp, IO} import spray.can.Http import scala.concurrent.{Await, Future} import akka.pattern.ask -import spray.http.{HttpResponse, HttpRequest} +import spray.http.{ChunkedResponseStart, ChunkedMessageEnd, HttpResponse, HttpRequest} import com.typesafe.config.ConfigFactory import akka.util.Timeout import scala.concurrent.duration._ diff --git a/src/test/scala/org/caoilte/spray/routing/LogAccessRoutingTests.scala b/src/test/scala/org/caoilte/spray/routing/LogAccessRoutingTests.scala index f9a8b4b..cfd7d35 100644 --- a/src/test/scala/org/caoilte/spray/routing/LogAccessRoutingTests.scala +++ b/src/test/scala/org/caoilte/spray/routing/LogAccessRoutingTests.scala @@ -33,6 +33,28 @@ object FailureRoutes extends Directives { class LogAccessRoutingTests extends FlatSpec with ScalaFutures with GivenWhenThen { + behavior of "An HTTP Server that handles a request with a 200 chunked response within the request timeout" + + it should "'Log Access' with a 200 chunked response and an Access Time less than the request timeout" in { + aTestLogAccessRoutingActor( + requestTimeoutMillis = 4000, + httpServiceActorFactory = ChunkedResponseServiceActor.factory(ChunkedResponse(12, 500), PATH)) { testKit => + import testKit._ + + whenReady(makeHttpCall(), timeout(Span(8, Seconds))) { s => + assert(s.entity.asString(HttpCharsets.`UTF-8`) === "response 12\nresponse 11\nresponse 10\nresponse 9\n" + + "response 8\nresponse 7\nresponse 6\nresponse 5\n" + + "response 4\nresponse 3\nresponse 2\nresponse 1\n")} + + expectMsgPF(8 seconds, "Expected normal log access event with chunked response delayed properties") { + case LogEvent( + HttpRequest(HttpMethods.GET,URI, _, _, _), + HttpResponse(StatusCodes.OK, _, _, _), time, LogAccess + ) if time >= 500 && time <= 7000 => true + } + } + } + behavior of "An HTTP Server that handles a request with a 200 response within the request timeout" it should "'Log Access' with a 200 response and an Access Time less than the request timeout" in { @@ -168,6 +190,28 @@ class LogAccessRoutingTests extends FlatSpec with ScalaFutures with GivenWhenThe } } + behavior of "An HTTP Server that doesn't complete within 100 times the request timeout with a chunked response" + + it should "'Log Access' with a connection abort and an Access Time equal to the configured Request Timeout time" in { + + aTestLogAccessRoutingActor( + requestTimeoutMillis = 500, + httpServiceActorFactory = ChunkedResponseServiceActor.factory(ChunkedResponse(12, 2000), PATH)) { testKit => + import testKit._ + + whenReady(makeHttpCall(), timeout(Span(2, Seconds))) { s => + assert(s.status.intValue == 500) + println(s.entity.toOption.get.contentType) + } + + expectMsgPF(2 seconds, "Expected normal log access access event with timeout properties") { + case LogEvent( + HttpRequest(HttpMethods.GET,URI, _, _, _), + HttpResponse(StatusCodes.InternalServerError, _, _, _), 500, LogAccess + ) => true + } + } + } behavior of "An HTTP Server that handles a JSON request with a TXT response within the request timeout" @@ -194,7 +238,7 @@ class LogAccessRoutingTests extends FlatSpec with ScalaFutures with GivenWhenThe } - implicit val TIMEOUT: Timeout = 3.second + implicit val TIMEOUT: Timeout = 10.second val PORT = 8084 val HOST = s"http://localhost:$PORT" val PATH = "test" @@ -210,6 +254,9 @@ class LogAccessRoutingTests extends FlatSpec with ScalaFutures with GivenWhenThe | registration-timeout = 100 s | routils-request-timeout = $requestTimeout | } + | host-connector { + | max-retries = 0 + | } |} |akka { | loglevel="DEBUG" diff --git a/src/test/scala/org/caoilte/spray/routing/TestLogAccessRoutingActor.scala b/src/test/scala/org/caoilte/spray/routing/TestLogAccessRoutingActor.scala index 21ca147..1a779ed 100644 --- a/src/test/scala/org/caoilte/spray/routing/TestLogAccessRoutingActor.scala +++ b/src/test/scala/org/caoilte/spray/routing/TestLogAccessRoutingActor.scala @@ -1,8 +1,7 @@ package org.caoilte.spray.routing -import akka.testkit.TestKit import org.caoilte.spray.routing.TestAccessLogger.TestAccessLogger -import spray.http.{HttpResponse, HttpRequest, HttpEntity, ContentTypes} +import spray.http._ import spray.routing._ import akka.actor.{Props, ActorRef, Actor} import scala.concurrent._ @@ -15,7 +14,7 @@ object TestAccessLogger { case object LogAccess extends LogType case object AccessAlreadyLogged extends LogType - case class LogEvent(request: HttpRequest, response: HttpResponse, time: Long, logAccessType: LogType) + case class LogEvent(request: HttpRequest, response: HttpMessagePart, time: Long, logAccessType: LogType) class TestAccessLogger(listener:ActorRef) extends AccessLogger { override def logAccess(request: HttpRequest, response: HttpResponse, time: Long) = { @@ -28,6 +27,69 @@ object TestAccessLogger { } } +case object ChunkedResponse { + val DEFAULT_RESPONSE = "response" +} + +case class ChunkedResponse(amountOfChunks: Int, thinkingMillis: Long, responseMessage:String = ChunkedResponse.DEFAULT_RESPONSE) + +object ChunkedResponseServiceActor { + def factory(chunkedResponse: ChunkedResponse, path:String): (ActorRef => Props) = actorRef => { + apply(new TestAccessLogger(actorRef), chunkedResponse, path) + } + def apply(accessLogger: AccessLogger, chunkedResponse: ChunkedResponse, path:String):Props = { + Props(new ChunkedResponseServiceActor(accessLogger, chunkedResponse, path)) + } +} + +class ChunkedResponseServiceActor(val accessLogger: AccessLogger, chunkedResponse: ChunkedResponse, path:String) + extends HttpServiceActor with LogAccessRoutingActor { + + case class RequestForChunkedResponse(ctx: RequestContext) + case class Ok(ctx: RequestContext, remainingChunks: Int) + + class ChunkedResponseActor(response:ChunkedResponse) extends Actor { + import response._ + def receive: Receive = { + case RequestForChunkedResponse(ctx) => { + blocking { + Thread.sleep(thinkingMillis) + } + ctx.responder ! ChunkedResponseStart(HttpResponse()).withAck(Ok(ctx, amountOfChunks)) + } + case Ok(ctx, 0) => { + ctx.responder ! ChunkedMessageEnd.withAck(new Object) + } + case Ok(ctx, remainingChunks) => { + blocking { + Thread.sleep(thinkingMillis) + } + + ctx.responder ! MessageChunk(s"$responseMessage $remainingChunks\n").withAck(Ok(ctx, remainingChunks - 1)) + } + } + } + + var testAc:ActorRef = _ + + implicit def executionContext = actorRefFactory.dispatcher + + override def preStart { + testAc = context.actorOf(Props(new ChunkedResponseActor(chunkedResponse)), "chunked-response-test-actor") + super.preStart + } + + val routes:Route = { + path(path) { + get { ctx: RequestContext => + implicit val TIMEOUT: Timeout = Timeout(chunkedResponse.thinkingMillis * 2, TimeUnit.MILLISECONDS) + testAc ! RequestForChunkedResponse(ctx) + } + } + } + + override def receive = runRoute(routes) +} case object DelayedResponse { val DEFAULT_RESPONSE = "response"