-
Notifications
You must be signed in to change notification settings - Fork 1
Bind
In the bind package, actors bind application request classes to MessageLogic objects. This speeds up some corner cases, like returning a constant or concurrent data structure, or forwarding a request to another actor. This also means an actor can invoke a method on the target actor directly when either the target actor has no exchange messenger or both the source and target actors share the same exchange messenger--which turns out to be a very common occurrence.
The downside to binding is the cost of doing a table lookup, but this is typically mitigated entirely by the increased efficiency in most cases. In the case where neither actor has an exchange messenger, the EchoTimingTest passes a message in 83 nanoseconds and the BurstTimingTest passes a message in 84 nanoseconds.
Exception handling is also supported by the bind package, with the default exception handler simply passing the exception to the actor which sourced the current request. A TransparentException wrapps an exception which occurs while processing a [synchronous] response, so that the original exception can be passed to the source actor's exception handler rather than to the target actor's exception handler.
Finally, requests are marked as active until a response is returned or an exception is raised. By this means we can assure that for each request there is no more than one response or raised exception.
##BindRequest
/**
* BindActor and Mailbox support only BindRequests and its subclasses.
*/
class BindRequest(dst: BindActor,
rf: Any => Unit,
data: AnyRef,
bound: QueuedLogic,
src: ExchangeMessengerSource)
extends ExchangeRequest(src, rf) {
/**
* Set to false when a response is returned or an exception is raised,
* active is used to ensure that there is only one response or exception
* for each request.
*/
var active = true
/**
* Default logic when no other exception handler is used.
* (Each application request class can have its own logic for
* handling exceptions.)
*/
var exceptionFunction: (Exception, ExchangeMessenger) => Unit = {
(ex, exchange) => reply(exchange, ex)
}
/**
* The actor which is to process the request.
*/
def target = dst
/**
* The application-specific request.
*/
def req = data
/**
* The message logic object used to process the request.
*/
def binding = bound
/**
* If the request is still active, mark the request as inactive and send
* the response.
*/
override def reply(exchangeMessenger: ExchangeMessenger, content: Any) {
if (!active) {
return
}
active = false
super.reply(exchangeMessenger, content)
}
}
##ActiveActor
/**
* The ActiveActor class is used in implicits to
* designate "this" actor.
*/
case class ActiveActor(bindActor: BindActor)
##MessageLogic
/**
* Actors bind application request classes to MessageLogic objects.
* When a request is sent to an actor, it uses the MessageLogic object
* to determine how the request is to be processed.
*/
abstract class MessageLogic {
/**
* The func method is used to determine how an application request
* is to be processed.
*/
def func(target: BindActor, msg: AnyRef, rf: Any => Unit)
(implicit srcActor: ActiveActor)
}
##ConcurrentData
/**
* ConcurrentData is a subclass of MessageLogic for handling immutable or cuncurrent,
* e.g. thread safe, data.
*/
class ConcurrentData(any: Any)
extends MessageLogic {
/**
* The data is immediately returned as a response.
*/
override def func(target: BindActor, msg: AnyRef, rf: Any => Unit)
(implicit sender: ActiveActor) {
rf(any)
}
}
##Forward
/**
* Forward is a subclass of MessageLogic which handles request forwarding.
*/
class Forward(actor: BindActor)
extends MessageLogic {
/**
* The request is immediately forwarded to another actor.
*/
override def func(target: BindActor, msg: AnyRef, rf: Any => Unit)
(implicit sender: ActiveActor) {
actor(msg)(rf)
}
}
##QueuedLogic
/**
* The QueuedLogic class supports requests that are to be added to an actor's
* incoming message queue.
*/
abstract class QueuedLogic(messageFunction: (AnyRef, Any => Unit) => Unit)
extends MessageLogic {
/**
* Returns the function which will eventually be used to process the request.
*/
def reqFunction = messageFunction
/**
* Process the request. Any exceptions raised durring request processing are
* returned as a response.
*/
def process(exchange: Exchange, bindRequest: BindRequest) {
try {
messageFunction(bindRequest.req, exchange.reply)
} catch {
case ex: Exception => {
exchange.reply(ex)
}
}
}
/**
* Create a BindRequest wrapping the application request and
* add it to the actor's incoming message queue.
*/
def enqueueRequest(srcExchange: Exchange,
targetActor: BindActor,
content: AnyRef,
responseFunction: Any => Unit) {
val oldReq = srcExchange.curReq.asInstanceOf[BindRequest]
val sender = oldReq.target
val req = targetActor.newRequest(
responseFunction,
content,
this,
sender)
req.setOldRequest(oldReq)
targetActor.exchangeMessenger.sendReq(targetActor, req, srcExchange)
}
}
##BoundFunction
/**
* The message logic object which wraps a function for processing
* an application request.
*/
class BoundFunction(messageFunction: (AnyRef, Any => Unit) => Unit)
extends QueuedLogic(messageFunction) {
/**
* The func method determines how an application request is to be processed.
* In the case of a BoundFunction, the request is processed immediately if
* the same mailbox is used by both the target and the source actors or if the
* target actor has no mailbox. Otherwise a BindRequest object is created which
* is then sent to Exchange.sendReq.
* (A request is invalid if the target actor has a mailbox but the source actor
* does not.)
*/
override def func(target: BindActor, msg: AnyRef, rf: Any => Unit)
(implicit srcActor: ActiveActor) {
val srcExchangeMessenger = {
if (srcActor == null) null
else {
srcActor.bindActor.exchangeMessenger
}
}
if (srcExchangeMessenger == null) {
if (target.exchangeMessenger != null) {
println("srcActor = " + srcActor)
println("srcMailbox = " + srcExchangeMessenger)
println("target = " + target)
println("targetMailbox = " + target.exchangeMessenger)
throw new UnsupportedOperationException(
"An immutable bindActor can only send to another immutable bindActor."
)
}
}
val responseFunction: Any => Unit = {
rsp => {
rsp match {
case rsp: Exception => {
srcExchangeMessenger.curReq.asInstanceOf[BindRequest].
exceptionFunction(rsp, srcExchangeMessenger)
}
case rsp => try {
rf(rsp)
} catch {
case ex: Exception => srcExchangeMessenger.curReq.asInstanceOf[BindRequest].
exceptionFunction(ex, srcExchangeMessenger)
}
}
}
}
val targetExchangeMessenger = target.exchangeMessenger
if (targetExchangeMessenger == null || targetExchangeMessenger == srcExchangeMessenger) {
if (responseFunction == null) messageFunction(msg, AnyRef => {})
else messageFunction(msg, responseFunction)
} else enqueueRequest(srcExchangeMessenger.asInstanceOf[Exchange],
target,
msg,
responseFunction)
}
}
##MailboxFactory
/**
* MailboxFactory is used to create newAsyncMailbox and newSyncMailbox objects.
* It also provides the ThreadManager used by these objects.
*/
class MailboxFactory(_threadManager: ThreadManager = new MessengerThreadManager) {
/**
* Returns the ThreadManager used by the async and sync mailbox objects.
*/
def threadManager = _threadManager
/**
* Stops all the threads of the ThreadManager as they become idle.
*/
def close {
threadManager.close
}
/**
* Creates an asynchronous mailbox.
*/
def newAsyncMailbox = {
new Mailbox(this, true)
}
/**
* Creates a synchronous mailbox.
*/
def newSyncMailbox = {
new Mailbox(this, false)
}
}
##Mailbox
/**
* A subclass of Exchange that supports BindActor.
* Mailbox objects are created by the MailboxFactory.
*/
class Mailbox(_mailboxFactory: MailboxFactory,
async: Boolean = false,
_bufferedMessenger: BufferedMessenger[ExchangeMessengerMessage] = null)
extends Exchange(_mailboxFactory.threadManager, async, _bufferedMessenger) {
/**
* Returns the MailboxFactory which created this Mailbox.
*/
def mailboxFactory: MailboxFactory = _mailboxFactory
/**
* Returns the current request as an instance of BindRequest.
*/
override def curReq = super.curReq.asInstanceOf[BindRequest]
/**
* Process the curReq message.
*/
override protected def processRequest {
curReq.binding.process(this, curReq)
}
}
##TransparentException
/**
* A TransparentException wraps another exception.
* Transparent exceptions are created when an exception occurs while
* processing a synchronous response, as a means of ensuring that an
* exception handler of the actor which sourced the request is invoked
* instead of an exception handler of the target actor.
*/
class TransparentException(ex: Exception) extends Exception(ex) {
/**
* Disable the stack trace for faster execution.
*/
override def fillInStackTrace(): Throwable =
if (sys.SystemProperties.noTraceSupression) super.fillInStackTrace()
else this
}
##Bindings
##BindActor
##Future
##Interop