-
Notifications
You must be signed in to change notification settings - Fork 7
4. Game Process
It's time to start building the primary communication protocol between the client and the server. For this workshop, we have decided to use WebSockets.
WebSockets is a communication protocol that provides full-duplex communication channels over a single TCP connection. It enables real-time communication between a client (typically a web browser) and a server, allowing both parties to send messages to each other at any time.
Unlike the traditional HTTP request-response model, where the client sends a request, and the server responds, WebSockets allow for persistent connections. Once the WebSocket connection is established, it remains open, allowing for bi-directional communication. This real-time capability makes WebSockets suitable for various scenarios that require instant data updates or interactive communication.
In our example, WebSockets are an excellent choice since one use case is applications requiring real-time updates. WebSockets allow the server to push updates to the client instantly, eliminating the need for the client to poll the server for new information repeatedly.
In general terms, WebSockets provide a more efficient and responsive alternative to traditional HTTP-based communication when real-time, interactive, or continuous data updates are required.
b9892a1d7afc81f4530e6aaf04db5ae293de91be
|
---|
In this exercise, we will implement a WebSocket endpoint in our server.
Open the file modules/server/src/main/scala/scaladays/Server.scala
. As you can see, we have changed the call to withHttpApp
in the EmberServerBuilder
object with withHttpWebSocketApp
:
def withHttpWebSocketApp(f: WebSocketBuilder[F] => HttpApp[F]): EmberServerBuilder[F]
This method allows us to add HTTP endpoints and also construct WebSocket endpoints with the param WebSocketBuilder
. This value is passed now to our WebServer.routes
. Let's move to that file (modules/server/src/main/scala/scaladays/server/WebServer.scala
.
For this exercise, we have implemented the structure of the endpoint:
case GET -> Root / "player" / PlayerId(playerId) / "join" =>
val send: fs2.Stream[F, WebSocketFrame] = sendResponse(playerId)
val receive: fs2.Pipe[F, WebSocketFrame, Unit] =
in => in.evalMap(processClientMessage(playerId, _))
ws.build(send, receive)
But as you can see, it's pretty easy. We need to provide three parts:
- And path definition:
case GET -> Root / "player" / PlayerId(playerId) / "join"
. - A
send
method of typefs2.Stream[F, WebSocketFrame]
. It represents the outgoing stream of messages that should be sent to the client. - A
receive
method of typefs2.Pipe[F, WebSocketFrame, Unit]
. It is a sink to which the framework will push the incoming WebSocket messages.
Now it's your turn. First, implement the processClientMessage
to log the player id with the body sent by the client. You can use the Logger
instance. WebSocketFrame
is an abstract class, be sure to handle all the possible cases for this scenario (text and close).
Secondly, implement the sendResponse
with an infinite stream that will keep the connection open by continuously sending a message every 500 milliseconds.
Once finished, you can test your endpoint with the following URI, for example: ws://127.0.0.1:28082/player/7f9c3605-b0b8-4235-8c86-f15fbc11332d/join
. To do that, you can use Postman or the command websocat
:
websocat -t ws://127.0.0.1:28082/player/7f9c3605-b0b8-4235-8c86-f15fbc11332d/join
Solution commit
https://github.com/xebia-functional/tictactoe/commit/9b967a96e20806f65ef7660c6290e25472bc125d
In Apache Kafka, a compacted topic refers to a particular type of topic configuration that retains only the latest value for each key in the topic. This is implemented with a log compaction process that ensures the log retains only the most recent record for each key, discarding older versions of the same key. This behavior differs from a regular Kafka topic, where all messages are typically retained.
Compacted topics are commonly used in scenarios where you want to maintain a current snapshot or summary of the data associated with each key, rather than storing every update. Compacted topics are commonly used in the following scenarios:
-
Event sourcing: They store the latest state of each entity or aggregate. Each event represents a change to the entity, and the compacted topic ensures that only the most recent state for each entity is stored.
-
Change data capture (CDC): CDC is a technique that captures and propagates data changes from one system to another. With compacted topics, change events are stored, ensuring only the latest changes for each record key are retained.
-
Database changelogs: As you can guess, compacted topics can efficiently store changelogs for databases.
45f26206b74a27c4b89663c2d7de1a5da144f71d
|
---|
Compacted topics are helpful in our following case, where we will store players id
s waiting to start a new game. We only need the latest state of this entity.
Firstly, go to the file modules/server/src/main/scala/scaladays/config/KafkaSetup.scala
. Here we implemented the createTopicUnless
method, remember. Now it's time to implement createCompactedTopicUnless
. In this exercise, you need to:
- Implement the
createCompactedTopicUnless
method. You need to find a way of passing thecleanup.policy
property to the new topic creation. - Call the newly implemented method in the
bootTopics
.
The second part of this exercise is a bit tricky. As part of the commit, we have added some code, including the Consumer
implementation. This consumer will receive events from the topics and communicate the state to the client.
Consumers are identified and managed with two ids, client id, and group id.
- Client Id: Identifies an individual Kafka client or consumer. Each consumer within a group should have a unique client id.
- Group Id: Identifies a consumer group comprising one or multiple consumers. All consumers within a group should have the same group id.
One consequence is that multiple consumers sharing the same group id will receive different messages, as the group is considered a single logical entity. In our exercise, we process the events of a client by creating a consumer per WebSocket and connecting to a specific topic. Consequently, we need to specify the group id when creating the consumer and ensure every consumer receives all the messages from the topic.
Go ahead and update the consumerSettings
method in modules/server/src/main/scala/scaladays/kafka/Consumer.scala
to set the group id to the proper value.
Solution commit
https://github.com/xebia-functional/tictactoe/commit/c025770e95a2868d94428a99959a73a8a10f785c
c025770e95a2868d94428a99959a73a8a10f785c
|
---|
EventHandler
is the critical piece of our connection between the client and the server. It needs to:
- Create a consumer per client
- Consume all game events
- Filter in those that are related to the current client
In this exercise, we're going to tie some pieces together. We want to connect our server stream to the client's WebSocket. We will use the TTTServer
algebra to connect the different parts of our code.
Go to the modules/server/src/main/scala/scaladays/server/TTTServer.scala
and create a method definition in the trait
with the following signature:
PlayerId => fs2.Stream[F, Game]
Luckily, there's a method in EventHandler
with that exactly signature. Implement the method just with a call to EventHandler. processEvent
. We don't need to do anything special here.
The second step is to go to the EventHandler
(modules/server/src/main/scala/scaladays/kafka/EventHandler.scala
) and implement processEvent
. Be sure your implementation fulfills the following:
- Creates a consumer with a different group id per client.
- Filter out
Game
events unrelated to the player in the scope.
The third step is to use the TTTServer
method implemented in the first step in the WebServer
(modules/server/src/main/scala/scaladays/server/WebServer.scala
). Find the sendResponse
method and change our dummy implementation with an actual implementation that:
- Passes the input to the
TTTServer
method. - Maps the result into a
WebSocketFrame
of typeText
. The body needs to be a JSON representation of theGame
.
Solution commit
https://github.com/xebia-functional/tictactoe/commit/11d3b2442cdb2150f983ba2116faa0feda18832d
Now it's time to implement our second kafka-streams logic. The WaitingStream
will perform the following actions:
- Read players from the input topic and put them in the player topic -> "Player waiting for a game"
- Read players from the player topic and pair two in an event in the input topic -> "Found a potential match"
- Read started games from the input topic and remove both players from the player topic -> "No longer waiting, now playing"
693f4de289a334beb28360011af451c51aa61f47
|
---|
For this exercise, we must implement the latest logic in the WaitingStream
(read started games from the input topic and remove both players from the player topic) with their corresponding Serdes
.
First, let's review the current code state after the new commit.
- In the
WebServer
(modules/server/src/main/scala/scaladays/server/WebServer.scala
), we redirect every action from the client to our algebraTTTServer
. - In the
TTTServer
(modules/server/src/main/scala/scaladays/server/TTTServer.scala
), we have added one case,JoinGame
, that makes a call toEventStorage.waitForMatch
. This creates aWaitingForMatch
event in the input topic underhood. You can check the code in theEventStorage
(modules/server/src/main/scala/scaladays/kafka/EventStorage.scala
). - Added
PlayerMatchTransformer
(modules/server/src/main/scala/scaladays/kafka/stream/PlayerMatchTransformer.scala
). This is a Kafka transformer whose main task is to keep a key-value storage (playerMatch
) updated with the latest player id. - Added
WaitingPlayerStream
(modules/server/src/main/scala/scaladays/kafka/stream/WaitingPlayerStream.scala
) with the implementation of two of the three logic: A) Read players from the input topic and put them in the player topic (waitingPlayerToPlayerStream
) B) Read players from the player topic and pair two in an event in the input topic (playerToMatchStream
)
Particular emphasis on playerToMatchStream
. This code has a code comment that explains some of the particularities of working with the kafka-streams API. You'll be facing these problems where the Scala and Java API are mixed, creating some strange behaviors when not handled carefully.
Now it's your turn. First, implement the deletePlayersAfterCreateMatch
with the following logic: read started games from the input topic and remove both players from the player topic. Remember that TTTEvent
contains an Event
field indicating the type of event. Check the source code at modules/server/src/main/scala/scaladays/kafka/messages/Events.scala
.
Once implemented, if we try to compile, we'll get errors like the following one:
[error] |No given instance of type org.apache.kafka.streams.kstream.Produced[scaladays.models.ids.PlayerId,
[error] | scaladays.models.ids.PlayerId
[error] |] was found for parameter produced of method to in class KStream.
This is because we now need more serdes instances. Concretely, the key serdes for PlayerId
and String
.
Open the VulcanSerdes
(modules/server/src/main/scala/scaladays/kafka/codecs/VulcanSerdes.scala
) and add those needed serdes.
Solution commit
https://github.com/xebia-functional/tictactoe/commit/33b20e0589e2105bf6c2e0d1c1e97c2a790db56f
As we did with the Login process, now it is convenient to analyze how the client expects to communicate with the server and what the parties involved in the Game process are.
4f564f31dd7f3c6acbd6524866d8badfecbd2c23
|
---|
The server already has the WebSocket endpoint ready for use by the client app, so let's work to define:
- The possible states necessary to open a WebSocket connection, create a new game, join an existing one, receive a WS error, etc.
- What changes does the Model demand to represent these new states
- What events (
Msg
) do we need to include in order to trigger those new states - How the Tyrian app will react to the new events (
Msg
) - What views will show the new state of the app
- How can we interact with a Tyrian WebSocket via a set of
Cmd
s
Let's continue with the same approach of divide up the exercise into smaller pieces.
First thing first. What are the new states of the app?
State | What is happening? |
---|---|
Before joining a game | The user has not clicked the "Join Game" button, and the WebSocket connection is not even started |
Waiting a game and opening WS | The user has already pressed the "Join Game" button, but the WS Connection client/serve not ready |
Waiting a game but WS is open | The user has already pressed the "Join Game" button, and the WS Connection client/serve is stablished |
WebSocket error received | The server has responded with an error. We don't have any game yet |
Game received through the WS | The server has responded with a registered game. |
If we recap, the current shape of the Model
includes nickname
, player
, and errors
, which don't seem to be enough
to describe completely all the new states. We propose to add the properties:
-
contest: Contest
which will include a eventual game. We usedContest
as a synonymous of Game but avoiding two classes with the same name. -
ws: Option[WebSocket]
which will potentially contain WS connection once is established.
The model might evolve like:
State | Nickname | Player | Errors | Contest | WS |
---|---|---|---|---|---|
Before joining a game | Non empty | Player.Registered |
Empty | Contest.Empty |
None |
Waiting a game and opening WS | Non empty | Player.Registered |
Empty | Contest.InProgress |
None |
Waiting a game but WS is open | Non empty | Player.Registered |
Empty | Contest.InProgress |
Some(ws) |
WebSocket error received | Non empty | Player.Registered |
Non empty | Contest.Empty |
Some(ws) |
Game received through the WS | Non empty | Player.Registered |
Empty | Contest.Registered |
Some(ws) |
- Create a new file
Contest.scala
inmodules/client/src/main/scala/scaladays/models
with the following enum definition:Empty
InProgress(status: String)
Registered(game: Game)
- Edit the file
Model.scala
to:- Describe these changes in the case class
Model
- Update the init
Model
.
- Describe these changes in the case class
Reminder: The event LoginSuccess(playerId: PlayerId)
would bring the Model
to the state "Before joining a game", right?
Let's check what other events (Msg
) we need to add to trigger all the new states:
State | Triggered by the Msg: |
---|---|
Before joining a game |
LoginSuccess(playerId: PlayerId) (existing) |
Waiting a game and opening WS |
WebSocketStatus(Connecting(playerId)) (new) |
Waiting a game but WS is open |
WebSocketStatus(Connected(ws)) (new) |
WebSocket error received |
WebSocketStatus(ConnectionError(error)) (new) |
Game received through the WS |
GameUpdate(game) (new) |
- Edit the file
Messages.scala
to add these events.
Well, now we have new events to which we must react with model evolutions, as follows:
Msg: | Model | Cmd |
---|---|---|
WebSocketStatus(Connecting(playerId)) |
contest = Contest.InProgress |
client.connect(playerId) |
WebSocketStatus(Connected(ws)) |
ws = ws |
Cmd.None |
WebSocketStatus(ConnectionError(error)) |
errors = List(error) |
Cmd.None |
GameUpdate(game) |
contest = Contest.Registered(game) |
Cmd.None |
- Edit the file
Update.scala
to react to these events as described above.
ℹ️ INFO: When the user pushes the Join Game button, the app establishes a WS connection with the server, so the Cmd client.connect(playerId) will be implemented in the part 5 |
---|
As you might expect, we now need to define a view for each possible state the app can adopt. Here are some ideas:
Before joining a game
Waiting a game and opening WS | Waiting a game but WS is open | WebSocket error received
- In the file
WaitingGameView.scala
implement the new views as shown above. - In the file
Main.scala
implement thedef view(model: ModelIO): Html[Msg]
to show the new views accordingly.
We want actually to open a WS connection with the Server.
- Edit the file
ScalaDaysClient.scala
to implement the functiondef connect(playerId: PlayerId): Cmd[F, Msg]
Tip 1: Tyrian WebSocket
At the official Tyrian docs, there is a very similar example where
WebSocket.connect
is used.
Tip 2: The URI of the WebSocket endpoint
As we implemented in the server side, it would be
val uri = wsUri / "player" / playerId.show / "join"
Solution commit
https://github.com/xebia-functional/tictactoe/commit/0609803510456dbe1e8815f9af0f39afb359634e
Now it's time to see what's happening in the background. If two players perform a login and then initiate a "join game" action, we should see how this is materialized in a "start game" event in the input topic.
Follow the next steps for creating two players waiting for a game:
- Open a client
- Fill in a name and click on "Login"
- Click on "Join a game"
- Repete the three steps above to create a different player
Now, let's access to Kafka Mafic. You should see your cluster, if not, refer to Exercise 13 to create it.
Expands the topics section
If you click on the input topic and search for messages, you should be able to verify that new messages are created in the shape we defined in the exercises above. Selecting "Search backward (descending offsets)" will show you the latest messages on top.
You should see different messages like the following:
- Event with a nickname
Message: Object {"time":17013900,"event":{"nickname":"Xebia Rocks"}}
- Event with a nickname and player id
Message: Object {"time":17013914,"event":{"playerId":"f658e2c4-ae21-433c-bd7d-6123d5b956a2","nickname":"Xebia Rocks"}}
- Event for waiting for a Game with two player ids
Message: Object {"time":1685367116509,"event":{"gameId":"4a45eae9-33b8-42a0-823b-5d519d581161","crossPlayer":"02cab447-1abc-4745-a32c-3964e1b274cc","circlePlayer":"f658e2c4-ae21-433c-bd7d-6123d5b956a2"}}
The player topic should also contain the different ids.
With the Game
event adequately created, it's time to implement the server logic for initiating a new game. This will allow the UI to enter a new state, where players will alternate turns until the end of the game is reached successfully or, hopefully not, due to an error in our system.
81ab77940469c7d425f735b137a345f5fb8d8fee
|
---|
First of all, let's review the introduced changes by this commit.
In KafkaSetup
(modules/server/src/main/scala/scaladays/config/KafkaSetup.scala
), we call createCompactedTopicUnless
with the game topic. Since games are modeled as a state machine, we are interested only in the last state of the game.
In KTopology
(modules/server/src/main/scala/scaladays/kafka/KTopology.scala
), we are now applying the kafka streams definition we're going to implement as part of this exercise.
Finally, in GameStream
(modules/server/src/main/scala/scaladays/kafka/stream/GameStream.scala
), we add the signature for the new method.
Open GameStream
in your editor and complete the matchStream
method. You need to:
- Read
StartGame
events from the input topic. - Generate events
Game
with the proper values in the game topic.
As usual, once implemented, we need to add the serdes for the transformations. Run compile
in an SBT console to see which serdes you are missing and add them in the VulcanSerdes
(modules/server/src/main/scala/scaladays/kafka/codecs/VulcanSerdes.scala
).
If you retry the Kafka Magic exercise, you should start seeing some Game
events in the game topic.
Solution commit
https://github.com/xebia-functional/tictactoe/commit/4593294adcc81b13611af3d3705e798f0ad6b692
In the client, we have provided the app with the ability to create WS connections, and, consequently, to create new games. However, given the two-way nature of these types of connections, we are not yet processing the events of new games created on the server. And that is what we are going to address in this section.
At this point, we must introduce a nice ability that Tyrian implements to listen to events of different kinds. And Tyrian's documentation defines it as:
Subscriptions (Subs) are used to observe something that changes over time, and to emit discrete messages when something happens.
The Tyrian.WebSocket
lets us generate new subscriptions (Sub[F, Msg]
) by passing a transformation from WebSocketEvent
to Msg
. Something like def subscribe[Msg](f: WebSocketEvent => Msg): Sub[F, Msg]
, which is extremely convenient
because we have a potential WebSocket
in our Model.
0b7a26ecbb0d40e29a873728cc0e14cefebc197a
|
---|
In the Main.scala
we already are checking if our Model
contains a WebSocket
in order to observe its events:
def subscriptions(model: ModelIO): Sub[IO, Msg] =
model.ws.fold(Sub.emit(Msg.WebSocketStatus(WebSocketMessage.WebSocketStatus.Disconnected)))(ws =>
scalaDaysClient.handleWebSocket(ws)
)
But now it's time to implement scalaDaysClient.handleWebSocket(ws)
(from ScalaDaysClient.scala
) having these tips into account:
- Remember the signature
def subscribe[Msg](f: WebSocketEvent => Msg): Sub[F, Msg]
- We are going to react to
WebSocketEvent.Receive(message)
by decoding the message asGame
.
- If works, then we emit
Msg.GameUpdate(game)
- If not. We emit
WebSocketStatus.ConnectionError
as aMsg
.- If we receives
WebSocketEvent.Heartbeat
orWebSocketEvent.Open
we emitWebSocketStatus.Nop
as aMsg
.- Finally, we emit
WebSocketStatus.ConnectionError(WebSocketError("Unknown websocket message"))
for the rest of incoming events.
In the Update.scala
we want to react to the new event Msg.GameUpdate(game)
. Please make sure the Model
evolves
with contest = Contest.Registered(game)
when we receive a Msg.GameUpdate(game)
.
The views for the Game are something like these:
The user's turn | The opponent's turn |
---|---|
The Hmlt[Msg]
for these views are not trivial, so we have partially implemented them at GameView.scala
. But there is
still one missing part. The method def CellView(position: Position): Html[Msg]
needs implementation, and renders a button
for the given position.
Tip 1: Enabled or Disabled
The button should be disabled if it not your turn, or if there is a movement at this position.
Tip 2: OnClick when the button is enabled
When the button is enabled, the onClick action should send a
Msg.RequestNewMovement(game, newMovement)
(which is a new Msg what we'll use in the next section.
Solution commit
https://github.com/xebia-functional/tictactoe/commit/ecaaad8f32356bd73285d3f036cc0b3f04afec24