From d457182b44c274c72b33e033bba7e36d6b87778c Mon Sep 17 00:00:00 2001 From: Caoilte O'Connor Date: Tue, 15 Sep 2015 14:52:33 +0100 Subject: [PATCH] - Remove the Request Mapping Actor and achieve same functionality by using our own Timeout Handler instead of SprayCans. -- A shame but a map for every request was never going to scale and was bound to have problems -- No longer log when the request eventually completes by itself. Could maybe put this back in later. - Updated tests, demo and docs to match changes. Will be a 0.3.0 release because of Spray Config changes required --- README.md | 29 ++++-- .../spray/routing/LogAccessRouting.scala | 99 ++++++------------- .../spray/routing/LogAccessRoutingDemo.scala | 5 +- .../spray/routing/LogAccessRoutingTests.scala | 68 ++++--------- .../routing/TestLogAccessRoutingActor.scala | 8 +- 5 files changed, 77 insertions(+), 132 deletions(-) diff --git a/README.md b/README.md index a0c4e26..cbc140e 100644 --- a/README.md +++ b/README.md @@ -4,13 +4,11 @@ In order to accurately log all access to a Spray Server you need to, + Time and log the completion of your routes ([Mathias tells me how][spray-time-custom-directive]) -+ Handle Timedout requests ([see Spray Docs][spray-timeout-handling]) and then log them -+ Ensure Routes that timeout don't also log on completion (ie log access twice - -[see this Google Groups Discussion for background][spray-timeout-discussion]) + Ensure Routes that throw exceptions have their access logged consistently with routes that do not ++ Handle timeouts. ([Spray Timeout Handling][spray-timeout-handling]) doesn't allow you to disambiguate the timeout +event and the completion of the timedout route. [see this Google Groups Discussion for background][spray-timeout-discussion]) -The ```LogAccessRouting``` Spray Routing trait uses custom directives to manage a ```Map[HttpRequest,Count]``` -and ensure that HttpRequests are only logged once - either by your routes, or by the timeout route. +The ```LogAccessRouting``` Spray Routing trait takes care of all of these access logging edge cases. Included is a demo that you can use to test this behaviour. It runs from within sbt using ```sbt-revolver``` and uses a custom logger based on a suffix of the [Common Log Format][common-log-format]. (There are also automated integration @@ -33,10 +31,11 @@ spray-routils "GET /hello HTTP/1.1" 200 12 580 ``` spray-routils "GET /hello HTTP/1.1" 500 69 500 -spray-routils THIS WOULD HAVE BEEN RESPONSE IF TIMEOUT HADN'T OCCURRED -spray-routils "GET /hello HTTP/1.1" 200 12 1048" ``` +It will also cause a dead letter warning to be created because the response handling actor will shutdown on timeout and +not receive any message received from the timedout request completing + ### Demo a Server with a route that always throws an exception @@ -77,6 +76,22 @@ class YourHttpService extends LogAccessRoutingActor with Actor { } ``` +You also need to make sure that you set a ```request timeout``` correctly + +``` +spray.can { + server { + routils-request-timeout = 5 s + request-timeout = infinite + } +} +``` + +The rather convoluted configuration is necessary in order to implement our own timeout handling instead of relying on +Spray's. When a request completes it is impossible to tell if Spray has already timed out the request. This could lead to +a request being access logged as completed when in fact its response was discarded. By implementing our own timeout logic +we can link Timeouts to normal requests and stop Access Log messages from being generated by timed out requests. + [spray-time-custom-directive]: https://groups.google.com/d/msg/spray-user/V5q6kaXfcHY/ioUzYbW8XvoJ "A Spray Custom Directive for timing a Route" [spray-timeout-handling]: http://spray.io/documentation/1.2.1/spray-routing/key-concepts/timeout-handling/ "Spray Timeout Handling" [spray-timeout-discussion]: https://groups.google.com/d/msg/spray-user/as_3g7Yl_kI/pJmzB-DXOF0J "Discussion about handling Spray Timeouts" diff --git a/src/main/scala/org/caoilte/spray/routing/LogAccessRouting.scala b/src/main/scala/org/caoilte/spray/routing/LogAccessRouting.scala index 1abc724..8c417ab 100644 --- a/src/main/scala/org/caoilte/spray/routing/LogAccessRouting.scala +++ b/src/main/scala/org/caoilte/spray/routing/LogAccessRouting.scala @@ -1,13 +1,14 @@ package org.caoilte.spray.routing +import com.typesafe.config.Config import spray.routing._ -import spray.can.server.ServerSettings import akka.actor._ import scala.concurrent.duration._ -import spray.util.LoggingContext +import spray.util.{SettingsCompanion, LoggingContext} import spray.http._ -import org.caoilte.spray.routing.SingleAccessLogger.AccessLogRequest import RequestAccessLogger._ +import scala.concurrent.duration._ +import spray.util._ trait AccessLogger { @@ -22,39 +23,40 @@ object RequestAccessLogger { Unit => System.currentTimeMillis() - startTime } - def props(ctx: RequestContext, singleAccessLogger:ActorRef, + def props(ctx: RequestContext, accessLogger: AccessLogger, timeStampCalculator:(Unit => Long), requestTimeout:FiniteDuration):Props = { - Props(new RequestAccessLogger(ctx, singleAccessLogger, timeStampCalculator, requestTimeout)) + Props(new RequestAccessLogger(ctx, accessLogger, timeStampCalculator, requestTimeout)) } case object RequestLoggingTimeout } -class RequestAccessLogger(ctx: RequestContext, singleAccessLogger:ActorRef, +class RequestAccessLogger(ctx: RequestContext, accessLogger: AccessLogger, timeStampCalculator:(Unit => Long), requestTimeout:FiniteDuration) extends Actor { import ctx._ import context.dispatcher - val cancellable:Cancellable = context.system.scheduler.scheduleOnce(requestTimeout * 100, self, RequestLoggingTimeout) + val cancellable:Cancellable = context.system.scheduler.scheduleOnce(requestTimeout, self, RequestLoggingTimeout) def receive = { case RequestLoggingTimeout => { val errorResponse = HttpResponse( StatusCodes.InternalServerError, HttpEntity(s"The RequestAccessLogger timed out waiting for the request to complete after " + - s"'${requestTimeout * 100}' which is 100 times the " + - s"configured request timeout (and therefore when a timeout response was made).")) - singleAccessLogger ! AccessLogRequest(request, errorResponse, timeStampCalculator(Unit)) + s"'${requestTimeout}'.")) + + accessLogger.logAccess(request, errorResponse, requestTimeout.toMillis) + ctx.complete(errorResponse) context.stop(self) } case response:HttpResponse => { cancellable.cancel() - singleAccessLogger ! AccessLogRequest(request, response, timeStampCalculator(Unit)) + accessLogger.logAccess(request, response, timeStampCalculator(Unit)) forwardMsgAndStop(response) } case other => { - forwardMsgAndStop(other) + responder forward other } } @@ -69,50 +71,10 @@ object SingleAccessLogger { case object LogState } -class SingleAccessLogger(accessLogger: AccessLogger) extends Actor with ActorLogging { - import SingleAccessLogger._ - import context.dispatcher - - val cancellable:Cancellable = context.system.scheduler.schedule(10 minutes, 10 minutes, self, LogState) - - def receive = handleAccessLogRequest(Map().withDefaultValue(0)) - - def handleAccessLogRequest(inProgressRequests: Map[HttpRequest, Int]): Receive = { - case request:HttpRequest => { - context.become(handleAccessLogRequest(inProgressRequests.updated(request, inProgressRequests(request)+1))) - } - case AccessLogRequest(request, response, time) => { - inProgressRequests(request) match { - case 0 => accessLogger.accessAlreadyLogged(request, response, time) - case 1 => { - accessLogger.logAccess(request, response, time) - context.become(handleAccessLogRequest(inProgressRequests - request)) - } - case _ => { - accessLogger.logAccess(request, response, time) - context.become(handleAccessLogRequest(inProgressRequests.updated(request, inProgressRequests(request)-1))) - } - } - } - case LogState => log.info(s"There are currently ${inProgressRequests.size} requests being tracked for access logging") - } -} - trait LogAccessRouting extends HttpService { - var singleAccessLoggerRef:ActorRef val requestTimeout:FiniteDuration val accessLogger:AccessLogger - - def accessLogTimeout: Directive0 = { - mapRequestContext { ctx => - ctx.withHttpResponseMapped { response => - singleAccessLoggerRef ! AccessLogRequest(ctx.request, response, requestTimeout.toMillis) - response - } - } - } - override def runRoute(route: Route)(implicit eh: ExceptionHandler, rh: RejectionHandler, ac: ActorContext, rs: RoutingSettings, log: LoggingContext): Actor.Receive = { @@ -127,23 +89,17 @@ trait LogAccessRouting extends HttpService { timeStampCalculator: (Unit => Long) = defaultTimeStampCalculator()):RequestContext = { val loggingInterceptor = ac.actorOf( - RequestAccessLogger.props(ctx, singleAccessLoggerRef, timeStampCalculator, requestTimeout) + RequestAccessLogger.props(ctx, accessLogger, timeStampCalculator, requestTimeout) ) ctx.withResponder(loggingInterceptor) } { - case Timedout(request: HttpRequest) ⇒ { - val ctx = attachLoggingInterceptorToRequest(request, Unit => requestTimeout.toMillis) - super.runRoute(timeoutRoute)(eh, rh, ac, rs, log)(ctx) - } case request: HttpRequest ⇒ { val ctx = attachLoggingInterceptorToRequest(request) - singleAccessLoggerRef ! request super.runRoute(route)(eh, rh, ac, rs, log)(ctx) } case ctx: RequestContext ⇒ { - singleAccessLoggerRef ! ctx.request super.runRoute(route)(eh, rh, ac, rs, log)(attachLoggingInterceptorToCtx(ctx)) } case other => super.runRoute(route)(eh, rh, ac, rs, log)(other) @@ -151,19 +107,24 @@ trait LogAccessRouting extends HttpService { } } +case class LogAccessRoutingSettings(requestTimeout: FiniteDuration) + +object LogAccessRoutingSettings extends SettingsCompanion[LogAccessRoutingSettings]("spray.can.server") { + override def fromSubConfig(c: Config): LogAccessRoutingSettings = { + + val sprayDuration:Duration = c getDuration "request-timeout" + require(!sprayDuration.isFinite(), "LogAccessRouting requires spray.can.server.request-timeout to be set as 'infinite'") + + val duration:Duration = c getDuration "routils-request-timeout" + require(duration.isFinite(), "LogAccessRouting requires spray.can.server.routils-request-timeout to be set with a finite duration") + apply(duration.asInstanceOf[FiniteDuration]) + } +} + trait LogAccessRoutingActor extends HttpServiceActor with LogAccessRouting { val accessLogger:AccessLogger - val requestTimeout:FiniteDuration = { - val configuredRequestTimeout:Duration = ServerSettings(context.system).requestTimeout - require(configuredRequestTimeout.isFinite(), "LogAccessRouting cannot be configured if request timeouts are not finite") - configuredRequestTimeout.asInstanceOf[FiniteDuration] - } + val requestTimeout:FiniteDuration = LogAccessRoutingSettings(context.system).requestTimeout var singleAccessLoggerRef:ActorRef = _ - - override def preStart() { - singleAccessLoggerRef = context.system.actorOf(Props(new SingleAccessLogger(accessLogger)), "single-access-logger") - super.preStart - } } \ No newline at end of file diff --git a/src/test/scala/org/caoilte/spray/routing/LogAccessRoutingDemo.scala b/src/test/scala/org/caoilte/spray/routing/LogAccessRoutingDemo.scala index c68aad1..fa41a95 100644 --- a/src/test/scala/org/caoilte/spray/routing/LogAccessRoutingDemo.scala +++ b/src/test/scala/org/caoilte/spray/routing/LogAccessRoutingDemo.scala @@ -82,7 +82,8 @@ object LogAccessRoutingDemo extends App { s""" |spray.can { | server { - | request-timeout = $requestTimeoutInMillis ms + | routils-request-timeout = $requestTimeoutInMillis ms + | request-timeout = infinite | idle-timeout = 100 s | registration-timeout = 100 s | } @@ -105,7 +106,7 @@ object LogAccessRoutingDemo extends App { "Binding failed. Switch on DEBUG-level logging for `akka.io.TcpListener` to log the cause.")) }(system.dispatcher) - Await.result(serverStartedFuture, 1 second) + Await.result(serverStartedFuture, 2 second) println("LogAccessRoutingDemo started. Try it out on http://localhost:8085/hello") } diff --git a/src/test/scala/org/caoilte/spray/routing/LogAccessRoutingTests.scala b/src/test/scala/org/caoilte/spray/routing/LogAccessRoutingTests.scala index 09da703..f9a8b4b 100644 --- a/src/test/scala/org/caoilte/spray/routing/LogAccessRoutingTests.scala +++ b/src/test/scala/org/caoilte/spray/routing/LogAccessRoutingTests.scala @@ -1,12 +1,12 @@ package org.caoilte.spray.routing -import org.caoilte.spray.routing.TestAccessLogger.{AccessAlreadyLogged, LogAccess, LogEvent} -import org.scalatest.FlatSpec +import org.caoilte.spray.routing.TestAccessLogger.{LogAccess, LogEvent} +import org.scalatest.{GivenWhenThen, FlatSpec} import akka.testkit.TestKit import com.typesafe.config.ConfigFactory import akka.actor.{ActorRef, Props, ActorSystem} import spray.http.HttpHeaders.Accept -import spray.routing.{Directives, Route} +import spray.routing.Directives import spray.http._ import spray.can.Http import scala.concurrent._ @@ -18,6 +18,7 @@ import akka.util.Timeout import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Seconds, Span} + object FailureRoutes extends Directives { val exceptionRoute = { @@ -30,7 +31,7 @@ object FailureRoutes extends Directives { } } -class LogAccessRoutingTests extends FlatSpec with ScalaFutures { +class LogAccessRoutingTests extends FlatSpec with ScalaFutures with GivenWhenThen { behavior of "An HTTP Server that handles a request with a 200 response within the request timeout" @@ -50,7 +51,6 @@ class LogAccessRoutingTests extends FlatSpec with ScalaFutures { HttpResponse(StatusCodes.OK, _, _, _), time, LogAccess ) if time >= 500 && time <= 4000 => true } - expectNoMsg(3 seconds) } } @@ -74,7 +74,6 @@ class LogAccessRoutingTests extends FlatSpec with ScalaFutures { HttpResponse(StatusCodes.InternalServerError, _, _, _), time, LogAccess ) if time <= 4000 => true } - expectNoMsg(3 seconds) } } @@ -97,14 +96,13 @@ class LogAccessRoutingTests extends FlatSpec with ScalaFutures { HttpResponse(StatusCodes.InternalServerError, _, _, _), time, LogAccess ) if time <= 4000 => true } - expectNoMsg(3 seconds) } } behavior of "An HTTP Server that handles a request with a 200 response outside of the request timeout" it should "'Log Access' with a 500 response and an Access Time equal to the configured Request Timeout time " + - "and then 'Access already logged' with a 200 response and an Access Time more than the Request Timeout time" in { + "and then not log the response completed outside of the timeout" in { aTestLogAccessRoutingActor( requestTimeoutMillis = 50, httpServiceActorFactory = DelayedResponseServiceActor.factory(DelayedResponse(500), PATH)) { testKit => @@ -114,27 +112,19 @@ class LogAccessRoutingTests extends FlatSpec with ScalaFutures { assert(s.status.intValue == 500) } - expectMsgPF(3 seconds, "Expected normal log access access event with with timeout properties") { + expectMsgPF(3 seconds, "Expected log access event with timeout properties") { case LogEvent( HttpRequest(HttpMethods.GET,URI, _, _, _), HttpResponse(StatusCodes.InternalServerError, _, _, _), 50, LogAccess ) => true } - - expectMsgPF(3 seconds, "Expected access already logged event with response delayed properties") { - case LogEvent( - HttpRequest(HttpMethods.GET,URI, _, _, _), - HttpResponse(StatusCodes.OK, _, _, _), time, AccessAlreadyLogged - ) if time >= 500 => true - } } } behavior of "An HTTP Server that doesn't complete within 100 times the request timeout to a TXT request" it should "'Log Access' with a 500 response and an Access Time equal to the configured Request Timeout time " + - "and then 'Access already logged' with a 500 response and Access Time more than the Request Timeout time and an " + - "appropriate error message" in { + "and then not log the response completed outside of the timeout" in { aTestLogAccessRoutingActor( requestTimeoutMillis = 10, httpServiceActorFactory = DelayedResponseServiceActor.factory(DelayedResponse(2000), PATH)) { testKit => @@ -151,24 +141,12 @@ class LogAccessRoutingTests extends FlatSpec with ScalaFutures { HttpResponse(StatusCodes.InternalServerError, _, _, _), 10, LogAccess ) => true } - - expectMsgPF(3 seconds, "Expected access already logged error event") { - case LogEvent( - HttpRequest(HttpMethods.GET,URI, _, _, _), - HttpResponse(StatusCodes.InternalServerError, entity, _, _), time, AccessAlreadyLogged - ) if - time >= (10 * 100) && - entity.asString.contains("The RequestAccessLogger timed out waiting for the request to complete") - => true - } } } behavior of "An HTTP Server that doesn't complete within 100 times the request timeout to a JSON request" - it should "'Log Access' with a 406 response and an Access Time equal to the configured Request Timeout time " + - "and then 'Access already logged' with a 500 response and Access Time more than the Request Timeout time and an " + - "appropriate error message" in { + it should "'Log Access' with a 500 response and an Access Time equal to the configured Request Timeout time" in { aTestLogAccessRoutingActor( requestTimeoutMillis = 10, @@ -177,25 +155,16 @@ class LogAccessRoutingTests extends FlatSpec with ScalaFutures { whenReady(makeHttpCall(MediaTypes.`application/json`), timeout(Span(2, Seconds))) { s => - assert(s.status.intValue == 406) // because Spray Server 500 timeout response re-maps to 406 + assert(s.status.intValue == 500) + println(s.entity.toOption.get.contentType) } expectMsgPF(3 seconds, "Expected normal log access access event with timeout properties") { case LogEvent( HttpRequest(HttpMethods.GET,URI, _, _, _), - HttpResponse(StatusCodes.NotAcceptable, _, _, _), 10, LogAccess + HttpResponse(StatusCodes.InternalServerError, _, _, _), 10, LogAccess ) => true } - - expectMsgPF(3 seconds, "Expected access already logged error event") { - case LogEvent( - HttpRequest(HttpMethods.GET,URI, _, _, _), - HttpResponse(StatusCodes.InternalServerError, entity, _, _), time, AccessAlreadyLogged - ) if - time >= (10 * 100) && - entity.asString.contains("The RequestAccessLogger timed out waiting for the request to complete") - => true - } } } @@ -221,7 +190,6 @@ class LogAccessRoutingTests extends FlatSpec with ScalaFutures { ) => true } - expectNoMsg(3 seconds) } } @@ -233,16 +201,14 @@ class LogAccessRoutingTests extends FlatSpec with ScalaFutures { val URI:Uri = Uri(s"$HOST/$PATH") - - - // going to have to use 'request-timeout = infinite' - spray-can timeout handling is FUBAR def CONFIG(requestTimeout:String = "1 s") = s""" |spray.can { | server { - | request-timeout = $requestTimeout + | request-timeout = infinite # We completely disable Spray timeout handling so that we can manage it | idle-timeout = 10 s | registration-timeout = 100 s + | routils-request-timeout = $requestTimeout | } |} |akka { @@ -256,7 +222,7 @@ class LogAccessRoutingTests extends FlatSpec with ScalaFutures { def aTestLogAccessRoutingActor( requestTimeoutMillis:Long, - httpServiceActorFactory: TestKit => Props + httpServiceActorFactory: ActorRef => Props ) (callback: TestKit => Unit) { val config = ConfigFactory.parseString(CONFIG(s"$requestTimeoutMillis ms")) @@ -264,7 +230,7 @@ class LogAccessRoutingTests extends FlatSpec with ScalaFutures { val testKit = new TestKit(system) try { - val serviceActor = system.actorOf(httpServiceActorFactory(testKit)) + val serviceActor = system.actorOf(httpServiceActorFactory(testKit.testActor)) val sprayServerStartResult = IO(Http).ask(Http.Bind(serviceActor, "localhost", PORT)).flatMap { case b: Http.Bound ⇒ Future.successful(b) @@ -275,6 +241,8 @@ class LogAccessRoutingTests extends FlatSpec with ScalaFutures { assert(sprayServerStartResult.isReadyWithin(3 second)) callback(testKit) + Then("No more access log messages should be received by the testkit testActor") + testKit.expectNoMsg(3 seconds) } finally { system.shutdown() } diff --git a/src/test/scala/org/caoilte/spray/routing/TestLogAccessRoutingActor.scala b/src/test/scala/org/caoilte/spray/routing/TestLogAccessRoutingActor.scala index f102350..21ca147 100644 --- a/src/test/scala/org/caoilte/spray/routing/TestLogAccessRoutingActor.scala +++ b/src/test/scala/org/caoilte/spray/routing/TestLogAccessRoutingActor.scala @@ -36,8 +36,8 @@ case object DelayedResponse { case class DelayedResponse(thinkingMillis: Long, responseMessage:String = DelayedResponse.DEFAULT_RESPONSE) object DelayedResponseServiceActor { - def factory(delayedResponse: DelayedResponse, path:String): (TestKit => Props) = testKit => { - apply(new TestAccessLogger(testKit.testActor), delayedResponse, path) + def factory(delayedResponse: DelayedResponse, path:String): (ActorRef => Props) = actorRef => { + apply(new TestAccessLogger(actorRef), delayedResponse, path) } def apply(accessLogger: AccessLogger, delayedResponse: DelayedResponse, path:String):Props = { Props(new DelayedResponseServiceActor(accessLogger, delayedResponse, path)) @@ -85,8 +85,8 @@ class DelayedResponseServiceActor(val accessLogger: AccessLogger, delayedRespons object RouteServiceActor { - def factory(route: Route, path:String): (TestKit => Props) = testKit => { - apply(new TestAccessLogger(testKit.testActor), route, path) + def factory(route: Route, path:String): (ActorRef => Props) = actorRef => { + apply(new TestAccessLogger(actorRef), route, path) } def apply(accessLogger: AccessLogger, route: Route, path:String):Props = { Props(new RouteServiceActor(accessLogger, route, path))