diff --git a/Modules/RDTs/.jvm/src/test/scala/ConsensusPropertySpec.scala b/Modules/RDTs/.jvm/src/test/scala/ConsensusPropertySpec.scala index 424e4aac5..84db0f2d4 100644 --- a/Modules/RDTs/.jvm/src/test/scala/ConsensusPropertySpec.scala +++ b/Modules/RDTs/.jvm/src/test/scala/ConsensusPropertySpec.scala @@ -72,7 +72,7 @@ class ConsensusPropertySpec[A: Arbitrary, C[_]: Consensus]( for numDevices <- Gen.choose(minDevices, maxDevices) ids = Range(0, numDevices).map(_ => LocalUid.gen()).toList - yield ids.map(id => (id, Consensus.init(ids.map(_.uid).toSet))).toMap + yield ids.map(id => (id, Consensus[C].empty)).toMap // generators override def genCommand(state: State): Gen[Command] = diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Consensus.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Consensus.scala index 728464fee..581fc7c6d 100644 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Consensus.scala +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Consensus.scala @@ -10,7 +10,6 @@ trait Consensus[C[_]] { extension [A](c: C[A]) def members(using Participants): Set[Uid] = participants extension [A](c: C[A]) def upkeep()(using LocalUid, Participants): C[A] - def init[A](members: Set[Uid]): C[A] def empty[A]: C[A] def lattice[A]: Lattice[C[A]] } @@ -21,7 +20,4 @@ object Consensus { override def empty: C[A] = Consensus[C].empty def apply[C[_]](using ev: Consensus[C]): Consensus[C] = ev - def init[A, C[_]](newMembers: Set[Uid])(using Consensus[C]): C[A] = - val a: Consensus[C] = apply[C] - a.init(newMembers) } diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Membership.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Membership.scala index 63f2ce549..290d69a8a 100644 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Membership.scala +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Membership.scala @@ -1,7 +1,7 @@ package rdts.datatypes.experiments.protocols import rdts.base.LocalUid.replicaId -import rdts.base.{Lattice, LocalUid, Uid} +import rdts.base.{Bottom, Lattice, LocalUid, Uid} import rdts.datatypes.experiments.protocols.Consensus.given import rdts.time.Time @@ -70,9 +70,10 @@ case class Membership[A, C[_], D[_]]( logger.info(s"Member consensus reached on members $members") copy( counter = counter + 1, - membersConsensus = Consensus.init(members), - innerConsensus = Consensus.init(members), - membershipChanging = false + membersConsensus = Consensus[C].empty, + innerConsensus = Consensus[D].empty, + membershipChanging = false, + members = members ) // inner consensus is reached case (None, Some(value)) if !membershipChanging => @@ -81,8 +82,8 @@ case class Membership[A, C[_], D[_]]( logger.info(s"Inner consensus reached on value $value, log: $newLog") copy( counter = counter + 1, - membersConsensus = Consensus.init(currentMembers), - innerConsensus = Consensus.init(currentMembers), + membersConsensus = Consensus[C].empty, + innerConsensus = Consensus[D].empty, log = newLog ) // nothing has changed @@ -102,8 +103,8 @@ object Membership { require(initialMembers.nonEmpty, "initial members can't be empty") Membership( counter = 0, - membersConsensus = Consensus[C].init[Set[Uid]](initialMembers), - innerConsensus = Consensus[D].init[A](initialMembers), + membersConsensus = Consensus[C].empty, + innerConsensus = Consensus[D].empty, log = List(), members = initialMembers ) diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Paxos.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Paxos.scala index a40b3fb2d..613b4dc54 100644 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Paxos.scala +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Paxos.scala @@ -196,7 +196,6 @@ object Paxos { override def read(using Participants): Option[A] = c.read extension [A](c: Paxos[A]) override def upkeep()(using LocalUid, Participants): Paxos[A] = c.upkeep() - override def init[A](members: GrowOnlySet[Uid]): Paxos[A] = Paxos.init(members = members) override def empty[A]: Paxos[A] = Paxos.unchanged diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/GeneralizedPaxos.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/GeneralizedPaxos.scala index 23f2b07b4..207f48b4d 100644 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/GeneralizedPaxos.scala +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/GeneralizedPaxos.scala @@ -5,8 +5,11 @@ import rdts.base.{Lattice, LocalUid, Uid} import rdts.datatypes.LastWriterWins import rdts.datatypes.experiments.protocols.{Consensus, Participants} -case class GeneralizedPaxos[A](rounds: Map[BallotNum, (LeaderElection, SimpleVoting[A])] = - Map.empty[BallotNum, (LeaderElection, SimpleVoting[A])], myValue: Option[LastWriterWins[A]] = None): +case class GeneralizedPaxos[A]( + rounds: Map[BallotNum, (LeaderElection, SimpleVoting[A])] = + Map.empty[BallotNum, (LeaderElection, SimpleVoting[A])], + myValue: Option[LastWriterWins[A]] = None +): def voteFor(leader: Uid, value: A)(using LocalUid, Participants): (LeaderElection, SimpleVoting[A]) = (SimpleVoting[Uid]().voteFor(leader), SimpleVoting[A]().voteFor(value)) @@ -20,7 +23,7 @@ case class GeneralizedPaxos[A](rounds: Map[BallotNum, (LeaderElection, SimpleVot def phase1b(using LocalUid, Participants): GeneralizedPaxos[A] = // return greatest decided value - val r = rounds.filter { case (b, (l, v)) => v.result != None } + val r = rounds.filter { case (b, (l, v)) => v.result.isDefined } val decidedVal = r.maxByOption { case (b, (l, v)) => b }.flatMap(_._2._2.result) // vote for newest leader election @@ -37,7 +40,7 @@ case class GeneralizedPaxos[A](rounds: Map[BallotNum, (LeaderElection, SimpleVot def phase2a(using LocalUid, Participants): GeneralizedPaxos[A] = // check if leader myHighestBallot match - case Some((ballotNum, (leaderElection, voting))) if leaderElection.result == Some(replicaId) => + case Some((ballotNum, (leaderElection, voting))) if leaderElection.result.contains(replicaId) => val value = if voting.votes.nonEmpty then voting.votes.head.value @@ -70,7 +73,7 @@ case class GeneralizedPaxos[A](rounds: Map[BallotNum, (LeaderElection, SimpleVot rounds.filter { case (b, (l, v)) => b.uid == replicaId }.maxByOption { case (b, (l, v)) => b } def newestDecidedVal(using Participants): Option[A] = - val r = rounds.filter { case (b, (l, v)) => v.result != None } + val r = rounds.filter { case (b, (l, v)) => v.result.isDefined } r.maxByOption { case (b, (l, v)) => b }.flatMap(_._2._2.result) object GeneralizedPaxos: @@ -89,15 +92,18 @@ object GeneralizedPaxos: extension [A](c: GeneralizedPaxos[A])(using Participants) override def read: Option[A] = c.newestDecidedVal extension [A](c: GeneralizedPaxos[A]) - override def upkeep()(using LocalUid): GeneralizedPaxos[A] = + override def upkeep()(using LocalUid, Participants): GeneralizedPaxos[A] = // check which phase we are in - - ??? - - - - override def init[A](members: Set[Uid]): GeneralizedPaxos[A] = GeneralizedPaxos() + c.highestBallot match + // we have a leader -> phase 2 + case Some((ballotNum, (leaderElection, voting))) if leaderElection.result.nonEmpty => + c.phase2b + // we are in the process of electing a new leader + case None => + c.phase1b override def empty[A]: GeneralizedPaxos[A] = GeneralizedPaxos() override def lattice[A]: Lattice[GeneralizedPaxos[A]] = lattice + + // TODO: define lteq diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/Paxos.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/Paxos.scala index b15c1be15..e94ffa96e 100644 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/Paxos.scala +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/Paxos.scala @@ -26,7 +26,7 @@ case class Paxos[A]( ) { // override def toString: String = pprint.apply(this).render - private def quorum: Int = members.size / 2 + 1 + private def quorum(using Participants): Int = participants.size / 2 + 1 def myHighestPromise(using LocalUid): Option[Promise[A]] = promises.filter(_.acceptor == replicaId).maxByOption(_.proposal) @@ -66,7 +66,7 @@ case class Paxos[A]( ) // phase 2a - def propose(proposal: ProposalNum, v: A)(using LocalUid): Paxos[A] = + def propose(proposal: ProposalNum, v: A)(using LocalUid, Participants): Paxos[A] = // check if I have received enough promises and have not proposed yet val myPromises = promises.filter(_.proposal == proposal) val hasProposed = accepts.exists(_.proposal == proposal) @@ -84,7 +84,7 @@ case class Paxos[A]( else Paxos.unchanged // quorum not reached, do nothing - def propose(v: A)(using LocalUid): Paxos[A] = + def propose(v: A)(using LocalUid, Participants): Paxos[A] = // find my newest proposalNum val proposalNum = prepares.filter(_.proposal.proposer == replicaId).maxByOption(_.proposal) proposalNum match @@ -129,7 +129,7 @@ object Paxos: given consensus: Consensus[Paxos] with extension [A](c: Paxos[A]) override def write(value: A)(using LocalUid, Participants): Paxos[A] = - if c.members.contains(replicaId) then + if participants.contains(replicaId) then def becomeLeader = c.prepare().copy(members = c.members.updated(replicaId, Some(LastWriterWins.now(value)))) val myNewestProposal = c.prepares.filter(_.proposal.proposer == replicaId).map(_.proposal).maxOption @@ -190,8 +190,6 @@ object Paxos: // there are no prepare messages, do nothing Paxos.unchanged - override def init[A](members: GrowOnlySet[Uid]): Paxos[A] = Paxos.init(members = members) - override def empty[A]: Paxos[A] = Paxos.unchanged override def lattice[A]: Lattice[Paxos[A]] = Paxos.lattice diff --git a/Modules/RDTs/src/test/scala/test/rdts/protocols/MembershipTest.scala b/Modules/RDTs/src/test/scala/test/rdts/protocols/MembershipTest.scala index 4c979cbf5..4c237d209 100644 --- a/Modules/RDTs/src/test/scala/test/rdts/protocols/MembershipTest.scala +++ b/Modules/RDTs/src/test/scala/test/rdts/protocols/MembershipTest.scala @@ -9,11 +9,13 @@ class MembershipTest extends munit.FunSuite { val id1 = LocalUid.gen() val id2 = LocalUid.gen() val id3 = LocalUid.gen() + val id4 = LocalUid.gen() test("Membership happy path") { given LogHack(false) var membership = Membership.init[Int, Paxos, Paxos](Set(id1, id2, id3).map(_.uid)) // id1 writes -> prepare + assert(membership.isMember(using id1)) membership = membership.merge(membership.write(1)(using id1)) assert(membership.isMember(using id1)) assert(membership.isMember(using id2)) @@ -36,4 +38,69 @@ class MembershipTest extends munit.FunSuite { assertEquals(membership.currentMembers, Set(id1, id2, id3).map(_.uid)) assertEquals(membership.read, List(1)) } + + test("Membership with member change") { + given LogHack(false) + var membership = Membership.init[Int, Paxos, Paxos](Set(id1, id2, id3).map(_.uid)) + // id1 writes -> prepare + assert(membership.isMember(using id1)) + membership = membership.merge(membership.write(1)(using id1)) + assert(membership.isMember(using id1)) + assert(membership.isMember(using id2)) + assert(membership.isMember(using id3)) + // id2 vote kicks id1 + membership = membership.merge(membership.removeMember(id1.uid)(using id2)) + assert(membership.isMember(using id1)) + assert(membership.isMember(using id2)) + assert(membership.isMember(using id3)) + // all upkeep --> promise and enter phase 2 + membership = membership + .merge(membership.upkeep()(using id1)) + .merge(membership.upkeep()(using id2)) + .merge(membership.upkeep()(using id3)) + // all upkeep again -> propose value + membership = membership + .merge(membership.upkeep()(using id1)) + .merge(membership.upkeep()(using id2)) + .merge(membership.upkeep()(using id3)) + // all upkeep again -> accept value + membership = membership + .merge(membership.upkeep()(using id1)) + .merge(membership.upkeep()(using id2)) + .merge(membership.upkeep()(using id3)) + assertEquals(membership.currentMembers, Set(id2, id3).map(_.uid)) + assertEquals(membership.read, List()) + // non-members should not be able to modify consensus + assertEquals( + membership.merge(membership.write(1)(using id1)), + membership + ) + assertNotEquals( + membership.merge(membership.write(1)(using id2)), + membership + ) + assertNotEquals( + membership.merge(membership.write(1)(using id3)), + membership + ) + } + + test("Fixed counterexample from suite") { + given LogHack(false) + var membership1 = Membership.init[Int, Paxos, Paxos](Set(id1, id2, id3, id4).map(_.uid)) + var membership2 = Membership.init[Int, Paxos, Paxos](Set(id1, id2, id3, id4).map(_.uid)) + // id1 writes -> prepare + membership1 = membership1.merge(membership1.write(1)(using id1)) + // upkeep on same id + membership1 = membership1.merge(membership1.upkeep()(using id1)) + // write on id2 + membership2 = membership2.merge(membership2.write(2)(using id2)) + // upkeep on id2 + membership2 = membership2.merge(membership2.upkeep()(using id2)) + // merge id1 and id2 + membership1 = membership1.merge(membership2) + membership2 = membership2.merge(membership1) + // -> logs should diverge + assertEquals(membership1.log, membership2.log) + } }