Skip to content

Commit

Permalink
[util/finagle-core][fiber-scheduler] don't reject tasks redirected fr…
Browse files Browse the repository at this point in the history
…om `FuturePool`s

**Problem**

The fiber scheduler can reject tasks that are redirected from `FuturePool`s. The idea behind that was to avoid having to support unbounded queuing but that's making the adoption of the new scheduler more difficult.

**Solution**

Introduce a new `tryFork` method that has a behavior similar to the previous `fork` implementation and allows the caller to detect if the task got rejected without having to handle an exception. The `fork` method is changed to never reject requests even if the scheduler is overloaded. Typically, `tryFork` is called from Finagle's default filter chain, which can reject tasks, and `fork` is called from redirected `FuturePool`s. Users can also call those methods directly but we don't have cases of that yet.

**Notes**

- With this change, it's safer to redirect `FuturePool`s so I changed the default fiber scheduler configuration to set `redirectFuturePools` to `true`
- Ideally, we should revisit this in the future so unbounded queuing doesn't need to be supported but that will require a larger effort to adapt applications

Differential Revision: https://phabricator.twitter.biz/D760928
  • Loading branch information
Flavio Brasil authored and jenkins committed Oct 22, 2021
1 parent 21ae472 commit b18c1c4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import java.util.concurrent.ExecutorService
trait ForkingScheduler extends Scheduler {

/**
* Forks the execution of a `Future` computation.
* Forks the execution of a `Future` computation even if
* the scheduler is overloaded.
*
* @param f the Future computation to be forked
* @tparam T the type of the `Future` computation
Expand All @@ -20,6 +21,18 @@ trait ForkingScheduler extends Scheduler {
*/
def fork[T](f: => Future[T]): Future[T]

/**
* Tries to fork the execution of a `Future` computation and
* returns empty in case the scheduler is overloaded.
*
* @param f the Future computation to be forked
* @tparam T the type of the `Future` computation
* @return A `Future` with `None` if the scheduler is overloaded
* or `Some[T]` if the task is successfully forked and
* executed.
*/
def tryFork[T](f: => Future[T]): Future[Option[T]]

/**
* Forks the execution of a `Future` computation using the
* provided executor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ class FuturePoolTest extends AnyFunSuite with Eventually {
forked = Some(r)
r
}
override def tryFork[T](f: => Future[T]): Future[Option[T]] = ???
override def redirectFuturePools(): Boolean = redirect
override def blocking[T](f: => T)(implicit perm: Awaitable.CanAwait): T = f
override def flush(): Unit = {}
Expand Down

0 comments on commit b18c1c4

Please sign in to comment.