From 03bc2619171388c09648d7cc7b767cd26662b04e Mon Sep 17 00:00:00 2001 From: Julian Date: Tue, 17 Dec 2024 15:19:55 +0100 Subject: [PATCH] move old protocols --- .../scala/ex2024bft/MembershipSuite.scala | 3 +- .../main/scala/probench/KeyValueReplica.scala | 2 +- .../src/main/scala/probench/cli.scala | 2 +- .../scala/probench/ClusterConsensus.scala | 2 +- .../test/scala/GeneralizedPaxosSuite.scala | 2 +- .../.jvm/src/test/scala/MembershipSuite.scala | 7 +- .../.jvm/src/test/scala/PaperPaxosSuite.scala | 2 +- .../src/test/scala/SimplePaxosSuite.scala | 8 +- .../experiments/protocols/Paxos.scala | 311 +++++++----------- .../{simplified => }/TwoPhaseCommit.scala | 4 +- .../experiments/protocols/Voting.scala | 106 +++--- .../protocols/messagebased/MultiPaxos.scala | 2 +- .../experiments/protocols/old/Paxos.scala | 207 ++++++++++++ .../experiments/protocols/old/Voting.scala | 65 ++++ .../simplified/GeneralizedPaxos.scala | 4 +- .../{ => old}/simplified/Paxos.scala | 4 +- .../protocols/paper/PaperPaxos.scala | 127 ------- .../protocols/simplified/Voting.scala | 75 ----- .../test/rdts/protocols/MembershipTest.scala | 4 +- .../scala/test/rdts/protocols/PaxosTest.scala | 4 +- .../rdts/protocols/SimpleVotingTests.scala | 3 +- .../protocols/VotingTests2Participants.scala | 8 +- .../rdts/protocols/paper/PaperPaxosTest.scala | 5 +- .../protocols/simplified/GenPaxosTest.scala | 4 +- .../simplified/SimplePaxosTest.scala | 4 +- 25 files changed, 481 insertions(+), 484 deletions(-) rename Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/{simplified => }/TwoPhaseCommit.scala (92%) create mode 100644 Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/old/Paxos.scala create mode 100644 Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/old/Voting.scala rename Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/{ => old}/simplified/GeneralizedPaxos.scala (97%) rename Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/{ => old}/simplified/Paxos.scala (98%) delete mode 100644 Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/paper/PaperPaxos.scala delete mode 100644 Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/Voting.scala diff --git a/Modules/Examples/Misc JVM/src/test/scala/ex2024bft/MembershipSuite.scala b/Modules/Examples/Misc JVM/src/test/scala/ex2024bft/MembershipSuite.scala index 44f373731..bb5d7f210 100644 --- a/Modules/Examples/Misc JVM/src/test/scala/ex2024bft/MembershipSuite.scala +++ b/Modules/Examples/Misc JVM/src/test/scala/ex2024bft/MembershipSuite.scala @@ -5,7 +5,8 @@ import org.scalacheck.Prop.propBoolean import org.scalacheck.commands.Commands import org.scalacheck.{Gen, Prop} import rdts.base.{LocalUid, Uid} -import rdts.datatypes.experiments.protocols.{Membership, Paxos} +import rdts.datatypes.experiments.protocols.Membership +import rdts.datatypes.experiments.protocols.old.Paxos import scala.collection.mutable.ArrayBuffer diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala index 01af3f041..91bb94762 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala @@ -6,7 +6,7 @@ import probench.data.RequestResponseQueue.Req import rdts.base.Lattice.syntax import rdts.base.{Bottom, LocalUid, Uid} import rdts.datatypes.experiments.protocols.Membership -import rdts.datatypes.experiments.protocols.simplified.Paxos +import rdts.datatypes.experiments.protocols.old.simplified.Paxos import rdts.syntax.DeltaBuffer import replication.DeltaDissemination diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala index 919244a1c..6c54874b5 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala @@ -9,7 +9,7 @@ import probench.clients.{ClientCLI, EtcdClient, ProBenchClient} import probench.data.{ClientNodeState, ClusterData, KVOperation} import rdts.base.Uid import rdts.datatypes.experiments.protocols.Membership -import rdts.datatypes.experiments.protocols.simplified.Paxos +import rdts.datatypes.experiments.protocols.old.simplified.Paxos import replication.{FileConnection, ProtocolMessage} import java.net.{DatagramSocket, InetSocketAddress} diff --git a/Modules/Examples/Protocol Benchmarks/src/test/scala/probench/ClusterConsensus.scala b/Modules/Examples/Protocol Benchmarks/src/test/scala/probench/ClusterConsensus.scala index 1e213c36d..e354c36b5 100644 --- a/Modules/Examples/Protocol Benchmarks/src/test/scala/probench/ClusterConsensus.scala +++ b/Modules/Examples/Protocol Benchmarks/src/test/scala/probench/ClusterConsensus.scala @@ -7,7 +7,7 @@ import probench.clients.ProBenchClient import probench.data.{ClientNodeState, ClusterData, KVOperation} import rdts.base.{LocalUid, Uid} import rdts.datatypes.experiments.protocols.Membership -import rdts.datatypes.experiments.protocols.simplified.Paxos +import rdts.datatypes.experiments.protocols.old.simplified.Paxos import replication.ProtocolMessage class ClusterConsensus extends munit.FunSuite { diff --git a/Modules/RDTs/.jvm/src/test/scala/GeneralizedPaxosSuite.scala b/Modules/RDTs/.jvm/src/test/scala/GeneralizedPaxosSuite.scala index 10dd499d2..4613c7b5a 100644 --- a/Modules/RDTs/.jvm/src/test/scala/GeneralizedPaxosSuite.scala +++ b/Modules/RDTs/.jvm/src/test/scala/GeneralizedPaxosSuite.scala @@ -1,4 +1,4 @@ -import rdts.datatypes.experiments.protocols.simplified.GeneralizedPaxos +import rdts.datatypes.experiments.protocols.old.simplified.GeneralizedPaxos class GeneralizedPaxosSuite extends munit.ScalaCheckSuite { diff --git a/Modules/RDTs/.jvm/src/test/scala/MembershipSuite.scala b/Modules/RDTs/.jvm/src/test/scala/MembershipSuite.scala index 8f056779a..038bdc0bb 100644 --- a/Modules/RDTs/.jvm/src/test/scala/MembershipSuite.scala +++ b/Modules/RDTs/.jvm/src/test/scala/MembershipSuite.scala @@ -2,8 +2,9 @@ import org.scalacheck.Arbitrary.arbitrary import org.scalacheck.Prop.propBoolean import org.scalacheck.{Arbitrary, Gen, Prop} import rdts.base.{Lattice, LocalUid} -import rdts.datatypes.experiments.protocols.simplified.GeneralizedPaxos -import rdts.datatypes.experiments.protocols.{Consensus, Membership, paper, simplified} +import rdts.datatypes.experiments.protocols.old.simplified +import rdts.datatypes.experiments.protocols.old.simplified.GeneralizedPaxos +import rdts.datatypes.experiments.protocols.{Consensus, Membership, Paxos} import scala.util.Try class MembershipSuite extends munit.ScalaCheckSuite { @@ -37,7 +38,7 @@ class MembershipSuite extends munit.ScalaCheckSuite { removeMemberFreq = 1 ).property()) - property("Membership with paper paxos")(MembershipSpec[Int, paper.Paxos, paper.Paxos]( + property("Membership with paper paxos")(MembershipSpec[Int, Paxos, Paxos]( logging = false, minDevices = 3, maxDevices = 6, diff --git a/Modules/RDTs/.jvm/src/test/scala/PaperPaxosSuite.scala b/Modules/RDTs/.jvm/src/test/scala/PaperPaxosSuite.scala index 60eed7258..4a73c15d7 100644 --- a/Modules/RDTs/.jvm/src/test/scala/PaperPaxosSuite.scala +++ b/Modules/RDTs/.jvm/src/test/scala/PaperPaxosSuite.scala @@ -1,4 +1,4 @@ -import rdts.datatypes.experiments.protocols.paper.Paxos +import rdts.datatypes.experiments.protocols.Paxos class PaperPaxosSuite extends munit.ScalaCheckSuite: // override def scalaCheckInitialSeed = "ZcBq5Oa3t8-hWG0Snkx22h6nivxFRCvp27NO4tFKzbN=" diff --git a/Modules/RDTs/.jvm/src/test/scala/SimplePaxosSuite.scala b/Modules/RDTs/.jvm/src/test/scala/SimplePaxosSuite.scala index 3027a621f..ff0d4777b 100644 --- a/Modules/RDTs/.jvm/src/test/scala/SimplePaxosSuite.scala +++ b/Modules/RDTs/.jvm/src/test/scala/SimplePaxosSuite.scala @@ -2,7 +2,7 @@ import org.scalacheck.Arbitrary.arbitrary import org.scalacheck.Prop.propBoolean import org.scalacheck.{Arbitrary, Gen, Prop} import rdts.base.LocalUid -import rdts.datatypes.experiments.protocols.simplified +import rdts.datatypes.experiments.protocols.old.simplified.Paxos import scala.util.Try @@ -31,7 +31,7 @@ class SimplePaxosSpec[A: Arbitrary]( maxDevices: Int, writeFreq: Int, mergeFreq: Int -) extends ConsensusPropertySpec[A, simplified.Paxos]( +) extends ConsensusPropertySpec[A, Paxos]( logging, minDevices, maxDevices, @@ -46,8 +46,8 @@ class SimplePaxosSpec[A: Arbitrary]( class PWrite(id: LocalUid, value: A) extends Write(id, value) { override def postCondition( - state: Map[LocalUid, simplified.Paxos[A]], - result: Try[Map[LocalUid, simplified.Paxos[A]]] + state: Map[LocalUid, Paxos[A]], + result: Try[Map[LocalUid, Paxos[A]]] ): Prop = val res = result.get val doubleProposal = res(id).accepts.groupBy(_.proposal).find(_._2.size > 1) diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Paxos.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Paxos.scala index f7f1b0d62..72b09acae 100644 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Paxos.scala +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Paxos.scala @@ -1,206 +1,125 @@ package rdts.datatypes.experiments.protocols -import rdts.base.Lattice.setLattice import rdts.base.LocalUid.replicaId -import rdts.base.{Bottom, Lattice, LocalUid, Uid} -import rdts.datatypes.GrowOnlySet -import rdts.datatypes.GrowOnlySet.* +import rdts.base.{Lattice, LocalUid, Uid} +import rdts.datatypes.LastWriterWins +import rdts.datatypes.experiments.protocols.Paxos.given +import rdts.datatypes.experiments.protocols.{Consensus, Participants} -import scala.compiletime.{constValue, summonFrom} - -enum Phase: - case One - case Two - -// message types -case class Prepare(proposalNumber: Int, proposer: Uid) - -case class Promise[A](proposal: Prepare, value: Option[A], acceptor: Uid) - -case class Accept[A](proposal: Prepare, value: A) - -case class Accepted[A](proposal: Prepare, value: A, acceptor: Uid) - -given Ordering[Prepare] with - override def compare(x: Prepare, y: Prepare): Int = - if x.proposalNumber > y.proposalNumber then 1 - else if x.proposalNumber == y.proposalNumber then Ordering[Uid].compare(x.proposer, y.proposer) - else -1 - -given Ordering[Accept[?]] with - override def compare(x: Accept[?], y: Accept[?]): Int = Ordering[Prepare].compare(x.proposal, y.proposal) +case class PaxosRound[A](leaderElection: LeaderElection = Voting(), proposals: Voting[A] = Voting[A]()) +type LeaderElection = Voting[Uid] +type Proposal case class Paxos[A]( - prepares: GrowOnlySet[Prepare], - promises: GrowOnlySet[Promise[A]], - accepts: GrowOnlySet[Accept[A]], - accepteds: GrowOnlySet[Accepted[A]], - members: Set[Uid] // constant -) { - private def quorum: Int = members.size / 2 + 1 - - def prepare()(using LocalUid): Paxos[A] = - val proposalNumber = highestProposal.map(_.proposalNumber).getOrElse(-1) + 1 - Paxos.unchanged.copy( - // members = members, - prepares = Set(Prepare(proposalNumber, replicaId)) - ) - - def promise()(using LocalUid): Paxos[A] = - val myHighestPromiseNumber = - promises.filter(_.acceptor == replicaId).map(_.proposal.proposalNumber).maxOption.getOrElse(-1) - // check if I already promised for an equally high id - if myHighestPromiseNumber >= highestProposal.map(_.proposalNumber).getOrElse(-1) - then - // already promised for equally high id - Paxos.unchanged - else - // there is a new higher proposal - // check if I already accepted a specific value - val value = - accepteds.filter(_.acceptor == replicaId).map(_.value).headOption - Paxos.unchanged.copy( - // members = members, - promises = Set(Promise(highestProposal.get, value, replicaId)) - ) - - def accept(v: A)(using LocalUid): Paxos[A] = - val promisesForProposal = myHighestProposal.map(p => promises.filter(_.proposal == p)).getOrElse(Set()) - // check if accepted - if !canSendAccept then - // is not accepted - Paxos.unchanged - else - // is accepted, check if promise contains value - val promisesWithVal = promisesForProposal.filter(_.value.isDefined) - val value: A = promisesWithVal.map(_.value).headOption.flatten.getOrElse(v) - Paxos.unchanged.copy( - accepts = Set(Accept(myHighestProposal.get, value)) - ) - - def accepted()(using LocalUid): Paxos[A] = - if newestAccept.isEmpty || // there are no accepts - // I have already promised a higher proposalNumber - promises.filter(_.acceptor == replicaId).map(_.proposal.proposalNumber).maxOption.getOrElse(-1) > - newestAccept.get.proposal.proposalNumber - then - Paxos.unchanged - else - Paxos.unchanged.copy( - // members = members, - accepteds = - Set(Accepted( - proposal = newestAccept.get.proposal, - value = newestAccept.get.value, - acceptor = replicaId - )) - ) - - def upkeep()(using LocalUid): Paxos[A] = - // check which phase we are in - phase match - case Phase.One if newestPrepare.isDefined => promise() - case Phase.Two => accepted() - case _ => Paxos.unchanged - - // helper functions - private def newestAccept: Option[Accept[A]] = accepts.maxOption - - private def newestPrepare: Option[Prepare] = prepares.maxOption - - private def highestProposal: Option[Prepare] = - prepares.maxOption - - private def myHighestProposal(using LocalUid): Option[Prepare] = prepares.filter(_.proposer == replicaId).maxOption - - private def canWrite(using LocalUid): Boolean = members.contains(replicaId) && read.isEmpty - - private def canSendAccept(using LocalUid): Boolean = - val promisesForProposal = myHighestProposal.map(p => promises.filter(_.proposal == p)).getOrElse(Set()) - promisesForProposal.size >= quorum - - private def phase: Phase = - if newestAccept.map(_.proposal.proposalNumber).getOrElse(-1) >= newestPrepare.map(_.proposalNumber).getOrElse(-1) - then - Phase.Two - else - Phase.One - - // API - def write(value: A)(using LocalUid): Paxos[A] = - if canWrite then - (phase, myHighestProposal, highestProposal) match - case (Phase.One, _, _) - if canSendAccept // we are in phase one and have received enough promises - => accept(value) - case (Phase.One, Some(p1), Some(p2)) - if p1.proposalNumber == p2.proposalNumber // my proposal is already the highest or there is a draw - => Paxos.unchanged - case _ // we try to propose new value - => prepare() - else - Paxos.unchanged - - def read: Option[A] = - val acceptedsPerProposal: Map[(Prepare, A), Set[Accepted[A]]] = accepteds.groupBy(a => (a.proposal, a.value)) - for - ((proposal, value), votes) <- acceptedsPerProposal.maxByOption((_, a) => a.size) - if votes.size >= quorum - yield value -} - -object Paxos { - - given lattice[A]: Lattice[Paxos[A]] with - override def merge(left: Paxos[A], right: Paxos[A]): Paxos[A] = - require( - Lattice[Set[Uid]].merge(left.members, right.members) == left.members, - "cannot merge two Paxos instances with differing members" - ) // members should remain fixed - - def allUids(p: Paxos[?]): Set[Uid] = - p.prepares.map(_.proposer) union - p.promises.flatMap(p => Set(p.proposal.proposer, p.acceptor)) union - p.accepts.map(_.proposal.proposer) union - p.accepteds.flatMap(p => Set(p.proposal.proposer, p.acceptor)) - - require( - (allUids(left) union allUids(right)).subsetOf(left.members), - "updates only contain Uids of known members" - ) - Paxos[A]( - prepares = left.prepares `merge` right.prepares, - promises = left.promises `merge` right.promises, - accepts = left.accepts `merge` right.accepts, - accepteds = left.accepteds `merge` right.accepteds, - members = left.members - ) - - def unchanged[A]: Paxos[A] = - Paxos[A]( - GrowOnlySet.empty[Prepare], - GrowOnlySet.empty[Promise[A]], - GrowOnlySet.empty[Accept[A]], - GrowOnlySet.empty[Accepted[A]], - Set.empty - ) - - def init[A](members: Set[Uid]): Paxos[A] = - require(members.nonEmpty, "Cannot initialize Paxos with empty set of members.") - unchanged[A].copy(members = members) + rounds: Map[BallotNum, PaxosRound[A]] = + Map.empty[BallotNum, PaxosRound[A]] +): + // update functions + def voteLeader(leader: Uid)(using LocalUid, Participants): PaxosRound[A] = + PaxosRound(leaderElection = Voting().voteFor(leader)) + + def voteValue(value: A)(using LocalUid, Participants): PaxosRound[A] = + PaxosRound(proposals = Voting().voteFor(value)) + + // query functions + def nextBallotNum(using LocalUid): BallotNum = + val maxCounter: Long = rounds + .filter((b, _) => b.uid == replicaId) + .map((b, _) => b.counter) + .maxOption + .getOrElse(-1) + BallotNum(replicaId, maxCounter + 1) + def currentRound: Option[(BallotNum, PaxosRound[A])] = rounds.maxOption + def currentBallot: Option[BallotNum] = rounds.maxOption.map(_._1) + def newestBallotWithLeader(using Participants): Option[(BallotNum, PaxosRound[A])] = + rounds.filter(_._2.leaderElection.result.nonEmpty).maxOption + def currentLeaderElection: Option[LeaderElection] = currentRound.map(_._2.leaderElection) + def leaderCandidate: Option[Uid] = currentLeaderElection.map(_.votes.head.value) + def myHighestBallot(using LocalUid): Option[(BallotNum, PaxosRound[A])] = + rounds.filter { case (b, p) => b.uid == replicaId }.maxOption + def lastValueVote: Option[(BallotNum, PaxosRound[A])] = rounds.filter(_._2.proposals.votes.nonEmpty).maxOption + def newestReceivedVal(using LocalUid) = lastValueVote.map(_._2.proposals.votes.head.value) + def myValue(using LocalUid): A = rounds(BallotNum(replicaId, -1)).proposals.votes.head.value + def decidedVal(using Participants): Option[A] = + rounds.collectFirst { case (b, PaxosRound(_, voting)) if voting.result.isDefined => voting.result.get } + + // phases: + def phase1a(using LocalUid, Participants)(value: A): Paxos[A] = + // try to become leader and remember a value for later + Paxos(Map(nextBallotNum -> voteLeader(replicaId), BallotNum(replicaId, -1) -> voteValue(value))) + def phase1a(using LocalUid, Participants): Paxos[A] = + // try to become leader + Paxos(Map(nextBallotNum -> voteLeader(replicaId))) + + def phase1b(using LocalUid, Participants): Paxos[A] = + // vote in the current leader election + (currentBallot, leaderCandidate, lastValueVote) match + case (Some(ballotNum), Some(candidate), None) => + // no value voted for, just vote for candidate + Paxos(Map(ballotNum -> voteLeader(candidate))) + case (Some(ballotNum), Some(candidate), Some(votedVal)) => + // vote for candidate and include value most recently voted for + Paxos(Map( + ballotNum -> voteLeader(candidate), + votedVal + )) + case _ => Paxos() // do nothing + + def phase2a(using LocalUid, Participants): Paxos[A] = + // propose a value if I am the leader + myHighestBallot match + case Some((ballotNum, PaxosRound(leaderElection, _))) + if leaderElection.result.contains(replicaId) => + if newestReceivedVal.nonEmpty then + Paxos(Map(ballotNum -> voteValue(newestReceivedVal.get))) + else + Paxos(Map(ballotNum -> voteValue(myValue))) + case _ => Paxos() // not leader -> do nothing + + def phase2b(using LocalUid, Participants): Paxos[A] = + // get highest ballot with leader and + // vote for the proposed value if there is one + newestBallotWithLeader match + case Some(ballotNum, PaxosRound(leaderElection, proposals)) + if proposals.votes.nonEmpty => + Paxos(Map(ballotNum -> voteValue(proposals.votes.head.value))) + case _ => Paxos() // current leader or proposed value not known -> do nothing + +object Paxos: + given [A]: Lattice[PaxosRound[A]] = Lattice.derived + given l[A]: Lattice[Paxos[A]] = Lattice.derived + + given [A]: Ordering[(BallotNum, PaxosRound[A])] with + override def compare(x: (BallotNum, PaxosRound[A]), y: (BallotNum, PaxosRound[A])): Int = (x, y) match + case ((x, _), (y, _)) => Ordering[BallotNum].compare(x, y) given consensus: Consensus[Paxos] with extension [A](c: Paxos[A]) - override def propose(value: A)(using LocalUid, Participants): Paxos[A] = c.write(value) - extension [A](c: Paxos[A]) - override def decision(using Participants): Option[A] = c.read + override def propose(value: A)(using LocalUid, Participants): Paxos[A] = + // check if I can propose a value + val afterProposal = c.phase2a + if Lattice[Paxos[A]].subsumption(afterProposal, c) then + // proposing did not work, try to become leader + c.phase1a(value) + else + afterProposal + extension [A](c: Paxos[A])(using Participants) + override def decision: Option[A] = c.decidedVal extension [A](c: Paxos[A]) - override def upkeep()(using LocalUid, Participants): Paxos[A] = c.upkeep() - - override def empty[A]: Paxos[A] = Paxos.unchanged - - override def lattice[A]: Lattice[Paxos[A]] = Paxos.lattice - - given bottom[A]: Bottom[Paxos[A]] with - override def empty: Paxos[A] = unchanged[A] -} + // upkeep can be used to perform the next protocol step automatically + override def upkeep()(using LocalUid, Participants): Paxos[A] = + // check which phase we are in + c.currentRound match + case Some((ballotNum, PaxosRound(leaderElection, _))) if leaderElection.result.nonEmpty => + // we have a leader -> phase 2 + if leaderElection.result.get == replicaId then + c.phase2a + else + c.phase2b + // we are in the process of electing a new leader + case _ => + c.phase1b + + override def empty[A]: Paxos[A] = Paxos() + + override def lattice[A]: Lattice[Paxos[A]] = l diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/TwoPhaseCommit.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/TwoPhaseCommit.scala similarity index 92% rename from Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/TwoPhaseCommit.scala rename to Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/TwoPhaseCommit.scala index 71f6d0323..960febb24 100644 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/TwoPhaseCommit.scala +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/TwoPhaseCommit.scala @@ -1,10 +1,10 @@ -package rdts.datatypes.experiments.protocols.simplified +package rdts.datatypes.experiments.protocols import rdts.base.LocalUid.replicaId import rdts.base.{LocalUid, Uid} import rdts.datatypes.experiments.protocols.Participants import rdts.datatypes.experiments.protocols.Participants.participants -import rdts.datatypes.experiments.protocols.simplified.PrepareAbort.{Abort, Prepare} +import PrepareAbort.{Abort, Prepare} enum PrepareAbort: case Prepare diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Voting.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Voting.scala index 79c3b11ab..fe0f4ed15 100644 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Voting.scala +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Voting.scala @@ -1,65 +1,73 @@ package rdts.datatypes.experiments.protocols +import rdts.base.LocalUid.replicaId import rdts.base.{Bottom, Lattice, LocalUid, Uid} -import rdts.datatypes.contextual.ReplicatedSet -import rdts.datatypes.{Epoch, LastWriterWins} -import rdts.dotted.Dotted -import LocalUid.replicaId -import rdts.time.Dots +import rdts.datatypes.experiments.protocols.Participants.participants +import rdts.datatypes.Epoch -case class Vote(leader: Uid, voter: Uid) +case class Vote[A](value: A, voter: Uid) -case class Voting(rounds: Epoch[ReplicatedSet[Vote]], numParticipants: LastWriterWins[Int]) { - def threshold: Int = numParticipants.value / 2 + 1 +case class Voting[A](votes: Set[Vote[A]] = Set.empty[Vote[A]]) { + def threshold(using Participants): Int = participants.size / 2 + 1 - def isOwner(using LocalUid): Boolean = - val (id, count) = leadingCount - id == replicaId && count >= threshold + def result(using Participants): Option[A] = + leadingCount match + case Some((v, count)) if count >= threshold => Some(v) + case _ => None - def request(using LocalUid, Dots): Dotted[Voting] = - if !rounds.value.isEmpty then Voting.unchanged - else voteFor(replicaId) + def voteFor(v: A)(using LocalUid, Participants): Voting[A] = + if !participants.contains(replicaId) || votes.exists { case Vote(_, voter) => voter == replicaId } + then Voting.unchanged // already voted! + else + Voting(Set(Vote(v, replicaId))) - def release(using LocalUid): Voting = - if !isOwner - then Voting.unchanged.data - else Voting(Epoch(rounds.counter + 1, ReplicatedSet.empty), numParticipants) + def leadingCount: Option[(A, Int)] = + val grouped: Map[A, Int] = votes.groupBy(_.value).map((value, vts) => (value, vts.size)) + grouped.maxByOption((_, size) => size) +} - def upkeep(using LocalUid, Dots): Dotted[Voting] = - val (id, count) = leadingCount - if checkIfMajorityPossible(count) - then voteFor(id) - else Dotted(forceRelease) +object Voting { + given lattice[A]: Lattice[Voting[A]] = Lattice.derived - def forceRelease(using LocalUid): Voting = - Voting(Epoch(rounds.counter + 1, ReplicatedSet.empty), numParticipants) + given bottom[A](using Participants): Bottom[Voting[A]] with + override def empty: Voting[A] = unchanged - def voteFor(uid: Uid)(using LocalUid, Dots): Dotted[Voting] = - if rounds.value.elements.exists { case Vote(_, voter) => voter == replicaId } - then Voting.unchanged // already voted! - else - val newVote = rounds.value.add(Vote(uid, replicaId)) - newVote.map(rs => Voting(rounds.write(rs), numParticipants)) - - def checkIfMajorityPossible(count: Int): Boolean = - val totalVotes = rounds.value.elements.size - val remainingVotes = numParticipants.value - totalVotes - (count + remainingVotes) > threshold - - def leadingCount(using id: LocalUid): (Uid, Int) = - val votes: Set[Vote] = rounds.value.elements - val grouped: Map[Uid, Int] = votes.groupBy(_.leader).map((o, elems) => (o, elems.size)) - if grouped.isEmpty - then (replicaId, 0) - else grouped.maxBy((o, size) => size) - // .maxBy((o, size) => size) + def unchanged[A](using Participants): Voting[A] = Voting(Set.empty) } -object Voting { - given Bottom[Int] with - def empty = 0 - val unchanged: Dotted[Voting] = Dotted(Voting(Epoch.empty, LastWriterWins.empty[Int])) +case class MultiRoundVoting[A](rounds: Epoch[Voting[A]]): + def release(using Participants): MultiRoundVoting[A] = + MultiRoundVoting(Epoch(rounds.counter + 1, Voting.unchanged)) + + def upkeep(using LocalUid, Participants): MultiRoundVoting[A] = + rounds.value.leadingCount match + case Some(value, count) if checkIfMajorityPossible => voteFor(value) + case Some(_) => release // we have a leading proposal but majority is not possible anymore + case None => MultiRoundVoting.unchanged // no change yet + + def checkIfMajorityPossible(using Participants): Boolean = + val totalVotes = rounds.value.votes.size + val remainingVotes = participants.size - totalVotes + val possible = rounds.value.leadingCount.map((_, count) => (count + remainingVotes) >= rounds.value.threshold) + possible.getOrElse(true) // if there is no leading vote, majority is always possible - given Lattice[Voting] = Lattice.derived + // api + def voteFor(c: A)(using LocalUid, Participants): MultiRoundVoting[A] = + MultiRoundVoting(Epoch(rounds.counter, rounds.value.voteFor(c))) + def result(using Participants): Option[A] = + rounds.value.result + +object MultiRoundVoting { + def unchanged[A](using Participants): MultiRoundVoting[A] = MultiRoundVoting(Epoch.empty[Voting[A]]) + given lattice[A]: Lattice[MultiRoundVoting[A]] = Lattice.derived } + +case class BallotNum(uid: Uid, counter: Long) + +object BallotNum: + given Ordering[BallotNum] with + override def compare(x: BallotNum, y: BallotNum): Int = + if x.counter > y.counter then 1 + else if x.counter < y.counter then -1 + else Ordering[Uid].compare(x.uid, y.uid) diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/messagebased/MultiPaxos.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/messagebased/MultiPaxos.scala index 1c630982f..85f4c8dc3 100644 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/messagebased/MultiPaxos.scala +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/messagebased/MultiPaxos.scala @@ -1,8 +1,8 @@ package rdts.datatypes.experiments.protocols.messagebased import rdts.base.LocalUid -import rdts.datatypes.experiments.protocols.simplified.BallotNum import rdts.base.LocalUid.replicaId +import rdts.datatypes.experiments.protocols.BallotNum import rdts.datatypes.experiments.protocols.messagebased.Message.{Prepare, Promise} enum Message: diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/old/Paxos.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/old/Paxos.scala new file mode 100644 index 000000000..75889261a --- /dev/null +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/old/Paxos.scala @@ -0,0 +1,207 @@ +package rdts.datatypes.experiments.protocols.old + +import rdts.base.Lattice.setLattice +import rdts.base.LocalUid.replicaId +import rdts.base.{Bottom, Lattice, LocalUid, Uid} +import rdts.datatypes.GrowOnlySet +import rdts.datatypes.GrowOnlySet.* +import rdts.datatypes.experiments.protocols.* + +import scala.compiletime.{constValue, summonFrom} + +enum Phase: + case One + case Two + +// message types +case class Prepare(proposalNumber: Int, proposer: Uid) + +case class Promise[A](proposal: Prepare, value: Option[A], acceptor: Uid) + +case class Accept[A](proposal: Prepare, value: A) + +case class Accepted[A](proposal: Prepare, value: A, acceptor: Uid) + +given Ordering[Prepare] with + override def compare(x: Prepare, y: Prepare): Int = + if x.proposalNumber > y.proposalNumber then 1 + else if x.proposalNumber == y.proposalNumber then Ordering[Uid].compare(x.proposer, y.proposer) + else -1 + +given Ordering[Accept[?]] with + override def compare(x: Accept[?], y: Accept[?]): Int = Ordering[Prepare].compare(x.proposal, y.proposal) + +case class Paxos[A]( + prepares: GrowOnlySet[Prepare], + promises: GrowOnlySet[Promise[A]], + accepts: GrowOnlySet[Accept[A]], + accepteds: GrowOnlySet[Accepted[A]], + members: Set[Uid] // constant +) { + private def quorum: Int = members.size / 2 + 1 + + def prepare()(using LocalUid): Paxos[A] = + val proposalNumber = highestProposal.map(_.proposalNumber).getOrElse(-1) + 1 + Paxos.unchanged.copy( + // members = members, + prepares = Set(Prepare(proposalNumber, replicaId)) + ) + + def promise()(using LocalUid): Paxos[A] = + val myHighestPromiseNumber = + promises.filter(_.acceptor == replicaId).map(_.proposal.proposalNumber).maxOption.getOrElse(-1) + // check if I already promised for an equally high id + if myHighestPromiseNumber >= highestProposal.map(_.proposalNumber).getOrElse(-1) + then + // already promised for equally high id + Paxos.unchanged + else + // there is a new higher proposal + // check if I already accepted a specific value + val value = + accepteds.filter(_.acceptor == replicaId).map(_.value).headOption + Paxos.unchanged.copy( + // members = members, + promises = Set(Promise(highestProposal.get, value, replicaId)) + ) + + def accept(v: A)(using LocalUid): Paxos[A] = + val promisesForProposal = myHighestProposal.map(p => promises.filter(_.proposal == p)).getOrElse(Set()) + // check if accepted + if !canSendAccept then + // is not accepted + Paxos.unchanged + else + // is accepted, check if promise contains value + val promisesWithVal = promisesForProposal.filter(_.value.isDefined) + val value: A = promisesWithVal.map(_.value).headOption.flatten.getOrElse(v) + Paxos.unchanged.copy( + accepts = Set(Accept(myHighestProposal.get, value)) + ) + + def accepted()(using LocalUid): Paxos[A] = + if newestAccept.isEmpty || // there are no accepts + // I have already promised a higher proposalNumber + promises.filter(_.acceptor == replicaId).map(_.proposal.proposalNumber).maxOption.getOrElse(-1) > + newestAccept.get.proposal.proposalNumber + then + Paxos.unchanged + else + Paxos.unchanged.copy( + // members = members, + accepteds = + Set(Accepted( + proposal = newestAccept.get.proposal, + value = newestAccept.get.value, + acceptor = replicaId + )) + ) + + def upkeep()(using LocalUid): Paxos[A] = + // check which phase we are in + phase match + case Phase.One if newestPrepare.isDefined => promise() + case Phase.Two => accepted() + case _ => Paxos.unchanged + + // helper functions + private def newestAccept: Option[Accept[A]] = accepts.maxOption + + private def newestPrepare: Option[Prepare] = prepares.maxOption + + private def highestProposal: Option[Prepare] = + prepares.maxOption + + private def myHighestProposal(using LocalUid): Option[Prepare] = prepares.filter(_.proposer == replicaId).maxOption + + private def canWrite(using LocalUid): Boolean = members.contains(replicaId) && read.isEmpty + + private def canSendAccept(using LocalUid): Boolean = + val promisesForProposal = myHighestProposal.map(p => promises.filter(_.proposal == p)).getOrElse(Set()) + promisesForProposal.size >= quorum + + private def phase: Phase = + if newestAccept.map(_.proposal.proposalNumber).getOrElse(-1) >= newestPrepare.map(_.proposalNumber).getOrElse(-1) + then + Phase.Two + else + Phase.One + + // API + def write(value: A)(using LocalUid): Paxos[A] = + if canWrite then + (phase, myHighestProposal, highestProposal) match + case (Phase.One, _, _) + if canSendAccept // we are in phase one and have received enough promises + => accept(value) + case (Phase.One, Some(p1), Some(p2)) + if p1.proposalNumber == p2.proposalNumber // my proposal is already the highest or there is a draw + => Paxos.unchanged + case _ // we try to propose new value + => prepare() + else + Paxos.unchanged + + def read: Option[A] = + val acceptedsPerProposal: Map[(Prepare, A), Set[Accepted[A]]] = accepteds.groupBy(a => (a.proposal, a.value)) + for + ((proposal, value), votes) <- acceptedsPerProposal.maxByOption((_, a) => a.size) + if votes.size >= quorum + yield value +} + +object Paxos { + + given lattice[A]: Lattice[Paxos[A]] with + override def merge(left: Paxos[A], right: Paxos[A]): Paxos[A] = + require( + Lattice[Set[Uid]].merge(left.members, right.members) == left.members, + "cannot merge two Paxos instances with differing members" + ) // members should remain fixed + + def allUids(p: Paxos[?]): Set[Uid] = + p.prepares.map(_.proposer) union + p.promises.flatMap(p => Set(p.proposal.proposer, p.acceptor)) union + p.accepts.map(_.proposal.proposer) union + p.accepteds.flatMap(p => Set(p.proposal.proposer, p.acceptor)) + + require( + (allUids(left) union allUids(right)).subsetOf(left.members), + "updates only contain Uids of known members" + ) + Paxos[A]( + prepares = left.prepares `merge` right.prepares, + promises = left.promises `merge` right.promises, + accepts = left.accepts `merge` right.accepts, + accepteds = left.accepteds `merge` right.accepteds, + members = left.members + ) + + def unchanged[A]: Paxos[A] = + Paxos[A]( + GrowOnlySet.empty[Prepare], + GrowOnlySet.empty[Promise[A]], + GrowOnlySet.empty[Accept[A]], + GrowOnlySet.empty[Accepted[A]], + Set.empty + ) + + def init[A](members: Set[Uid]): Paxos[A] = + require(members.nonEmpty, "Cannot initialize Paxos with empty set of members.") + unchanged[A].copy(members = members) + + given consensus: Consensus[Paxos] with + extension [A](c: Paxos[A]) + override def propose(value: A)(using LocalUid, Participants): Paxos[A] = c.write(value) + extension [A](c: Paxos[A]) + override def decision(using Participants): Option[A] = c.read + extension [A](c: Paxos[A]) + override def upkeep()(using LocalUid, Participants): Paxos[A] = c.upkeep() + + override def empty[A]: Paxos[A] = Paxos.unchanged + + override def lattice[A]: Lattice[Paxos[A]] = Paxos.lattice + + given bottom[A]: Bottom[Paxos[A]] with + override def empty: Paxos[A] = unchanged[A] +} diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/old/Voting.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/old/Voting.scala new file mode 100644 index 000000000..01f2d0e9a --- /dev/null +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/old/Voting.scala @@ -0,0 +1,65 @@ +package rdts.datatypes.experiments.protocols.old + +import rdts.base.LocalUid.replicaId +import rdts.base.{Bottom, Lattice, LocalUid, Uid} +import rdts.datatypes.contextual.ReplicatedSet +import rdts.datatypes.{Epoch, LastWriterWins} +import rdts.dotted.Dotted +import rdts.time.Dots + +case class Vote(leader: Uid, voter: Uid) + +case class Voting(rounds: Epoch[ReplicatedSet[Vote]], numParticipants: LastWriterWins[Int]) { + def threshold: Int = numParticipants.value / 2 + 1 + + def isOwner(using LocalUid): Boolean = + val (id, count) = leadingCount + id == replicaId && count >= threshold + + def request(using LocalUid, Dots): Dotted[Voting] = + if !rounds.value.isEmpty then Voting.unchanged + else voteFor(replicaId) + + def release(using LocalUid): Voting = + if !isOwner + then Voting.unchanged.data + else Voting(Epoch(rounds.counter + 1, ReplicatedSet.empty), numParticipants) + + def upkeep(using LocalUid, Dots): Dotted[Voting] = + val (id, count) = leadingCount + if checkIfMajorityPossible(count) + then voteFor(id) + else Dotted(forceRelease) + + def forceRelease(using LocalUid): Voting = + Voting(Epoch(rounds.counter + 1, ReplicatedSet.empty), numParticipants) + + def voteFor(uid: Uid)(using LocalUid, Dots): Dotted[Voting] = + if rounds.value.elements.exists { case Vote(_, voter) => voter == replicaId } + then Voting.unchanged // already voted! + else + val newVote = rounds.value.add(Vote(uid, replicaId)) + newVote.map(rs => Voting(rounds.write(rs), numParticipants)) + + def checkIfMajorityPossible(count: Int): Boolean = + val totalVotes = rounds.value.elements.size + val remainingVotes = numParticipants.value - totalVotes + (count + remainingVotes) > threshold + + def leadingCount(using id: LocalUid): (Uid, Int) = + val votes: Set[Vote] = rounds.value.elements + val grouped: Map[Uid, Int] = votes.groupBy(_.leader).map((o, elems) => (o, elems.size)) + if grouped.isEmpty + then (replicaId, 0) + else grouped.maxBy((o, size) => size) + // .maxBy((o, size) => size) +} + +object Voting { + given Bottom[Int] with + def empty = 0 + val unchanged: Dotted[Voting] = Dotted(Voting(Epoch.empty, LastWriterWins.empty[Int])) + + given Lattice[Voting] = Lattice.derived + +} diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/GeneralizedPaxos.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/old/simplified/GeneralizedPaxos.scala similarity index 97% rename from Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/GeneralizedPaxos.scala rename to Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/old/simplified/GeneralizedPaxos.scala index 83ce491be..9d521e5bc 100644 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/GeneralizedPaxos.scala +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/old/simplified/GeneralizedPaxos.scala @@ -1,9 +1,9 @@ -package rdts.datatypes.experiments.protocols.simplified +package rdts.datatypes.experiments.protocols.old.simplified import rdts.base.LocalUid.replicaId import rdts.base.{Lattice, LocalUid, Uid} import rdts.datatypes.LastWriterWins -import rdts.datatypes.experiments.protocols.{Consensus, Participants} +import rdts.datatypes.experiments.protocols.{BallotNum, Consensus, LeaderElection, Participants, Voting} case class GeneralizedPaxos[A]( rounds: Map[BallotNum, (LeaderElection, Voting[A])] = diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/Paxos.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/old/simplified/Paxos.scala similarity index 98% rename from Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/Paxos.scala rename to Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/old/simplified/Paxos.scala index ee840bdd1..e00379a31 100644 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/Paxos.scala +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/old/simplified/Paxos.scala @@ -1,10 +1,10 @@ -package rdts.datatypes.experiments.protocols.simplified +package rdts.datatypes.experiments.protocols.old.simplified import rdts.base.Lattice.mapLattice import rdts.base.LocalUid.replicaId import rdts.base.{Bottom, Lattice, LocalUid, Uid} import rdts.datatypes.experiments.protocols.Participants.participants -import rdts.datatypes.experiments.protocols.simplified.Paxos.given +import Paxos.given import rdts.datatypes.experiments.protocols.{Consensus, Participants} import rdts.datatypes.{GrowOnlySet, LastWriterWins} diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/paper/PaperPaxos.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/paper/PaperPaxos.scala deleted file mode 100644 index 3599a1e0a..000000000 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/paper/PaperPaxos.scala +++ /dev/null @@ -1,127 +0,0 @@ -package rdts.datatypes.experiments.protocols.paper - -import rdts.base.LocalUid.replicaId -import rdts.base.{Lattice, LocalUid, Uid} -import rdts.datatypes.LastWriterWins -import rdts.datatypes.experiments.protocols.simplified.{BallotNum, Voting} -import rdts.datatypes.experiments.protocols.simplified.given_Ordering_BallotNum -import rdts.datatypes.experiments.protocols.{Consensus, Participants} -import Paxos.given - -case class PaxosRound[A](leaderElection: LeaderElection = Voting(), proposals: Voting[A] = Voting[A]()) -type LeaderElection = Voting[Uid] -type Proposal - -case class Paxos[A]( - rounds: Map[BallotNum, PaxosRound[A]] = - Map.empty[BallotNum, PaxosRound[A]] -): - // update functions - def voteLeader(leader: Uid)(using LocalUid, Participants): PaxosRound[A] = - PaxosRound(leaderElection = Voting().voteFor(leader)) - - def voteValue(value: A)(using LocalUid, Participants): PaxosRound[A] = - PaxosRound(proposals = Voting().voteFor(value)) - - // query functions - def nextBallotNum(using LocalUid): BallotNum = - val maxCounter: Long = rounds - .filter((b, _) => b.uid == replicaId) - .map((b, _) => b.counter) - .maxOption - .getOrElse(-1) - BallotNum(replicaId, maxCounter + 1) - def currentRound: Option[(BallotNum, PaxosRound[A])] = rounds.maxOption - def currentBallot: Option[BallotNum] = rounds.maxOption.map(_._1) - def newestBallotWithLeader(using Participants): Option[(BallotNum, PaxosRound[A])] = - rounds.filter(_._2.leaderElection.result.nonEmpty).maxOption - def currentLeaderElection: Option[LeaderElection] = currentRound.map(_._2.leaderElection) - def leaderCandidate: Option[Uid] = currentLeaderElection.map(_.votes.head.value) - def myHighestBallot(using LocalUid): Option[(BallotNum, PaxosRound[A])] = - rounds.filter { case (b, p) => b.uid == replicaId }.maxOption - def lastValueVote: Option[(BallotNum, PaxosRound[A])] = rounds.filter(_._2.proposals.votes.nonEmpty).maxOption - def newestReceivedVal(using LocalUid) = lastValueVote.map(_._2.proposals.votes.head.value) - def myValue(using LocalUid): A = rounds(BallotNum(replicaId, -1)).proposals.votes.head.value - def decidedVal(using Participants): Option[A] = - rounds.collectFirst { case (b, PaxosRound(_, voting)) if voting.result.isDefined => voting.result.get } - - // phases: - def phase1a(using LocalUid, Participants)(value: A): Paxos[A] = - // try to become leader and remember a value for later - Paxos(Map(nextBallotNum -> voteLeader(replicaId), BallotNum(replicaId, -1) -> voteValue(value))) - def phase1a(using LocalUid, Participants): Paxos[A] = - // try to become leader - Paxos(Map(nextBallotNum -> voteLeader(replicaId))) - - def phase1b(using LocalUid, Participants): Paxos[A] = - // vote in the current leader election - (currentBallot, leaderCandidate, lastValueVote) match - case (Some(ballotNum), Some(candidate), None) => - // no value voted for, just vote for candidate - Paxos(Map(ballotNum -> voteLeader(candidate))) - case (Some(ballotNum), Some(candidate), Some(votedVal)) => - // vote for candidate and include value most recently voted for - Paxos(Map( - ballotNum -> voteLeader(candidate), - votedVal - )) - case _ => Paxos() // do nothing - - def phase2a(using LocalUid, Participants): Paxos[A] = - // propose a value if I am the leader - myHighestBallot match - case Some((ballotNum, PaxosRound(leaderElection, _))) - if leaderElection.result.contains(replicaId) => - if newestReceivedVal.nonEmpty then - Paxos(Map(ballotNum -> voteValue(newestReceivedVal.get))) - else - Paxos(Map(ballotNum -> voteValue(myValue))) - case _ => Paxos() // not leader -> do nothing - - def phase2b(using LocalUid, Participants): Paxos[A] = - // get highest ballot with leader and - // vote for the proposed value if there is one - newestBallotWithLeader match - case Some(ballotNum, PaxosRound(leaderElection, proposals)) - if proposals.votes.nonEmpty => - Paxos(Map(ballotNum -> voteValue(proposals.votes.head.value))) - case _ => Paxos() // current leader or proposed value not known -> do nothing - -object Paxos: - given [A]: Lattice[PaxosRound[A]] = Lattice.derived - given l[A]: Lattice[Paxos[A]] = Lattice.derived - - given [A]: Ordering[(BallotNum, PaxosRound[A])] with - override def compare(x: (BallotNum, PaxosRound[A]), y: (BallotNum, PaxosRound[A])): Int = (x, y) match - case ((x, _), (y, _)) => Ordering[BallotNum].compare(x, y) - - given consensus: Consensus[Paxos] with - extension [A](c: Paxos[A]) - override def propose(value: A)(using LocalUid, Participants): Paxos[A] = - // check if I can propose a value - val afterProposal = c.phase2a - if Lattice[Paxos[A]].subsumption(afterProposal, c) then - // proposing did not work, try to become leader - c.phase1a(value) - else - afterProposal - extension [A](c: Paxos[A])(using Participants) - override def decision: Option[A] = c.decidedVal - extension [A](c: Paxos[A]) - // upkeep can be used to perform the next protocol step automatically - override def upkeep()(using LocalUid, Participants): Paxos[A] = - // check which phase we are in - c.currentRound match - case Some((ballotNum, PaxosRound(leaderElection, _))) if leaderElection.result.nonEmpty => - // we have a leader -> phase 2 - if leaderElection.result.get == replicaId then - c.phase2a - else - c.phase2b - // we are in the process of electing a new leader - case _ => - c.phase1b - - override def empty[A]: Paxos[A] = Paxos() - - override def lattice[A]: Lattice[Paxos[A]] = l diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/Voting.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/Voting.scala deleted file mode 100644 index c789496ad..000000000 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/simplified/Voting.scala +++ /dev/null @@ -1,75 +0,0 @@ -package rdts.datatypes.experiments.protocols.simplified - -import rdts.base.LocalUid.replicaId -import rdts.base.{Bottom, Lattice, LocalUid, Uid} -import rdts.datatypes.experiments.protocols.Participants -import rdts.datatypes.experiments.protocols.Participants.participants -import rdts.datatypes.{Epoch, GrowOnlyMap, GrowOnlySet} - -case class Vote[A](value: A, voter: Uid) - -case class Voting[A](votes: Set[Vote[A]] = Set.empty[Vote[A]]) { - def threshold(using Participants): Int = participants.size / 2 + 1 - - def result(using Participants): Option[A] = - leadingCount match - case Some((v, count)) if count >= threshold => Some(v) - case _ => None - - def voteFor(v: A)(using LocalUid, Participants): Voting[A] = - if !participants.contains(replicaId) || votes.exists { case Vote(_, voter) => voter == replicaId } - then Voting.unchanged // already voted! - else - Voting(Set(Vote(v, replicaId))) - - def leadingCount: Option[(A, Int)] = - val grouped: Map[A, Int] = votes.groupBy(_.value).map((value, vts) => (value, vts.size)) - grouped.maxByOption((_, size) => size) -} - -object Voting { - given lattice[A]: Lattice[Voting[A]] = Lattice.derived - - given bottom[A](using Participants): Bottom[Voting[A]] with - override def empty: Voting[A] = unchanged - - def unchanged[A](using Participants): Voting[A] = Voting(Set.empty) -} - -type LeaderElection = Voting[Uid] - -case class MultiRoundVoting[A](rounds: Epoch[Voting[A]]): - def release(using Participants): MultiRoundVoting[A] = - MultiRoundVoting(Epoch(rounds.counter + 1, Voting.unchanged)) - - def upkeep(using LocalUid, Participants): MultiRoundVoting[A] = - rounds.value.leadingCount match - case Some(value, count) if checkIfMajorityPossible => voteFor(value) - case Some(_) => release // we have a leading proposal but majority is not possible anymore - case None => MultiRoundVoting.unchanged // no change yet - - def checkIfMajorityPossible(using Participants): Boolean = - val totalVotes = rounds.value.votes.size - val remainingVotes = participants.size - totalVotes - val possible = rounds.value.leadingCount.map((_, count) => (count + remainingVotes) >= rounds.value.threshold) - possible.getOrElse(true) // if there is no leading vote, majority is always possible - - // api - def voteFor(c: A)(using LocalUid, Participants): MultiRoundVoting[A] = - MultiRoundVoting(Epoch(rounds.counter, rounds.value.voteFor(c))) - - def result(using Participants): Option[A] = - rounds.value.result - -object MultiRoundVoting { - def unchanged[A](using Participants): MultiRoundVoting[A] = MultiRoundVoting(Epoch.empty[Voting[A]]) - given lattice[A]: Lattice[MultiRoundVoting[A]] = Lattice.derived -} - -case class BallotNum(uid: Uid, counter: Long) - -given Ordering[BallotNum] with - override def compare(x: BallotNum, y: BallotNum): Int = - if x.counter > y.counter then 1 - else if x.counter < y.counter then -1 - else Ordering[Uid].compare(x.uid, y.uid) diff --git a/Modules/RDTs/src/test/scala/test/rdts/protocols/MembershipTest.scala b/Modules/RDTs/src/test/scala/test/rdts/protocols/MembershipTest.scala index d791e713b..499db69c2 100644 --- a/Modules/RDTs/src/test/scala/test/rdts/protocols/MembershipTest.scala +++ b/Modules/RDTs/src/test/scala/test/rdts/protocols/MembershipTest.scala @@ -2,8 +2,8 @@ package test.rdts.protocols import rdts.base.Lattice.syntax import rdts.base.{LocalUid, Uid} -import rdts.datatypes.experiments.protocols.Membership -import rdts.datatypes.experiments.protocols.simplified.{MultiRoundVoting, Paxos, Voting} +import rdts.datatypes.experiments.protocols.old.simplified.Paxos +import rdts.datatypes.experiments.protocols.{Membership, MultiRoundVoting, Voting} class MembershipTest extends munit.FunSuite { diff --git a/Modules/RDTs/src/test/scala/test/rdts/protocols/PaxosTest.scala b/Modules/RDTs/src/test/scala/test/rdts/protocols/PaxosTest.scala index 778e90fd1..c8a910c65 100644 --- a/Modules/RDTs/src/test/scala/test/rdts/protocols/PaxosTest.scala +++ b/Modules/RDTs/src/test/scala/test/rdts/protocols/PaxosTest.scala @@ -3,8 +3,8 @@ package test.rdts.protocols import rdts.base.Lattice.merge import rdts.base.{Bottom, LocalUid, Uid} import rdts.datatypes.GrowOnlyMap.* -import rdts.datatypes.experiments.protocols.Paxos -import rdts.datatypes.experiments.protocols.Paxos.* +import rdts.datatypes.experiments.protocols.old.Paxos +import rdts.datatypes.experiments.protocols.old.Paxos.* import rdts.datatypes.{GrowOnlyCounter, GrowOnlyMap} import rdts.dotted.{Dotted, DottedLattice} import rdts.time.Dots diff --git a/Modules/RDTs/src/test/scala/test/rdts/protocols/SimpleVotingTests.scala b/Modules/RDTs/src/test/scala/test/rdts/protocols/SimpleVotingTests.scala index 95e016fe4..792569a0c 100644 --- a/Modules/RDTs/src/test/scala/test/rdts/protocols/SimpleVotingTests.scala +++ b/Modules/RDTs/src/test/scala/test/rdts/protocols/SimpleVotingTests.scala @@ -2,8 +2,7 @@ package test.rdts.protocols import rdts.base.Lattice.syntax.merge import rdts.base.{Lattice, LocalUid, Uid} -import rdts.datatypes.experiments.protocols.Participants -import rdts.datatypes.experiments.protocols.simplified.{LeaderElection, MultiRoundVoting, Voting} +import rdts.datatypes.experiments.protocols.{LeaderElection, MultiRoundVoting, Participants, Voting} class SimpleVotingTests extends munit.FunSuite { diff --git a/Modules/RDTs/src/test/scala/test/rdts/protocols/VotingTests2Participants.scala b/Modules/RDTs/src/test/scala/test/rdts/protocols/VotingTests2Participants.scala index 0c3e09d04..ca9f67185 100644 --- a/Modules/RDTs/src/test/scala/test/rdts/protocols/VotingTests2Participants.scala +++ b/Modules/RDTs/src/test/scala/test/rdts/protocols/VotingTests2Participants.scala @@ -3,7 +3,7 @@ package test.rdts.protocols import rdts.base.{Lattice, LocalUid, Uid} import rdts.datatypes.contextual.ReplicatedSet import rdts.datatypes.experiments.protocols -import rdts.datatypes.experiments.protocols.{Vote, Voting} +import rdts.datatypes.experiments.protocols.old.{Vote, Voting} import rdts.datatypes.{Epoch, LastWriterWins} import rdts.dotted.{Dotted, DottedLattice} import rdts.time.Dots @@ -14,7 +14,7 @@ class VotingTests2Participants extends munit.FunSuite { // create replicas for set of 2 participants val id1: LocalUid = LocalUid.gen() val id2: LocalUid = LocalUid.gen() - var voting = Dotted(protocols.Voting( + var voting = Dotted(Voting( rounds = Epoch.empty[ReplicatedSet[Vote]], numParticipants = LastWriterWins.now(2) )) @@ -34,12 +34,12 @@ class VotingTests2Participants extends munit.FunSuite { assert(!voting.data.isOwner(using id2)) } test("Is not owner for 4 participants") { - voting = voting.merge(Dotted(protocols.Voting(voting.data.rounds, LastWriterWins.now(4)))) + voting = voting.merge(Dotted(Voting(voting.data.rounds, LastWriterWins.now(4)))) assert(!voting.data.isOwner(using id1)) assert(!voting.data.isOwner(using id2)) } test("Is owner for 3 participants") { - voting = voting.merge(Dotted(protocols.Voting(voting.data.rounds, LastWriterWins.now(3)))) + voting = voting.merge(Dotted(Voting(voting.data.rounds, LastWriterWins.now(3)))) assert(voting.data.isOwner(using id1)) assert(!voting.data.isOwner(using id2)) } diff --git a/Modules/RDTs/src/test/scala/test/rdts/protocols/paper/PaperPaxosTest.scala b/Modules/RDTs/src/test/scala/test/rdts/protocols/paper/PaperPaxosTest.scala index dc676281d..7dea02f45 100644 --- a/Modules/RDTs/src/test/scala/test/rdts/protocols/paper/PaperPaxosTest.scala +++ b/Modules/RDTs/src/test/scala/test/rdts/protocols/paper/PaperPaxosTest.scala @@ -1,9 +1,8 @@ package test.rdts.protocols.paper import rdts.base.LocalUid -import rdts.datatypes.experiments.protocols.Participants -import rdts.datatypes.experiments.protocols.paper.Paxos -import rdts.datatypes.experiments.protocols.paper.Paxos.given +import rdts.datatypes.experiments.protocols.{Participants, Paxos} +import Paxos.given class PaperPaxosTest extends munit.FunSuite { diff --git a/Modules/RDTs/src/test/scala/test/rdts/protocols/simplified/GenPaxosTest.scala b/Modules/RDTs/src/test/scala/test/rdts/protocols/simplified/GenPaxosTest.scala index 42386166c..3032fc26c 100644 --- a/Modules/RDTs/src/test/scala/test/rdts/protocols/simplified/GenPaxosTest.scala +++ b/Modules/RDTs/src/test/scala/test/rdts/protocols/simplified/GenPaxosTest.scala @@ -2,8 +2,8 @@ package test.rdts.protocols.simplified import rdts.base.LocalUid import rdts.datatypes.experiments.protocols.Participants -import rdts.datatypes.experiments.protocols.simplified.GeneralizedPaxos -import rdts.datatypes.experiments.protocols.simplified.GeneralizedPaxos.given +import rdts.datatypes.experiments.protocols.old.simplified.GeneralizedPaxos +import GeneralizedPaxos.given class GenPaxosTest extends munit.FunSuite { diff --git a/Modules/RDTs/src/test/scala/test/rdts/protocols/simplified/SimplePaxosTest.scala b/Modules/RDTs/src/test/scala/test/rdts/protocols/simplified/SimplePaxosTest.scala index 1506fe4ae..6b30c6f9f 100644 --- a/Modules/RDTs/src/test/scala/test/rdts/protocols/simplified/SimplePaxosTest.scala +++ b/Modules/RDTs/src/test/scala/test/rdts/protocols/simplified/SimplePaxosTest.scala @@ -3,8 +3,8 @@ package test.rdts.protocols.simplified import rdts.base.{Bottom, LocalUid, Uid} import rdts.datatypes.experiments.protocols.Participants import rdts.datatypes.experiments.protocols.Participants.participants -import rdts.datatypes.experiments.protocols.simplified.Paxos -import rdts.datatypes.experiments.protocols.simplified.Paxos.{*, given} +import rdts.datatypes.experiments.protocols.old.simplified.Paxos +import rdts.datatypes.experiments.protocols.old.simplified.Paxos.given import rdts.time.Dots import scala.math.Ordering.Implicits.infixOrderingOps