-
Notifications
You must be signed in to change notification settings - Fork 1
Bind
In the bind package, actors bind application request classes to MessageLogic objects. This speeds up some corner cases, like returning a constant or concurrent data structure, or forwarding a request to another actor. This also means an actor can invoke a method on the target actor directly when either the target actor has no exchange messenger or both the source and target actors share the same exchange messenger--which turns out to be a very common occurrence.
The downside to binding is the cost of doing a table lookup, but this is typically mitigated entirely by the increased efficiency in most cases. In the case where neither actor has an exchange messenger, the EchoTimingTest passes a message in 83 nanoseconds and the BurstTimingTest passes a message in 84 nanoseconds.
Exception handling is also supported by the bind package, with the default exception handler simply passing the exception to the actor which sourced the current request. A TransparentException wrapps an exception which occurs while processing a [synchronous] response, so that the original exception can be passed to the source actor's exception handler rather than to the target actor's exception handler.
Finally, requests are marked as active until a response is returned or an exception is raised. By this means we can assure that for each request there is no more than one response or raised exception.
##BindRequest
/**
* BindActor and Mailbox support only BindRequests and its subclasses.
*/
class BindRequest(dst: BindActor,
rf: Any => Unit,
data: AnyRef,
bound: QueuedLogic,
src: ExchangeMessengerSource)
extends ExchangeRequest(src, rf) {
/**
* Set to false when a response is returned or an exception is raised,
* active is used to ensure that there is only one response or exception
* for each request.
*/
var active = true
/**
* Default logic when no other exception handler is used.
* (Each application request class can have its own logic for
* handling exceptions.)
*/
var exceptionFunction: (Exception, ExchangeMessenger) => Unit = {
(ex, exchange) => reply(exchange, ex)
}
/**
* The actor which is to process the request.
*/
def target = dst
/**
* The application-specific request.
*/
def req = data
/**
* The message logic object used to process the request.
*/
def binding = bound
/**
* If the request is still active, mark the request as inactive and send
* the response.
*/
override def reply(exchangeMessenger: ExchangeMessenger, content: Any) {
if (!active) {
return
}
active = false
super.reply(exchangeMessenger, content)
}
}
##ActiveActor
/**
* The ActiveActor class is used in implicits to
* designate "this" actor.
*/
case class ActiveActor(bindActor: BindActor)
##MessageLogic
/**
* Actors bind application request classes to MessageLogic objects.
* When a request is sent to an actor, it uses the MessageLogic object
* to determine how the request is to be processed.
*/
abstract class MessageLogic {
/**
* The func method is used to determine how an application request
* is to be processed.
*/
def func(target: BindActor, msg: AnyRef, rf: Any => Unit)
(implicit srcActor: ActiveActor)
}
##ConcurrentData
/**
* ConcurrentData is a subclass of MessageLogic for handling immutable or cuncurrent,
* e.g. thread safe, data.
*/
class ConcurrentData(any: Any)
extends MessageLogic {
/**
* The data is immediately returned as a response.
*/
override def func(target: BindActor, msg: AnyRef, rf: Any => Unit)
(implicit sender: ActiveActor) {
rf(any)
}
}
##Forward
/**
* Forward is a subclass of MessageLogic which handles request forwarding.
*/
class Forward(actor: BindActor)
extends MessageLogic {
/**
* The request is immediately forwarded to another actor.
*/
override def func(target: BindActor, msg: AnyRef, rf: Any => Unit)
(implicit sender: ActiveActor) {
actor(msg)(rf)
}
}
##QueuedLogic /**
- The QueuedLogic class supports requests that are to be added to an actor's
- incoming message queue. */ abstract class QueuedLogic(messageFunction: (AnyRef, Any => Unit) => Unit) extends MessageLogic {
/**
- Returns the function which will eventually be used to process the request. */ def reqFunction = messageFunction
/**
- Process the request. Any exceptions raised durring request processing are
- returned as a response. */ def process(exchange: Exchange, bindRequest: BindRequest) { try { messageFunction(bindRequest.req, exchange.reply) } catch { case ex: Exception => { exchange.reply(ex) } } }
/**
- Create a BindRequest wrapping the application request and
- add it to the actor's incoming message queue. */ def enqueueRequest(srcExchange: Exchange, targetActor: BindActor, content: AnyRef, responseFunction: Any => Unit) { val oldReq = srcExchange.curReq.asInstanceOf[BindRequest] val sender = oldReq.target val req = targetActor.newRequest( responseFunction, content, this, sender) req.setOldRequest(oldReq) targetActor.exchangeMessenger.sendReq(targetActor, req, srcExchange) } }
##BoundFunction
##BindActor
##Bindings
##Mailbox
##MailboxFactory
##TransparentException
##Future
##Interop