Skip to content

Commit

Permalink
Add cancelNow
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Jul 24, 2023
1 parent bac5eed commit d624bbb
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
7 changes: 7 additions & 0 deletions core/src/main/scala/ox/fork.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def fork[T](f: => T)(using Ox): Fork[T] =
case e: ExecutionException => Left(e.getCause)
case e: Throwable => Left(e)
else Left(new InterruptedException("fork was cancelled before it started"))
override def cancelNow(): Unit = forkFuture.cancel(false)

def forkAll[T](fs: Seq[() => T])(using Ox): Fork[Seq[T]] =
val forks = fs.map(f => fork(f()))
Expand All @@ -44,6 +45,7 @@ def forkAll[T](fs: Seq[() => T])(using Ox): Fork[Seq[T]] =
if results.exists(_.isLeft)
then Left(results.collectFirst { case Left(e) => e }.get)
else Right(results.collect { case Right(t) => t })
override def cancelNow(): Unit = forks.foreach(_.cancelNow())

/** A running fork, started using [[fork]] or [[fork]], backend by a thread. */
trait Fork[T]:
Expand All @@ -55,3 +57,8 @@ trait Fork[T]:

/** Interrupts the fork, and blocks until it completes with a result. */
def cancel(): Either[Throwable, T]

/** Interrupts the fork, and returns immediately, without waiting for the fork complete. Note that the enclosing scope will only complete
* once all forks have completed.
*/
def cancelNow(): Unit
21 changes: 21 additions & 0 deletions core/src/test/scala/ox/ForkTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,25 @@ class ForkTest extends AnyFlatSpec with Matchers {
else trail.trail shouldBe Vector("started", "interrupted", "interrupted done", "cancel done")
}
}

"cancelNow" should "return immediately, and wait for forks when scope completes" in {
val trail = Trail()
scoped {
val f = fork {
try
Thread.sleep(500L)
trail.add("main done")
catch
case _: InterruptedException =>
Thread.sleep(500L)
trail.add("interrupted done")
}

Thread.sleep(100L) // making sure the fork starts
f.cancelNow()
trail.add("cancel done")
trail.trail shouldBe Vector("cancel done")
}
trail.trail shouldBe Vector("cancel done", "interrupted done")
}
}

0 comments on commit d624bbb

Please sign in to comment.