diff --git a/core/src/main/scala/ox/channels/Cell.scala b/core/src/main/scala/ox/channels/Cell.scala index be53e89e..f1a09215 100644 --- a/core/src/main/scala/ox/channels/Cell.scala +++ b/core/src/main/scala/ox/channels/Cell.scala @@ -2,11 +2,19 @@ package ox.channels import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.atomic.AtomicBoolean +import scala.util.control.NonFatal + +// a lazily-created, optional result - exceptions might be throw when the function is called, hence it should be called +// only on the thread where the value should be received +private[ox] type MaybeCreateResult[T] = () => Option[SelectResult[T]] private[ox] trait CellCompleter[-T]: - /** Complete the cell with a value. Should only be called if this cell is owned by the calling thread. */ + /** Complete the cell with a result. Should only be called if this cell is owned by the calling thread. */ def complete(t: SelectResult[T]): Unit + /** Complete the cell with a lazily-created, optional result. Should only be called if this cell is owned by the calling thread. */ + def complete(t: MaybeCreateResult[T]): Unit + /** Complete the cell with a new completer. Should only be called if this cell is owned by the calling thread. */ def completeWithNewCell(): Unit @@ -20,16 +28,17 @@ private[ox] trait CellCompleter[-T]: private[ox] class Cell[T] extends CellCompleter[T]: private val isOwned = new AtomicBoolean(false) - private val cell = new ArrayBlockingQueue[SelectResult[T] | Cell[T] | ChannelState.Closed](1) + private val cell = new ArrayBlockingQueue[SelectResult[T] | MaybeCreateResult[T] | Cell[T] | ChannelState.Closed](1) // each cell should be completed exactly once, so we are not using the blocking capabilities of `cell`; // using `cell.put` might throw an interrupted exception, which might cause a deadlock (as there's a thread awaiting a // cell's completion on its own interrupt - see cellTakeInterrupted); hence, using `.add`. override def complete(t: SelectResult[T]): Unit = cell.add(t) + override def complete(t: MaybeCreateResult[T]): Unit = cell.add(t) override def completeWithNewCell(): Unit = cell.add(Cell[T]) override def completeWithClosed(s: ChannelState.Closed): Unit = cell.add(s) override def tryOwn(): Boolean = isOwned.compareAndSet(false, true) - def take(): SelectResult[T] | Cell[T] | ChannelState.Closed = cell.take() + def take(): SelectResult[T] | MaybeCreateResult[T] | Cell[T] | ChannelState.Closed = cell.take() def isAlreadyOwned: Boolean = isOwned.get() /** Linked cells are created when creating CollectSources. */ @@ -37,16 +46,20 @@ private[ox] class LinkedCell[T, U](linkedTo: CellCompleter[U], f: T => Option[U] extends CellCompleter[T] { override def complete(t: SelectResult[T]): Unit = t match - case r: Source[T]#Received => - f(r.value) match // TODO exceptions - case Some(u) => linkedTo.complete(createReceived(u)) - case None => linkedTo.completeWithNewCell() // ignoring the received value - case _ => throw new IllegalStateException() // linked cells can only be created from sources + case r: Source[T]#Received => linkedTo.complete(() => f(r.value).map(createReceived)) // f might throw exceptions, making lazy + case _ => throw new IllegalStateException() // linked cells can only be created from sources + override def complete(t: MaybeCreateResult[T]): Unit = + linkedTo.complete { () => + t() match + case Some(r: Source[T]#Received) => f(r.value).map(createReceived) + case Some(_) => throw new IllegalStateException() // linked cells can only be created from sources + case _ => None + } override def completeWithNewCell(): Unit = linkedTo.completeWithNewCell() override def completeWithClosed(s: ChannelState.Closed): Unit = linkedTo.completeWithClosed(s) override def tryOwn(): Boolean = linkedTo.tryOwn() - // for cleanup + // for Source/Sink cell cleanup override def equals(obj: Any): Boolean = linkedTo.equals(obj) override def hashCode(): Int = linkedTo.hashCode() } diff --git a/core/src/main/scala/ox/channels/select.scala b/core/src/main/scala/ox/channels/select.scala index bd91455b..eade3dae 100644 --- a/core/src/main/scala/ox/channels/select.scala +++ b/core/src/main/scala/ox/channels/select.scala @@ -59,6 +59,12 @@ private def doSelect[T](clauses: List[SelectClause[T]]): SelectResult[T] | Chann // completed with a value; interrupting self and returning it try t finally Thread.currentThread().interrupt() + case t: MaybeCreateResult[T] @unchecked => + try + t() match + case Some(r) => r + case None => throw e + finally Thread.currentThread().interrupt() def takeFromCellInterruptSafe(c: Cell[T]): SelectResult[T] | ChannelClosed = try @@ -66,7 +72,12 @@ private def doSelect[T](clauses: List[SelectClause[T]]): SelectResult[T] | Chann case c2: Cell[T] @unchecked => offerCellAndTake(c2) // we got a new cell on which we should be waiting, add it to the channels case s: ChannelState.Error => ChannelClosed.Error(s.reason) case ChannelState.Done => doSelect(clauses) - case t: SelectResult[T] @unchecked => t + case t: SelectResult[T] @unchecked => t + case t: MaybeCreateResult[T] @unchecked => + // this might throw exceptions, but this is fine - we're on the thread that called select + t() match + case Some(r) => r + case None => doSelect(clauses) catch case e: InterruptedException => cellTakeInterrupted(c, e) // now that the cell has been filled, it is owned, and should be removed from the waiting lists of the other channels finally cleanupCell(c, alsoWhenSingleClause = false)