Skip to content

Commit

Permalink
Have explicit boundary/suspend capture sets
Browse files Browse the repository at this point in the history
  • Loading branch information
natsukagami committed Aug 26, 2024
1 parent 80d09d3 commit e2b91bf
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 18 deletions.
21 changes: 12 additions & 9 deletions jvm/src/main/scala/async/VThreadSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object VThreadScheduler extends Scheduler:
object VThreadSupport extends AsyncSupport:
type Scheduler = VThreadScheduler.type

private final class VThreadLabel[R]():
private final class VThreadLabel[R]() extends caps.Capability:
private var result: Option[R] = None
private val lock = ReentrantLock()
private val cond = lock.newCondition()
Expand All @@ -74,11 +74,11 @@ object VThreadSupport extends AsyncSupport:
result.get
finally lock.unlock()

override opaque type Label[R] = VThreadLabel[R]
override opaque type Label[R, Cap^] <: caps.Capability = VThreadLabel[R]

// outside boundary: waiting on label
// inside boundary: waiting on suspension
private final class VThreadSuspension[-T, +R](using private[VThreadSupport] val l: Label[R] @uncheckedVariance)
private final class VThreadSuspension[-T, +R](using private[VThreadSupport] val l: VThreadLabel[R] @uncheckedVariance)
extends gears.async.Suspension[T, R]:
private var nextInput: Option[T] = None
private val lock = ReentrantLock()
Expand Down Expand Up @@ -107,25 +107,28 @@ object VThreadSupport extends AsyncSupport:

override opaque type Suspension[-T, +R] <: gears.async.Suspension[T, R] = VThreadSuspension[T, R]

override def boundary[R](body: (Label[R]) ?=> R): R =
override def boundary[R, Cap^](body: Label[R, Cap]^ ?->{Cap^} R): R =
val label = VThreadLabel[R]()
VThreadScheduler.execute: () =>
val result = body(using label)
label.setResult(result)

label.waitResult()

override private[async] def resumeAsync[T, R](suspension: Suspension[T, R])(arg: T)(using Scheduler): Unit =
override private[async] def resumeAsync[T, R](suspension: Suspension[T, R]^)(arg: T)(using Scheduler): Unit =
suspension.l.clearResult()
suspension.setInput(arg)

override def scheduleBoundary(body: (Label[Unit]) ?=> Unit)(using Scheduler): Unit =
override def scheduleBoundary[Cap^](body: Label[Unit, Cap] ?->{Cap^} Unit)(using Scheduler): Unit =
VThreadScheduler.execute: () =>
val label = VThreadLabel[Unit]()
body(using label)

override def suspend[T, R](body: Suspension[T, R] => R)(using l: Label[R]): T =
val sus = new VThreadSuspension[T, R]()
override def suspend[T, R, Cap^](body: Suspension[T, R]^{Cap^} => R^{Cap^})(using l: Label[R, Cap]^): T =
val sus = new VThreadSuspension[T, R](using caps.unsafe.unsafeAssumePure(l))
val res = body(sus)
l.setResult(res)
l.setResult(
// SAFETY: will only be stored and returned by the Suspension resumption mechanism
caps.unsafe.unsafeAssumePure(res)
)
sus.waitInput()
10 changes: 5 additions & 5 deletions shared/src/main/scala/async/AsyncSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,27 @@ trait Suspension[-T, +R]:
/** Support for suspension capabilities through a delimited continuation interface. */
trait SuspendSupport:
/** A marker for the "limit" of "delimited continuation". */
type Label[R]
type Label[R, Cap^] <: caps.Capability

/** The provided suspension type. */
type Suspension[-T, +R] <: gears.async.Suspension[T, R]

/** Set the suspension marker as the body's caller, and execute `body`. */
def boundary[R](body: Label[R] ?=> R): R
def boundary[R, Cap^](body: Label[R, Cap] ?->{Cap^} R): R^{Cap^}

/** Should return immediately if resume is called from within body */
def suspend[T, R](body: Suspension[T, R] => R)(using Label[R]): T
def suspend[T, R, Cap^](body: Suspension[T, R]^{Cap^} => R^{Cap^})(using Label[R, Cap]): T

/** Extends [[SuspendSupport]] with "asynchronous" boundary/resume functions, in the presence of a [[Scheduler]] */
trait AsyncSupport extends SuspendSupport:
type Scheduler <: gears.async.Scheduler

/** Resume a [[Suspension]] at some point in the future, scheduled by the scheduler. */
private[async] def resumeAsync[T, R](suspension: Suspension[T, R])(arg: T)(using s: Scheduler): Unit =
private[async] def resumeAsync[T, R](suspension: Suspension[T, R]^)(arg: T)(using s: Scheduler): Unit =
s.execute(() => suspension.resume(arg))

/** Schedule a computation with the suspension boundary already created. */
private[async] def scheduleBoundary(body: Label[Unit] ?=> Unit)(using s: Scheduler): Unit =
private[async] def scheduleBoundary[Cap^](body: Label[Unit, Cap] ?->{Cap^} Unit)(using s: Scheduler): Unit =
s.execute(() => boundary(body))

/** A scheduler implementation, with the ability to execute a computation immediately or after a delay. */
Expand Down
8 changes: 4 additions & 4 deletions shared/src/main/scala/async/futures.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ object Future:
private def checkCancellation(): Unit =
if cancelRequest.get() then throw new CancellationException()

private class FutureAsync(val group: CompletionGroup)(using label: acSupport.Label[Unit])
private class FutureAsync[Cap^](val group: CompletionGroup)(using label: acSupport.Label[Unit, Cap])
extends Async(using acSupport, acScheduler):
/** Await a source first by polling it, and, if that fails, by suspending in a onComplete call.
*/
override def await[U](src: Async.Source[U]^): U =
class CancelSuspension extends Cancellable:
var suspension: acSupport.Suspension[Try[U], Unit] = uninitialized
var listener: Listener[U]^{this} = uninitialized
var suspension: acSupport.Suspension[Try[U], Unit]^{Cap^} = uninitialized
var listener: Listener[U]^{this, Cap^} = uninitialized
var completed = false

def complete() = synchronized:
Expand All @@ -150,7 +150,7 @@ object Future:
.poll()
.getOrElse:
val cancellable = CancelSuspension()
val res = acSupport.suspend[Try[U], Unit](k =>
val res = acSupport.suspend[Try[U], Unit, Cap](k =>
val listener = Listener.acceptingListener[U]: (x, _) =>
val completedBefore = cancellable.complete()
if !completedBefore then acSupport.resumeAsync(k)(Success(x))
Expand Down

0 comments on commit e2b91bf

Please sign in to comment.