Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for chunked transfers #2

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion src/main/scala/org/caoilte/spray/routing/LogAccessRouting.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ class RequestAccessLogger(ctx: RequestContext, accessLogger: AccessLogger,
import ctx._

import context.dispatcher
val cancellable:Cancellable = context.system.scheduler.scheduleOnce(requestTimeout, self, RequestLoggingTimeout)
var cancellable:Cancellable = context.system.scheduler.scheduleOnce(requestTimeout, self, RequestLoggingTimeout)
var chunkedResponse:HttpResponse = null

def handleChunkedResponseStart(wrapper: Any, response: HttpResponse) = {
// In case a chunk transfer is started, store the response for later logging
chunkedResponse = response
responder forward wrapper
}

def receive = {
case RequestLoggingTimeout => {
Expand All @@ -55,6 +62,26 @@ class RequestAccessLogger(ctx: RequestContext, accessLogger: AccessLogger,
accessLogger.logAccess(request, response, timeStampCalculator(Unit))
forwardMsgAndStop(response)
}

case confirmed@Confirmed(ChunkedResponseStart(response), _) => handleChunkedResponseStart(confirmed, response)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this pattern match work?

case confirmed@Confirmed(ChunkedResponseStart(response,_) | ChunkedResponseStart(response)) => { chunkedResponse = response responder forward confirmed }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly no

LogAccessRouting.scala:69: illegal variable in pattern alternative
[error]     case confirmed@Confirmed(ChunkedResponseStart(response),_) | ChunkedResponseStart(response) => {
[error]                                                                                       ^

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh

case unconfirmed@ChunkedResponseStart(response) => handleChunkedResponseStart(unconfirmed, response)

case confirmed@(Confirmed(MessageChunk(_,_), _) | MessageChunk) => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice pattern match

// Each chunk is considered a heart-beat: We are still producing output and the client is still consuming
// - therefore reset the requestTimeout schedule
cancellable.cancel()
cancellable = context.system.scheduler.scheduleOnce(requestTimeout, self, RequestLoggingTimeout)

responder forward confirmed
}

case confirmed@(Confirmed(ChunkedMessageEnd, _) | ChunkedMessageEnd) => {
// Handled like HttpResponse: provide the (previously saved) ChunkedResponseStart for logging and quit
cancellable.cancel()
accessLogger.logAccess(request, chunkedResponse, timeStampCalculator(Unit))
forwardMsgAndStop(confirmed)
}

case other => {
responder forward other
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import akka.io.{Tcp, IO}
import spray.can.Http
import scala.concurrent.{Await, Future}
import akka.pattern.ask
import spray.http.{HttpResponse, HttpRequest}
import spray.http.{ChunkedResponseStart, ChunkedMessageEnd, HttpResponse, HttpRequest}
import com.typesafe.config.ConfigFactory
import akka.util.Timeout
import scala.concurrent.duration._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,28 @@ object FailureRoutes extends Directives {

class LogAccessRoutingTests extends FlatSpec with ScalaFutures with GivenWhenThen {

behavior of "An HTTP Server that handles a request with a 200 chunked response within the request timeout"

it should "'Log Access' with a 200 chunked response and an Access Time less than the request timeout" in {
aTestLogAccessRoutingActor(
requestTimeoutMillis = 4000,
httpServiceActorFactory = ChunkedResponseServiceActor.factory(ChunkedResponse(12, 500), PATH)) { testKit =>
import testKit._

whenReady(makeHttpCall(), timeout(Span(8, Seconds))) { s =>
assert(s.entity.asString(HttpCharsets.`UTF-8`) === "response 12\nresponse 11\nresponse 10\nresponse 9\n"
+ "response 8\nresponse 7\nresponse 6\nresponse 5\n"
+ "response 4\nresponse 3\nresponse 2\nresponse 1\n")}

expectMsgPF(8 seconds, "Expected normal log access event with chunked response delayed properties") {
case LogEvent(
HttpRequest(HttpMethods.GET,URI, _, _, _),
HttpResponse(StatusCodes.OK, _, _, _), time, LogAccess
) if time >= 500 && time <= 7000 => true
}
}
}

behavior of "An HTTP Server that handles a request with a 200 response within the request timeout"

it should "'Log Access' with a 200 response and an Access Time less than the request timeout" in {
Expand Down Expand Up @@ -194,7 +216,7 @@ class LogAccessRoutingTests extends FlatSpec with ScalaFutures with GivenWhenThe
}


implicit val TIMEOUT: Timeout = 3.second
implicit val TIMEOUT: Timeout = 10.second
val PORT = 8084
val HOST = s"http://localhost:$PORT"
val PATH = "test"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package org.caoilte.spray.routing

import akka.testkit.TestKit
import org.caoilte.spray.routing.TestAccessLogger.TestAccessLogger
import spray.http.{HttpResponse, HttpRequest, HttpEntity, ContentTypes}
import spray.http._
import spray.routing._
import akka.actor.{Props, ActorRef, Actor}
import scala.concurrent._
Expand All @@ -15,7 +14,7 @@ object TestAccessLogger {
case object LogAccess extends LogType
case object AccessAlreadyLogged extends LogType

case class LogEvent(request: HttpRequest, response: HttpResponse, time: Long, logAccessType: LogType)
case class LogEvent(request: HttpRequest, response: HttpMessagePart, time: Long, logAccessType: LogType)

class TestAccessLogger(listener:ActorRef) extends AccessLogger {
override def logAccess(request: HttpRequest, response: HttpResponse, time: Long) = {
Expand All @@ -28,6 +27,69 @@ object TestAccessLogger {
}
}

case object ChunkedResponse {
val DEFAULT_RESPONSE = "response"
}

case class ChunkedResponse(amountOfChunks: Int, thinkingMillis: Long, responseMessage:String = ChunkedResponse.DEFAULT_RESPONSE)

object ChunkedResponseServiceActor {
def factory(chunkedResponse: ChunkedResponse, path:String): (ActorRef => Props) = actorRef => {
apply(new TestAccessLogger(actorRef), chunkedResponse, path)
}
def apply(accessLogger: AccessLogger, chunkedResponse: ChunkedResponse, path:String):Props = {
Props(new ChunkedResponseServiceActor(accessLogger, chunkedResponse, path))
}
}

class ChunkedResponseServiceActor(val accessLogger: AccessLogger, chunkedResponse: ChunkedResponse, path:String)
extends HttpServiceActor with LogAccessRoutingActor {

case class RequestForChunkedResponse(ctx: RequestContext)
case class Ok(ctx: RequestContext, remainingChunks: Int)

class ChunkedResponseActor(response:ChunkedResponse) extends Actor {
import response._
def receive: Receive = {
case RequestForChunkedResponse(ctx) => {
blocking {
Thread.sleep(thinkingMillis)
}
ctx.responder ! ChunkedResponseStart(HttpResponse()).withAck(Ok(ctx, amountOfChunks))
}
case Ok(ctx, 0) => {
ctx.responder ! ChunkedMessageEnd.withAck(new Object)
}
case Ok(ctx, remainingChunks) => {
blocking {
Thread.sleep(thinkingMillis)
}

ctx.responder ! MessageChunk(s"$responseMessage $remainingChunks\n").withAck(Ok(ctx, remainingChunks - 1))
}
}
}

var testAc:ActorRef = _

implicit def executionContext = actorRefFactory.dispatcher

override def preStart {
testAc = context.actorOf(Props(new ChunkedResponseActor(chunkedResponse)), "chunked-response-test-actor")
super.preStart
}

val routes:Route = {
path(path) {
get { ctx: RequestContext =>
implicit val TIMEOUT: Timeout = Timeout(chunkedResponse.thinkingMillis * 2, TimeUnit.MILLISECONDS)
testAc ! RequestForChunkedResponse(ctx)
}
}
}

override def receive = runRoute(routes)
}

case object DelayedResponse {
val DEFAULT_RESPONSE = "response"
Expand Down