Skip to content

Commit

Permalink
Support exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Jul 22, 2023
1 parent 25533a2 commit 1821dba
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 10 deletions.
31 changes: 22 additions & 9 deletions core/src/main/scala/ox/channels/Cell.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,19 @@ package ox.channels

import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.atomic.AtomicBoolean
import scala.util.control.NonFatal

// a lazily-created, optional result - exceptions might be throw when the function is called, hence it should be called
// only on the thread where the value should be received
private[ox] type MaybeCreateResult[T] = () => Option[SelectResult[T]]

private[ox] trait CellCompleter[-T]:
/** Complete the cell with a value. Should only be called if this cell is owned by the calling thread. */
/** Complete the cell with a result. Should only be called if this cell is owned by the calling thread. */
def complete(t: SelectResult[T]): Unit

/** Complete the cell with a lazily-created, optional result. Should only be called if this cell is owned by the calling thread. */
def complete(t: MaybeCreateResult[T]): Unit

/** Complete the cell with a new completer. Should only be called if this cell is owned by the calling thread. */
def completeWithNewCell(): Unit

Expand All @@ -20,33 +28,38 @@ private[ox] trait CellCompleter[-T]:

private[ox] class Cell[T] extends CellCompleter[T]:
private val isOwned = new AtomicBoolean(false)
private val cell = new ArrayBlockingQueue[SelectResult[T] | Cell[T] | ChannelState.Closed](1)
private val cell = new ArrayBlockingQueue[SelectResult[T] | MaybeCreateResult[T] | Cell[T] | ChannelState.Closed](1)

// each cell should be completed exactly once, so we are not using the blocking capabilities of `cell`;
// using `cell.put` might throw an interrupted exception, which might cause a deadlock (as there's a thread awaiting a
// cell's completion on its own interrupt - see cellTakeInterrupted); hence, using `.add`.
override def complete(t: SelectResult[T]): Unit = cell.add(t)
override def complete(t: MaybeCreateResult[T]): Unit = cell.add(t)
override def completeWithNewCell(): Unit = cell.add(Cell[T])
override def completeWithClosed(s: ChannelState.Closed): Unit = cell.add(s)
override def tryOwn(): Boolean = isOwned.compareAndSet(false, true)
def take(): SelectResult[T] | Cell[T] | ChannelState.Closed = cell.take()
def take(): SelectResult[T] | MaybeCreateResult[T] | Cell[T] | ChannelState.Closed = cell.take()
def isAlreadyOwned: Boolean = isOwned.get()

/** Linked cells are created when creating CollectSources. */
private[ox] class LinkedCell[T, U](linkedTo: CellCompleter[U], f: T => Option[U], createReceived: U => Source[U]#Received)
extends CellCompleter[T] {
override def complete(t: SelectResult[T]): Unit =
t match
case r: Source[T]#Received =>
f(r.value) match // TODO exceptions
case Some(u) => linkedTo.complete(createReceived(u))
case None => linkedTo.completeWithNewCell() // ignoring the received value
case _ => throw new IllegalStateException() // linked cells can only be created from sources
case r: Source[T]#Received => linkedTo.complete(() => f(r.value).map(createReceived)) // f might throw exceptions, making lazy
case _ => throw new IllegalStateException() // linked cells can only be created from sources
override def complete(t: MaybeCreateResult[T]): Unit =
linkedTo.complete { () =>
t() match
case Some(r: Source[T]#Received) => f(r.value).map(createReceived)
case Some(_) => throw new IllegalStateException() // linked cells can only be created from sources
case _ => None
}
override def completeWithNewCell(): Unit = linkedTo.completeWithNewCell()
override def completeWithClosed(s: ChannelState.Closed): Unit = linkedTo.completeWithClosed(s)
override def tryOwn(): Boolean = linkedTo.tryOwn()

// for cleanup
// for Source/Sink cell cleanup
override def equals(obj: Any): Boolean = linkedTo.equals(obj)
override def hashCode(): Int = linkedTo.hashCode()
}
13 changes: 12 additions & 1 deletion core/src/main/scala/ox/channels/select.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,25 @@ private def doSelect[T](clauses: List[SelectClause[T]]): SelectResult[T] | Chann
// completed with a value; interrupting self and returning it
try t
finally Thread.currentThread().interrupt()
case t: MaybeCreateResult[T] @unchecked =>
try
t() match
case Some(r) => r
case None => throw e
finally Thread.currentThread().interrupt()

def takeFromCellInterruptSafe(c: Cell[T]): SelectResult[T] | ChannelClosed =
try
c.take() match
case c2: Cell[T] @unchecked => offerCellAndTake(c2) // we got a new cell on which we should be waiting, add it to the channels
case s: ChannelState.Error => ChannelClosed.Error(s.reason)
case ChannelState.Done => doSelect(clauses)
case t: SelectResult[T] @unchecked => t
case t: SelectResult[T] @unchecked => t
case t: MaybeCreateResult[T] @unchecked =>
// this might throw exceptions, but this is fine - we're on the thread that called select
t() match
case Some(r) => r
case None => doSelect(clauses)
catch case e: InterruptedException => cellTakeInterrupted(c, e)
// now that the cell has been filled, it is owned, and should be removed from the waiting lists of the other channels
finally cleanupCell(c, alsoWhenSingleClause = false)
Expand Down

0 comments on commit 1821dba

Please sign in to comment.