Skip to content

Commit

Permalink
- Remove the Request Mapping Actor and achieve same functionality by …
Browse files Browse the repository at this point in the history
…using our own Timeout Handler instead of SprayCans.

-- A shame but a map for every request was never going to scale and was bound to have problems
-- No longer log when the request eventually completes by itself. Could maybe put this back in later.
- Updated tests, demo and docs to match changes.

Will be a 0.3.0 release because of Spray Config changes required
  • Loading branch information
caoilte committed Sep 15, 2015
1 parent ded5992 commit d457182
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 132 deletions.
29 changes: 22 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@

In order to accurately log all access to a Spray Server you need to,
+ Time and log the completion of your routes ([Mathias tells me how][spray-time-custom-directive])
+ Handle Timedout requests ([see Spray Docs][spray-timeout-handling]) and then log them
+ Ensure Routes that timeout don't also log on completion (ie log access twice -
[see this Google Groups Discussion for background][spray-timeout-discussion])
+ Ensure Routes that throw exceptions have their access logged consistently with routes that do not
+ Handle timeouts. ([Spray Timeout Handling][spray-timeout-handling]) doesn't allow you to disambiguate the timeout
event and the completion of the timedout route. [see this Google Groups Discussion for background][spray-timeout-discussion])

The ```LogAccessRouting``` Spray Routing trait uses custom directives to manage a ```Map[HttpRequest,Count]```
and ensure that HttpRequests are only logged once - either by your routes, or by the timeout route.
The ```LogAccessRouting``` Spray Routing trait takes care of all of these access logging edge cases.

Included is a demo that you can use to test this behaviour. It runs from within sbt using ```sbt-revolver``` and uses
a custom logger based on a suffix of the [Common Log Format][common-log-format]. (There are also automated integration
Expand All @@ -33,10 +31,11 @@ spray-routils "GET /hello HTTP/1.1" 200 12 580

```
spray-routils "GET /hello HTTP/1.1" 500 69 500
spray-routils THIS WOULD HAVE BEEN RESPONSE IF TIMEOUT HADN'T OCCURRED
spray-routils "GET /hello HTTP/1.1" 200 12 1048"
```

It will also cause a dead letter warning to be created because the response handling actor will shutdown on timeout and
not receive any message received from the timedout request completing


### Demo a Server with a route that always throws an exception

Expand Down Expand Up @@ -77,6 +76,22 @@ class YourHttpService extends LogAccessRoutingActor with Actor {
}
```

You also need to make sure that you set a ```request timeout``` correctly

```
spray.can {
server {
routils-request-timeout = 5 s
request-timeout = infinite
}
}
```

The rather convoluted configuration is necessary in order to implement our own timeout handling instead of relying on
Spray's. When a request completes it is impossible to tell if Spray has already timed out the request. This could lead to
a request being access logged as completed when in fact its response was discarded. By implementing our own timeout logic
we can link Timeouts to normal requests and stop Access Log messages from being generated by timed out requests.

[spray-time-custom-directive]: https://groups.google.com/d/msg/spray-user/V5q6kaXfcHY/ioUzYbW8XvoJ "A Spray Custom Directive for timing a Route"
[spray-timeout-handling]: http://spray.io/documentation/1.2.1/spray-routing/key-concepts/timeout-handling/ "Spray Timeout Handling"
[spray-timeout-discussion]: https://groups.google.com/d/msg/spray-user/as_3g7Yl_kI/pJmzB-DXOF0J "Discussion about handling Spray Timeouts"
Expand Down
99 changes: 30 additions & 69 deletions src/main/scala/org/caoilte/spray/routing/LogAccessRouting.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package org.caoilte.spray.routing

import com.typesafe.config.Config
import spray.routing._
import spray.can.server.ServerSettings
import akka.actor._
import scala.concurrent.duration._
import spray.util.LoggingContext
import spray.util.{SettingsCompanion, LoggingContext}
import spray.http._
import org.caoilte.spray.routing.SingleAccessLogger.AccessLogRequest
import RequestAccessLogger._
import scala.concurrent.duration._
import spray.util._


trait AccessLogger {
Expand All @@ -22,39 +23,40 @@ object RequestAccessLogger {
Unit => System.currentTimeMillis() - startTime
}

def props(ctx: RequestContext, singleAccessLogger:ActorRef,
def props(ctx: RequestContext, accessLogger: AccessLogger,
timeStampCalculator:(Unit => Long), requestTimeout:FiniteDuration):Props = {
Props(new RequestAccessLogger(ctx, singleAccessLogger, timeStampCalculator, requestTimeout))
Props(new RequestAccessLogger(ctx, accessLogger, timeStampCalculator, requestTimeout))
}

case object RequestLoggingTimeout

}

class RequestAccessLogger(ctx: RequestContext, singleAccessLogger:ActorRef,
class RequestAccessLogger(ctx: RequestContext, accessLogger: AccessLogger,
timeStampCalculator:(Unit => Long), requestTimeout:FiniteDuration) extends Actor {
import ctx._

import context.dispatcher
val cancellable:Cancellable = context.system.scheduler.scheduleOnce(requestTimeout * 100, self, RequestLoggingTimeout)
val cancellable:Cancellable = context.system.scheduler.scheduleOnce(requestTimeout, self, RequestLoggingTimeout)

def receive = {
case RequestLoggingTimeout => {
val errorResponse = HttpResponse(
StatusCodes.InternalServerError,
HttpEntity(s"The RequestAccessLogger timed out waiting for the request to complete after " +
s"'${requestTimeout * 100}' which is 100 times the " +
s"configured request timeout (and therefore when a timeout response was made)."))
singleAccessLogger ! AccessLogRequest(request, errorResponse, timeStampCalculator(Unit))
s"'${requestTimeout}'."))

accessLogger.logAccess(request, errorResponse, requestTimeout.toMillis)
ctx.complete(errorResponse)
context.stop(self)
}
case response:HttpResponse => {
cancellable.cancel()
singleAccessLogger ! AccessLogRequest(request, response, timeStampCalculator(Unit))
accessLogger.logAccess(request, response, timeStampCalculator(Unit))
forwardMsgAndStop(response)
}
case other => {
forwardMsgAndStop(other)
responder forward other
}
}

Expand All @@ -69,50 +71,10 @@ object SingleAccessLogger {
case object LogState
}

class SingleAccessLogger(accessLogger: AccessLogger) extends Actor with ActorLogging {
import SingleAccessLogger._
import context.dispatcher

val cancellable:Cancellable = context.system.scheduler.schedule(10 minutes, 10 minutes, self, LogState)

def receive = handleAccessLogRequest(Map().withDefaultValue(0))

def handleAccessLogRequest(inProgressRequests: Map[HttpRequest, Int]): Receive = {
case request:HttpRequest => {
context.become(handleAccessLogRequest(inProgressRequests.updated(request, inProgressRequests(request)+1)))
}
case AccessLogRequest(request, response, time) => {
inProgressRequests(request) match {
case 0 => accessLogger.accessAlreadyLogged(request, response, time)
case 1 => {
accessLogger.logAccess(request, response, time)
context.become(handleAccessLogRequest(inProgressRequests - request))
}
case _ => {
accessLogger.logAccess(request, response, time)
context.become(handleAccessLogRequest(inProgressRequests.updated(request, inProgressRequests(request)-1)))
}
}
}
case LogState => log.info(s"There are currently ${inProgressRequests.size} requests being tracked for access logging")
}
}

trait LogAccessRouting extends HttpService {
var singleAccessLoggerRef:ActorRef
val requestTimeout:FiniteDuration
val accessLogger:AccessLogger


def accessLogTimeout: Directive0 = {
mapRequestContext { ctx =>
ctx.withHttpResponseMapped { response =>
singleAccessLoggerRef ! AccessLogRequest(ctx.request, response, requestTimeout.toMillis)
response
}
}
}

override def runRoute(route: Route)(implicit eh: ExceptionHandler, rh: RejectionHandler, ac: ActorContext,
rs: RoutingSettings, log: LoggingContext): Actor.Receive = {

Expand All @@ -127,43 +89,42 @@ trait LogAccessRouting extends HttpService {
timeStampCalculator: (Unit => Long) = defaultTimeStampCalculator()):RequestContext = {

val loggingInterceptor = ac.actorOf(
RequestAccessLogger.props(ctx, singleAccessLoggerRef, timeStampCalculator, requestTimeout)
RequestAccessLogger.props(ctx, accessLogger, timeStampCalculator, requestTimeout)
)
ctx.withResponder(loggingInterceptor)
}

{
case Timedout(request: HttpRequest) {
val ctx = attachLoggingInterceptorToRequest(request, Unit => requestTimeout.toMillis)
super.runRoute(timeoutRoute)(eh, rh, ac, rs, log)(ctx)
}
case request: HttpRequest {
val ctx = attachLoggingInterceptorToRequest(request)
singleAccessLoggerRef ! request
super.runRoute(route)(eh, rh, ac, rs, log)(ctx)
}
case ctx: RequestContext {
singleAccessLoggerRef ! ctx.request
super.runRoute(route)(eh, rh, ac, rs, log)(attachLoggingInterceptorToCtx(ctx))
}
case other => super.runRoute(route)(eh, rh, ac, rs, log)(other)
}
}
}

case class LogAccessRoutingSettings(requestTimeout: FiniteDuration)

object LogAccessRoutingSettings extends SettingsCompanion[LogAccessRoutingSettings]("spray.can.server") {
override def fromSubConfig(c: Config): LogAccessRoutingSettings = {

val sprayDuration:Duration = c getDuration "request-timeout"
require(!sprayDuration.isFinite(), "LogAccessRouting requires spray.can.server.request-timeout to be set as 'infinite'")

val duration:Duration = c getDuration "routils-request-timeout"
require(duration.isFinite(), "LogAccessRouting requires spray.can.server.routils-request-timeout to be set with a finite duration")
apply(duration.asInstanceOf[FiniteDuration])
}
}

trait LogAccessRoutingActor extends HttpServiceActor with LogAccessRouting {

val accessLogger:AccessLogger
val requestTimeout:FiniteDuration = {
val configuredRequestTimeout:Duration = ServerSettings(context.system).requestTimeout
require(configuredRequestTimeout.isFinite(), "LogAccessRouting cannot be configured if request timeouts are not finite")
configuredRequestTimeout.asInstanceOf[FiniteDuration]
}
val requestTimeout:FiniteDuration = LogAccessRoutingSettings(context.system).requestTimeout

var singleAccessLoggerRef:ActorRef = _

override def preStart() {
singleAccessLoggerRef = context.system.actorOf(Props(new SingleAccessLogger(accessLogger)), "single-access-logger")
super.preStart
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ object LogAccessRoutingDemo extends App {
s"""
|spray.can {
| server {
| request-timeout = $requestTimeoutInMillis ms
| routils-request-timeout = $requestTimeoutInMillis ms
| request-timeout = infinite
| idle-timeout = 100 s
| registration-timeout = 100 s
| }
Expand All @@ -105,7 +106,7 @@ object LogAccessRoutingDemo extends App {
"Binding failed. Switch on DEBUG-level logging for `akka.io.TcpListener` to log the cause."))
}(system.dispatcher)

Await.result(serverStartedFuture, 1 second)
Await.result(serverStartedFuture, 2 second)
println("LogAccessRoutingDemo started. Try it out on http://localhost:8085/hello")

}
Loading

0 comments on commit d457182

Please sign in to comment.