Skip to content

Commit

Permalink
Allow to pass an Executor when converting Futures
Browse files Browse the repository at this point in the history
This is to circumvent scala#179.
  • Loading branch information
rdesgroppes committed Nov 18, 2021
1 parent 0478d7d commit 719e4de
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 17 deletions.
48 changes: 45 additions & 3 deletions src/main/scala/scala/compat/java8/FutureConverters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import scala.language.implicitConversions
import scala.concurrent.java8.FuturesConvertersImpl._
import scala.concurrent.java8.FuturesConvertersImplCompat._
import scala.concurrent.{ Future, Promise, ExecutionContext, ExecutionContextExecutorService, ExecutionContextExecutor }
import java.util.concurrent.{ CompletionStage, Executor, ExecutorService }
import java.util.concurrent.{ CompletionStage, Executor, ExecutorService, ForkJoinPool }
import java.util.function.Consumer

/**
Expand Down Expand Up @@ -59,16 +59,38 @@ object FutureConverters {
* transformations to their asynchronous counterparts, i.e.
* <code>thenRun</code> will internally call <code>thenRunAsync</code>.
*
* Callbacks will run on ForkJoinPool.commonPool(), unless it does not
* support a parallelism level of at least two, in which case a new Thread
* is used.
*
* @param f The Scala Future which may eventually supply the completion for
* the returned CompletionStage
* @return a CompletionStage that runs all callbacks asynchronously and does
* not support the CompletableFuture interface
*/
def toJava[T](f: Future[T]): CompletionStage[T] = {
def toJava[T](f: Future[T]): CompletionStage[T] = toJava(f, ForkJoinPool.commonPool())

/**
* Returns a CompletionStage that will be completed with the same value or
* exception as the given Scala Future when that completes. Since the Future is a read-only
* representation, this CompletionStage does not support the
* <code>toCompletableFuture</code> method. The semantics of Scala Future
* demand that all callbacks are invoked asynchronously by default, therefore
* the returned CompletionStage routes all calls to synchronous
* transformations to their asynchronous counterparts, i.e.
* <code>thenRun</code> will internally call <code>thenRunAsync</code>.
*
* @param f The Scala Future which may eventually supply the completion for
* the returned CompletionStage
* @param e The Java Executor onto which schedule the callbacks
* @return a CompletionStage that runs all callbacks asynchronously and does
* not support the CompletableFuture interface
*/
def toJava[T](f: Future[T], e: Executor): CompletionStage[T] = {
f match {
case p: P[T @unchecked] => p.wrapped
case _ =>
val cf = new CF[T](f)
val cf = new CF[T](f, e)
implicit val ec = InternalCallbackExecutor
f onComplete cf
cf
Expand Down Expand Up @@ -189,10 +211,30 @@ object FutureConverters {
* transformations to their asynchronous counterparts, i.e.
* <code>thenRun</code> will internally call <code>thenRunAsync</code>.
*
* Callbacks will run on ForkJoinPool.commonPool(), unless it does not
* support a parallelism level of at least two, in which case a new Thread
* is used.
*
* @return a CompletionStage that runs all callbacks asynchronously and does
* not support the CompletableFuture interface
*/
def toJava: CompletionStage[T] = FutureConverters.toJava(__self)

/**
* Returns a CompletionStage that will be completed with the same value or
* exception as the given Scala Future when that completes. Since the Future is a read-only
* representation, this CompletionStage does not support the
* <code>toCompletableFuture</code> method. The semantics of Scala Future
* demand that all callbacks are invoked asynchronously by default, therefore
* the returned CompletionStage routes all calls to synchronous
* transformations to their asynchronous counterparts, i.e.
* <code>thenRun</code> will internally call <code>thenRunAsync</code>.
*
* @param e The Java Executor onto which schedule the callbacks
* @return a CompletionStage that runs all callbacks asynchronously and does
* not support the CompletableFuture interface
*/
def toJava(e: Executor): CompletionStage[T] = FutureConverters.toJava(__self, e)
}

implicit def CompletionStageOps[T](cs: CompletionStage[T]): CompletionStageOps[T] = new CompletionStageOps(cs)
Expand Down
30 changes: 16 additions & 14 deletions src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import scala.util.{Failure, Success, Try}

// TODO: make this private[scala] when genjavadoc allows for that.
object FuturesConvertersImpl {
class CF[T](val wrapped: Future[T]) extends CompletableFuture[T] with (Try[T] => Unit) {
class CF[T](val wrapped: Future[T], executor: Executor) extends CompletableFuture[T] with (Try[T] => Unit) {
def this(wrapped: Future[T]) = this(wrapped, ForkJoinPool.commonPool())

override def apply(t: Try[T]): Unit = t match {
case Success(v) => complete(v)
case Failure(e) => completeExceptionally(e)
Expand All @@ -32,29 +34,29 @@ object FuturesConvertersImpl {
/*
* Ensure that completions of this future cannot hold the Scala Future’s completer hostage.
*/
override def thenApply[U](fn: JF[_ >: T, _ <: U]): CompletableFuture[U] = thenApplyAsync(fn)
override def thenApply[U](fn: JF[_ >: T, _ <: U]): CompletableFuture[U] = thenApplyAsync(fn, executor)

override def thenAccept(fn: Consumer[_ >: T]): CompletableFuture[Void] = thenAcceptAsync(fn)
override def thenAccept(fn: Consumer[_ >: T]): CompletableFuture[Void] = thenAcceptAsync(fn, executor)

override def thenRun(fn: Runnable): CompletableFuture[Void] = thenRunAsync(fn)
override def thenRun(fn: Runnable): CompletableFuture[Void] = thenRunAsync(fn, executor)

override def thenCombine[U, V](cs: CompletionStage[_ <: U], fn: BiFunction[_ >: T, _ >: U, _ <: V]): CompletableFuture[V] = thenCombineAsync(cs, fn)
override def thenCombine[U, V](cs: CompletionStage[_ <: U], fn: BiFunction[_ >: T, _ >: U, _ <: V]): CompletableFuture[V] = thenCombineAsync(cs, fn, executor)

override def thenAcceptBoth[U](cs: CompletionStage[_ <: U], fn: BiConsumer[_ >: T, _ >: U]): CompletableFuture[Void] = thenAcceptBothAsync(cs, fn)
override def thenAcceptBoth[U](cs: CompletionStage[_ <: U], fn: BiConsumer[_ >: T, _ >: U]): CompletableFuture[Void] = thenAcceptBothAsync(cs, fn, executor)

override def runAfterBoth(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterBothAsync(cs, fn)
override def runAfterBoth(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterBothAsync(cs, fn, executor)

override def applyToEither[U](cs: CompletionStage[_ <: T], fn: JF[_ >: T, U]): CompletableFuture[U] = applyToEitherAsync(cs, fn)
override def applyToEither[U](cs: CompletionStage[_ <: T], fn: JF[_ >: T, U]): CompletableFuture[U] = applyToEitherAsync(cs, fn, executor)

override def acceptEither(cs: CompletionStage[_ <: T], fn: Consumer[_ >: T]): CompletableFuture[Void] = acceptEitherAsync(cs, fn)
override def acceptEither(cs: CompletionStage[_ <: T], fn: Consumer[_ >: T]): CompletableFuture[Void] = acceptEitherAsync(cs, fn, executor)

override def runAfterEither(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterEitherAsync(cs, fn)
override def runAfterEither(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterEitherAsync(cs, fn, executor)

override def thenCompose[U](fn: JF[_ >: T, _ <: CompletionStage[U]]): CompletableFuture[U] = thenComposeAsync(fn)
override def thenCompose[U](fn: JF[_ >: T, _ <: CompletionStage[U]]): CompletableFuture[U] = thenComposeAsync(fn, executor)

override def whenComplete(fn: BiConsumer[_ >: T, _ >: Throwable]): CompletableFuture[T] = whenCompleteAsync(fn)
override def whenComplete(fn: BiConsumer[_ >: T, _ >: Throwable]): CompletableFuture[T] = whenCompleteAsync(fn, executor)

override def handle[U](fn: BiFunction[_ >: T, Throwable, _ <: U]): CompletableFuture[U] = handleAsync(fn)
override def handle[U](fn: BiFunction[_ >: T, Throwable, _ <: U]): CompletableFuture[U] = handleAsync(fn, executor)

override def exceptionally(fn: JF[Throwable, _ <: T]): CompletableFuture[T] = {
val cf = new CompletableFuture[T]
Expand All @@ -71,7 +73,7 @@ object FuturesConvertersImpl {
if (n ne this) cf.complete(n.asInstanceOf[T])
}
}
})
}, executor)
cf
}

Expand Down

0 comments on commit 719e4de

Please sign in to comment.