diff --git a/core/src/main/scala/com/permutive/refreshable/Refreshable.scala b/core/src/main/scala/com/permutive/refreshable/Refreshable.scala index 17867bd..ca64090 100644 --- a/core/src/main/scala/com/permutive/refreshable/Refreshable.scala +++ b/core/src/main/scala/com/permutive/refreshable/Refreshable.scala @@ -19,7 +19,7 @@ package com.permutive.refreshable import cats.effect._ import cats.effect.syntax.all._ import cats.syntax.all._ -import cats.{~>, Applicative, Functor} +import cats.{Applicative, FlatMap, Functor, ~>} import fs2.Stream import fs2.concurrent.SignallingRef import retry._ @@ -131,6 +131,15 @@ object Refreshable { * a callback invoked whenever a new value is generated, the * [[scala.concurrent.duration.FiniteDuration]] is the period that will be * waited before the next new value + * @param combineFunction + * a function which takes the old value and new value, returning some + * effectful value. This can be used to perform actions like accumulation + * of the underlying values or discarding new values that don't match some + * predicate. A failure in the returned `F[A]` will result in the value not + * being updated, now will it trigger the `refreshFailureCallback` as a + * failure in this function is nothing to do with the internals of + * `Refreshable` and should be handled properly by the caller, however the + * value in the store will be set to a [[CachedValue.Error]] * @param defaultValue * an optional default value to use when initialising the resource, if the * call to `fa` fails. This will prevent the constructor from failing @@ -148,9 +157,31 @@ object Refreshable { ]], val exhaustedRetriesCallback: PartialFunction[Throwable, F[Unit]], val newValueCallback: Option[(A, FiniteDuration) => F[Unit]], + val combineFunction: Option[(CachedValue[A], CachedValue[A]) => F[A]], val defaultValue: Option[A] ) { self => + private[refreshable] def this( + refresh: F[A], + cacheDuration: A => FiniteDuration, + retryPolicy: A => RetryPolicy[F], + refreshFailureCallback: PartialFunction[(Throwable, RetryDetails), F[ + Unit + ]], + exhaustedRetriesCallback: PartialFunction[Throwable, F[Unit]], + newValueCallback: Option[(A, FiniteDuration) => F[Unit]], + defaultValue: Option[A] + ) = this( + refresh, + cacheDuration, + retryPolicy, + refreshFailureCallback, + exhaustedRetriesCallback, + newValueCallback, + None, + defaultValue + ) + private def copy( refresh: F[A] = self.refresh, cacheDuration: A => FiniteDuration = self.cacheDuration, @@ -162,6 +193,8 @@ object Refreshable { self.exhaustedRetriesCallback, newValueCallback: Option[(A, FiniteDuration) => F[Unit]] = self.newValueCallback, + combineFunction: Option[(CachedValue[A], CachedValue[A]) => F[A]] = + self.combineFunction, defaultValue: Option[A] = self.defaultValue ): RefreshableBuilder[F, A] = new RefreshableBuilder[F, A]( refresh, @@ -170,6 +203,7 @@ object Refreshable { refreshFailureCallback, exhaustedRetriesCallback, newValueCallback, + combineFunction, defaultValue ) {} @@ -198,6 +232,11 @@ object Refreshable { callback: (A, FiniteDuration) => F[Unit] ): RefreshableBuilder[F, A] = copy(newValueCallback = Some(callback)) + def combine( + combineFunction: (CachedValue[A], CachedValue[A]) => F[A] + ): RefreshableBuilder[F, A] = + copy(combineFunction = Some(combineFunction)) + def defaultValue(defaultValue: A): RefreshableBuilder[F, A] = copy(defaultValue = Some(defaultValue)) @@ -217,14 +256,39 @@ object Refreshable { } yield RefreshableImpl(store, fiberStore, makeFiber(store)) } + private def storeValue( + store: Ref[F, CachedValue[A]], + oldValue: CachedValue[A], + newValue: CachedValue[A] + ): F[Unit] = + combineFunction.fold(store.set(newValue)) { f => + f(oldValue, newValue) + .flatMap { v => + val value = newValue match { + case CachedValue.Success(_) => CachedValue.Success(v) + case CachedValue.Error(_, error) => CachedValue.Error(v, error) + case CachedValue.Cancelled(_) => CachedValue.Cancelled(v) + } + store.set(value) + } + // Any error from `f` will result in the value not being updated + // the user should handle errors in the function themselves. + // The `onRefreshFailure` callback will not be called here as any + // error arising from this is the user's responsibility and not + // anything to do with the `Refreshable`'s internals. + .handleErrorWith(th => + store.set(CachedValue.Error(oldValue.value, th)) + ) + } + protected def makeFiber( store: Ref[F, CachedValue[A]] )(wait: Deferred[F, Unit]) = (wait.get >> store.get .flatMap(a => refreshLoop( - a.value, + a, refresh, - store.set(_), + storeValue(store, _, _), cacheDuration, refreshFailureCallback, newValueCallback.getOrElse((_, _) => Applicative[F].unit), @@ -251,17 +315,17 @@ object Refreshable { } yield ()).uncancelable private def refreshLoop( - initialA: A, + initialA: CachedValue[A], fa: F[A], - set: CachedValue[A] => F[Unit], + set: (CachedValue[A], CachedValue[A]) => F[Unit], cacheDuration: A => FiniteDuration, onRefreshFailure: PartialFunction[(Throwable, RetryDetails), F[Unit]], onNewValue: (A, FiniteDuration) => F[Unit], retryPolicy: A => RetryPolicy[F] - ): F[Unit] = { - def innerLoop(currentA: A): F[Unit] = { + ): F[Unit] = + FlatMap[F].tailRecM[A, Unit](initialA.value) { currentA => val faError = fa.onError { case th => - set(CachedValue.Error(currentA, th)) + set(initialA, CachedValue.Error(currentA, th)) } val retryFa = @@ -281,14 +345,9 @@ object Refreshable { // though; for example we'd need to handle the case of failing to acquire a new value ensuring consumers do not // block on an empty deferred forever. newA <- retryFa - _ <- set(CachedValue.Success(newA)) - _ <- innerLoop(newA) - } yield () + _ <- set(initialA, CachedValue.Success(newA)) + } yield Left(newA) } - - innerLoop(initialA) - } - } private class RefreshableImpl[F[_]: Concurrent, A] private ( @@ -341,6 +400,7 @@ object Refreshable { refreshFailureCallback = PartialFunction.empty, exhaustedRetriesCallback = PartialFunction.empty, newValueCallback = None, + combineFunction = None, defaultValue = None ) {} diff --git a/core/src/test/scala/com/permutive/refreshable/RefreshableSuite.scala b/core/src/test/scala/com/permutive/refreshable/RefreshableSuite.scala index e561076..f112c58 100644 --- a/core/src/test/scala/com/permutive/refreshable/RefreshableSuite.scala +++ b/core/src/test/scala/com/permutive/refreshable/RefreshableSuite.scala @@ -17,11 +17,11 @@ package com.permutive.refreshable import cats.arrow.FunctionK -import cats.syntax.all._ import cats.effect._ import cats.effect.testkit.TestControl -import retry._ +import cats.syntax.all._ import munit.CatsEffectSuite +import retry._ import scala.concurrent.duration._ @@ -359,6 +359,30 @@ class RefreshableSuite extends CatsEffectSuite { TestControl.executeEmbed(run) } + + test(s"${factory.name} - newValueSelector changes resulting value") { + + val run = + factory + .resource[Int]( + refresh = IO(1), + cacheDuration = _ => 2.seconds, + onRefreshFailure = { case _ => + IO.unit + }, + onExhaustedRetries = { case _ => + IO.unit + }, + combine = Some((oldV: CachedValue[Int], newV: CachedValue[Int]) => + IO(oldV.value + newV.value) + ) + ) + .use { refreshable => + IO.sleep(3.seconds) >> refreshable.value.assertEquals(2) + } + + TestControl.executeEmbed(run) + } } suite(Default) @@ -389,6 +413,7 @@ class RefreshableSuite extends CatsEffectSuite { onRefreshFailure: PartialFunction[(Throwable, RetryDetails), IO[Unit]], onExhaustedRetries: PartialFunction[Throwable, IO[Unit]], onNewValue: Option[(A, FiniteDuration) => IO[Unit]] = None, + combine: Option[(CachedValue[A], CachedValue[A]) => IO[A]] = None, defaultValue: Option[A] = None, retryPolicy: Option[RetryPolicy[IO]] = None ): Resource[IO, Refreshable[IO, A]] = { @@ -400,9 +425,11 @@ class RefreshableSuite extends CatsEffectSuite { val b2 = onNewValue.fold(b1)(v => b1.onNewValue(v)) - val b3 = defaultValue.fold(b2)(v => b2.defaultValue(v)) + val b3 = combine.fold(b2)(v => b2.combine(v)) - retryPolicy.fold(b3)(v => b3.retryPolicy(v)).resource + val b4 = defaultValue.fold(b3)(v => b3.defaultValue(v)) + + retryPolicy.fold(b4)(v => b4.retryPolicy(v)).resource } } @@ -416,6 +443,7 @@ class RefreshableSuite extends CatsEffectSuite { onRefreshFailure: PartialFunction[(Throwable, RetryDetails), IO[Unit]], onExhaustedRetries: PartialFunction[Throwable, IO[Unit]], onNewValue: Option[(A, FiniteDuration) => IO[Unit]] = None, + combine: Option[(CachedValue[A], CachedValue[A]) => IO[A]] = None, defaultValue: Option[A] = None, retryPolicy: Option[RetryPolicy[IO]] = None ): Resource[IO, Refreshable[IO, A]] = { @@ -427,9 +455,11 @@ class RefreshableSuite extends CatsEffectSuite { val b2 = onNewValue.fold(b1)(v => b1.onNewValue(v)) - val b3 = defaultValue.fold(b2)(v => b2.defaultValue(v)) + val b3 = combine.fold(b2)(v => b2.combine(v)) + + val b4 = defaultValue.fold(b3)(v => b3.defaultValue(v)) - retryPolicy.fold(b3)(v => b3.retryPolicy(v)).resource + retryPolicy.fold(b4)(v => b4.retryPolicy(v)).resource } } @@ -444,6 +474,7 @@ class RefreshableSuite extends CatsEffectSuite { onRefreshFailure: PartialFunction[(Throwable, RetryDetails), IO[Unit]], onExhaustedRetries: PartialFunction[Throwable, IO[Unit]], onNewValue: Option[(A, FiniteDuration) => IO[Unit]] = None, + combine: Option[(CachedValue[A], CachedValue[A]) => IO[A]] = None, defaultValue: Option[A] = None, retryPolicy: Option[RetryPolicy[IO]] = None ): Resource[IO, Refreshable[IO, A]] = Default @@ -453,6 +484,7 @@ class RefreshableSuite extends CatsEffectSuite { onRefreshFailure, onExhaustedRetries, onNewValue, + combine, defaultValue, retryPolicy ) @@ -469,6 +501,7 @@ class RefreshableSuite extends CatsEffectSuite { onRefreshFailure: PartialFunction[(Throwable, RetryDetails), IO[Unit]], onExhaustedRetries: PartialFunction[Throwable, IO[Unit]], onNewValue: Option[(A, FiniteDuration) => IO[Unit]] = None, + combine: Option[(CachedValue[A], CachedValue[A]) => IO[A]] = None, defaultValue: Option[A] = None, retryPolicy: Option[RetryPolicy[IO]] = None ): Resource[IO, Refreshable[IO, A]]