Skip to content

Commit

Permalink
Impelement Executor + 1 test
Browse files Browse the repository at this point in the history
  • Loading branch information
lenguyenthanh committed Nov 15, 2023
1 parent 146b72b commit 9b1c0bb
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 18 deletions.
71 changes: 71 additions & 0 deletions app/src/main/scala/Executor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package lila.fishnet

import cats.syntax.all.*
import cats.effect.IO
import cats.effect.std.Queue
import cats.effect.kernel.Ref
import lila.fishnet.Work.Acquired
import org.joda.time.DateTime
import lila.fishnet.Work.Move

trait FishnetClient:
def acquire(accquire: MoveDb.Acquire): IO[Option[Work]]
def move(workId: Work.Id): IO[Option[Lila.Move]]

trait LilaClient:
def send(move: Lila.Move): IO[Unit]

trait Executor:
// get a move from the queue return Work
def acquire(accquire: MoveDb.Acquire): IO[Option[Work.Move]]
// get Work from Map => send to lila
def move(workId: Work.Id, result: Fishnet.PostMove): IO[Unit]
def add(work: Work.Move): IO[Unit]

object Executor:

def instance(client: LilaClient): IO[Executor] =
for
workQueue <- Queue.unbounded[IO, Work.Move]
waitingQueue <-
Ref.of[IO, Map[Work.Id, Work.Move]](Map.empty) // Verify concurrent access with AtomicCell
yield new Executor:
type State = Map[Work.Id, Work.Move]

def add(work: Work.Move): IO[Unit] =
workQueue.offer(work)

def acquire(accquire: MoveDb.Acquire): IO[Option[Work.Move]] =
for
work <- workQueue.tryTake
acquiredWork = work.map(_.copy(acquired = Work.Acquired(accquire.clientKey, DateTime.now).some))
_ <- acquiredWork.fold(IO.unit)(w => waitingQueue.update(_ + (w.id -> w)))
yield acquiredWork

def move(id: Work.Id, result: Fishnet.PostMove): IO[Unit] =
waitingQueue.flatModify: m =>
m.get(id).fold(m -> notFound(id, result.fishnet.apikey).void): work =>
if work.isAcquiredBy(result.fishnet.apikey) then
result.move.uci match
case Some(uci) =>
val move = Lila.Move(work.game, uci)
(m - id) -> client.send(move)
case _ => updateOrGiveUp(m, work.invalid)
else
m -> notAcquired(work, result.fishnet.apikey)

def updateOrGiveUp(state: State, move: Work.Move): (State, IO[Unit]) =
val newState = state - move.id
val io = if move.isOutOfTries then
workQueue.offer(move)
else
IO.unit
newState -> io

// report not found
def notFound(id: Work.Id, key: ClientKey): IO[Unit] =
IO.println(s"not found $id, $key")

// report not acquired
def notAcquired(work: Work.Move, key: ClientKey): IO[Unit] =
IO.println(s"not acquired $work, $key")
4 changes: 2 additions & 2 deletions app/Lila.scala → app/src/main/scala/Lila.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ object Lila:
def readClock(s: String) =
s split ' ' match
case Array(ws, bs, incs) =>
for {
for
wtime <- ws.toIntOption
btime <- bs.toIntOption
inc <- incs.toIntOption
} yield Work.Clock(wtime, btime, inc)
yield Work.Clock(wtime, btime, inc)
case _ => None
18 changes: 9 additions & 9 deletions app/MoveDb.scala → app/src/main/scala/MoveDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import scala.concurrent.duration.*

object MoveDb:

private case class Add(move: Work.Move)
private case class Acquire(clientKey: ClientKey)
case class Add(move: Work.Move)
case class Acquire(clientKey: ClientKey)
// private case class PostResult(workId: Work.Id, data: JsonApi.Request.PostMove)
private object Clean
object Clean

final private class Monitor:
private class Monitor:

val dbSize = Kamon.gauge("db.size").withoutTags()
val dbQueued = Kamon.gauge("db.queued").withoutTags()
Expand All @@ -22,10 +22,11 @@ object MoveDb:

def success(work: Work.Move) =
val now = System.currentTimeMillis
if (work.level == 8) work.acquiredAt foreach { acquiredAt =>
lvl8AcquiredTimeRequest.record(now - acquiredAt.getMillis, TimeUnit.MILLISECONDS)
}
if (work.level == 1)
if work.level == 8 then
work.acquiredAt foreach { acquiredAt =>
lvl8AcquiredTimeRequest.record(now - acquiredAt.getMillis, TimeUnit.MILLISECONDS)
}
if work.level == 1 then
lvl1FullTimeRequest.record(now - work.createdAt.getMillis, TimeUnit.MILLISECONDS)

def failure(work: Work, clientKey: ClientKey, e: Exception) = {
Expand All @@ -41,4 +42,3 @@ object MoveDb:
// s"Received unacquired move ${work.id} for ${work.game.id} by $clientKey. Work current tries: ${work.tries} acquired: ${work.acquired}"
// )
}

12 changes: 6 additions & 6 deletions app/Work.scala → app/src/main/scala/Work.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ sealed trait Work:
def id = _id

def acquiredAt = acquired.map(_.date)
def acquiredByKey = acquired.map(_.clientKey)
def acquiredByKey: Option[ClientKey] = acquired.map(_.clientKey)
def isAcquiredBy(clientKey: ClientKey) = acquiredByKey contains clientKey
def isAcquired = acquired.isDefined
def nonAcquired = !isAcquired
Expand All @@ -29,7 +29,7 @@ object Work:

case class Acquired(
clientKey: ClientKey,
date: DateTime
date: DateTime,
):

def ageInMillis = System.currentTimeMillis - date.getMillis
Expand All @@ -40,7 +40,7 @@ object Work:
id: String, // can be a study chapter ID, if studyId is set
initialFen: Option[Fen.Epd],
variant: Variant,
moves: String
moves: String,
)

case class Clock(wtime: Int, btime: Int, inc: Int)
Expand All @@ -53,19 +53,19 @@ object Work:
tries: Int,
lastTryByKey: Option[ClientKey],
acquired: Option[Acquired],
createdAt: DateTime
createdAt: DateTime,
) extends Work:

def assignTo(clientKey: ClientKey) =
copy(
acquired = Some(
Acquired(
clientKey = clientKey,
date = DateTime.now
date = DateTime.now,
)
),
lastTryByKey = Some(clientKey),
tries = tries + 1
tries = tries + 1,
)

def timeout = copy(acquired = None)
Expand Down
10 changes: 10 additions & 0 deletions app/model.scala → app/src/main/scala/model.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
package lila.fishnet

import chess.format.Uci

trait StringValue extends Any:
def value: String
override def toString = value

trait IntValue extends Any:
def value: Int
override def toString = value.toString

case class IpAddress(value: String) extends AnyVal with StringValue

case class ClientKey(value: String) extends AnyVal with StringValue

object Fishnet:
case class Fishnet(apikey: ClientKey)
case class Acquire(fishnet: Fishnet)
case class PostMove(fishnet: Fishnet, move: MoveResult)
case class MoveResult(bestmove: String):
def uci: Option[Uci] = Uci(bestmove)
46 changes: 46 additions & 0 deletions app/src/test/scala/ExecutorTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package lila.fishnet

import weaver.*
import weaver.scalacheck.Checkers
import cats.effect.IO
import cats.effect.kernel.Ref
import org.joda.time.DateTime
import monocle.syntax.all.*

object ExecutorTest extends SimpleIOSuite with Checkers:

val work = Work.Move(
_id = Work.Id("id"),
game = Work.Game(
id = "id",
initialFen = None,
variant = chess.variant.Standard,
moves = "",
),
level = 1,
clock = None,
tries = 0,
lastTryByKey = None,
acquired = None,
createdAt = DateTime.now,
)

val key = ClientKey("key")

test("add => acqurired => work"):
for
executor <- createExecutor()
_ <- executor.add(work)
acquiredOption <- executor.acquire(MoveDb.Acquire(ClientKey("key")))
acquired = acquiredOption.get
yield assert(acquired.acquired.get.clientKey == key) `and` assert(acquired.copy(acquired = None) == work)

def createExecutor(): IO[Executor] =
createLilaClient.flatMap(Executor.instance)

def createLilaClient: IO[LilaClient] =
Ref.of[IO, List[Lila.Move]](Nil)
.map: ref =>
new LilaClient:
def send(move: Lila.Move): IO[Unit] =
ref.update(_ :+ move)
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ inThisBuild(
)

lazy val app = project
.in(file("app"))
.settings(
scalacOptions -= "-Xfatal-warnings",
scalacOptions ++= Seq("-source:future", "-rewrite", "-indent", "-explain", "-Wunused:all"),
Expand All @@ -25,12 +26,13 @@ lazy val app = project
cirisCore,
cirisHtt4s,
fs2,
http4sClient,
jodaTime,
kamonCore,
kamonInflux,
kamonSystemMetrics,
http4sClient,
log4Cats,
monocleCore,
weaver,
weaverScalaCheck,
),
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ object Dependencies {
val kamonInflux = "io.kamon" %% "kamon-influxdb" % V.kamon
val kamonSystemMetrics = "io.kamon" %% "kamon-system-metrics" % V.kamon

val monocleCore = "dev.optics" %% "monocle-core" % "3.2.0"

val http4sDsl = http4s("dsl")
val http4sServer = http4s("ember-server")
val http4sClient = http4s("ember-client")
Expand Down

0 comments on commit 9b1c0bb

Please sign in to comment.