Skip to content

Commit

Permalink
Merge pull request #1979 from dedis/fix-be2-daniel-send-state
Browse files Browse the repository at this point in the history
[BE2] fix send state
  • Loading branch information
sgueissa authored Jun 30, 2024
2 parents a2b1d53 + 3f0eeff commit 2ab5e66
Showing 1 changed file with 34 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.pattern.{AskableActorRef, ask}
import akka.stream.scaladsl.Flow
import ch.epfl.pop.decentralized.GossipManager.TriggerPullState
import ch.epfl.pop.model.network.MethodType.rumor_state
import ch.epfl.pop.model.network.method.{Rumor, RumorState}
import ch.epfl.pop.model.network.method.{GreetServer, Rumor, RumorState}
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.network.method.message.data.ActionType
import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse, MethodType}
Expand Down Expand Up @@ -163,27 +163,36 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
case None => -1
}

private def sendRumorState(): Unit = {
val randomPeer = connectionMediatorRef ? ConnectionMediator.GetRandomPeer()
Await.result(randomPeer, duration) match {
case ConnectionMediator.GetRandomPeerAck(serverRef, greetServer) =>
val rumorStateGet = dbActorRef ? GetRumorState()
Await.result(rumorStateGet, duration) match
case DbActorGetRumorStateAck(rumorState) =>
log.info(s"Sending rumor_state ${rumorState.state} to ${greetServer.serverAddress}")
serverRef ! ClientAnswer(
Right(JsonRpcRequest(
RpcValidator.JSON_RPC_VERSION,
rumor_state,
rumorState,
Some(jsonId)
))
)
jsonId += 1
case _ => log.error(s"Actor $self failed on creating rumor state. State wasn't gossiped.")
case m @ _ =>
log.warning(s"Received an unexpected message $m waiting for a random peer")
}
private def sendRumorState(actorRefDest: ActorRef = ActorRef.noSender, greetServerDest: Option[GreetServer] = None): Unit = {
var serverRef = ActorRef.noSender
var greetServer_ : Option[GreetServer] = None
if actorRefDest == Actor.noSender then
val randomPeerGet = connectionMediatorRef ? ConnectionMediator.GetRandomPeer()
serverRef = Await.result(randomPeerGet, duration) match {
case ConnectionMediator.GetRandomPeerAck(serverRef, greetServer) =>
greetServer_ = Some(greetServer)
serverRef
case m @ _ =>
log.warning(s"Received an unexpected message $m waiting for a random peer")
serverRef
}
else
serverRef = actorRefDest
greetServer_ = greetServerDest
val rumorStateGet = dbActorRef ? GetRumorState()
Await.result(rumorStateGet, duration) match
case DbActorGetRumorStateAck(rumorState) =>
log.info(s"Sending rumor_state ${rumorState.state} to ${if greetServer_.isDefined then greetServer_.get.serverAddress else ""}")
serverRef ! ClientAnswer(
Right(JsonRpcRequest(
RpcValidator.JSON_RPC_VERSION,
rumor_state,
rumorState,
Some(jsonId)
))
)
jsonId += 1
case _ => log.error(s"Actor $self failed on creating rumor state. State wasn't gossiped.")
}

private def peersAlreadyReceived(jsonRpcRequest: JsonRpcRequest): Set[ActorRef] = {
Expand Down Expand Up @@ -225,6 +234,9 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
case Monitor.NoServerConnected =>
timers.cancel(periodicRumorStateKey)

case ConnectionMediator.NewServerConnected(serverRef, greetServer) =>
sendRumorState(serverRef, Some(greetServer))

case TriggerPullState() =>
sendRumorState()

Expand Down

0 comments on commit 2ab5e66

Please sign in to comment.