From 3d4d5d120aacc3e81375cd3616837481f873ec79 Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Wed, 15 Nov 2023 23:50:59 +0100 Subject: [PATCH] Impelement Executor + 1 test --- app/src/main/scala/Executor.scala | 71 +++++++++++++++++++++++++++ app/{ => src/main/scala}/Lila.scala | 4 +- app/{ => src/main/scala}/MoveDb.scala | 18 +++---- app/{ => src/main/scala}/Work.scala | 12 ++--- app/{ => src/main/scala}/model.scala | 10 ++++ app/src/test/scala/ExecutorTest.scala | 46 +++++++++++++++++ build.sbt | 4 +- project/Dependencies.scala | 2 + 8 files changed, 149 insertions(+), 18 deletions(-) create mode 100644 app/src/main/scala/Executor.scala rename app/{ => src/main/scala}/Lila.scala (89%) rename app/{ => src/main/scala}/MoveDb.scala (79%) rename app/{ => src/main/scala}/Work.scala (91%) rename app/{ => src/main/scala}/model.scala (55%) create mode 100644 app/src/test/scala/ExecutorTest.scala diff --git a/app/src/main/scala/Executor.scala b/app/src/main/scala/Executor.scala new file mode 100644 index 0000000..206855c --- /dev/null +++ b/app/src/main/scala/Executor.scala @@ -0,0 +1,71 @@ +package lila.fishnet + +import cats.syntax.all.* +import cats.effect.IO +import cats.effect.std.Queue +import cats.effect.kernel.Ref +import lila.fishnet.Work.Acquired +import org.joda.time.DateTime +import lila.fishnet.Work.Move + +trait FishnetClient: + def acquire(accquire: MoveDb.Acquire): IO[Option[Work]] + def move(workId: Work.Id): IO[Option[Lila.Move]] + +trait LilaClient: + def send(move: Lila.Move): IO[Unit] + +trait Executor: + // get a move from the queue return Work + def acquire(accquire: MoveDb.Acquire): IO[Option[Work.Move]] + // get Work from Map => send to lila + def move(workId: Work.Id, result: Fishnet.PostMove): IO[Unit] + def add(work: Work.Move): IO[Unit] + +object Executor: + + def instance(client: LilaClient): IO[Executor] = + for + workQueue <- Queue.unbounded[IO, Work.Move] + waitingQueue <- + Ref.of[IO, Map[Work.Id, Work.Move]](Map.empty) // Verify concurrent access with AtomicCell + yield new Executor: + type State = Map[Work.Id, Work.Move] + + def add(work: Work.Move): IO[Unit] = + workQueue.offer(work) + + def acquire(accquire: MoveDb.Acquire): IO[Option[Work.Move]] = + for + work <- workQueue.tryTake + acquiredWork = work.map(_.copy(acquired = Work.Acquired(accquire.clientKey, DateTime.now).some)) + _ <- acquiredWork.fold(IO.unit)(w => waitingQueue.update(_ + (w.id -> w))) + yield acquiredWork + + def move(id: Work.Id, result: Fishnet.PostMove): IO[Unit] = + waitingQueue.flatModify: m => + m.get(id).fold(m -> notFound(id, result.fishnet.apikey).void): work => + if work.isAcquiredBy(result.fishnet.apikey) then + result.move.uci match + case Some(uci) => + val move = Lila.Move(work.game, uci) + (m - id) -> client.send(move) + case _ => updateOrGiveUp(m, work.invalid) + else + m -> notAcquired(work, result.fishnet.apikey) + + def updateOrGiveUp(state: State, move: Work.Move): (State, IO[Unit]) = + val newState = state - move.id + val io = if move.isOutOfTries then + workQueue.offer(move) + else + IO.unit + newState -> io + + // report not found + def notFound(id: Work.Id, key: ClientKey): IO[Unit] = + IO.println(s"not found $id, $key") + + // report not acquired + def notAcquired(work: Work.Move, key: ClientKey): IO[Unit] = + IO.println(s"not acquired $work, $key") diff --git a/app/Lila.scala b/app/src/main/scala/Lila.scala similarity index 89% rename from app/Lila.scala rename to app/src/main/scala/Lila.scala index f8f8f64..bc67004 100644 --- a/app/Lila.scala +++ b/app/src/main/scala/Lila.scala @@ -14,9 +14,9 @@ object Lila: def readClock(s: String) = s split ' ' match case Array(ws, bs, incs) => - for { + for wtime <- ws.toIntOption btime <- bs.toIntOption inc <- incs.toIntOption - } yield Work.Clock(wtime, btime, inc) + yield Work.Clock(wtime, btime, inc) case _ => None diff --git a/app/MoveDb.scala b/app/src/main/scala/MoveDb.scala similarity index 79% rename from app/MoveDb.scala rename to app/src/main/scala/MoveDb.scala index c241181..240edab 100644 --- a/app/MoveDb.scala +++ b/app/src/main/scala/MoveDb.scala @@ -7,12 +7,12 @@ import scala.concurrent.duration.* object MoveDb: - private case class Add(move: Work.Move) - private case class Acquire(clientKey: ClientKey) + case class Add(move: Work.Move) + case class Acquire(clientKey: ClientKey) // private case class PostResult(workId: Work.Id, data: JsonApi.Request.PostMove) - private object Clean + object Clean - final private class Monitor: + private class Monitor: val dbSize = Kamon.gauge("db.size").withoutTags() val dbQueued = Kamon.gauge("db.queued").withoutTags() @@ -22,10 +22,11 @@ object MoveDb: def success(work: Work.Move) = val now = System.currentTimeMillis - if (work.level == 8) work.acquiredAt foreach { acquiredAt => - lvl8AcquiredTimeRequest.record(now - acquiredAt.getMillis, TimeUnit.MILLISECONDS) - } - if (work.level == 1) + if work.level == 8 then + work.acquiredAt foreach { acquiredAt => + lvl8AcquiredTimeRequest.record(now - acquiredAt.getMillis, TimeUnit.MILLISECONDS) + } + if work.level == 1 then lvl1FullTimeRequest.record(now - work.createdAt.getMillis, TimeUnit.MILLISECONDS) def failure(work: Work, clientKey: ClientKey, e: Exception) = { @@ -41,4 +42,3 @@ object MoveDb: // s"Received unacquired move ${work.id} for ${work.game.id} by $clientKey. Work current tries: ${work.tries} acquired: ${work.acquired}" // ) } - diff --git a/app/Work.scala b/app/src/main/scala/Work.scala similarity index 91% rename from app/Work.scala rename to app/src/main/scala/Work.scala index 2eded09..9c2f513 100644 --- a/app/Work.scala +++ b/app/src/main/scala/Work.scala @@ -15,7 +15,7 @@ sealed trait Work: def id = _id def acquiredAt = acquired.map(_.date) - def acquiredByKey = acquired.map(_.clientKey) + def acquiredByKey: Option[ClientKey] = acquired.map(_.clientKey) def isAcquiredBy(clientKey: ClientKey) = acquiredByKey contains clientKey def isAcquired = acquired.isDefined def nonAcquired = !isAcquired @@ -29,7 +29,7 @@ object Work: case class Acquired( clientKey: ClientKey, - date: DateTime + date: DateTime, ): def ageInMillis = System.currentTimeMillis - date.getMillis @@ -40,7 +40,7 @@ object Work: id: String, // can be a study chapter ID, if studyId is set initialFen: Option[Fen.Epd], variant: Variant, - moves: String + moves: String, ) case class Clock(wtime: Int, btime: Int, inc: Int) @@ -53,7 +53,7 @@ object Work: tries: Int, lastTryByKey: Option[ClientKey], acquired: Option[Acquired], - createdAt: DateTime + createdAt: DateTime, ) extends Work: def assignTo(clientKey: ClientKey) = @@ -61,11 +61,11 @@ object Work: acquired = Some( Acquired( clientKey = clientKey, - date = DateTime.now + date = DateTime.now, ) ), lastTryByKey = Some(clientKey), - tries = tries + 1 + tries = tries + 1, ) def timeout = copy(acquired = None) diff --git a/app/model.scala b/app/src/main/scala/model.scala similarity index 55% rename from app/model.scala rename to app/src/main/scala/model.scala index f577dfb..cb4d89f 100644 --- a/app/model.scala +++ b/app/src/main/scala/model.scala @@ -1,8 +1,11 @@ package lila.fishnet +import chess.format.Uci + trait StringValue extends Any: def value: String override def toString = value + trait IntValue extends Any: def value: Int override def toString = value.toString @@ -10,3 +13,10 @@ trait IntValue extends Any: case class IpAddress(value: String) extends AnyVal with StringValue case class ClientKey(value: String) extends AnyVal with StringValue + +object Fishnet: + case class Fishnet(apikey: ClientKey) + case class Acquire(fishnet: Fishnet) + case class PostMove(fishnet: Fishnet, move: MoveResult) + case class MoveResult(bestmove: String): + def uci: Option[Uci] = Uci(bestmove) diff --git a/app/src/test/scala/ExecutorTest.scala b/app/src/test/scala/ExecutorTest.scala new file mode 100644 index 0000000..9a18c39 --- /dev/null +++ b/app/src/test/scala/ExecutorTest.scala @@ -0,0 +1,46 @@ +package lila.fishnet + +import weaver.* +import weaver.scalacheck.Checkers +import cats.effect.IO +import cats.effect.kernel.Ref +import org.joda.time.DateTime +import monocle.syntax.all.* + +object ExecutorTest extends SimpleIOSuite with Checkers: + + val work = Work.Move( + _id = Work.Id("id"), + game = Work.Game( + id = "id", + initialFen = None, + variant = chess.variant.Standard, + moves = "", + ), + level = 1, + clock = None, + tries = 0, + lastTryByKey = None, + acquired = None, + createdAt = DateTime.now, + ) + + val key = ClientKey("key") + + test("add => acqurired => work"): + for + executor <- createExecutor() + _ <- executor.add(work) + acquiredOption <- executor.acquire(MoveDb.Acquire(ClientKey("key"))) + acquired = acquiredOption.get + yield assert(acquired.acquired.get.clientKey == key) `and` assert(acquired.copy(acquired = None) == work) + + def createExecutor(): IO[Executor] = + createLilaClient.flatMap(Executor.instance) + + def createLilaClient: IO[LilaClient] = + Ref.of[IO, List[Lila.Move]](Nil) + .map: ref => + new LilaClient: + def send(move: Lila.Move): IO[Unit] = + ref.update(_ :+ move) diff --git a/build.sbt b/build.sbt index 1388afc..f6e7253 100644 --- a/build.sbt +++ b/build.sbt @@ -12,6 +12,7 @@ inThisBuild( ) lazy val app = project + .in(file("app")) .settings( scalacOptions -= "-Xfatal-warnings", scalacOptions ++= Seq("-source:future", "-rewrite", "-indent", "-explain", "-Wunused:all"), @@ -26,12 +27,13 @@ lazy val app = project cirisCore, cirisHtt4s, fs2, + http4sClient, jodaTime, kamonCore, kamonInflux, kamonSystemMetrics, - http4sClient, log4Cats, + monocleCore, weaver, weaverScalaCheck, ), diff --git a/project/Dependencies.scala b/project/Dependencies.scala index dded4a0..6178a66 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -38,6 +38,8 @@ object Dependencies { val kamonInflux = "io.kamon" %% "kamon-influxdb" % V.kamon val kamonSystemMetrics = "io.kamon" %% "kamon-system-metrics" % V.kamon + val monocleCore = "dev.optics" %% "monocle-core" % "3.2.0" + val http4sDsl = http4s("dsl") val http4sServer = http4s("ember-server") val http4sClient = http4s("ember-client")