Skip to content

Commit

Permalink
M src/list/raxos/protocal/Cargo.toml
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Sep 22, 2024
1 parent d2c4245 commit f17d1fb
Show file tree
Hide file tree
Showing 21 changed files with 254 additions and 424 deletions.
4 changes: 3 additions & 1 deletion src/list/raxos/protocal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
thiserror = "1.0.63"
validit = { version = "0.2.4" }


[dev-dependencies]
maplit = "1.0.2"
maplit = "1.0.2"
37 changes: 12 additions & 25 deletions src/list/raxos/protocal/src/apaxos/acceptor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::BTreeSet;
use std::collections::HashSet;
use std::error::Error;
use std::fmt::Debug;
use std::fmt::Formatter;

use validit::Validate;

Expand All @@ -13,7 +12,7 @@ use crate::Types;
pub struct Acceptor<T: Types> {
/// A Time that is smaller than any one of these time
/// is not allow to commit.
pub forbidden_commit_time: BTreeSet<T::Time>,
pub forbidden_commit_time: HashSet<T::Time>,

pub history: T::History,
}
Expand All @@ -24,18 +23,6 @@ impl<T: Types> Validate for Acceptor<T> {
}
}

impl<T: Types> Default for Acceptor<T>
where
T::Time: std::hash::Hash,
{
fn default() -> Self {
Self {
forbidden_commit_time: Default::default(),
history: Default::default(),
}
}
}

impl<T: Types> Acceptor<T> {
/// Handle the phase-1 request from a [`Proposer`], i.e., set up a new
/// [`Time`] point.
Expand All @@ -48,27 +35,27 @@ impl<T: Types> Acceptor<T> {
/// For example, **2PC** will revert the `Time` if the coordinator receives
/// conflicting votes(otherwise other [`Proposer`] can not proceed). But
/// **Classic Paxos** does not have to revert the `Time` but it could.
pub(crate) fn handle_phase1_request(&mut self, commit_time: T::Time) -> (T::Time, Self) {
if self.is_committable(commit_time) {
pub(crate) fn handle_phase1_request(&mut self, commit_time: T::Time) -> (T::Time, T::History) {
if self.is_committable(&commit_time) {
return (commit_time, self.history.visible(commit_time));
}

self.forbidden_commit_time.insert(commit_time);
(commit_time, self.history.visible(commit_time))
}


pub(crate) fn handle_phase2_request(&mut self, history: T::History) -> bool {
dbg!("handle_phase2_request", history);

let maximals = history.maximals().collect::<Vec<_>>();
dbg!("handle_phase2_request", maximals);

assert_eq!(maximals.len(), 1, "A proposer commit a history reachable from only one single time");
let mut maximals = history.maximal_times();
let new_written_time = maximals.next().unwrap();

let cc = maximals[0];
assert!(
maximals.next().is_none(),
"A proposer commit a history reachable from only one single time"
);

if self.is_committable(cc) {
if self.is_committable(&new_written_time) {
return false;
}

Expand All @@ -80,7 +67,7 @@ impl<T: Types> Acceptor<T> {
/// Check it is allowed to commit at the specified time.
fn is_committable(&self, time: &T::Time) -> bool {
for t in self.forbidden_commit_time {
if t.greater_equal(time) && t != time {
if t.is_gt(time) {
return false;
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/list/raxos/protocal/src/apaxos/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use std::collections::BTreeMap;

use crate::Types;

#[derive(thiserror::Error)]
pub enum APError<T: Types> {
ReadQuorumNotReached {
accepted: BTreeMap<T::AcceptorId, ()>,
},
WriteQuorumNotReached {
accepted: BTreeMap<T::AcceptorId, ()>,
},
}
6 changes: 6 additions & 0 deletions src/list/raxos/protocal/src/apaxos/greater_equal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
/// absent, transitioning from `c` to `a` is not allowed.
// TODO: consider add RHS as parameter to GreaterEqual
pub trait GreaterEqual {
/// if greater than `other`
fn is_gt(&self, other: &Self) -> bool
where Self: PartialEq {
self.greater_equal(other) && self != other
}

fn greater_equal(&self, other: &Self) -> bool;
}

Expand Down
75 changes: 67 additions & 8 deletions src/list/raxos/protocal/src/apaxos/history.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,85 @@
use std::fmt::Debug;

use crate::apaxos::greater_equal::GreaterEqual;
use crate::Types;

pub struct TimeEvent<T: Types> {
time: T::Time,
event: T::Event,
}

pub trait History<T: Types>: Default {
fn append(&mut self, time_event: TimeEvent<T>);
fn get(&self, time: T::Time) -> Option<&T::Event>;
impl<T: Types> TimeEvent<T> {
pub fn new(time: T::Time, event: T::Event) -> Self {
Self { time, event }
}
}

pub trait History<T: Types>
where Self: Default + Debug + Clone
{
fn append(&mut self, time: T::Time, event: T::Event);

fn get(&self, time: &T::Time) -> Option<&T::Event>;

/// Return a sub set of the history that is visible at `time`.
///
/// In other words, a sub set of TimeEvent that is less than or equal to `time`.
/// In other words, a sub set of TimeEvent that is less than or equal to
/// `time`.
fn visible(&self, time: T::Time) -> Self;

/// Return the maximal [`TimeEvent`] in the history.
/// Return the maximal [`Time`] and [`Event`] in the history.
///
/// `maximal` is defined as:
/// g in P is a maximal element:
/// if there is no element a in P such that a > g
fn maximals(&self) -> Vec<&TimeEvent<T>>;
///
/// All `maximal` have no order between them.
fn maximals(&self) -> impl Iterator<Item = (T::Time, T::Event)>;

fn maximal_times(&self) -> impl Iterator<Item = T::Time> {
self.maximals().map(|(t, _)| t)
}

/// Merge two [`History`]
///
/// Note that if there are `maximal` that have an order, the smaller one
/// will be removed. Because a `reader` only choose the greater branch.
fn merge(&mut self, other: Self)
where Self: sealed::Seal {
let mut res = Self::default();

for my_maximal in self.maximal_times() {
if !other.greater_equal(&my_maximal) {
res.do_merge(self.visible(my_maximal));
}
}

fn merge(&mut self, other: Self);
}
for other_maximal in other.maximal_times() {
if !self.greater_equal(&other_maximal) {
res.do_merge(other.visible(other_maximal));
}
}

*self = res;
}

/// Check if a [`History`] is greater or equal to a given time.
///
/// In other word, if there is a [`Time`] in this history that is greater or
/// equal the given time.
fn greater_equal(&self, t: &T::Time) -> bool {
for max_time in self.maximal_times() {
if max_time.greater_equal(t) {
return true;
}
}
false
}

fn do_merge(&mut self, other: Self);
}

mod sealed {
pub trait Seal {}
impl<T> Seal for T {}
}
5 changes: 3 additions & 2 deletions src/list/raxos/protocal/src/apaxos/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
pub mod accepted;
pub mod history;
pub mod acceptor;
pub mod greater_equal;
pub mod greater_equal_map;
pub mod history;
pub mod proposal;
pub mod proposer;
pub mod ptime;

pub mod errors;
25 changes: 13 additions & 12 deletions src/list/raxos/protocal/src/apaxos/proposer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use phase1::Phase1;
use phase2::Phase2;

use crate::apaxos::proposal::Proposal;
use crate::apaxos::errors::APError;
use crate::apaxos::history::History;
use crate::APaxos;
use crate::Types;

Expand All @@ -13,28 +14,27 @@ mod phase2;
pub struct Proposer<'a, T: Types> {
apaxos: &'a mut APaxos<T>,
time: T::Time,
proposal: Proposal<T, T::Event>,
event: T::Event,
}

impl<'a, T: Types> Proposer<'a, T> {
/// Create an instance of [`APaxos`] that tries to commit `value` at `time`
/// to the distributed system.
pub fn new(apaxos: &'a mut APaxos<T>, time: T::Time, value: T::Event) -> Self {
pub fn new(apaxos: &'a mut APaxos<T>, time: T::Time, event: T::Event) -> Self {
Self {
apaxos,
time,
proposal: Proposal::new(time, value),
event,
}
}

pub fn run(&mut self) -> Proposal<T, T::Event> {
let maybe_committed = self.new_phase1().run();
let committed = self.new_phase2(maybe_committed).run();
pub fn run(&mut self) -> Result<T::History, APError<T>> {
let maybe_committed = self.new_phase1().run()?;
let committed = self.new_phase2(maybe_committed).run()?;

committed
Ok(committed)
}

// TODO: phase-1-revert
fn new_phase1(&mut self) -> Phase1<T> {
Phase1 {
apaxos: &mut self.apaxos,
Expand All @@ -44,12 +44,13 @@ impl<'a, T: Types> Proposer<'a, T> {
}
}

fn new_phase2(&mut self, maybe_committed: Option<Proposal<T, T::Event>>) -> Phase2<T> {
fn new_phase2(&mut self, mut maybe_committed: T::History) -> Phase2<T> {
maybe_committed.append(self.time, self.event.clone());
Phase2 {
apaxos: &mut self.apaxos,
time: self.time,
decided: maybe_committed.unwrap_or_else(|| self.proposal.clone()),
granted: Default::default(),
decided: maybe_committed,
accepted: Default::default(),
}
}
}
Loading

0 comments on commit f17d1fb

Please sign in to comment.