Skip to content

Commit

Permalink
Minimize changes
Browse files Browse the repository at this point in the history
  • Loading branch information
natsukagami committed Jul 12, 2024
1 parent 1612af4 commit 5830cb2
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 22 deletions.
10 changes: 6 additions & 4 deletions shared/src/main/scala/async/AsyncOperations.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package gears.async

import language.experimental.captureChecking

import gears.async.AsyncOperations.sleep

import java.util.concurrent.TimeoutException
import scala.concurrent.duration.FiniteDuration

Expand All @@ -19,14 +21,14 @@ object AsyncOperations:
* @param millis
* The duration to suspend, in milliseconds. Must be a positive integer.
*/
def sleep(millis: Long)(using AsyncOperations, Async): Unit =
inline def sleep(millis: Long)(using AsyncOperations, Async): Unit =
summon[AsyncOperations].sleep(millis)

/** Suspends the current [[Async]] context for `duration`.
* @param duration
* The duration to suspend. Must be positive.
*/
def sleep(duration: FiniteDuration)(using AsyncOperations, Async): Unit =
inline def sleep(duration: FiniteDuration)(using AsyncOperations, Async): Unit =
sleep(duration.toMillis)

/** Runs `op` with a timeout. When the timeout occurs, `op` is cancelled through the given [[Async]] context, and
Expand All @@ -36,7 +38,7 @@ def withTimeout[T](timeout: FiniteDuration)(op: Async ?=> T)(using AsyncOperatio
Async.group:
Async.select(
Future(op).handle(_.get),
Future(AsyncOperations.sleep(timeout)).handle: _ =>
Future(sleep(timeout)).handle: _ =>
throw TimeoutException()
)

Expand All @@ -47,5 +49,5 @@ def withTimeoutOption[T](timeout: FiniteDuration)(op: Async ?=> T)(using AsyncOp
Async.group:
Async.select(
Future(op).handle(v => Some(v.get)),
Future(AsyncOperations.sleep(timeout)).handle(_ => None)
Future(sleep(timeout)).handle(_ => None)
)
16 changes: 6 additions & 10 deletions shared/src/main/scala/async/Cancellable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ package gears.async

import language.experimental.captureChecking

import java.util.concurrent.atomic.AtomicLong

/** A trait for cancellable entities that can be grouped. */
trait Cancellable:
val id = Cancellable.Id()
private var group: CompletionGroup = CompletionGroup.Unlinked

/** Issue a cancel request */
Expand All @@ -15,9 +12,9 @@ trait Cancellable:
/** Add this cancellable to the given group after removing it from the previous group in which it was.
*/
def link(group: CompletionGroup): this.type = synchronized:
this.group.drop(this)
this.group.drop(this.unsafeAssumePure)
this.group = group
this.group.add(this)
this.group.add(this.unsafeAssumePure)
this

/** Link this cancellable to the cancellable group of the current async context.
Expand All @@ -29,14 +26,13 @@ trait Cancellable:
def unlink(): this.type =
link(CompletionGroup.Unlinked)

/** Assume that the [[Cancellable]] is pure, in the case that cancellation does *not* refer to captured resources.
*/
inline def unsafeAssumePure: Cancellable = caps.unsafe.unsafeAssumePure(this)

end Cancellable

object Cancellable:
opaque type Id = Long
private object Id:
private val gen = AtomicLong(0)
def apply(): Id = gen.incrementAndGet()

/** A special [[Cancellable]] object that just tracks whether its linked group was cancelled. */
trait Tracking extends Cancellable:
def isCancelled: Boolean
Expand Down
11 changes: 5 additions & 6 deletions shared/src/main/scala/async/CompletionGroup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import scala.collection.mutable
import scala.util.Success

import Future.Promise
import scala.annotation.unchecked.uncheckedCaptures

/** A group of cancellable objects that are completed together. Cancelling the group means cancelling all its
* uncompleted members.
*/
class CompletionGroup extends Cancellable.Tracking:
private val members: mutable.Set[(Cancellable^) @uncheckedCaptures] = mutable.Set[(Cancellable^) @uncheckedCaptures]()
private val members: mutable.Set[Cancellable] = mutable.Set()
private var canceled: Boolean = false
private var cancelWait: Option[Promise[Unit]] = None

Expand All @@ -32,14 +31,14 @@ class CompletionGroup extends Cancellable.Tracking:
unlink()

/** Add given member to the members set. If the group has already been cancelled, cancels that member immediately. */
def add(member: Cancellable^): Unit =
def add(member: Cancellable): Unit =
val alreadyCancelled = synchronized:
members += member // Add this member no matter what since we'll wait for it still
canceled
if alreadyCancelled then member.cancel()

/** Remove given member from the members set if it is an element */
def drop(member: Cancellable^): Unit = synchronized:
def drop(member: Cancellable): Unit = synchronized:
members -= member
if members.isEmpty && cancelWait.isDefined then cancelWait.get.complete(Success(()))

Expand All @@ -53,8 +52,8 @@ object CompletionGroup:
object Unlinked extends CompletionGroup:
override def cancel(): Unit = ()
override def waitCompletion()(using Async): Unit = ()
override def add(member: Cancellable^): Unit = ()
override def drop(member: Cancellable^): Unit = ()
override def add(member: Cancellable): Unit = ()
override def drop(member: Cancellable): Unit = ()
end Unlinked

end CompletionGroup
4 changes: 2 additions & 2 deletions shared/src/main/scala/async/Listener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ trait Listener[-T]:

object Listener:
/** A simple [[Listener]] that always accepts the item and sends it to the consumer. */
def acceptingListener[T](consumer: (T, SourceSymbol[T]) => Unit): Listener[T]^{consumer} =
/* inline bug */ def acceptingListener[T](consumer: (T, SourceSymbol[T]) => Unit): Listener[T]^{consumer} =
new Listener[T]:
val lock = null
def complete(data: T, source: SourceSymbol[T]) = consumer(data, source)
Expand All @@ -64,7 +64,7 @@ object Listener:
* [[Async.Source.dropListener]] these listeners are compared for equality by the hash of the source and the inner
* listener.
*/
abstract case class ForwardingListener[T](src: Async.Source[?]^, inner: Listener[?]^) extends Listener[T]
abstract case class ForwardingListener[-T](src: Async.Source[?]^, inner: Listener[?]^) extends Listener[T]

object ForwardingListener:
/** Creates an empty [[ForwardingListener]] for equality comparison. */
Expand Down

0 comments on commit 5830cb2

Please sign in to comment.