Skip to content

Commit

Permalink
Merge pull request #21 from permutive-engineering/new-value-selector
Browse files Browse the repository at this point in the history
  • Loading branch information
janstenpickle authored Dec 7, 2022
2 parents adcb464 + 0f69f3d commit 2c7d684
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 21 deletions.
90 changes: 75 additions & 15 deletions core/src/main/scala/com/permutive/refreshable/Refreshable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.permutive.refreshable
import cats.effect._
import cats.effect.syntax.all._
import cats.syntax.all._
import cats.{~>, Applicative, Functor}
import cats.{Applicative, FlatMap, Functor, ~>}
import fs2.Stream
import fs2.concurrent.SignallingRef
import retry._
Expand Down Expand Up @@ -131,6 +131,15 @@ object Refreshable {
* a callback invoked whenever a new value is generated, the
* [[scala.concurrent.duration.FiniteDuration]] is the period that will be
* waited before the next new value
* @param combineFunction
* a function which takes the old value and new value, returning some
* effectful value. This can be used to perform actions like accumulation
* of the underlying values or discarding new values that don't match some
* predicate. A failure in the returned `F[A]` will result in the value not
* being updated, now will it trigger the `refreshFailureCallback` as a
* failure in this function is nothing to do with the internals of
* `Refreshable` and should be handled properly by the caller, however the
* value in the store will be set to a [[CachedValue.Error]]
* @param defaultValue
* an optional default value to use when initialising the resource, if the
* call to `fa` fails. This will prevent the constructor from failing
Expand All @@ -148,9 +157,31 @@ object Refreshable {
]],
val exhaustedRetriesCallback: PartialFunction[Throwable, F[Unit]],
val newValueCallback: Option[(A, FiniteDuration) => F[Unit]],
val combineFunction: Option[(CachedValue[A], CachedValue[A]) => F[A]],
val defaultValue: Option[A]
) { self =>

private[refreshable] def this(
refresh: F[A],
cacheDuration: A => FiniteDuration,
retryPolicy: A => RetryPolicy[F],
refreshFailureCallback: PartialFunction[(Throwable, RetryDetails), F[
Unit
]],
exhaustedRetriesCallback: PartialFunction[Throwable, F[Unit]],
newValueCallback: Option[(A, FiniteDuration) => F[Unit]],
defaultValue: Option[A]
) = this(
refresh,
cacheDuration,
retryPolicy,
refreshFailureCallback,
exhaustedRetriesCallback,
newValueCallback,
None,
defaultValue
)

private def copy(
refresh: F[A] = self.refresh,
cacheDuration: A => FiniteDuration = self.cacheDuration,
Expand All @@ -162,6 +193,8 @@ object Refreshable {
self.exhaustedRetriesCallback,
newValueCallback: Option[(A, FiniteDuration) => F[Unit]] =
self.newValueCallback,
combineFunction: Option[(CachedValue[A], CachedValue[A]) => F[A]] =
self.combineFunction,
defaultValue: Option[A] = self.defaultValue
): RefreshableBuilder[F, A] = new RefreshableBuilder[F, A](
refresh,
Expand All @@ -170,6 +203,7 @@ object Refreshable {
refreshFailureCallback,
exhaustedRetriesCallback,
newValueCallback,
combineFunction,
defaultValue
) {}

Expand Down Expand Up @@ -198,6 +232,11 @@ object Refreshable {
callback: (A, FiniteDuration) => F[Unit]
): RefreshableBuilder[F, A] = copy(newValueCallback = Some(callback))

def combine(
combineFunction: (CachedValue[A], CachedValue[A]) => F[A]
): RefreshableBuilder[F, A] =
copy(combineFunction = Some(combineFunction))

def defaultValue(defaultValue: A): RefreshableBuilder[F, A] =
copy(defaultValue = Some(defaultValue))

Expand All @@ -217,14 +256,39 @@ object Refreshable {
} yield RefreshableImpl(store, fiberStore, makeFiber(store))
}

private def storeValue(
store: Ref[F, CachedValue[A]],
oldValue: CachedValue[A],
newValue: CachedValue[A]
): F[Unit] =
combineFunction.fold(store.set(newValue)) { f =>
f(oldValue, newValue)
.flatMap { v =>
val value = newValue match {
case CachedValue.Success(_) => CachedValue.Success(v)
case CachedValue.Error(_, error) => CachedValue.Error(v, error)
case CachedValue.Cancelled(_) => CachedValue.Cancelled(v)
}
store.set(value)
}
// Any error from `f` will result in the value not being updated
// the user should handle errors in the function themselves.
// The `onRefreshFailure` callback will not be called here as any
// error arising from this is the user's responsibility and not
// anything to do with the `Refreshable`'s internals.
.handleErrorWith(th =>
store.set(CachedValue.Error(oldValue.value, th))
)
}

protected def makeFiber(
store: Ref[F, CachedValue[A]]
)(wait: Deferred[F, Unit]) = (wait.get >> store.get
.flatMap(a =>
refreshLoop(
a.value,
a,
refresh,
store.set(_),
storeValue(store, _, _),
cacheDuration,
refreshFailureCallback,
newValueCallback.getOrElse((_, _) => Applicative[F].unit),
Expand All @@ -251,17 +315,17 @@ object Refreshable {
} yield ()).uncancelable

private def refreshLoop(
initialA: A,
initialA: CachedValue[A],
fa: F[A],
set: CachedValue[A] => F[Unit],
set: (CachedValue[A], CachedValue[A]) => F[Unit],
cacheDuration: A => FiniteDuration,
onRefreshFailure: PartialFunction[(Throwable, RetryDetails), F[Unit]],
onNewValue: (A, FiniteDuration) => F[Unit],
retryPolicy: A => RetryPolicy[F]
): F[Unit] = {
def innerLoop(currentA: A): F[Unit] = {
): F[Unit] =
FlatMap[F].tailRecM[A, Unit](initialA.value) { currentA =>
val faError = fa.onError { case th =>
set(CachedValue.Error(currentA, th))
set(initialA, CachedValue.Error(currentA, th))
}

val retryFa =
Expand All @@ -281,14 +345,9 @@ object Refreshable {
// though; for example we'd need to handle the case of failing to acquire a new value ensuring consumers do not
// block on an empty deferred forever.
newA <- retryFa
_ <- set(CachedValue.Success(newA))
_ <- innerLoop(newA)
} yield ()
_ <- set(initialA, CachedValue.Success(newA))
} yield Left(newA)
}

innerLoop(initialA)
}

}

private class RefreshableImpl[F[_]: Concurrent, A] private (
Expand Down Expand Up @@ -341,6 +400,7 @@ object Refreshable {
refreshFailureCallback = PartialFunction.empty,
exhaustedRetriesCallback = PartialFunction.empty,
newValueCallback = None,
combineFunction = None,
defaultValue = None
) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package com.permutive.refreshable

import cats.arrow.FunctionK
import cats.syntax.all._
import cats.effect._
import cats.effect.testkit.TestControl
import retry._
import cats.syntax.all._
import munit.CatsEffectSuite
import retry._

import scala.concurrent.duration._

Expand Down Expand Up @@ -359,6 +359,30 @@ class RefreshableSuite extends CatsEffectSuite {
TestControl.executeEmbed(run)

}

test(s"${factory.name} - newValueSelector changes resulting value") {

val run =
factory
.resource[Int](
refresh = IO(1),
cacheDuration = _ => 2.seconds,
onRefreshFailure = { case _ =>
IO.unit
},
onExhaustedRetries = { case _ =>
IO.unit
},
combine = Some((oldV: CachedValue[Int], newV: CachedValue[Int]) =>
IO(oldV.value + newV.value)
)
)
.use { refreshable =>
IO.sleep(3.seconds) >> refreshable.value.assertEquals(2)
}

TestControl.executeEmbed(run)
}
}

suite(Default)
Expand Down Expand Up @@ -389,6 +413,7 @@ class RefreshableSuite extends CatsEffectSuite {
onRefreshFailure: PartialFunction[(Throwable, RetryDetails), IO[Unit]],
onExhaustedRetries: PartialFunction[Throwable, IO[Unit]],
onNewValue: Option[(A, FiniteDuration) => IO[Unit]] = None,
combine: Option[(CachedValue[A], CachedValue[A]) => IO[A]] = None,
defaultValue: Option[A] = None,
retryPolicy: Option[RetryPolicy[IO]] = None
): Resource[IO, Refreshable[IO, A]] = {
Expand All @@ -400,9 +425,11 @@ class RefreshableSuite extends CatsEffectSuite {

val b2 = onNewValue.fold(b1)(v => b1.onNewValue(v))

val b3 = defaultValue.fold(b2)(v => b2.defaultValue(v))
val b3 = combine.fold(b2)(v => b2.combine(v))

retryPolicy.fold(b3)(v => b3.retryPolicy(v)).resource
val b4 = defaultValue.fold(b3)(v => b3.defaultValue(v))

retryPolicy.fold(b4)(v => b4.retryPolicy(v)).resource
}
}

Expand All @@ -416,6 +443,7 @@ class RefreshableSuite extends CatsEffectSuite {
onRefreshFailure: PartialFunction[(Throwable, RetryDetails), IO[Unit]],
onExhaustedRetries: PartialFunction[Throwable, IO[Unit]],
onNewValue: Option[(A, FiniteDuration) => IO[Unit]] = None,
combine: Option[(CachedValue[A], CachedValue[A]) => IO[A]] = None,
defaultValue: Option[A] = None,
retryPolicy: Option[RetryPolicy[IO]] = None
): Resource[IO, Refreshable[IO, A]] = {
Expand All @@ -427,9 +455,11 @@ class RefreshableSuite extends CatsEffectSuite {

val b2 = onNewValue.fold(b1)(v => b1.onNewValue(v))

val b3 = defaultValue.fold(b2)(v => b2.defaultValue(v))
val b3 = combine.fold(b2)(v => b2.combine(v))

val b4 = defaultValue.fold(b3)(v => b3.defaultValue(v))

retryPolicy.fold(b3)(v => b3.retryPolicy(v)).resource
retryPolicy.fold(b4)(v => b4.retryPolicy(v)).resource

}
}
Expand All @@ -444,6 +474,7 @@ class RefreshableSuite extends CatsEffectSuite {
onRefreshFailure: PartialFunction[(Throwable, RetryDetails), IO[Unit]],
onExhaustedRetries: PartialFunction[Throwable, IO[Unit]],
onNewValue: Option[(A, FiniteDuration) => IO[Unit]] = None,
combine: Option[(CachedValue[A], CachedValue[A]) => IO[A]] = None,
defaultValue: Option[A] = None,
retryPolicy: Option[RetryPolicy[IO]] = None
): Resource[IO, Refreshable[IO, A]] = Default
Expand All @@ -453,6 +484,7 @@ class RefreshableSuite extends CatsEffectSuite {
onRefreshFailure,
onExhaustedRetries,
onNewValue,
combine,
defaultValue,
retryPolicy
)
Expand All @@ -469,6 +501,7 @@ class RefreshableSuite extends CatsEffectSuite {
onRefreshFailure: PartialFunction[(Throwable, RetryDetails), IO[Unit]],
onExhaustedRetries: PartialFunction[Throwable, IO[Unit]],
onNewValue: Option[(A, FiniteDuration) => IO[Unit]] = None,
combine: Option[(CachedValue[A], CachedValue[A]) => IO[A]] = None,
defaultValue: Option[A] = None,
retryPolicy: Option[RetryPolicy[IO]] = None
): Resource[IO, Refreshable[IO, A]]
Expand Down

0 comments on commit 2c7d684

Please sign in to comment.