diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/Paxos.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/Paxos.scala index 7d1d2faba..e2c00c6f7 100644 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/Paxos.scala +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/Paxos.scala @@ -1,9 +1,11 @@ package rdts.datatypes.experiments.protocols.simplified +import rdts.base.Lattice.mapLattice import rdts.base.LocalUid.replicaId import rdts.base.{Lattice, LocalUid, Uid} -import rdts.datatypes.GrowOnlySet import rdts.datatypes.experiments.protocols.Consensus +import rdts.datatypes.experiments.protocols.simplified.Paxos.given +import rdts.datatypes.{GrowOnlySet, LastWriterWins} import scala.math.Ordering.Implicits.infixOrderingOps @@ -18,18 +20,12 @@ case class Accept[A](proposal: ProposalNum, value: A) case class Accepted[A](proposal: ProposalNum, acceptor: Uid) -given Ordering[ProposalNum] with - override def compare(x: ProposalNum, y: ProposalNum): Int = - if x.number > y.number then 1 - else if x.number == y.number then Ordering[Uid].compare(x.proposer, y.proposer) - else -1 - case class Paxos[A]( prepares: GrowOnlySet[Prepare], promises: GrowOnlySet[Promise[A]], accepts: GrowOnlySet[Accept[A]], accepted: GrowOnlySet[Accepted[A]], - members: Set[Uid] // constant + members: Map[Uid, Option[LastWriterWins[A]]] // constant ) { private def quorum: Int = members.size / 2 + 1 @@ -39,7 +35,6 @@ case class Paxos[A]( def chooseProposalNumber(using LocalUid): ProposalNum = val highestNum = prepares - .filter(_.proposal.proposer == replicaId) .map(_.proposal.number) .maxOption.getOrElse(-1) ProposalNum(highestNum + 1, replicaId) @@ -89,10 +84,20 @@ case class Paxos[A]( .flatMap(_.highestAccepted) .maxByOption((p, v) => p) .map((p, v) => v) - copy(accepts = accepts + Accept(proposal, acceptedValue.getOrElse(v))) + copy( + accepts = accepts + Accept(proposal, acceptedValue.getOrElse(v)), + members = members.updated(replicaId, Some(LastWriterWins.now(v))) + ) else this // quorum not reached, do nothing + def propose(v: A)(using LocalUid): Paxos[A] = + // find my newest proposalNum + val proposalNum = prepares.filter(_.proposal.proposer == replicaId).maxByOption(_.proposal) + proposalNum match + case Some(Prepare(prop)) => propose(prop, v) + case None => this + // phase 2b def accept(proposal: ProposalNum)(using LocalUid): Paxos[A] = // check if I have already promised a newer proposal @@ -107,36 +112,43 @@ case class Paxos[A]( } object Paxos: - def unchanged[A]: Paxos[A] = Paxos[A]( GrowOnlySet.empty[Prepare], GrowOnlySet.empty[Promise[A]], GrowOnlySet.empty[Accept[A]], GrowOnlySet.empty[Accepted[A]], - Set.empty + Map.empty ) def init[A](members: Set[Uid]): Paxos[A] = require(members.nonEmpty, "Cannot initialize Paxos with empty set of members.") - unchanged[A].copy(members = members) + unchanged[A].copy(members = members.map((_, None)).toMap) + + given Ordering[ProposalNum] with + override def compare(x: ProposalNum, y: ProposalNum): Int = + if x.number > y.number then 1 + else if x.number == y.number then Ordering[Uid].compare(x.proposer, y.proposer) + else -1 given [A]: Lattice[Paxos[A]] = Lattice.derived given consensus: Consensus[Paxos] with extension [A](c: Paxos[A]) override def write(value: A)(using LocalUid): Paxos[A] = + def becomeLeader = c.prepare().copy(members = c.members.updated(replicaId, Some(LastWriterWins.now(value)))) + val myNewestProposal = c.prepares.filter(_.proposal.proposer == replicaId).map(_.proposal).maxOption myNewestProposal match case Some(proposal) => - // check if proposing does anything + // check if proposing does anything (i.e. I am the leader) val proposed = c.propose(proposal, value) if Lattice[Paxos[A]].lteq(proposed, c) then // proposing did not work, try to become leader - c.prepare() + becomeLeader else proposed - case None => c.prepare() // no proposals yet, try to become leader + case None => becomeLeader // no proposals yet, try to become leader extension [A](c: Paxos[A]) override def read: Option[A] = val acceptancePerProposal: Map[ProposalNum, Set[Accepted[A]]] = c.accepted.groupBy(_.proposal) @@ -146,20 +158,28 @@ object Paxos: if votes.size >= c.quorum yield acceptedProposal.value extension [A](c: Paxos[A]) - override def members: Set[Uid] = c.members + override def members: Set[Uid] = c.members.keySet extension [A](c: Paxos[A]) override def reset(newMembers: Set[Uid]): Paxos[A] = Paxos.init(members = newMembers) extension [A](c: Paxos[A]) override def upkeep()(using LocalUid): Paxos[A] = // check if the newest accept is newer than the newest prepare message (c.accepts.maxByOption(_.proposal), c.prepares.maxByOption(_.proposal)) match - case (Some(accept), Some(prepare)) if accept.proposal >= prepare.proposal => + case (Some(Accept(a, _)), Some(Prepare(p))) if a >= p => // we are in phase 2 - c.accept(accept.proposal) - case (_, Some(prepare)) => + c.accept(a) + case (_, Some(Prepare(proposal))) => // we are in phase 1 - c.promise(prepare.proposal) + val d = c.promise(proposal) + + // check if we have become the leader and can start phase 2 by proposing a value + if proposal.proposer == replicaId && // we proposed the leading proposal + d.promises.count(_.proposal == proposal) >= d.quorum && // it has reached a quorum + d.members(proposal.proposer).nonEmpty // we know what value to propose + then + d.propose(proposal, d.members(proposal.proposer).get.value) + else d + case _ => // there are no prepare messages, do nothing Paxos.unchanged - diff --git a/Modules/RDTs/src/test/scala/test/rdts/protocols/SimplePaxosTest.scala b/Modules/RDTs/src/test/scala/test/rdts/protocols/SimplePaxosTest.scala new file mode 100644 index 000000000..094c09937 --- /dev/null +++ b/Modules/RDTs/src/test/scala/test/rdts/protocols/SimplePaxosTest.scala @@ -0,0 +1,167 @@ +package test.rdts.protocols.simplified + +import rdts.base.{Bottom, LocalUid, Uid} +import rdts.datatypes.GrowOnlyMap +import rdts.datatypes.experiments.protocols.simplified.Paxos +import rdts.datatypes.experiments.protocols.simplified.Paxos.{*, given} +import rdts.time.Dots + +import scala.math.Ordering.Implicits.infixOrderingOps + +class SimplePaxosTest extends munit.FunSuite { + given Bottom[Int] with + override def empty: Int = Int.MinValue + + given dots: Dots = Dots.empty + val id1 = LocalUid.gen() + val id2 = LocalUid.gen() + val id3 = LocalUid.gen() + + given members: Set[Uid] = Set(id1, id2, id3).map(_.uid) + + var emptyPaxosObject: Paxos[Int] = Paxos.init(members) + +// test("Merge fails with different members") { +// val p1: Paxos[Int] = Paxos.unchanged.copy(members = Set(id1).map(_.uid)) +// val p2: Paxos[Int] = Paxos.unchanged.copy(members = Set(id2).map(_.uid)) +// interceptMessage[IllegalArgumentException]( +// "requirement failed: cannot merge two Paxos instances with differing members" +// ) { +// p1 `merge` p2 +// } +// } + + test("Paxos for 3 participants without errors") { + var a: Paxos[Int] = emptyPaxosObject + + val proposal = a.chooseProposalNumber(using id1) + a = a `merge` a.prepare()(using id1) + a = a `merge` a.upkeep()(using id1) `merge` a.upkeep()(using id2) `merge` a.upkeep()(using id3) + assertEquals(a.read, None) + a = a `merge` a.propose(proposal, 1)(using id1) + a = a `merge` a.upkeep()(using id1) `merge` a.upkeep()(using id2) `merge` a.upkeep()(using id3) + assertEquals(a.read, Some(1)) + } + + test("newer proposal numbers are bigger") { + var testPaxosObject = emptyPaxosObject + + testPaxosObject = testPaxosObject.merge(testPaxosObject.prepare()(using id1)) + val firstProposalNumber = testPaxosObject.prepares.head.proposal + val secondProposalNumber = testPaxosObject.chooseProposalNumber(using id1) + + assert(firstProposalNumber < secondProposalNumber) + } + + test("No changes for older proposals") { + var testPaxosObject1 = emptyPaxosObject + // replica 1 sends prepare + testPaxosObject1 = testPaxosObject1.merge(testPaxosObject1.prepare()(using id1)) + // replica 2 sends prepare + testPaxosObject1 = testPaxosObject1.merge(testPaxosObject1.prepare()(using id2)) + testPaxosObject1 = + testPaxosObject1.merge(testPaxosObject1.upkeep()(using id1)).merge(testPaxosObject1.upkeep()(using id2)) + var testPaxosObject2 = emptyPaxosObject + // replica 3 sends prepare, with smaller proposal number + testPaxosObject2 = testPaxosObject2.merge(testPaxosObject2.prepare()(using id3)) + // replica 1 receives 3's prepare + testPaxosObject1 = testPaxosObject1.merge(testPaxosObject2) + // assert that the object before and after replica 1 calls upkeep is the same + assertEquals(testPaxosObject1, testPaxosObject1.merge(testPaxosObject1.upkeep()(using id1))) + } + + test("promise sends previously accepted value") { + var testPaxosObject = emptyPaxosObject + + // replica 1 sends prepare + testPaxosObject = testPaxosObject.merge(testPaxosObject.prepare()(using id1)) + // replica 2 and 3 receive prepare + testPaxosObject = + testPaxosObject.merge(testPaxosObject.upkeep()(using id2)).merge(testPaxosObject.upkeep()(using id3)) + // replica 1 receives 2 and 3's promise + testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)) + // replica 1 sends propose(1) + testPaxosObject = testPaxosObject.merge(testPaxosObject.propose(1)(using id1)) + // replica 2 and 3 receive propose + testPaxosObject = + testPaxosObject.merge(testPaxosObject.upkeep()(using id2)).merge(testPaxosObject.upkeep()(using id3)) + // replica 1 receives accepted + testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)) + // replica 2 sends prepare to propose a new value + testPaxosObject = testPaxosObject.merge(testPaxosObject.prepare()(using id2)) + // replica 1 receives 2's new prepare, get the value in 1's promise + + val promiseValue = testPaxosObject.upkeep()(using id1).promises.maxBy(_.proposal).highestAccepted.get._2 + // assert the promise contains the previously accepted value + assertEquals(promiseValue, 1) + } + + test("acceptor sends promise with highest proposal number") { + var testPaxosObject = emptyPaxosObject + + testPaxosObject = testPaxosObject.merge(testPaxosObject.prepare()(using id1)) + .merge(testPaxosObject.prepare()(using id2)) + .merge(testPaxosObject.prepare()(using id3)) + val highestProposalNumber = testPaxosObject.prepares.map(_.proposal).max + testPaxosObject = testPaxosObject.upkeep()(using id1) + val promiseProposalNumber = testPaxosObject.promises.head.proposal + + assertEquals(promiseProposalNumber, highestProposalNumber) + } + + test("accept contains value of promise with highest proposal number") { + var testPaxosObject = emptyPaxosObject + // replica 1 sends prepare + testPaxosObject = testPaxosObject.merge(testPaxosObject.prepare()(using id1)) + // 1, 2 and 3 receive prepare + testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using + id2)).merge(testPaxosObject.upkeep()(using id3)) + // 1 sends accept + testPaxosObject = testPaxosObject.merge(testPaxosObject.propose(1)(using id1)) + // 1,2 and 3 receive accept + testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using + id2)).merge(testPaxosObject.upkeep()(using id3)) + // replica 2 sends prepare + testPaxosObject = testPaxosObject.merge(testPaxosObject.prepare()(using id2)) + // 1 and 2 receives 2's prepare + testPaxosObject = + testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using id2)) + // 2 sends accept, which should contain the value of 1's promise and not the value "2" + val acceptValue = testPaxosObject.propose(2)(using id2).accepts.head.value + // assert that the value in 2's accept message is the value of 1's promise + assertEquals(acceptValue, 1) + } + + test("write works as expected") { + var testPaxosObject = emptyPaxosObject + val writeValue = 1 + // replica 1 tries to write + testPaxosObject = testPaxosObject.merge(testPaxosObject.write(writeValue)(using id1)) + testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using + id2)).merge(testPaxosObject.upkeep()(using id3)) + assertEquals(testPaxosObject.read, None) + // replica 1 tries to write again + testPaxosObject = testPaxosObject.merge(testPaxosObject.write(writeValue)(using id1)) + testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using + id2)).merge(testPaxosObject.upkeep()(using id3)) + assertEquals(testPaxosObject.read, Some(writeValue)) + } + + test("concurrent writes") { + var testPaxosObject = emptyPaxosObject + // replica 1 and 2 try to write + testPaxosObject = + testPaxosObject.merge(testPaxosObject.write(1)(using id1)).merge(testPaxosObject.write(2)(using id2)) + // deliver prepares + testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using + id2)).merge(testPaxosObject.upkeep()(using id3)) + assertEquals(testPaxosObject.read, None) + // deliver proposal + testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using + id2)).merge(testPaxosObject.upkeep()(using id3)) + // deliver accepted + testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using + id2)).merge(testPaxosObject.upkeep()(using id3)) + assert(clue(testPaxosObject.read) == Some(2) || clue(testPaxosObject.read) == Some(1)) + } +}