Skip to content

Commit

Permalink
Merge pull request #1977 from dedis/fix-be2-daniel-readpk
Browse files Browse the repository at this point in the history
fix key read and instant pull
  • Loading branch information
sgueissa authored Jun 30, 2024
2 parents 7049cfb + bc3bc79 commit a2b1d53
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import ch.epfl.pop.storage.DbActor.{DbActorAck, DbActorGetRumorStateAck, DbActor

import scala.concurrent.Await
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random
import scala.util.{Random, Success, Try}

/** This class is responsible of managing the gossiping of rumors across the network
* @param dbActorRef
Expand All @@ -33,18 +33,23 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
private var activeGossipProtocol: Map[JsonRpcRequest, Set[ActorRef]] = Map.empty
private var rumorMap: Map[PublicKey, Int] = Map.empty
private var jsonId = 0
private var publicKey: Option[PublicKey] = None
private var publicKey_ : Option[PublicKey] = None
private var connectionMediatorRef: AskableActorRef = _

private val periodicRumorStateKey = 0

publicKey = {
val readPk = dbActorRef ? DbActor.ReadServerPublicKey()
Await.result(readPk, duration) match
case DbActor.DbActorReadServerPublicKeyAck(pk) => Some(pk)
case _ =>
log.error(s"Will not be able to create rumors because it has no publicKey")
None
private def publicKey: Option[PublicKey] = {
if (publicKey_.isDefined)
publicKey_
else
val readPk = dbActorRef ? DbActor.ReadServerPublicKey()
Try(Await.result(readPk, duration)) match
case Success(DbActor.DbActorReadServerPublicKeyAck(pk)) =>
publicKey_ = Some(pk)
publicKey_
case _ =>
log.error(s"Will not be able to create rumors because it has no publicKey")
None
}

rumorMap =
Expand Down Expand Up @@ -214,6 +219,7 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
connectionMediatorRef = sender()

case Monitor.AtLeastOneServerConnected =>
sendRumorState()
timers.startTimerWithFixedDelay(periodicRumorStateKey, TriggerPullState(), pullRate)

case Monitor.NoServerConnected =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfterEach

import scala.concurrent.Await
import concurrent.duration.DurationInt
import scala.util.{Failure, Success, Try}

class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSystem")) with AnyFunSuiteLike with AskPatternConstants with Matchers with BeforeAndAfterEach {

Expand Down Expand Up @@ -90,11 +91,14 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys

// checks that created a correct rumor from that message and was received by other server
val rumor = Rumor(publicKey.get, 0, Map(castVoteRequest.getParamsChannel -> List(castVoteRequest.getParamsMessage.get)))
// ignore state
server.receiveOne(duration)

val receivedMsg = server.receiveOne(duration.mul(5)).asInstanceOf[ClientAnswer]
receivedMsg.graphMessage match
case Right(jsonRpcRequest: JsonRpcRequest) =>
jsonRpcRequest.method shouldBe MethodType.rumor
jsonRpcRequest.id shouldBe Some(0)
jsonRpcRequest.id shouldBe Some(1)
val jsonRumor = jsonRpcRequest.getParams.asInstanceOf[Rumor]
jsonRumor shouldBe rumor
case _ => 0 shouldBe 1
Expand All @@ -119,6 +123,9 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
val outputCreateRumor = Source.single(decodedInit).via(gossip).runWith(Sink.head)
Await.result(outputCreateRumor, duration)

// ignore state
server.receiveOne(duration)

server.expectNoMessage()
}

Expand All @@ -132,6 +139,8 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
// registers a new server
connectionMediatorRef ? ConnectionMediator.NewServerConnected(peerServer.ref, GreetServer(PublicKey(Base64Data("")), "", ""))
checkPeersWritten(connectionMediatorRef)
// ignore state
peerServer.receiveOne(duration)

// emulates receiving a castVote and processes it
for (id <- 0 to 4) {
Expand All @@ -141,10 +150,11 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
Await.result(outputCreateRumor, duration)
// checks that created a correct rumor from that message and was received by other server
val rumor = Rumor(PublicKey(Base64Data("blabla")), id, Map(castVoteRequest.getParamsChannel -> List(castVoteRequest.getParamsMessage.get)))

val receivedMsg = peerServer.receiveOne(duration).asInstanceOf[ClientAnswer]
receivedMsg.graphMessage match
case Right(jsonRpcRequest: JsonRpcRequest) =>
jsonRpcRequest.id shouldBe Some(id)
jsonRpcRequest.id shouldBe Some(id + 1)
jsonRpcRequest.getParams.asInstanceOf[Rumor].rumorId shouldBe id
case _ => 0 shouldBe 1
}
Expand All @@ -160,6 +170,8 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
val peerServer = TestProbe()
connectionMediatorRef ? ConnectionMediator.NewServerConnected(peerServer.ref, GreetServer(PublicKey(Base64Data("")), "", ""))
checkPeersWritten(connectionMediatorRef)
// ignore state
peerServer.receiveOne(duration)

for (id <- 0 to 4) {
// emulates receiving a castVote and processes it
Expand All @@ -175,7 +187,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
val receivedMsg = peerServer.receiveOne(duration).asInstanceOf[ClientAnswer]
receivedMsg.graphMessage match
case Right(jsonRpcRequest: JsonRpcRequest) =>
jsonRpcRequest.id shouldBe Some(id)
jsonRpcRequest.id shouldBe Some(id + 1)
jsonRpcRequest.getParams.asInstanceOf[Rumor].rumorId shouldBe id
case _ => 0 shouldBe 1
}
Expand All @@ -193,6 +205,8 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
// registers a new server
connectionMediatorRef ? ConnectionMediator.NewServerConnected(peerServer.ref, GreetServer(PublicKey(Base64Data.encode("publicKey")), "", ""))
checkPeersWritten(connectionMediatorRef)
// ignore state
peerServer.receiveOne(duration)

// emulates receiving a castVote and processes it
val outputCreateRumor = Source.single(Right(castVoteRequest)).via(gossip).runWith(Sink.head)
Expand All @@ -206,7 +220,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
val response = Right(JsonRpcResponse(
RpcValidator.JSON_RPC_VERSION,
ResultObject(0),
Some(0)
Some(1)
))

// by processing the reponse, gossipManager should not send again a rumor to a peer
Expand All @@ -229,6 +243,8 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
// registers a new server
connectionMediatorRef ? ConnectionMediator.NewServerConnected(peerServer.ref, GreetServer(PublicKey(Base64Data.encode("publicKey")), "", ""))
checkPeersWritten(connectionMediatorRef)
// ignore state
peerServer.receiveOne(duration)

// emulates receiving a castVote and processes it
val outputCreateRumor = Source.single(Right(castVoteRequest)).via(gossip).runWith(Sink.head)
Expand Down Expand Up @@ -262,6 +278,8 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
// register server
connectionMediatorRef ? ConnectionMediator.NewServerConnected(peerServer.ref, GreetServer(PublicKey(Base64Data("")), "", ""))
checkPeersWritten(connectionMediatorRef)
// ignore state
peerServer.receiveOne(duration)

val output = Source.single(Right(rumorRequest)).via(gossipHandler).runWith(Sink.head)

Expand All @@ -270,7 +288,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
peerServer.receiveOne(duration) match
case ClientAnswer(Right(jsonRpcRequest: JsonRpcRequest)) =>
jsonRpcRequest.method shouldBe MethodType.rumor
jsonRpcRequest.id shouldBe Some(0)
jsonRpcRequest.id shouldBe Some(1)
jsonRpcRequest.getParams.asInstanceOf[Rumor] shouldBe rumor
case _ => 0 shouldBe 1
}
Expand All @@ -291,19 +309,21 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
// register server
for (peer <- peers) {
connectionMediatorRef ? ConnectionMediator.NewServerConnected(peer.ref, GreetServer(PublicKey(Base64Data("")), "", ""))
peer.receiveOne(duration)
}
checkPeersWritten(connectionMediatorRef)

val output = Source.single(Right(rumorRequest)).via(gossipHandler).runWith(Sink.head)

Await.result(output, duration)

// include rumor state
peers.map(_.receiveOne(duration)).count(_ != null) shouldBe 1

}

test("gossip handler should send rumor if there is an ongoing gossip protocol") {
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef))
val gossipManager: ActorRef = system.actorOf(GossipManager.props(dbActorRef, pullRate = 30.seconds))
connectionMediatorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManager, messageRegistry))
val sender = TestProbe()
val gossipHandler = GossipManager.gossipHandler(gossipManager, sender.ref)
Expand All @@ -321,6 +341,7 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
var n = 0
for (peer <- peers) {
connectionMediatorRef ? ConnectionMediator.NewServerConnected(peer.ref, GreetServer(PublicKey(Base64Data.encode(s"$n")), "", ""))
peer.receiveOne(duration)
n += 1
}
checkPeersWritten(connectionMediatorRef)
Expand All @@ -335,16 +356,16 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys
// checks that only one peers received the rumor
peers.foreach { peer =>
peer.receiveOne(duration.mul(2)) match
case ClientAnswer(_) =>
case null => remainingPeers :+= peer
case ClientAnswer(Right(jsonRpcRequest: JsonRpcRequest)) if jsonRpcRequest.method == MethodType.rumor =>
case _ => remainingPeers :+= peer
}
remainingPeers.size shouldBe peers.size - 1

// sends back to the gossipManager a response that the rumor is new
val response = Right(JsonRpcResponse(
RpcValidator.JSON_RPC_VERSION,
ResultObject(0),
Some(0)
Some(1)
))

// by processing the reponse, gossipManager should send again a rumor to a new peer
Expand All @@ -356,8 +377,8 @@ class GossipManagerSuite extends TestKit(ActorSystem("GossipManagerSuiteActorSys

remainingPeers.foreach { peer =>
peer.receiveOne(duration) match
case ClientAnswer(_) =>
case null => remainingPeers2 :+= peer
case ClientAnswer(Right(jsonRpcRequest: JsonRpcRequest)) if jsonRpcRequest.method == MethodType.rumor =>
case _ => remainingPeers2 :+= peer
}
remainingPeers2.size shouldBe remainingPeers.size - 1

Expand Down

0 comments on commit a2b1d53

Please sign in to comment.