Skip to content

Commit

Permalink
fix: ensure consistency between chain and ledger (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Oct 15, 2023
1 parent 3df3f67 commit 0401482
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 107 deletions.
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,5 @@ pub enum PullEvent {
pub enum RollEvent {
Apply(BlockSlot, BlockHash, RawBlock),
Undo(BlockSlot, BlockHash, RawBlock),
Reset(Point),
Origin,
}
20 changes: 7 additions & 13 deletions src/sync/apply.rs → src/sync/chain.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
use gasket::framework::*;
use pallas::ledger::configs::byron::GenesisFile;
use pallas::storage::rolldb::chain;
use tracing::info;

use crate::prelude::*;
use crate::storage::applydb::ApplyDB;

pub type UpstreamPort = gasket::messaging::tokio::InputPort<RollEvent>;

#[derive(Stage)]
#[stage(name = "apply", unit = "RollEvent", worker = "Worker")]
#[stage(name = "chain", unit = "RollEvent", worker = "Worker")]
pub struct Stage {
ledger: ApplyDB,
chain: chain::Store,
genesis: GenesisFile,

pub upstream: UpstreamPort,

Expand All @@ -24,11 +21,9 @@ pub struct Stage {
}

impl Stage {
pub fn new(ledger: ApplyDB, chain: chain::Store, genesis: GenesisFile) -> Self {
pub fn new(chain: chain::Store) -> Self {
Self {
ledger,
chain,
genesis,
upstream: Default::default(),
// downstream: Default::default(),
block_count: Default::default(),
Expand Down Expand Up @@ -64,17 +59,16 @@ impl gasket::framework::Worker<Stage> for Worker {
.roll_forward(*slot, *hash, cbor.clone())
.or_panic()?;

stage.ledger.apply_block(cbor).or_panic()?;
info!(slot, "roll forward");
}
RollEvent::Undo(slot, _, cbor) => {
RollEvent::Undo(slot, _, _) => {
stage.chain.roll_back(*slot).or_panic()?;
stage.ledger.undo_block(cbor).or_panic()?;
info!(slot, "rollback");
}
RollEvent::Origin => {
stage.chain.roll_back_origin().or_panic()?;
stage.ledger.apply_origin(&stage.genesis).or_panic()?
info!("rollback to origin");
}
RollEvent::Reset(_) => todo!(),
};

Ok(())
Expand Down
69 changes: 69 additions & 0 deletions src/sync/ledger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use gasket::framework::*;
use pallas::ledger::configs::byron::GenesisFile;

use crate::prelude::*;
use crate::storage::applydb::ApplyDB;

pub type UpstreamPort = gasket::messaging::tokio::InputPort<RollEvent>;

#[derive(Stage)]
#[stage(name = "ledger", unit = "RollEvent", worker = "Worker")]
pub struct Stage {
ledger: ApplyDB,
genesis: GenesisFile,

pub upstream: UpstreamPort,

#[metric]
block_count: gasket::metrics::Counter,

#[metric]
wal_count: gasket::metrics::Counter,
}

impl Stage {
pub fn new(ledger: ApplyDB, genesis: GenesisFile) -> Self {
Self {
ledger,
genesis,
upstream: Default::default(),
// downstream: Default::default(),
block_count: Default::default(),
wal_count: Default::default(),
}
}
}

impl Stage {}

pub struct Worker;

#[async_trait::async_trait(?Send)]
impl gasket::framework::Worker<Stage> for Worker {
async fn bootstrap(_stage: &Stage) -> Result<Self, WorkerError> {
Ok(Self)
}

async fn schedule(
&mut self,
stage: &mut Stage,
) -> Result<WorkSchedule<RollEvent>, WorkerError> {
let msg = stage.upstream.recv().await.or_panic()?;

Ok(WorkSchedule::Unit(msg.payload))
}

async fn execute(&mut self, unit: &RollEvent, stage: &mut Stage) -> Result<(), WorkerError> {
match unit {
RollEvent::Apply(_, _, cbor) => {
stage.ledger.apply_block(cbor).or_panic()?;
}
RollEvent::Undo(_, _, cbor) => {
stage.ledger.undo_block(cbor).or_panic()?;
}
RollEvent::Origin => stage.ledger.apply_origin(&stage.genesis).or_panic()?,
};

Ok(())
}
}
42 changes: 21 additions & 21 deletions src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use gasket::messaging::{RecvPort, SendPort};
use pallas::ledger::configs::byron::GenesisFile;
use pallas::storage::rolldb::{chain, wal};
use pallas::storage::rolldb::chain::Store as ChainStore;
use pallas::storage::rolldb::wal::Store as WalStore;
use serde::Deserialize;

use crate::prelude::*;
use crate::storage::applydb::ApplyDB;

pub mod apply;
pub mod chain;
pub mod ledger;
pub mod pull;
pub mod roll;

Expand All @@ -18,8 +20,8 @@ pub struct Config {

pub fn pipeline(
config: &Config,
wal: wal::Store,
chain: chain::Store,
wal: WalStore,
chain: ChainStore,
ledger: ApplyDB,
genesis: GenesisFile,
policy: &gasket::runtime::Policy,
Expand All @@ -36,35 +38,33 @@ pub fn pipeline(
pull_cursor,
);

let chain_cursor = chain.find_tip().map_err(Error::storage)?;
let ledger_cursor = ledger.cursor().map_err(Error::storage)?;
let cursor_chain = chain.find_tip().map_err(Error::storage)?;
let cursor_ledger = ledger.cursor().map_err(Error::storage)?;

// this is a business invariant, the state of the chain and ledger stores should
// "match". Since there's no concept of transaction spanning both stores, we do
// an eager check at bootstrap
assert_eq!(
chain_cursor, ledger_cursor,
"chain and ledger cursor don't match"
);

let mut roll = roll::Stage::new(wal, chain_cursor);
let mut roll = roll::Stage::new(wal, cursor_chain, cursor_ledger);

let mut apply = apply::Stage::new(ledger, chain, genesis);
let mut chain = chain::Stage::new(chain);
let mut ledger = ledger::Stage::new(ledger, genesis);

let (to_roll, from_pull) = gasket::messaging::tokio::mpsc_channel(50);
pull.downstream.connect(to_roll);
roll.upstream.connect(from_pull);

let (to_apply, from_roll) = gasket::messaging::tokio::mpsc_channel(50);
roll.downstream.connect(to_apply);
apply.upstream.connect(from_roll);
let (to_chain, from_roll) = gasket::messaging::tokio::mpsc_channel(50);
roll.downstream_chain.connect(to_chain);
chain.upstream.connect(from_roll);

let (to_ledger, from_roll) = gasket::messaging::tokio::mpsc_channel(50);
roll.downstream_ledger.connect(to_ledger);
ledger.upstream.connect(from_roll);

// output to outside of out pipeline
// apply.downstream.connect(output);

let pull = gasket::runtime::spawn_stage(pull, policy.clone());
let roll = gasket::runtime::spawn_stage(roll, policy.clone());
let apply = gasket::runtime::spawn_stage(apply, policy.clone());
let chain = gasket::runtime::spawn_stage(chain, policy.clone());
let ledger = gasket::runtime::spawn_stage(ledger, policy.clone());

Ok(gasket::daemon::Daemon(vec![pull, roll, apply]))
Ok(gasket::daemon::Daemon(vec![pull, roll, chain, ledger]))
}
Loading

0 comments on commit 0401482

Please sign in to comment.