From 8a1de0af6f2a8fbddfea8d0802ba03fa6d4c8525 Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Tue, 21 Nov 2023 22:18:47 +0100 Subject: [PATCH] Add scheduled cleaning job --- app/src/main/scala/App.scala | 8 +++++--- app/src/main/scala/RedisSubscriberJob.scala | 13 +++++++++++++ app/src/test/scala/ExecutorTest.scala | 2 +- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/app/src/main/scala/App.scala b/app/src/main/scala/App.scala index 98cfb48..4d782b2 100644 --- a/app/src/main/scala/App.scala +++ b/app/src/main/scala/App.scala @@ -21,9 +21,11 @@ object App extends IOApp.Simple: res <- AppResources.instance(config.redis) lilaClient = LilaClient(res.redisPubsub) executor <- Resource.eval(Executor.instance(lilaClient)) - job = RedisSubscriberJob(executor, res.redisPubsub) - httpApi = HttpApi(executor, HealthCheck()) + workListenerJob = RedisSubscriberJob(executor, res.redisPubsub) + cleanJob = CleanJob(executor) + httpApi = HttpApi(executor, HealthCheck()) server <- MkHttpServer.apply.newEmber(config.server, httpApi.httpApp) - _ <- job.run().background + _ <- workListenerJob.run().background + _ <- cleanJob.run().background _ <- Resource.eval(Logger[IO].info(s"Starting server on ${config.server.host}:${config.server.port}")) yield () diff --git a/app/src/main/scala/RedisSubscriberJob.scala b/app/src/main/scala/RedisSubscriberJob.scala index 6b4b6c9..5a353ea 100644 --- a/app/src/main/scala/RedisSubscriberJob.scala +++ b/app/src/main/scala/RedisSubscriberJob.scala @@ -7,6 +7,8 @@ import io.chrisdavenport.rediculous.RedisPubSub import org.typelevel.log4cats.Logger import Lila.Request +import scala.concurrent.duration.* + trait RedisSubscriberJob: def run(): IO[Unit] @@ -33,3 +35,14 @@ object RedisSubscriberJob: case Some(request) => executor.add(request) case None => Logger[IO].error(s"Failed to parse message: $msg"), ) *> pubsub.runMessages + +trait CleanJob: + def run(): IO[Unit] + +object CleanJob: + def apply(executor: Executor)(using Logger[IO]): CleanJob = + new CleanJob: + def run(): IO[Unit] = + Logger[IO].info("Start cleaning job") *> + IO.sleep(5.seconds) *> + IO.realTimeInstant.flatMap(executor.clean) diff --git a/app/src/test/scala/ExecutorTest.scala b/app/src/test/scala/ExecutorTest.scala index 9667b3f..c18837b 100644 --- a/app/src/test/scala/ExecutorTest.scala +++ b/app/src/test/scala/ExecutorTest.scala @@ -66,7 +66,7 @@ object ExecutorTest extends SimpleIOSuite: moves <- ref.get yield assert(moves.isEmpty) - test("after timeout should move can be acruired again"): + test("after timeout move should be able to acquired again"): for ref <- Ref.of[IO, List[Lila.Move]](Nil) client = createLilaClient(ref)