Skip to content

Commit

Permalink
improve paxos interface by caching proposed values
Browse files Browse the repository at this point in the history
  • Loading branch information
haaase committed Aug 26, 2024
1 parent 594b114 commit aa4d00b
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package rdts.datatypes.experiments.protocols.simplified

import rdts.base.Lattice.mapLattice
import rdts.base.LocalUid.replicaId
import rdts.base.{Lattice, LocalUid, Uid}
import rdts.datatypes.GrowOnlySet
import rdts.datatypes.experiments.protocols.Consensus
import rdts.datatypes.experiments.protocols.simplified.Paxos.given
import rdts.datatypes.{GrowOnlySet, LastWriterWins}

import scala.math.Ordering.Implicits.infixOrderingOps

Expand All @@ -18,18 +20,12 @@ case class Accept[A](proposal: ProposalNum, value: A)

case class Accepted[A](proposal: ProposalNum, acceptor: Uid)

given Ordering[ProposalNum] with
override def compare(x: ProposalNum, y: ProposalNum): Int =
if x.number > y.number then 1
else if x.number == y.number then Ordering[Uid].compare(x.proposer, y.proposer)
else -1

case class Paxos[A](
prepares: GrowOnlySet[Prepare],
promises: GrowOnlySet[Promise[A]],
accepts: GrowOnlySet[Accept[A]],
accepted: GrowOnlySet[Accepted[A]],
members: Set[Uid] // constant
members: Map[Uid, Option[LastWriterWins[A]]] // constant
) {

private def quorum: Int = members.size / 2 + 1
Expand All @@ -39,7 +35,6 @@ case class Paxos[A](

def chooseProposalNumber(using LocalUid): ProposalNum =
val highestNum = prepares
.filter(_.proposal.proposer == replicaId)
.map(_.proposal.number)
.maxOption.getOrElse(-1)
ProposalNum(highestNum + 1, replicaId)
Expand Down Expand Up @@ -89,10 +84,20 @@ case class Paxos[A](
.flatMap(_.highestAccepted)
.maxByOption((p, v) => p)
.map((p, v) => v)
copy(accepts = accepts + Accept(proposal, acceptedValue.getOrElse(v)))
copy(
accepts = accepts + Accept(proposal, acceptedValue.getOrElse(v)),
members = members.updated(replicaId, Some(LastWriterWins.now(v)))
)
else
this // quorum not reached, do nothing

def propose(v: A)(using LocalUid): Paxos[A] =
// find my newest proposalNum
val proposalNum = prepares.filter(_.proposal.proposer == replicaId).maxByOption(_.proposal)
proposalNum match
case Some(Prepare(prop)) => propose(prop, v)
case None => this

// phase 2b
def accept(proposal: ProposalNum)(using LocalUid): Paxos[A] =
// check if I have already promised a newer proposal
Expand All @@ -107,36 +112,43 @@ case class Paxos[A](
}

object Paxos:

def unchanged[A]: Paxos[A] =
Paxos[A](
GrowOnlySet.empty[Prepare],
GrowOnlySet.empty[Promise[A]],
GrowOnlySet.empty[Accept[A]],
GrowOnlySet.empty[Accepted[A]],
Set.empty
Map.empty
)

def init[A](members: Set[Uid]): Paxos[A] =
require(members.nonEmpty, "Cannot initialize Paxos with empty set of members.")
unchanged[A].copy(members = members)
unchanged[A].copy(members = members.map((_, None)).toMap)

given Ordering[ProposalNum] with
override def compare(x: ProposalNum, y: ProposalNum): Int =
if x.number > y.number then 1
else if x.number == y.number then Ordering[Uid].compare(x.proposer, y.proposer)
else -1

given [A]: Lattice[Paxos[A]] = Lattice.derived

given consensus: Consensus[Paxos] with
extension [A](c: Paxos[A])
override def write(value: A)(using LocalUid): Paxos[A] =
def becomeLeader = c.prepare().copy(members = c.members.updated(replicaId, Some(LastWriterWins.now(value))))

val myNewestProposal = c.prepares.filter(_.proposal.proposer == replicaId).map(_.proposal).maxOption
myNewestProposal match
case Some(proposal) =>
// check if proposing does anything
// check if proposing does anything (i.e. I am the leader)
val proposed = c.propose(proposal, value)
if Lattice[Paxos[A]].lteq(proposed, c) then
// proposing did not work, try to become leader
c.prepare()
becomeLeader
else
proposed
case None => c.prepare() // no proposals yet, try to become leader
case None => becomeLeader // no proposals yet, try to become leader
extension [A](c: Paxos[A])
override def read: Option[A] =
val acceptancePerProposal: Map[ProposalNum, Set[Accepted[A]]] = c.accepted.groupBy(_.proposal)
Expand All @@ -146,20 +158,28 @@ object Paxos:
if votes.size >= c.quorum
yield acceptedProposal.value
extension [A](c: Paxos[A])
override def members: Set[Uid] = c.members
override def members: Set[Uid] = c.members.keySet
extension [A](c: Paxos[A])
override def reset(newMembers: Set[Uid]): Paxos[A] = Paxos.init(members = newMembers)
extension [A](c: Paxos[A])
override def upkeep()(using LocalUid): Paxos[A] =
// check if the newest accept is newer than the newest prepare message
(c.accepts.maxByOption(_.proposal), c.prepares.maxByOption(_.proposal)) match
case (Some(accept), Some(prepare)) if accept.proposal >= prepare.proposal =>
case (Some(Accept(a, _)), Some(Prepare(p))) if a >= p =>
// we are in phase 2
c.accept(accept.proposal)
case (_, Some(prepare)) =>
c.accept(a)
case (_, Some(Prepare(proposal))) =>
// we are in phase 1
c.promise(prepare.proposal)
val d = c.promise(proposal)

// check if we have become the leader and can start phase 2 by proposing a value
if proposal.proposer == replicaId && // we proposed the leading proposal
d.promises.count(_.proposal == proposal) >= d.quorum && // it has reached a quorum
d.members(proposal.proposer).nonEmpty // we know what value to propose
then
d.propose(proposal, d.members(proposal.proposer).get.value)
else d

case _ =>
// there are no prepare messages, do nothing
Paxos.unchanged

167 changes: 167 additions & 0 deletions Modules/RDTs/src/test/scala/test/rdts/protocols/SimplePaxosTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package test.rdts.protocols.simplified

import rdts.base.{Bottom, LocalUid, Uid}
import rdts.datatypes.GrowOnlyMap
import rdts.datatypes.experiments.protocols.simplified.Paxos
import rdts.datatypes.experiments.protocols.simplified.Paxos.{*, given}
import rdts.time.Dots

import scala.math.Ordering.Implicits.infixOrderingOps

class SimplePaxosTest extends munit.FunSuite {
given Bottom[Int] with
override def empty: Int = Int.MinValue

given dots: Dots = Dots.empty
val id1 = LocalUid.gen()
val id2 = LocalUid.gen()
val id3 = LocalUid.gen()

given members: Set[Uid] = Set(id1, id2, id3).map(_.uid)

var emptyPaxosObject: Paxos[Int] = Paxos.init(members)

// test("Merge fails with different members") {
// val p1: Paxos[Int] = Paxos.unchanged.copy(members = Set(id1).map(_.uid))
// val p2: Paxos[Int] = Paxos.unchanged.copy(members = Set(id2).map(_.uid))
// interceptMessage[IllegalArgumentException](
// "requirement failed: cannot merge two Paxos instances with differing members"
// ) {
// p1 `merge` p2
// }
// }

test("Paxos for 3 participants without errors") {
var a: Paxos[Int] = emptyPaxosObject

val proposal = a.chooseProposalNumber(using id1)
a = a `merge` a.prepare()(using id1)
a = a `merge` a.upkeep()(using id1) `merge` a.upkeep()(using id2) `merge` a.upkeep()(using id3)
assertEquals(a.read, None)
a = a `merge` a.propose(proposal, 1)(using id1)
a = a `merge` a.upkeep()(using id1) `merge` a.upkeep()(using id2) `merge` a.upkeep()(using id3)
assertEquals(a.read, Some(1))
}

test("newer proposal numbers are bigger") {
var testPaxosObject = emptyPaxosObject

testPaxosObject = testPaxosObject.merge(testPaxosObject.prepare()(using id1))
val firstProposalNumber = testPaxosObject.prepares.head.proposal
val secondProposalNumber = testPaxosObject.chooseProposalNumber(using id1)

assert(firstProposalNumber < secondProposalNumber)
}

test("No changes for older proposals") {
var testPaxosObject1 = emptyPaxosObject
// replica 1 sends prepare
testPaxosObject1 = testPaxosObject1.merge(testPaxosObject1.prepare()(using id1))
// replica 2 sends prepare
testPaxosObject1 = testPaxosObject1.merge(testPaxosObject1.prepare()(using id2))
testPaxosObject1 =
testPaxosObject1.merge(testPaxosObject1.upkeep()(using id1)).merge(testPaxosObject1.upkeep()(using id2))
var testPaxosObject2 = emptyPaxosObject
// replica 3 sends prepare, with smaller proposal number
testPaxosObject2 = testPaxosObject2.merge(testPaxosObject2.prepare()(using id3))
// replica 1 receives 3's prepare
testPaxosObject1 = testPaxosObject1.merge(testPaxosObject2)
// assert that the object before and after replica 1 calls upkeep is the same
assertEquals(testPaxosObject1, testPaxosObject1.merge(testPaxosObject1.upkeep()(using id1)))
}

test("promise sends previously accepted value") {
var testPaxosObject = emptyPaxosObject

// replica 1 sends prepare
testPaxosObject = testPaxosObject.merge(testPaxosObject.prepare()(using id1))
// replica 2 and 3 receive prepare
testPaxosObject =
testPaxosObject.merge(testPaxosObject.upkeep()(using id2)).merge(testPaxosObject.upkeep()(using id3))
// replica 1 receives 2 and 3's promise
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1))
// replica 1 sends propose(1)
testPaxosObject = testPaxosObject.merge(testPaxosObject.propose(1)(using id1))
// replica 2 and 3 receive propose
testPaxosObject =
testPaxosObject.merge(testPaxosObject.upkeep()(using id2)).merge(testPaxosObject.upkeep()(using id3))
// replica 1 receives accepted
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1))
// replica 2 sends prepare to propose a new value
testPaxosObject = testPaxosObject.merge(testPaxosObject.prepare()(using id2))
// replica 1 receives 2's new prepare, get the value in 1's promise

val promiseValue = testPaxosObject.upkeep()(using id1).promises.maxBy(_.proposal).highestAccepted.get._2
// assert the promise contains the previously accepted value
assertEquals(promiseValue, 1)
}

test("acceptor sends promise with highest proposal number") {
var testPaxosObject = emptyPaxosObject

testPaxosObject = testPaxosObject.merge(testPaxosObject.prepare()(using id1))
.merge(testPaxosObject.prepare()(using id2))
.merge(testPaxosObject.prepare()(using id3))
val highestProposalNumber = testPaxosObject.prepares.map(_.proposal).max
testPaxosObject = testPaxosObject.upkeep()(using id1)
val promiseProposalNumber = testPaxosObject.promises.head.proposal

assertEquals(promiseProposalNumber, highestProposalNumber)
}

test("accept contains value of promise with highest proposal number") {
var testPaxosObject = emptyPaxosObject
// replica 1 sends prepare
testPaxosObject = testPaxosObject.merge(testPaxosObject.prepare()(using id1))
// 1, 2 and 3 receive prepare
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using
id2)).merge(testPaxosObject.upkeep()(using id3))
// 1 sends accept
testPaxosObject = testPaxosObject.merge(testPaxosObject.propose(1)(using id1))
// 1,2 and 3 receive accept
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using
id2)).merge(testPaxosObject.upkeep()(using id3))
// replica 2 sends prepare
testPaxosObject = testPaxosObject.merge(testPaxosObject.prepare()(using id2))
// 1 and 2 receives 2's prepare
testPaxosObject =
testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using id2))
// 2 sends accept, which should contain the value of 1's promise and not the value "2"
val acceptValue = testPaxosObject.propose(2)(using id2).accepts.head.value
// assert that the value in 2's accept message is the value of 1's promise
assertEquals(acceptValue, 1)
}

test("write works as expected") {
var testPaxosObject = emptyPaxosObject
val writeValue = 1
// replica 1 tries to write
testPaxosObject = testPaxosObject.merge(testPaxosObject.write(writeValue)(using id1))
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using
id2)).merge(testPaxosObject.upkeep()(using id3))
assertEquals(testPaxosObject.read, None)
// replica 1 tries to write again
testPaxosObject = testPaxosObject.merge(testPaxosObject.write(writeValue)(using id1))
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using
id2)).merge(testPaxosObject.upkeep()(using id3))
assertEquals(testPaxosObject.read, Some(writeValue))
}

test("concurrent writes") {
var testPaxosObject = emptyPaxosObject
// replica 1 and 2 try to write
testPaxosObject =
testPaxosObject.merge(testPaxosObject.write(1)(using id1)).merge(testPaxosObject.write(2)(using id2))
// deliver prepares
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using
id2)).merge(testPaxosObject.upkeep()(using id3))
assertEquals(testPaxosObject.read, None)
// deliver proposal
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using
id2)).merge(testPaxosObject.upkeep()(using id3))
// deliver accepted
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using
id2)).merge(testPaxosObject.upkeep()(using id3))
assert(clue(testPaxosObject.read) == Some(2) || clue(testPaxosObject.read) == Some(1))
}
}

0 comments on commit aa4d00b

Please sign in to comment.