diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala index 04a07e4b0a..a1a90cdd26 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala @@ -20,7 +20,7 @@ import ch.epfl.pop.storage.DbActor.{DbActorAck, DbActorGetRumorStateAck, DbActor import scala.concurrent.Await import scala.concurrent.duration.{DurationInt, FiniteDuration} -import scala.util.Random +import scala.util.{Random, Success, Try} /** This class is responsible of managing the gossiping of rumors across the network * @param dbActorRef @@ -33,18 +33,23 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou private var activeGossipProtocol: Map[JsonRpcRequest, Set[ActorRef]] = Map.empty private var rumorMap: Map[PublicKey, Int] = Map.empty private var jsonId = 0 - private var publicKey: Option[PublicKey] = None + private var publicKey_ : Option[PublicKey] = None private var connectionMediatorRef: AskableActorRef = _ private val periodicRumorStateKey = 0 - publicKey = { - val readPk = dbActorRef ? DbActor.ReadServerPublicKey() - Await.result(readPk, duration) match - case DbActor.DbActorReadServerPublicKeyAck(pk) => Some(pk) - case _ => - log.error(s"Will not be able to create rumors because it has no publicKey") - None + private def publicKey: Option[PublicKey] = { + if (publicKey_.isDefined) + publicKey_ + else + val readPk = dbActorRef ? DbActor.ReadServerPublicKey() + Try(Await.result(readPk, duration)) match + case Success(DbActor.DbActorReadServerPublicKeyAck(pk)) => + publicKey_ = Some(pk) + publicKey_ + case _ => + log.error(s"Will not be able to create rumors because it has no publicKey") + None } rumorMap = @@ -214,6 +219,7 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou connectionMediatorRef = sender() case Monitor.AtLeastOneServerConnected => + sendRumorState() timers.startTimerWithFixedDelay(periodicRumorStateKey, TriggerPullState(), pullRate) case Monitor.NoServerConnected => diff --git a/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala b/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala index dd4abbda0c..23ccbaa19b 100644 --- a/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala +++ b/be2-scala/src/test/scala/ch/epfl/pop/decentralized/GossipManagerSuite.scala @@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfterEach import scala.concurrent.Await import concurrent.duration.DurationInt +import scala.util.{Failure, Success, Try} class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSystem")) with AnyFunSuiteLike with AskPatternConstants with Matchers with BeforeAndAfterEach { @@ -90,11 +91,14 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys // checks that created a correct rumor from that message and was received by other server val rumor = Rumor(publicKey.get, 0, Map(castVoteRequest.getParamsChannel -> List(castVoteRequest.getParamsMessage.get))) + // ignore state + server.receiveOne(duration) + val receivedMsg = server.receiveOne(duration.mul(5)).asInstanceOf[ClientAnswer] receivedMsg.graphMessage match case Right(jsonRpcRequest: JsonRpcRequest) => jsonRpcRequest.method shouldBe MethodType.rumor - jsonRpcRequest.id shouldBe Some(0) + jsonRpcRequest.id shouldBe Some(1) val jsonRumor = jsonRpcRequest.getParams.asInstanceOf[Rumor] jsonRumor shouldBe rumor case _ => 0 shouldBe 1 @@ -119,6 +123,9 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys val outputCreateRumor = Source.single(decodedInit).via(gossip).runWith(Sink.head) Await.result(outputCreateRumor, duration) + // ignore state + server.receiveOne(duration) + server.expectNoMessage() } @@ -132,6 +139,8 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys // registers a new server connectionMediatorRef ? ConnectionMediator.NewServerConnected(peerServer.ref, GreetServer(PublicKey(Base64Data("")), "", "")) checkPeersWritten(connectionMediatorRef) + // ignore state + peerServer.receiveOne(duration) // emulates receiving a castVote and processes it for (id <- 0 to 4) { @@ -141,10 +150,11 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys Await.result(outputCreateRumor, duration) // checks that created a correct rumor from that message and was received by other server val rumor = Rumor(PublicKey(Base64Data("blabla")), id, Map(castVoteRequest.getParamsChannel -> List(castVoteRequest.getParamsMessage.get))) + val receivedMsg = peerServer.receiveOne(duration).asInstanceOf[ClientAnswer] receivedMsg.graphMessage match case Right(jsonRpcRequest: JsonRpcRequest) => - jsonRpcRequest.id shouldBe Some(id) + jsonRpcRequest.id shouldBe Some(id + 1) jsonRpcRequest.getParams.asInstanceOf[Rumor].rumorId shouldBe id case _ => 0 shouldBe 1 } @@ -160,6 +170,8 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys val peerServer = TestProbe() connectionMediatorRef ? ConnectionMediator.NewServerConnected(peerServer.ref, GreetServer(PublicKey(Base64Data("")), "", "")) checkPeersWritten(connectionMediatorRef) + // ignore state + peerServer.receiveOne(duration) for (id <- 0 to 4) { // emulates receiving a castVote and processes it @@ -175,7 +187,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys val receivedMsg = peerServer.receiveOne(duration).asInstanceOf[ClientAnswer] receivedMsg.graphMessage match case Right(jsonRpcRequest: JsonRpcRequest) => - jsonRpcRequest.id shouldBe Some(id) + jsonRpcRequest.id shouldBe Some(id + 1) jsonRpcRequest.getParams.asInstanceOf[Rumor].rumorId shouldBe id case _ => 0 shouldBe 1 } @@ -193,6 +205,8 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys // registers a new server connectionMediatorRef ? ConnectionMediator.NewServerConnected(peerServer.ref, GreetServer(PublicKey(Base64Data.encode("publicKey")), "", "")) checkPeersWritten(connectionMediatorRef) + // ignore state + peerServer.receiveOne(duration) // emulates receiving a castVote and processes it val outputCreateRumor = Source.single(Right(castVoteRequest)).via(gossip).runWith(Sink.head) @@ -206,7 +220,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys val response = Right(JsonRpcResponse( RpcValidator.JSON_RPC_VERSION, ResultObject(0), - Some(0) + Some(1) )) // by processing the reponse, gossipManager should not send again a rumor to a peer @@ -229,6 +243,8 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys // registers a new server connectionMediatorRef ? ConnectionMediator.NewServerConnected(peerServer.ref, GreetServer(PublicKey(Base64Data.encode("publicKey")), "", "")) checkPeersWritten(connectionMediatorRef) + // ignore state + peerServer.receiveOne(duration) // emulates receiving a castVote and processes it val outputCreateRumor = Source.single(Right(castVoteRequest)).via(gossip).runWith(Sink.head) @@ -262,6 +278,8 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys // register server connectionMediatorRef ? ConnectionMediator.NewServerConnected(peerServer.ref, GreetServer(PublicKey(Base64Data("")), "", "")) checkPeersWritten(connectionMediatorRef) + // ignore state + peerServer.receiveOne(duration) val output = Source.single(Right(rumorRequest)).via(gossipHandler).runWith(Sink.head) @@ -270,7 +288,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys peerServer.receiveOne(duration) match case ClientAnswer(Right(jsonRpcRequest: JsonRpcRequest)) => jsonRpcRequest.method shouldBe MethodType.rumor - jsonRpcRequest.id shouldBe Some(0) + jsonRpcRequest.id shouldBe Some(1) jsonRpcRequest.getParams.asInstanceOf[Rumor] shouldBe rumor case _ => 0 shouldBe 1 } @@ -291,6 +309,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys // register server for (peer <- peers) { connectionMediatorRef ? ConnectionMediator.NewServerConnected(peer.ref, GreetServer(PublicKey(Base64Data("")), "", "")) + peer.receiveOne(duration) } checkPeersWritten(connectionMediatorRef) @@ -298,12 +317,13 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys Await.result(output, duration) + // include rumor state peers.map(_.receiveOne(duration)).count(_ != null) shouldBe 1 } test("gossip handler should send rumor if there is an ongoing gossip protocol") { - val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef)) + val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, pullRate = 30.seconds)) connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry)) val sender = TestProbe() val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref) @@ -321,6 +341,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys var n = 0 for (peer <- peers) { connectionMediatorRef ? ConnectionMediator.NewServerConnected(peer.ref, GreetServer(PublicKey(Base64Data.encode(s"$n")), "", "")) + peer.receiveOne(duration) n += 1 } checkPeersWritten(connectionMediatorRef) @@ -335,8 +356,8 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys // checks that only one peers received the rumor peers.foreach { peer => peer.receiveOne(duration.mul(2)) match - case ClientAnswer(_) => - case null => remainingPeers :+= peer + case ClientAnswer(Right(jsonRpcRequest: JsonRpcRequest)) if jsonRpcRequest.method == MethodType.rumor => + case _ => remainingPeers :+= peer } remainingPeers.size shouldBe peers.size - 1 @@ -344,7 +365,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys val response = Right(JsonRpcResponse( RpcValidator.JSON_RPC_VERSION, ResultObject(0), - Some(0) + Some(1) )) // by processing the reponse, gossipManager should send again a rumor to a new peer @@ -356,8 +377,8 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys remainingPeers.foreach { peer => peer.receiveOne(duration) match - case ClientAnswer(_) => - case null => remainingPeers2 :+= peer + case ClientAnswer(Right(jsonRpcRequest: JsonRpcRequest)) if jsonRpcRequest.method == MethodType.rumor => + case _ => remainingPeers2 :+= peer } remainingPeers2.size shouldBe remainingPeers.size - 1