Skip to content
laforge49 edited this page Nov 24, 2011 · 30 revisions

In the bind package, actors bind application request classes to MessageLogic objects. This speeds up some corner cases, like returning a constant or concurrent data structure, or forwarding a request to another actor. This also means an actor can invoke a method on the target actor directly when either the target actor has no exchange messenger or both the source and target actors share the same exchange messenger--which turns out to be a very common occurrence.

The downside to binding is the cost of doing a table lookup, but this is typically mitigated entirely by the increased efficiency in most cases. In the case where neither actor has an exchange messenger, the EchoTimingTest passes a message in 83 nanoseconds and the BurstTimingTest passes a message in 84 nanoseconds.

Exception handling is also supported by the bind package, with the default exception handler simply passing the exception to the actor which sourced the current request. A TransparentException wrapps an exception which occurs while processing a [synchronous] response, so that the original exception can be passed to the source actor's exception handler rather than to the target actor's exception handler.

Finally, requests are marked as active until a response is returned or an exception is raised. By this means we can assure that for each request there is no more than one response or raised exception.

##BindRequest

/**
 * BindActor and Mailbox support only BindRequests and its subclasses.
 */
class BindRequest(dst: BindActor,
                  rf: Any => Unit,
                  data: AnyRef,
                  bound: QueuedLogic,
                  src: ExchangeMessengerSource)
  extends ExchangeRequest(src, rf) {

  /**
   * Set to false when a response is returned or an exception is raised,
   * active is used to ensure that there is only one response or exception
   * for each request.
   */
  var active = true

  /**
   * Default logic when no other exception handler is used.
   * (Each application request class can have its own logic for
   * handling exceptions.)
   */
  var exceptionFunction: (Exception, ExchangeMessenger) => Unit = {
    (ex, exchange) => reply(exchange, ex)
  }

  /**
   * The actor which is to process the request.
   */
  def target = dst

  /**
   * The application-specific request.
   */
  def req = data

  /**
   * The message logic object used to process the request.
   */
  def binding = bound

  /**
   * If the request is still active, mark the request as inactive and send
   * the response.
   */
  override def reply(exchangeMessenger: ExchangeMessenger, content: Any) {
    if (!active) {
      return
    }
    active = false
    super.reply(exchangeMessenger, content)
  }
}

BindRequest

##ActiveActor

/**
 * The ActiveActor class is used in implicits to
 * designate "this" actor.
 */
case class ActiveActor(bindActor: BindActor)

ActiveActor

##MessageLogic

/**
 * Actors bind application request classes to MessageLogic objects.
 * When a request is sent to an actor, it uses the MessageLogic object
 * to determine how the request is to be processed.
 */
abstract class MessageLogic {
  /**
   * The func method is used to determine how an application request
   * is to be processed.
   */
  def func(target: BindActor, msg: AnyRef, rf: Any => Unit)
          (implicit srcActor: ActiveActor)
}

MessageLogic

##ConcurrentData

/**
 * ConcurrentData is a subclass of MessageLogic for handling immutable or cuncurrent,
 * e.g. thread safe, data.
 */
class ConcurrentData(any: Any)
  extends MessageLogic {

  /**
   * The data is immediately returned as a response.
   */
  override def func(target: BindActor, msg: AnyRef, rf: Any => Unit)
                   (implicit sender: ActiveActor) {
    rf(any)
  }
}

ConcurrentData

##Forward

/**
 * Forward is a subclass of MessageLogic which handles request forwarding.
 */
class Forward(actor: BindActor)
  extends MessageLogic {

  /**
   * The request is immediately forwarded to another actor.
   */
  override def func(target: BindActor, msg: AnyRef, rf: Any => Unit)
                   (implicit sender: ActiveActor) {
    actor(msg)(rf)
  }
}

Forward

##QueuedLogic

/**
 * The QueuedLogic class supports requests that are to be added to an actor's
 * incoming message queue.
 */
abstract class QueuedLogic(messageFunction: (AnyRef, Any => Unit) => Unit)
  extends MessageLogic {

  /**
   * Returns the function which will eventually be used to process the request.
   */
  def reqFunction = messageFunction

  /**
   * Process the request. Any exceptions raised durring request processing are
   * returned as a response.
   */
  def process(exchange: Exchange, bindRequest: BindRequest) {
    try {
      messageFunction(bindRequest.req, exchange.reply)
    } catch {
      case ex: Exception => {
        exchange.reply(ex)
      }
    }
  }

  /**
   * Create a BindRequest wrapping the application request and
   * add it to the actor's incoming message queue.
   */
  def enqueueRequest(srcExchange: Exchange,
                   targetActor: BindActor,
                   content: AnyRef,
                   responseFunction: Any => Unit) {
    val oldReq = srcExchange.curReq.asInstanceOf[BindRequest]
    val sender = oldReq.target
    val req = targetActor.newRequest(
      responseFunction,
      content,
      this,
      sender)
    req.setOldRequest(oldReq)
    targetActor.exchangeMessenger.sendReq(targetActor, req, srcExchange)
  }
}

QueuedLogic

##BoundFunction

/**
 * The message logic object which wraps a function for processing
 * an application request.
 */
class BoundFunction(messageFunction: (AnyRef, Any => Unit) => Unit)
  extends QueuedLogic(messageFunction) {

  /**
   * The func method determines how an application request is to be processed.
   * In the case of a BoundFunction, the request is processed immediately if
   * the same mailbox is used by both the target and the source actors or if the
   * target actor has no mailbox. Otherwise a BindRequest object is created which
   * is then sent to Exchange.sendReq.
   * (A request is invalid if the target actor has a mailbox but the source actor
   * does not.)
   */
  override def func(target: BindActor, msg: AnyRef, rf: Any => Unit)
                   (implicit srcActor: ActiveActor) {
    val srcExchangeMessenger = {
      if (srcActor == null) null
      else {
        srcActor.bindActor.exchangeMessenger
      }
    }
    if (srcExchangeMessenger == null) {
      if (target.exchangeMessenger != null) {
        println("srcActor = " + srcActor)
        println("srcMailbox = " + srcExchangeMessenger)
        println("target = " + target)
        println("targetMailbox = " + target.exchangeMessenger)
        throw new UnsupportedOperationException(
          "An immutable bindActor can only send to another immutable bindActor."
        )
      }
    }
    val responseFunction: Any => Unit = {
      rsp => {
        rsp match {
          case rsp: Exception => {
            srcExchangeMessenger.curReq.asInstanceOf[BindRequest].
              exceptionFunction(rsp, srcExchangeMessenger)
          }
          case rsp => try {
            rf(rsp)
          } catch {
            case ex: Exception => srcExchangeMessenger.curReq.asInstanceOf[BindRequest].
              exceptionFunction(ex, srcExchangeMessenger)
          }
        }
      }
    }
    val targetExchangeMessenger = target.exchangeMessenger
    if (targetExchangeMessenger == null || targetExchangeMessenger == srcExchangeMessenger) {
      if (responseFunction == null) messageFunction(msg, AnyRef => {})
      else messageFunction(msg, responseFunction)
    } else enqueueRequest(srcExchangeMessenger.asInstanceOf[Exchange],
      target,
      msg,
      responseFunction)
  }
    }

BoundFunction

##MailboxFactory

/**
 * MailboxFactory is used to create newAsyncMailbox and newSyncMailbox objects.
 * It also provides the ThreadManager used by these objects.
 */
class MailboxFactory(_threadManager: ThreadManager = new MessengerThreadManager) {

  /**
   * Returns the ThreadManager used by the async and sync mailbox objects.
   */
  def threadManager = _threadManager

  /**
   * Stops all the threads of the ThreadManager as they become idle.
   */
  def close {
    threadManager.close
  }

  /**
   * Creates an asynchronous mailbox.
   */
  def newAsyncMailbox = {
    new Mailbox(this, true)
  }

  /**
   * Creates a synchronous mailbox.
   */
  def newSyncMailbox = {
    new Mailbox(this, false)
  }
}

MailboxFactory

##Mailbox

/**
 * A subclass of Exchange that supports BindActor.
 * Mailbox objects are created by the MailboxFactory.
 */
class Mailbox(_mailboxFactory: MailboxFactory,
              async: Boolean = false,
              _bufferedMessenger: BufferedMessenger[ExchangeMessengerMessage] = null)
  extends Exchange(_mailboxFactory.threadManager, async, _bufferedMessenger) {

  /**
   * Returns the MailboxFactory which created this Mailbox.
   */
  def mailboxFactory: MailboxFactory = _mailboxFactory

  /**
   * Returns the current request as an instance of BindRequest.
   */
  override def curReq = super.curReq.asInstanceOf[BindRequest]

  /**
   * Process the curReq message.
   */
  override protected def processRequest {
    curReq.binding.process(this, curReq)
  }
}

Mailbox

##TransparentException

/**
 * A TransparentException wraps another exception.
 * Transparent exceptions are created when an exception occurs while
 * processing a synchronous response, as a means of ensuring that an
 * exception handler of the actor which sourced the request is invoked
 * instead of an exception handler of the target actor.
 */
class TransparentException(ex: Exception) extends Exception(ex) {

  /**
   * Disable the stack trace for faster execution.
   */
  override def fillInStackTrace(): Throwable =
    if (sys.SystemProperties.noTraceSupression) super.fillInStackTrace()
    else this
}

TransparentException

##Bindings

/**
 * Bindings is a trait common to actors and components of actors.
 */
trait Bindings {

  /**
   * The binding of an application request class to a MessageLogic
   * object is stored in messageLogics.
   */
  val messageLogics =
    new java.util.HashMap[Class[_ <: AnyRef], MessageLogic]

  /**
   * "This" actor is implicit.
   */
  implicit def activeActor: ActiveActor

  /**
   * Returns the exchange messenger object used by the actor.
   */
  def exchangeMessenger: Mailbox

  /**
   * Wraps a message processing function with an exception handling function.
   */
  def exceptionHandler(msg: AnyRef,
                       responseFunction: Any => Unit,
                       messageFunction: (AnyRef, Any => Unit) => Unit)
                      (exceptionFunction: (Exception, ExchangeMessenger) => Unit) {
    if (exchangeMessenger == null) throw
      new UnsupportedOperationException("Actors without mailboxes can not use excepton handlers")
    val oldExceptionFunction = exchangeMessenger.curReq.asInstanceOf[BindRequest].
      exceptionFunction
    exchangeMessenger.curReq.asInstanceOf[BindRequest].exceptionFunction = exceptionFunction
    try {
      messageFunction(msg, rsp => {
        exchangeMessenger.curReq.asInstanceOf[BindRequest].exceptionFunction =
          oldExceptionFunction
        try {
          responseFunction(rsp)
        } catch {
          case ex: Exception => throw new TransparentException(ex)
        }
      })
    } catch {
      case ex: TransparentException => {
        exceptionFunction(ex.getCause.asInstanceOf[Exception], exchangeMessenger)
      }
      case ex: Exception => {
        exchangeMessenger.curReq.asInstanceOf[BindRequest].exceptionFunction =
          oldExceptionFunction
        exceptionFunction(ex, exchangeMessenger)
      }
    }
  }

  /**
   * Bind a class of application request to a message processing function.
   */
  protected def bind(reqClass: Class[_ <: AnyRef],
                     messageFunction: (AnyRef, Any => Unit) => Unit) {
    if (activeActor.bindActor.isOpen) throw new IllegalStateException
    messageLogics.put(reqClass, new BoundFunction(messageFunction))
  }

  /**
   * Bind a class of application request to a MessageLogic object.
   */
  protected def bindMessageLogic(reqClass: Class[_ <: AnyRef],
                         safe: MessageLogic) {
    if (activeActor.bindActor.isOpen) throw new IllegalStateException
    messageLogics.put(reqClass, safe)
  }
}

Bindings

##BindActor

/**
 * Objects which implement BindActor support message binding,
 * default message processing via a hierarchy of actors and
 * can share a common exchange messenger.
 */
trait BindActor
  extends ExchangeMessengerActor
  with Bindings {

  /**
   * The exchange messenger for this actor.
   */
  private var _exchangeMessenger: Mailbox = null

  /**
   * The actor which processes any messages not bound to this actor.
   */
  private var _superior: BindActor = null

  /**
   * "This" actor.
   */
  private val _activeActor = ActiveActor(this)

  /**
   * Set to true when initialization is complete.
   */
  private var _opened = false

  /**
   * "This" actor is implicit.
   */
  implicit def activeActor: ActiveActor = _activeActor

  /**
   * Returns the exchange messenger object used by the actor.
   */
  override def exchangeMessenger = _exchangeMessenger

  /**
   * Set the exchange messenger for this actor.
   * (Valid only during actor initialization.)
   */
  def setExchangeMessenger(exchangeMessenger: Mailbox) {
    if (isOpen) throw new IllegalStateException
    _exchangeMessenger = exchangeMessenger
  }

  /**
   * Returns the mailboxFactory/threadManager.
   */
  def mailboxFactory = exchangeMessenger.mailboxFactory

  /**
   * Returns an asynchronous exchange.
   */
  def newAsyncMailbox = mailboxFactory.newAsyncMailbox

  /**
   * Returns a synchronous exchange.
   */
  def newSyncMailbox = mailboxFactory.newSyncMailbox

  /**
   * Returns true when initialization is complete.
   */
  def isOpen = _opened

  /**
   * Complete actor initialization if it is not already complete.
   */
  def _open {
    if (!isOpen) opener
  }

  /**
   * Complete actor initialization.
   */
  protected def opener {
    open
    _opened = true
  }

  /**
   * Perform application-specific actor initialization.
   */
  protected def open {}

  /**
   * Specify the actor to process messages not bound by this actor.
   * (Not valid once initialization is complete.)
   */
  def setSuperior(superior: BindActor) {
    if (isOpen) throw new IllegalStateException
    _superior = superior
  }

  /**
   * Returns the actor which processes messages not bound by this actor.
   */
  def superior = _superior

  /**
   * Create a new BindRequest.
   */
  def newRequest(rf: Any => Unit,
                 data: AnyRef,
                 bound: QueuedLogic,
                 src: ExchangeMessengerSource) =
    new BindRequest(this, rf, data, bound, src)

  /**
   * If initialization is not complete, then complete it.
   * Once complete, process the application request.
   */
  def apply(msg: AnyRef)
           (responseFunction: Any => Unit)
           (implicit srcActor: ActiveActor) {
    _open
    val messageLogic = messageLogics.get(msg.getClass)
    if (messageLogic != null) messageLogic.func(this, msg, responseFunction)(srcActor)
    else if (superior != null) superior(msg)(responseFunction)(srcActor)
    else {
      System.err.println("bindActor = " + this.getClass.getName)
      throw new IllegalArgumentException("Unknown type of message: " + msg.getClass.getName)
    }
  }

  /**
   * Check that this actor, or one of its superiors, can process a given
   * class of application request.
   * (To be called from within an application-specific open method.)
   */
  def requiredService(reqClass: Class[_ <: AnyRef]) {
    if (isOpen) throw new IllegalStateException
    var actor: BindActor = this
    while (!actor.messageLogics.containsKey(reqClass)) {
      if (superior == null)
        throw new UnsupportedOperationException("service missing for " + reqClass.getName)
      actor = superior
    }
  }
}

BindActor

##Future

/**
 * The Future class for sending a request and waiting for a response.
 */
class Future
  extends ExchangeMessengerSource
  with MessageListDestination[ExchangeMessengerMessage] {
  @volatile private[this] var rsp: Any = _
  @volatile private[this] var satisfied = false

  /**
   * Sends an application request to a given actor.
   */
  def send(dst: BindActor, msg: AnyRef) {
    val safe = dst.messageLogics.get(msg.getClass)
    if (!safe.isInstanceOf[QueuedLogic]) throw
      new IllegalArgumentException(msg.getClass.getName + "can not be sent asynchronously to " + dst)
    dst._open
    val mailbox = dst.exchangeMessenger
    if (mailbox == null) {
      val boundFunction = safe.asInstanceOf[BoundFunction]
      boundFunction.reqFunction(msg, synchronousResponse)
    } else {
      val bound = safe.asInstanceOf[QueuedLogic]
      val req = dst.newRequest(Unit => {}, msg, bound, this)
      val blkmsg = new ArrayList[ExchangeMessengerMessage]
      blkmsg.add(req)
      dst.messageListDestination.incomingMessageList(blkmsg)
    }
  }

  /**
   * The logic for processing a synchronous response.
   */
  private def synchronousResponse(_rsp: Any) {
    rsp = _rsp
    satisfied = true
  }

  /**
   * Returns the MessageListDestination, this.
   */
  override def messageListDestination = this

  /**
   * The logic for processing an asynchronous response.
   */
  override def incomingMessageList(blkmsg: ArrayList[ExchangeMessengerMessage]) {
    synchronized {
      if (!satisfied) {
        rsp = blkmsg.get(0).asInstanceOf[ExchangeMessengerResponse].rsp
        satisfied = true
      }
      notify()
    }
  }

  /**
   * The get method waits for a response.
   */
  @tailrec final def get: Any = {
    synchronized {
      if (satisfied) return rsp
      this.wait()
      if (satisfied) return rsp
    }
    get
  }
}

/**
 * Sends a response and waits for a request.
 * This companion object is used mostly by test code, as it blocks the current thread
 * until a response is received.
 * Use of this companion object is further constrained, as it does not support application request
 * classes bound to a ConcurrentData objects nor application request classes bound to
 * a Forward object.
 */
object Future {
  def apply(actor: BindActor, msg: AnyRef) = {
    val future = new Future
    future.send(actor, msg)
    val rv = future.get
    rv match {
      case ex: Exception => throw ex
      case _ =>
    }
    rv
  }
}

Future

##Interop

/**
 * A helper class for Scala Reactors which send requests to a BindActor and receive the response.
 */
class Interop[T >: AnyRef](reactor: Reactor[T])
  extends ExchangeMessengerSource
  with MessageListDestination[ExchangeMessengerMessage] {

  /**
   * Returns the MessageListDestination, this.
   */
  override def messageListDestination = this

  /**
   * The afpSend method sends a message to a BindActor and provides a response function
   * for processing the response.
   */
  def afpSend(dst: Actor, msg: AnyRef)(rf: Any => Unit) {
    val safe = dst.messageLogics.get(msg.getClass)
    if (!safe.isInstanceOf[QueuedLogic]) throw
      new IllegalArgumentException(msg.getClass.getName + "can not be sent asynchronously to " + dst)
    dst._open
    val mailbox = dst.exchangeMessenger
    if (mailbox == null) {
      val boundFunction = safe.asInstanceOf[BoundFunction]
      boundFunction.reqFunction(msg, rf)
    } else {
      val bound = safe.asInstanceOf[QueuedLogic]
      val req = dst.newRequest(rf, msg, bound, this)
      val blkmsg = new java.util.ArrayList[ExchangeMessengerMessage]
      blkmsg.add(req)
      dst.messageListDestination.incomingMessageList(blkmsg)
    }
  }

  /**
   * The logic for processing an asynchronous response.
   */
  override def incomingMessageList(blkmsg: java.util.ArrayList[ExchangeMessengerMessage]) {
    var i = 0
    while (i < blkmsg.size) {
      val mailboxRsp = blkmsg.get(i).asInstanceOf[ExchangeMessengerResponse]
      i += 1
      reactor ! mailboxRsp
    }
  }

  /**
   * The afpResponse method should be called when the Reactor receives
   * an ExchangeMessengerResponse object.
   */
  def afpResponse(mailboxRsp: ExchangeMessengerResponse) {
    mailboxRsp.responseFunction(mailboxRsp.rsp)
  }
}

Interop

Clone this wiki locally