From 8bbea1b1015f89806668baca7def2e6d5f13d2c2 Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Fri, 27 Sep 2024 18:23:58 +0200 Subject: [PATCH] Make kamon calls blocking --- app/src/main/scala/App.scala | 6 ++-- app/src/main/scala/AppConfig.scala | 2 +- app/src/main/scala/KamonInitiator.scala | 4 +-- app/src/main/scala/Monitor.scala | 37 +++++++++++++++---------- 4 files changed, 29 insertions(+), 20 deletions(-) diff --git a/app/src/main/scala/App.scala b/app/src/main/scala/App.scala index 5a68d87..a737ac1 100644 --- a/app/src/main/scala/App.scala +++ b/app/src/main/scala/App.scala @@ -13,9 +13,9 @@ object App extends IOApp.Simple: def app: Resource[IO, Unit] = for - config <- AppConfig.load.toResource + config <- AppConfig.load().toResource _ <- Logger[IO].info(s"Starting lila-fishnet with config: $config").toResource - _ <- KamonInitiator.apply.init(config.kamon).toResource + _ <- KamonInitiator.apply().init(config.kamon).toResource res <- AppResources.instance(config.redis) _ <- FishnetApp(res, config).run() yield () @@ -34,4 +34,4 @@ class FishnetApp(res: AppResources, config: AppConfig)(using Logger[IO]): private def createExecutor: Resource[IO, Executor] = val lilaClient = LilaClient(res.redisPubsub) val repository = StateRepository.instance(config.repository.path) - Executor.instance(lilaClient, repository, Monitor(), config.executor) + Monitor().toResource.flatMap(Executor.instance(lilaClient, repository, _, config.executor)) diff --git a/app/src/main/scala/AppConfig.scala b/app/src/main/scala/AppConfig.scala index a025c5d..710a7da 100644 --- a/app/src/main/scala/AppConfig.scala +++ b/app/src/main/scala/AppConfig.scala @@ -8,7 +8,7 @@ import com.comcast.ip4s.* object AppConfig: - def load: IO[AppConfig] = appConfig.load[IO] + def load(): IO[AppConfig] = appConfig.load[IO] def appConfig = ( RedisConfig.config, diff --git a/app/src/main/scala/KamonInitiator.scala b/app/src/main/scala/KamonInitiator.scala index 0bb49a0..86204d7 100644 --- a/app/src/main/scala/KamonInitiator.scala +++ b/app/src/main/scala/KamonInitiator.scala @@ -8,6 +8,6 @@ trait KamonInitiator: def init(config: KamonConfig): IO[Unit] object KamonInitiator: - def apply: KamonInitiator = new: + def apply(): KamonInitiator = new: def init(config: KamonConfig): IO[Unit] = - IO(Kamon.init()).whenA(config.enabled) + IO.blocking(Kamon.init()).whenA(config.enabled) diff --git a/app/src/main/scala/Monitor.scala b/app/src/main/scala/Monitor.scala index 35e785e..a66367e 100644 --- a/app/src/main/scala/Monitor.scala +++ b/app/src/main/scala/Monitor.scala @@ -1,6 +1,7 @@ package lila.fishnet import cats.effect.IO +import cats.syntax.all.* import kamon.Kamon import kamon.metric.Timer @@ -20,17 +21,25 @@ object Monitor: val lvl8AcquiredTimeRequest = Kamon.timer("move.acquired.lvl8").withoutTags() val lvl1FullTimeRequest = Kamon.timer("move.full.lvl1").withoutTags() - def apply(): Monitor = new: - def success(work: Work.Task): IO[Unit] = - IO.realTimeInstant.map: now => - if work.request.level == 8 then - work.acquiredAt.foreach(at => record(lvl8AcquiredTimeRequest, at, now)) - if work.request.level == 1 then record(lvl1FullTimeRequest, work.createdAt, now) - - def updateSize(state: AppState): IO[Unit] = - IO(dbSize.update(state.size.toDouble)) *> - IO(dbQueued.update(state.count(_.nonAcquired).toDouble)) *> - IO(dbAcquired.update(state.count(_.isAcquired).toDouble)).void - - private def record(timer: Timer, start: Instant, end: Instant): Unit = - val _ = timer.record(start.until(end, ChronoUnit.MILLIS), TimeUnit.MILLISECONDS) + def apply(): IO[Monitor] = + ( + IO.blocking(Kamon.gauge("db.size").withoutTags()), + IO.blocking(Kamon.gauge("db.queued").withoutTags()), + IO.blocking(Kamon.gauge("db.acquired").withoutTags()), + IO.blocking(Kamon.timer("move.acquired.lvl8").withoutTags()), + IO.blocking(Kamon.timer("move.full.lvl1").withoutTags()) + ).parMapN: (dbSize, dbQueued, dbAcquired, lvl8AcquiredTimeRequest, lvl1FullTimeRequest) => + new Monitor: + def success(work: Work.Task): IO[Unit] = + IO.realTimeInstant.map: now => + if work.request.level == 8 then + work.acquiredAt.foreach(at => record(lvl8AcquiredTimeRequest, at, now)) + else if work.request.level == 1 then record(lvl1FullTimeRequest, work.createdAt, now) + + def updateSize(state: AppState): IO[Unit] = + IO(dbSize.update(state.size.toDouble)) *> + IO(dbQueued.update(state.count(_.nonAcquired).toDouble)) *> + IO(dbAcquired.update(state.count(_.isAcquired).toDouble)).void + + private def record(timer: Timer, start: Instant, end: Instant): Unit = + val _ = timer.record(start.until(end, ChronoUnit.MILLIS), TimeUnit.MILLISECONDS)