Skip to content

Commit

Permalink
Merge pull request #23 from natsukagami/explicit-channels-prelim
Browse files Browse the repository at this point in the history
Channels implementation with new listeners
  • Loading branch information
natsukagami authored Dec 10, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents 1137ee0 + b96497d commit 86159db
Showing 9 changed files with 675 additions and 375 deletions.
78 changes: 69 additions & 9 deletions shared/src/main/scala/async/Async.scala
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong
import gears.async.Listener.{withLock, ListenerLockWrapper}
import gears.async.Listener.NumberedLock
import scala.util.boundary

/** A context that allows to suspend waiting for asynchronous data sources
*/
@@ -112,6 +113,9 @@ object Async:
poll(Listener.acceptingListener { (x, _) => resultOpt = Some(x) })
resultOpt

/** Utility method for direct waiting with `Async`. */
def await(using Async) = Async.await(this)

end Source

/** An original source has a standard definition of `onComplete` in terms of `poll` and `addListener`. Implementations
@@ -127,6 +131,31 @@ object Async:

end OriginalSource

object Source:
/** Create a [[Source]] containing the given values, resolved once for each. */
def values[T](values: T*) =
import scala.collection.JavaConverters._
val q = java.util.concurrent.ConcurrentLinkedQueue[T]()
q.addAll(values.asJavaCollection)
new Source[T]:
override def poll(k: Listener[T]): Boolean =
if q.isEmpty() then false
else
k.lockCompletely(this) match
case Listener.Gone => true
case Listener.Locked =>
val item = q.poll()
if item == null then
k.releaseLock(Listener.Locked)
false
else
k.complete(item, this)
true

override def onComplete(k: Listener[T]): Unit = poll(k)
override def dropListener(k: Listener[T]): Unit = ()
end values

extension [T](src: Source[T])
/** Pass on data transformed by `f` */
def map[U](f: T => U) =
@@ -145,25 +174,35 @@ object Async:
def dropListener(k: Listener[U]): Unit =
src.dropListener(transform(k))

def race[T](sources: Source[T]*): Source[T] = raceImpl[T, T]((v, _) => v)(sources*)
def raceWithOrigin[T](sources: Source[T]*): Source[(T, Source[T])] =
raceImpl[(T, Source[T]), T]((v, src) => (v, src))(sources*)

/** Pass first result from any of `sources` to the continuation */
def race[T](sources: Source[T]*): Source[T] =
private def raceImpl[T, U](map: (U, Source[U]) => T)(sources: Source[U]*): Source[T] =
new Source[T] { selfSrc =>
def poll(k: Listener[T]): Boolean =
val it = sources.iterator
var found = false

while it.hasNext && !found do found = it.next.poll(k)
val listener = new Listener.ForwardingListener[U](this, k):
val lock = withLock(k) { inner => new ListenerLockWrapper(inner, selfSrc) }
def complete(data: U, source: Async.Source[U]) =
k.complete(map(data, source), selfSrc)
end listener

while it.hasNext && !found do found = it.next.poll(listener)
found

def onComplete(k: Listener[T]): Unit =
val listener = new Listener.ForwardingListener[T](this, k)
val listener = new Listener.ForwardingListener[U](this, k)
with NumberedLock
with Listener.ListenerLock
with Listener.PartialLock { self =>
val lock = self

var found = false
inline def heldLock = if k.lock == null then Listener.Locked else this
def heldLock = if k.lock == null then Listener.Locked else this

/* == PartialLock implementation == */
// Note that this is bogus if k.lock is null, but we'll never use it if it is.
@@ -190,26 +229,47 @@ object Async:
self.releaseLock()
if until == heldLock then null else k.lock

def complete(item: T, src: Async.Source[T]) =
def complete(item: U, src: Async.Source[U]) =
found = true
self.releaseLock()
sources.foreach(s => if s != src then s.dropListener(self))
k.complete(item, selfSrc)
k.complete(map(item, src), selfSrc)
} // end listener

sources.foreach(_.onComplete(listener))

def dropListener(k: Listener[T]): Unit =
val listener = Listener.ForwardingListener.empty(this, k)
val listener = Listener.ForwardingListener.empty[U](this, k)
sources.foreach(_.dropListener(listener))

}
end race
end raceImpl

/** Cases for handling async sources in a [[select]]. [[SelectCase]] can be constructed by extension methods `handle`
* of [[Source]].
*/
opaque type SelectCase[T] = (Source[?], Nothing => T)
// ^ unsafe types, but we only construct SelectCase from `handle` which is safe

extension [T](src: Source[T])
/** Attach a handler to [[src]], creating a [[SelectCase]]. */
inline def handle[U](f: T => U): SelectCase[U] = (src, f)

/** Alias for [[handle]] */
inline def ~~>[U](f: T => U): SelectCase[U] = src.handle(f)

/** Race a list of sources with the corresponding handler functions, once an item has come back. Like [[race]],
* [[select]] guarantees exactly one of the sources are polled. Unlike `map`ping a [[Source]], the handler in
* [[select]] is run in the same async context as the calling context of [[select]].
*/
def select[T](cases: SelectCase[T]*)(using Async) =
val (input, which) = raceWithOrigin(cases.map(_._1)*).await
val (_, handler) = cases.find(_._1 == which).get
handler.asInstanceOf[input.type => T](input)

/** If left (respectively, right) source succeeds with `x`, pass `Left(x)`, (respectively, Right(x)) on to the
* continuation.
*/
def either[T1, T2](src1: Source[T1], src2: Source[T2]): Source[Either[T1, T2]] =
race(src1.map(Left(_)), src2.map(Right(_)))

end Async
4 changes: 2 additions & 2 deletions shared/src/main/scala/async/Listener.scala
Original file line number Diff line number Diff line change
@@ -64,11 +64,11 @@ object Listener:
* [[Async.Source.dropListener]] these listeners are compared for equality by the hash of the source and the inner
* listener.
*/
abstract case class ForwardingListener[T](src: Async.Source[?], inner: Listener[T]) extends Listener[T]
abstract case class ForwardingListener[T](src: Async.Source[?], inner: Listener[?]) extends Listener[T]

object ForwardingListener:
/** Create an empty [[ForwardingListener]] for equality comparison. */
def empty[T](src: Async.Source[?], inner: Listener[T]) = new ForwardingListener(src, inner):
def empty[T](src: Async.Source[?], inner: Listener[?]) = new ForwardingListener[T](src, inner):
val lock = null
override def complete(data: T, source: Async.Source[T]) = ???

2 changes: 1 addition & 1 deletion shared/src/main/scala/async/Timer.scala
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ class Timer(tickDuration: Duration) extends Cancellable {
inline final def src: Async.Source[this.TimerEvent] = Source

/** Starts the timer. Suspends until the timer is cancelled. */
def start()(using Async, AsyncOperations): Unit =
def run()(using Async, AsyncOperations): Unit =
cancellationScope(this):
loop()

Loading

0 comments on commit 86159db

Please sign in to comment.