Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make kamon calls blocking #350

Merged
merged 1 commit into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions app/src/main/scala/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ object App extends IOApp.Simple:

def app: Resource[IO, Unit] =
for
config <- AppConfig.load.toResource
config <- AppConfig.load().toResource
_ <- Logger[IO].info(s"Starting lila-fishnet with config: $config").toResource
_ <- KamonInitiator.apply.init(config.kamon).toResource
_ <- KamonInitiator.apply().init(config.kamon).toResource
res <- AppResources.instance(config.redis)
_ <- FishnetApp(res, config).run()
yield ()
Expand All @@ -34,4 +34,4 @@ class FishnetApp(res: AppResources, config: AppConfig)(using Logger[IO]):
private def createExecutor: Resource[IO, Executor] =
val lilaClient = LilaClient(res.redisPubsub)
val repository = StateRepository.instance(config.repository.path)
Executor.instance(lilaClient, repository, Monitor(), config.executor)
Monitor().toResource.flatMap(Executor.instance(lilaClient, repository, _, config.executor))
2 changes: 1 addition & 1 deletion app/src/main/scala/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.comcast.ip4s.*

object AppConfig:

def load: IO[AppConfig] = appConfig.load[IO]
def load(): IO[AppConfig] = appConfig.load[IO]

def appConfig = (
RedisConfig.config,
Expand Down
4 changes: 2 additions & 2 deletions app/src/main/scala/KamonInitiator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ trait KamonInitiator:
def init(config: KamonConfig): IO[Unit]

object KamonInitiator:
def apply: KamonInitiator = new:
def apply(): KamonInitiator = new:
def init(config: KamonConfig): IO[Unit] =
IO(Kamon.init()).whenA(config.enabled)
IO.blocking(Kamon.init()).whenA(config.enabled)
37 changes: 23 additions & 14 deletions app/src/main/scala/Monitor.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lila.fishnet

import cats.effect.IO
import cats.syntax.all.*
import kamon.Kamon
import kamon.metric.Timer

Expand All @@ -20,17 +21,25 @@ object Monitor:
val lvl8AcquiredTimeRequest = Kamon.timer("move.acquired.lvl8").withoutTags()
val lvl1FullTimeRequest = Kamon.timer("move.full.lvl1").withoutTags()

def apply(): Monitor = new:
def success(work: Work.Task): IO[Unit] =
IO.realTimeInstant.map: now =>
if work.request.level == 8 then
work.acquiredAt.foreach(at => record(lvl8AcquiredTimeRequest, at, now))
if work.request.level == 1 then record(lvl1FullTimeRequest, work.createdAt, now)

def updateSize(state: AppState): IO[Unit] =
IO(dbSize.update(state.size.toDouble)) *>
IO(dbQueued.update(state.count(_.nonAcquired).toDouble)) *>
IO(dbAcquired.update(state.count(_.isAcquired).toDouble)).void

private def record(timer: Timer, start: Instant, end: Instant): Unit =
val _ = timer.record(start.until(end, ChronoUnit.MILLIS), TimeUnit.MILLISECONDS)
def apply(): IO[Monitor] =
(
IO.blocking(Kamon.gauge("db.size").withoutTags()),
IO.blocking(Kamon.gauge("db.queued").withoutTags()),
IO.blocking(Kamon.gauge("db.acquired").withoutTags()),
IO.blocking(Kamon.timer("move.acquired.lvl8").withoutTags()),
IO.blocking(Kamon.timer("move.full.lvl1").withoutTags())
).parMapN: (dbSize, dbQueued, dbAcquired, lvl8AcquiredTimeRequest, lvl1FullTimeRequest) =>
new Monitor:
def success(work: Work.Task): IO[Unit] =
IO.realTimeInstant.map: now =>
if work.request.level == 8 then
work.acquiredAt.foreach(at => record(lvl8AcquiredTimeRequest, at, now))
else if work.request.level == 1 then record(lvl1FullTimeRequest, work.createdAt, now)

def updateSize(state: AppState): IO[Unit] =
IO(dbSize.update(state.size.toDouble)) *>
IO(dbQueued.update(state.count(_.nonAcquired).toDouble)) *>
IO(dbAcquired.update(state.count(_.isAcquired).toDouble)).void

private def record(timer: Timer, start: Instant, end: Instant): Unit =
val _ = timer.record(start.until(end, ChronoUnit.MILLIS), TimeUnit.MILLISECONDS)