Skip to content
laforge49 edited this page Nov 25, 2011 · 30 revisions

In the bind package, actors bind application request classes to MessageLogic objects. This speeds up some corner cases, like returning a constant or concurrent data structure, or forwarding a request to another actor. This also means an actor can invoke a method on the target actor directly when either the target actor has no exchange messenger or both the source and target actors share the same exchange messenger--something that is easy to do in most cases.

The downside to binding is the cost of doing a table lookup, but this is typically mitigated entirely by the increased efficiency in most cases. In the case where neither actor has an exchange messenger, the EchoTimingTest passes a message in 83 nanoseconds and the BurstTimingTest passes a message in 84 nanoseconds.

By reworking the EchoTimingTest to use both hardware threads, the DualEchoTimingTest achieves a 63% improvement in speed with a message being passed every 51 nanoseconds.

Exception handling is also supported by the bind package, with the default exception handler simply passing the exception to the actor which sourced the current request. A TransparentException wraps an exception which occurs while processing a [synchronous] response, so that the original exception can be passed to the source actor's exception handler rather than to the target actor's exception handler.

Finally, requests are marked as active until a response is returned or an exception is raised. By this means we can assure that for each request there is no more than one response or raised exception.

##BindRequest

/**
 * BindActor and Mailbox support only BindRequests and its subclasses.
 */
class BindRequest(dst: BindActor,
                  rf: Any => Unit,
                  data: AnyRef,
                  bound: QueuedLogic,
                  src: ExchangeMessengerSource)
  extends ExchangeRequest(src, rf) {

  /**
   * Set to false when a response is returned or an exception is raised,
   * active is used to ensure that there is only one response or exception
   * for each request.
   */
  var active = true

  /**
   * Default logic when no other exception handler is used.
   * (Each application request class can have its own logic for
   * handling exceptions.)
   */
  var exceptionFunction: (Exception, ExchangeMessenger) => Unit = {
    (ex, exchange) => reply(exchange, ex)
  }

  /**
   * The actor which is to process the request.
   */
  def target = dst

  /**
   * The application-specific request.
   */
  def req = data

  /**
   * The message logic object used to process the request.
   */
  def binding = bound

  /**
   * If the request is still active, mark the request as inactive and send
   * the response.
   */
  override def reply(exchangeMessenger: ExchangeMessenger, content: Any) {
    if (!active) {
      return
    }
    active = false
    super.reply(exchangeMessenger, content)
  }
}

BindRequest

##ActiveActor

/**
 * The ActiveActor class is used in implicits to
 * designate "this" actor.
 */
case class ActiveActor(bindActor: BindActor)

ActiveActor

##MessageLogic

/**
 * Actors bind application request classes to MessageLogic objects.
 * When a request is sent to an actor, it uses the MessageLogic object
 * to determine how the request is to be processed.
 */
abstract class MessageLogic {
  /**
   * The func method is used to determine how an application request
   * is to be processed.
   */
  def func(target: BindActor, msg: AnyRef, rf: Any => Unit)
          (implicit srcActor: ActiveActor)
}

MessageLogic

##ConcurrentData

/**
 * ConcurrentData is a subclass of MessageLogic for handling immutable or cuncurrent,
 * e.g. thread safe, data.
 */
class ConcurrentData(any: Any)
  extends MessageLogic {

  /**
   * The data is immediately returned as a response.
   */
  override def func(target: BindActor, msg: AnyRef, rf: Any => Unit)
                   (implicit sender: ActiveActor) {
    rf(any)
  }
}

ConcurrentData

##Forward

/**
 * Forward is a subclass of MessageLogic which handles request forwarding.
 */
class Forward(actor: BindActor)
  extends MessageLogic {

  /**
   * The request is immediately forwarded to another actor.
   */
  override def func(target: BindActor, msg: AnyRef, rf: Any => Unit)
                   (implicit sender: ActiveActor) {
    actor(msg)(rf)
  }
}

Forward

##QueuedLogic

/**
 * The QueuedLogic class supports requests that are to be added to an actor's
 * incoming message queue.
 */
abstract class QueuedLogic(messageFunction: (AnyRef, Any => Unit) => Unit)
  extends MessageLogic {

  /**
   * Returns the function which will eventually be used to process the request.
   */
  def reqFunction = messageFunction

  /**
   * Process the request. Any exceptions raised durring request processing are
   * returned as a response.
   */
  def process(exchange: Exchange, bindRequest: BindRequest) {
    try {
      messageFunction(bindRequest.req, exchange.reply)
    } catch {
      case ex: Exception => {
        exchange.reply(ex)
      }
    }
  }

  /**
   * Create a BindRequest wrapping the application request and
   * add it to the actor's incoming message queue.
   */
  def enqueueRequest(srcExchange: Exchange,
                   targetActor: BindActor,
                   content: AnyRef,
                   responseFunction: Any => Unit) {
    val oldReq = srcExchange.curReq.asInstanceOf[BindRequest]
    val sender = oldReq.target
    val req = targetActor.newRequest(
      responseFunction,
      content,
      this,
      sender)
    req.setOldRequest(oldReq)
    targetActor.exchangeMessenger.sendReq(targetActor, req, srcExchange)
  }
}

QueuedLogic

##BoundFunction

/**
 * The message logic object which wraps a function for processing
 * an application request.
 */
class BoundFunction(messageFunction: (AnyRef, Any => Unit) => Unit)
  extends QueuedLogic(messageFunction) {

  /**
   * The func method determines how an application request is to be processed.
   * In the case of a BoundFunction, the request is processed immediately if
   * the same mailbox is used by both the target and the source actors or if the
   * target actor has no mailbox. Otherwise a BindRequest object is created which
   * is then sent to Exchange.sendReq.
   * (A request is invalid if the target actor has a mailbox but the source actor
   * does not.)
   */
  override def func(target: BindActor, msg: AnyRef, rf: Any => Unit)
                   (implicit srcActor: ActiveActor) {
    val srcExchangeMessenger = {
      if (srcActor == null) null
      else {
        srcActor.bindActor.exchangeMessenger
      }
    }
    if (srcExchangeMessenger == null) {
      if (target.exchangeMessenger != null) {
        println("srcActor = " + srcActor)
        println("srcMailbox = " + srcExchangeMessenger)
        println("target = " + target)
        println("targetMailbox = " + target.exchangeMessenger)
        throw new UnsupportedOperationException(
          "An immutable bindActor can only send to another immutable bindActor."
        )
      }
    }
    val responseFunction: Any => Unit = {
      rsp => {
        rsp match {
          case rsp: Exception => {
            srcExchangeMessenger.curReq.asInstanceOf[BindRequest].
              exceptionFunction(rsp, srcExchangeMessenger)
          }
          case rsp => try {
            rf(rsp)
          } catch {
            case ex: Exception => srcExchangeMessenger.curReq.asInstanceOf[BindRequest].
              exceptionFunction(ex, srcExchangeMessenger)
          }
        }
      }
    }
    val targetExchangeMessenger = target.exchangeMessenger
    if (targetExchangeMessenger == null || targetExchangeMessenger == srcExchangeMessenger) {
      if (responseFunction == null) messageFunction(msg, AnyRef => {})
      else messageFunction(msg, responseFunction)
    } else enqueueRequest(srcExchangeMessenger.asInstanceOf[Exchange],
      target,
      msg,
      responseFunction)
  }
    }

BoundFunction

##MailboxFactory

/**
 * MailboxFactory is used to create newAsyncMailbox and newSyncMailbox objects.
 * It also provides the ThreadManager used by these objects.
 */
class MailboxFactory(_threadManager: ThreadManager = new MessengerThreadManager) {

  /**
   * Returns the ThreadManager used by the async and sync mailbox objects.
   */
  def threadManager = _threadManager

  /**
   * Stops all the threads of the ThreadManager as they become idle.
   */
  def close {
    threadManager.close
  }

  /**
   * Creates an asynchronous mailbox.
   */
  def newAsyncMailbox = {
    new Mailbox(this, true)
  }

  /**
   * Creates a synchronous mailbox.
   */
  def newSyncMailbox = {
    new Mailbox(this, false)
  }
}

MailboxFactory

##Mailbox

/**
 * A subclass of Exchange that supports BindActor.
 * Mailbox objects are created by the MailboxFactory.
 */
class Mailbox(_mailboxFactory: MailboxFactory,
              async: Boolean = false,
              _bufferedMessenger: BufferedMessenger[ExchangeMessengerMessage] = null)
  extends Exchange(_mailboxFactory.threadManager, async, _bufferedMessenger) {

  /**
   * Returns the MailboxFactory which created this Mailbox.
   */
  def mailboxFactory: MailboxFactory = _mailboxFactory

  /**
   * Returns the current request as an instance of BindRequest.
   */
  override def curReq = super.curReq.asInstanceOf[BindRequest]

  /**
   * Process the curReq message.
   */
  override protected def processRequest {
    curReq.binding.process(this, curReq)
  }
}

Mailbox

##TransparentException

/**
 * A TransparentException wraps another exception.
 * Transparent exceptions are created when an exception occurs while
 * processing a synchronous response, as a means of ensuring that an
 * exception handler of the actor which sourced the request is invoked
 * instead of an exception handler of the target actor.
 */
class TransparentException(ex: Exception) extends Exception(ex) {

  /**
   * Disable the stack trace for faster execution.
   */
  override def fillInStackTrace(): Throwable =
    if (sys.SystemProperties.noTraceSupression) super.fillInStackTrace()
    else this
}

TransparentException

##Bindings

/**
 * Bindings is a trait common to actors and components of actors.
 */
trait Bindings {

  /**
   * The binding of an application request class to a MessageLogic
   * object is stored in messageLogics.
   */
  val messageLogics =
    new java.util.HashMap[Class[_ <: AnyRef], MessageLogic]

  /**
   * "This" actor is implicit.
   */
  implicit def activeActor: ActiveActor

  /**
   * Returns the exchange messenger object used by the actor.
   */
  def exchangeMessenger: Mailbox

  /**
   * Wraps a message processing function with an exception handling function.
   */
  def exceptionHandler(msg: AnyRef,
                       responseFunction: Any => Unit,
                       messageFunction: (AnyRef, Any => Unit) => Unit)
                      (exceptionFunction: (Exception, ExchangeMessenger) => Unit) {
    if (exchangeMessenger == null) throw
      new UnsupportedOperationException("Actors without mailboxes can not use excepton handlers")
    val oldExceptionFunction = exchangeMessenger.curReq.asInstanceOf[BindRequest].
      exceptionFunction
    exchangeMessenger.curReq.asInstanceOf[BindRequest].exceptionFunction = exceptionFunction
    try {
      messageFunction(msg, rsp => {
        exchangeMessenger.curReq.asInstanceOf[BindRequest].exceptionFunction =
          oldExceptionFunction
        try {
          responseFunction(rsp)
        } catch {
          case ex: Exception => throw new TransparentException(ex)
        }
      })
    } catch {
      case ex: TransparentException => {
        exceptionFunction(ex.getCause.asInstanceOf[Exception], exchangeMessenger)
      }
      case ex: Exception => {
        exchangeMessenger.curReq.asInstanceOf[BindRequest].exceptionFunction =
          oldExceptionFunction
        exceptionFunction(ex, exchangeMessenger)
      }
    }
  }

  /**
   * Bind a class of application request to a message processing function.
   */
  protected def bind(reqClass: Class[_ <: AnyRef],
                     messageFunction: (AnyRef, Any => Unit) => Unit) {
    if (activeActor.bindActor.isOpen) throw new IllegalStateException
    messageLogics.put(reqClass, new BoundFunction(messageFunction))
  }

  /**
   * Bind a class of application request to a MessageLogic object.
   */
  protected def bindMessageLogic(reqClass: Class[_ <: AnyRef],
                         safe: MessageLogic) {
    if (activeActor.bindActor.isOpen) throw new IllegalStateException
    messageLogics.put(reqClass, safe)
  }
}

Bindings

##BindActor

/**
 * Objects which implement BindActor support message binding,
 * default message processing via a hierarchy of actors and
 * can share a common exchange messenger.
 */
trait BindActor
  extends ExchangeMessengerActor
  with Bindings {

  /**
   * The exchange messenger for this actor.
   */
  private var _exchangeMessenger: Mailbox = null

  /**
   * The actor which processes any messages not bound to this actor.
   */
  private var _superior: BindActor = null

  /**
   * "This" actor.
   */
  private val _activeActor = ActiveActor(this)

  /**
   * Set to true when initialization is complete.
   */
  private var _opened = false

  /**
   * "This" actor is implicit.
   */
  implicit def activeActor: ActiveActor = _activeActor

  /**
   * Returns the exchange messenger object used by the actor.
   */
  override def exchangeMessenger = _exchangeMessenger

  /**
   * Set the exchange messenger for this actor.
   * (Valid only during actor initialization.)
   */
  def setExchangeMessenger(exchangeMessenger: Mailbox) {
    if (isOpen) throw new IllegalStateException
    _exchangeMessenger = exchangeMessenger
  }

  /**
   * Returns the mailboxFactory/threadManager.
   */
  def mailboxFactory = exchangeMessenger.mailboxFactory

  /**
   * Returns an asynchronous exchange.
   */
  def newAsyncMailbox = mailboxFactory.newAsyncMailbox

  /**
   * Returns a synchronous exchange.
   */
  def newSyncMailbox = mailboxFactory.newSyncMailbox

  /**
   * Returns true when initialization is complete.
   */
  def isOpen = _opened

  /**
   * Complete actor initialization if it is not already complete.
   */
  def _open {
    if (!isOpen) opener
  }

  /**
   * Complete actor initialization.
   */
  protected def opener {
    open
    _opened = true
  }

  /**
   * Perform application-specific actor initialization.
   */
  protected def open {}

  /**
   * Specify the actor to process messages not bound by this actor.
   * (Not valid once initialization is complete.)
   */
  def setSuperior(superior: BindActor) {
    if (isOpen) throw new IllegalStateException
    _superior = superior
  }

  /**
   * Returns the actor which processes messages not bound by this actor.
   */
  def superior = _superior

  /**
   * Create a new BindRequest.
   */
  def newRequest(rf: Any => Unit,
                 data: AnyRef,
                 bound: QueuedLogic,
                 src: ExchangeMessengerSource) =
    new BindRequest(this, rf, data, bound, src)

  /**
   * If initialization is not complete, then complete it.
   * Once complete, process the application request.
   */
  def apply(msg: AnyRef)
           (responseFunction: Any => Unit)
           (implicit srcActor: ActiveActor) {
    _open
    val messageLogic = messageLogics.get(msg.getClass)
    if (messageLogic != null) messageLogic.func(this, msg, responseFunction)(srcActor)
    else if (superior != null) superior(msg)(responseFunction)(srcActor)
    else {
      System.err.println("bindActor = " + this.getClass.getName)
      throw new IllegalArgumentException("Unknown type of message: " + msg.getClass.getName)
    }
  }

  /**
   * Check that this actor, or one of its superiors, can process a given
   * class of application request.
   * (To be called from within an application-specific open method.)
   */
  def requiredService(reqClass: Class[_ <: AnyRef]) {
    if (isOpen) throw new IllegalStateException
    var actor: BindActor = this
    while (!actor.messageLogics.containsKey(reqClass)) {
      if (superior == null)
        throw new UnsupportedOperationException("service missing for " + reqClass.getName)
      actor = superior
    }
  }
}

BindActor

##Future

/**
 * The Future class for sending a request and waiting for a response.
 */
class Future
  extends ExchangeMessengerSource
  with MessageListDestination[ExchangeMessengerMessage] {
  @volatile private[this] var rsp: Any = _
  @volatile private[this] var satisfied = false

  /**
   * Sends an application request to a given actor.
   */
  def send(dst: BindActor, msg: AnyRef) {
    val safe = dst.messageLogics.get(msg.getClass)
    if (!safe.isInstanceOf[QueuedLogic]) throw
      new IllegalArgumentException(msg.getClass.getName + "can not be sent asynchronously to " + dst)
    dst._open
    val mailbox = dst.exchangeMessenger
    if (mailbox == null) {
      val boundFunction = safe.asInstanceOf[BoundFunction]
      boundFunction.reqFunction(msg, synchronousResponse)
    } else {
      val bound = safe.asInstanceOf[QueuedLogic]
      val req = dst.newRequest(Unit => {}, msg, bound, this)
      val blkmsg = new ArrayList[ExchangeMessengerMessage]
      blkmsg.add(req)
      dst.messageListDestination.incomingMessageList(blkmsg)
    }
  }

  /**
   * The logic for processing a synchronous response.
   */
  private def synchronousResponse(_rsp: Any) {
    rsp = _rsp
    satisfied = true
  }

  /**
   * Returns the MessageListDestination, this.
   */
  override def messageListDestination = this

  /**
   * The logic for processing an asynchronous response.
   */
  override def incomingMessageList(blkmsg: ArrayList[ExchangeMessengerMessage]) {
    synchronized {
      if (!satisfied) {
        rsp = blkmsg.get(0).asInstanceOf[ExchangeMessengerResponse].rsp
        satisfied = true
      }
      notify()
    }
  }

  /**
   * The get method waits for a response.
   */
  @tailrec final def get: Any = {
    synchronized {
      if (satisfied) return rsp
      this.wait()
      if (satisfied) return rsp
    }
    get
  }
}

/**
 * Sends a response and waits for a request.
 * This companion object is used mostly by test code, as it blocks the current thread
 * until a response is received.
 * Use of this companion object is further constrained, as it does not support application request
 * classes bound to a ConcurrentData objects nor application request classes bound to
 * a Forward object.
 */
object Future {
  def apply(actor: BindActor, msg: AnyRef) = {
    val future = new Future
    future.send(actor, msg)
    val rv = future.get
    rv match {
      case ex: Exception => throw ex
      case _ =>
    }
    rv
  }
}

Future

##Interop

/**
 * A helper class for Scala Reactors which send requests to a BindActor and receive the response.
 */
class Interop[T >: AnyRef](reactor: Reactor[T])
  extends ExchangeMessengerSource
  with MessageListDestination[ExchangeMessengerMessage] {

  /**
   * Returns the MessageListDestination, this.
   */
  override def messageListDestination = this

  /**
   * The afpSend method sends a message to a BindActor and provides a response function
   * for processing the response.
   */
  def afpSend(dst: Actor, msg: AnyRef)(rf: Any => Unit) {
    val safe = dst.messageLogics.get(msg.getClass)
    if (!safe.isInstanceOf[QueuedLogic]) throw
      new IllegalArgumentException(msg.getClass.getName + "can not be sent asynchronously to " + dst)
    dst._open
    val mailbox = dst.exchangeMessenger
    if (mailbox == null) {
      val boundFunction = safe.asInstanceOf[BoundFunction]
      boundFunction.reqFunction(msg, rf)
    } else {
      val bound = safe.asInstanceOf[QueuedLogic]
      val req = dst.newRequest(rf, msg, bound, this)
      val blkmsg = new java.util.ArrayList[ExchangeMessengerMessage]
      blkmsg.add(req)
      dst.messageListDestination.incomingMessageList(blkmsg)
    }
  }

  /**
   * The logic for processing an asynchronous response.
   */
  override def incomingMessageList(blkmsg: java.util.ArrayList[ExchangeMessengerMessage]) {
    var i = 0
    while (i < blkmsg.size) {
      val mailboxRsp = blkmsg.get(i).asInstanceOf[ExchangeMessengerResponse]
      i += 1
      reactor ! mailboxRsp
    }
  }

  /**
   * The afpResponse method should be called when the Reactor receives
   * an ExchangeMessengerResponse object.
   */
  def afpResponse(mailboxRsp: ExchangeMessengerResponse) {
    mailboxRsp.responseFunction(mailboxRsp.rsp)
  }
}

Interop

Clone this wiki locally