From 412f24584bfb17454be5c1ca570621e1a7217879 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Fri, 6 Oct 2023 09:51:54 -0300 Subject: [PATCH] refactor roll vs apply --- .../sync-mainnet/byron.json | 0 examples/sync-mainnet/dolos.toml | 21 ++++ examples/sync-preview/byron.json | 91 ++++++++++++++++ examples/sync-preview/dolos.toml | 4 + src/bin/dolos/daemon.rs | 9 +- src/bin/dolos/sync.rs | 5 +- src/model.rs | 1 + src/serve/grpc/sync.rs | 4 +- src/storage/applydb/genesis.rs | 29 ++++- src/storage/applydb/mod.rs | 15 ++- src/storage/kvtable.rs | 11 +- src/storage/rolldb/mod.rs | 101 ++++++++++++------ src/storage/rolldb/stream.rs | 27 +++-- src/storage/rolldb/wal.rs | 55 ++++++++++ src/sync/apply.rs | 26 +++-- src/sync/mod.rs | 4 +- src/sync/pull.rs | 6 +- src/sync/roll.rs | 47 +++++--- 18 files changed, 364 insertions(+), 92 deletions(-) rename test_data/mainnet-byron-genesis.json => examples/sync-mainnet/byron.json (100%) create mode 100644 examples/sync-mainnet/dolos.toml create mode 100644 examples/sync-preview/byron.json diff --git a/test_data/mainnet-byron-genesis.json b/examples/sync-mainnet/byron.json similarity index 100% rename from test_data/mainnet-byron-genesis.json rename to examples/sync-mainnet/byron.json diff --git a/examples/sync-mainnet/dolos.toml b/examples/sync-mainnet/dolos.toml new file mode 100644 index 00000000..b022680c --- /dev/null +++ b/examples/sync-mainnet/dolos.toml @@ -0,0 +1,21 @@ +[upstream] +peer_address = "relays-new.cardano-mainnet.iohk.io:3001" +network_magic = 764824073 + +[rolldb] +path = "./tmp/rolldb" +k_param = 1000 + +[applydb] +path = "./tmp/applydb" + +[serve.grpc] +listen_address = "[::]:50051" + +[serve.ouroboros] +listen_address = "localhost:30013" +magic = 0 + +[byron] +path = "./byron.json" +hash = "xxx" diff --git a/examples/sync-preview/byron.json b/examples/sync-preview/byron.json new file mode 100644 index 00000000..62363764 --- /dev/null +++ b/examples/sync-preview/byron.json @@ -0,0 +1,91 @@ +{ + "bootStakeholders": { + "021e737009040bf7f1e7b1bcc148f29d748d4a6b561902c95e4a9f36": 1, + "0bc82ced9544980b9ffe7f64b1538bbda6804a5cc32c8035485e184b": 1, + "18ed9844deef98cf9ba8b39791dede0538d2d2fa79bf67ef37dcc826": 1, + "66cfa84ad0ee5ca8586244c8393007cf3d9622d77cfa03fd4f35065b": 1, + "76c4d6c68c0ef81ae364411a84e52ce66089ed006ca29adfc0227901": 1, + "8cc6b89fec65cc83d34b7bab2e6494db631d8476a86625767dd0c2a0": 1, + "e90060fdc085ac9f63cdb3b32ba1d84e0f7eb98561687b213b4c8770": 1 + }, + "heavyDelegation": { + "021e737009040bf7f1e7b1bcc148f29d748d4a6b561902c95e4a9f36": { + "omega": 0, + "issuerPk": "6hSFCotivD08t02n43RMiaF9LzwtYVrFMu/WX6ShfEsxfdXFL5Y6c+DwHSZOCywU0RJz5er2icIO03UytC9NTg==", + "delegatePk": "JEnSVQTPGriTx1+lAMkKhCNsMBDNPGw+NiEvNPh4ui6IdvxrO+WkQPTy5U865XB4VFvi/zb7d+H1bilnztQNBg==", + "cert": "558952d17442e8cc73f0c7dd606e329b38ed2ec0c1f83fe2567d28b21ef2223d2d23640cd0531f75832b50e519631c48643fcfaa7168851645dce07b90d87f0e" + }, + "0bc82ced9544980b9ffe7f64b1538bbda6804a5cc32c8035485e184b": { + "omega": 0, + "issuerPk": "MJ7IskKU8GKk0Eeg3zhfSOK1DDVXOMHD2V/zhEpODUtL9YB0Y7sXnbZfg3+Df05hskP5Jz+dZvdC6DH/dP9jmQ==", + "delegatePk": "hwO7NJL7LfAk5e/QG61FKcdORoK60tvprE3063Muh4EQKrWA6l7t23B2GziK8D0hRO0j5W1Gzpn8WW69XLIlKA==", + "cert": "2bccf50d0c3cbb03dd29cfba817e8ba615db3d7722b41b264ad08722e548cfe83d069b29d13e490823d7519ecdd9940ea49573f6027056c4bd58da1adf75020e" + }, + "18ed9844deef98cf9ba8b39791dede0538d2d2fa79bf67ef37dcc826": { + "omega": 0, + "issuerPk": "pXbW4Jak8maeuWiosvrurykKnqDSHswUjroonSDS3fTnWS+BKe+vjT4zZJNKhQ33KbagiHVJ5CJUNggfsCtG2g==", + "delegatePk": "rbJAZp3kWCUvp8dnLR6qsgpGU+qKAFow4NHYKWiKCkfm1qFCFONob50N1IbNWCGWAhg38ZPTvBazTasjsfj6yQ==", + "cert": "89e1638e31fd3d402cecb897ba773d8c2c11c2d3cff2462b266e21461539b1a4fe8fb528e159b9af473799b51e49aa5b5816a88f10c484aa7cef7ad12850830a" + }, + "66cfa84ad0ee5ca8586244c8393007cf3d9622d77cfa03fd4f35065b": { + "omega": 0, + "issuerPk": "/LGZjmmcAMRisP7Rf454GM2QUKgj2aAyqE+iQo2PIEhcistFOlT+idtbLTceZAnQcwwPJDtTcNi+EnPQyscZOg==", + "delegatePk": "rinFUiKKCPPFY0ULEKn1SPRgLVmOS3jdTXDtrxK6VI1I11G3uBS1Olxi0mQSN3kf+B3hm/xHkuUDVNaSXNiBeQ==", + "cert": "3e7f30bb68c5bc4d23c2a730ac154a188a1fd45aac3f438efd380303171443d2ca4f50e5a1ff66b40ae3da64697f2599956ae06c21b73fa828b8c0dc9fb27302" + }, + "76c4d6c68c0ef81ae364411a84e52ce66089ed006ca29adfc0227901": { + "omega": 0, + "issuerPk": "9EE85tTLdSSR4T1Xoy6n9wr6jlbavCdfp9oQKusskO3DSSyNqRYS7QzYQ96j/WnphUey63082YkKijMfF9A4eA==", + "delegatePk": "dvyHDkXg8LFtb0K6Sitl8OGSEZPvfCVQYLDR6Au6t6/ROvlerMKQ8uri4fG7hQQzbHKtdKWgv94t+zuFJTQ1fw==", + "cert": "5ec0ed46ae7e575bdb089f1bceca3b2689b13a7162fe08578fe60ba64607fffaa507412a97652c3c81cc0ef93ff404cf809a628ae19faba1a035fca0505c1d04" + }, + "8cc6b89fec65cc83d34b7bab2e6494db631d8476a86625767dd0c2a0": { + "omega": 0, + "issuerPk": "Hr5S5PAxf9HSB4FzmtZzaFcXrNrctrI5XUrDrnCkOUTX6rhbtOMkXU3sWVDOvU6LNSSr3/Ws2+iCYZIr7LmTWg==", + "delegatePk": "FaLH2b5H/XS31YRnm98N6fP4Etx6m+GbniVAXMwOp8KhYXPKBJBsX/EjIy3pSkvRBhGCjsycB0yrDxWMi5ZsIQ==", + "cert": "10f06304cceb42071605ebba67b308c7568e5e6fe0d773c58f7e8c13bc8d8a340f70a4fd5e1b4a1c1db1de5c7646802bbc929d6c82d7adb8a77cb6ad77eac50a" + }, + "e90060fdc085ac9f63cdb3b32ba1d84e0f7eb98561687b213b4c8770": { + "omega": 0, + "issuerPk": "B2R+VXzy3c8bxncdOpQ2Z/tblxRNQO8AXQ0OsJDQvZYnLeGQcLD78kyYLpi3nfuS4SfnLar23NV4yiEVwaw+Yw==", + "delegatePk": "nACHGIBacymrKwn07iW/a5ZKJCPZ2cKQqeXw3ivR7WOYVUuufWhZlCoUTZ7rtBqoDaexblUQwkC7hA7AmNA3FA==", + "cert": "b5440daa05f7fae557df46e4f1b7c5802b86f465daad1137e315abf6e72f1c877207276abb8dcba86e18e42d39b34c2f0fa82ba2919944cdc8e2e5264baa450b" + } + }, + "startTime": 1666656000, + "nonAvvmBalances": { + "FHnt4NL7yPXjpZtYj1YUiX9QYYUZGXDT9gA2PJXQFkTSMx3EgawXK5BUrCHdhe2": "0", + "FHnt4NL7yPXk7D87qAWEmfnL7wSQ9AzBU2mjZt3eM48NSCbygxgzAU6vCGiRZEW": "0", + "FHnt4NL7yPXpazQsTdJ3Gp1twQUo4N5rrgGbRNSzchjchPiApc1k4CvqDMcdd7H": "0", + "FHnt4NL7yPXtNo1wLCLZyGTMfAvB14h8onafiYkM7B69ZwvGgXeUyQWfi7FPrif": "0", + "FHnt4NL7yPXtmi4mAjD43V3NB3shDs1gCuHNcMLPsRWjaw1b2yRV2xad8S8V6aq": "0", + "FHnt4NL7yPXvDWHa8bVs73UEUdJd64VxWXSFNqetECtYfTd9TtJguJ14Lu3feth": "30000000000000000", + "FHnt4NL7yPXvNSRpCYydjRr7koQCrsTtkovk5uYMimgqMJX2DyrEEBqiXaTd8rG": "0", + "FHnt4NL7yPY9rTvdsCeyRnsbzp4bN7XdmAZeU5PzA1qR2asYmN6CsdxJw4YoDjG": "0" + }, + "blockVersionData": { + "scriptVersion": 0, + "slotDuration": "20000", + "maxBlockSize": "2000000", + "maxHeaderSize": "2000000", + "maxTxSize": "4096", + "maxProposalSize": "700", + "mpcThd": "20000000000000", + "heavyDelThd": "300000000000", + "updateVoteThd": "1000000000000", + "updateProposalThd": "100000000000000", + "updateImplicit": "10000", + "softforkRule": { + "initThd": "900000000000000", + "minThd": "600000000000000", + "thdDecrement": "50000000000000" + }, + "txFeePolicy": { + "summand": "155381000000000", + "multiplier": "43946000000" + }, + "unlockStakeEpoch": "18446744073709551615" + }, + "protocolConsts": { "k": 432, "protocolMagic": 2 }, + "avvmDistr": {} +} diff --git a/examples/sync-preview/dolos.toml b/examples/sync-preview/dolos.toml index cced085a..7478d3b8 100644 --- a/examples/sync-preview/dolos.toml +++ b/examples/sync-preview/dolos.toml @@ -15,3 +15,7 @@ listen_address = "[::]:50051" [serve.ouroboros] listen_address = "localhost:30013" magic = 0 + +[byron] +path = "./byron.json" +hash = "xxx" diff --git a/src/bin/dolos/daemon.rs b/src/bin/dolos/daemon.rs index 862a22db..4c1fc65e 100644 --- a/src/bin/dolos/daemon.rs +++ b/src/bin/dolos/daemon.rs @@ -1,4 +1,5 @@ use std::path::Path; +use tracing::info; use dolos::{ prelude::*, @@ -41,15 +42,9 @@ pub async fn run( let applydb = ApplyDB::open(applydb_path).map_err(Error::storage)?; - if applydb.is_empty() { - applydb - .insert_genesis_utxos(&byron_genesis) - .map_err(Error::storage)?; - } - let server = tokio::spawn(dolos::serve::serve(config.serve, rolldb.clone())); - dolos::sync::pipeline(&config.upstream, rolldb, applydb, policy) + dolos::sync::pipeline(&config.upstream, rolldb, applydb, byron_genesis, policy) .unwrap() .block(); diff --git a/src/bin/dolos/sync.rs b/src/bin/dolos/sync.rs index f55ccef7..10a902be 100644 --- a/src/bin/dolos/sync.rs +++ b/src/bin/dolos/sync.rs @@ -29,6 +29,9 @@ pub fn run( let rolldb = RollDB::open(rolldb_path, config.rolldb.k_param.unwrap_or(1000)).map_err(Error::storage)?; + let byron_genesis = + pallas::ledger::configs::byron::from_file(&config.byron.path).map_err(Error::config)?; + let applydb_path = config .applydb .path @@ -37,7 +40,7 @@ pub fn run( let applydb = ApplyDB::open(applydb_path).map_err(Error::storage)?; - dolos::sync::pipeline(&config.upstream, rolldb, applydb, policy) + dolos::sync::pipeline(&config.upstream, rolldb, applydb, byron_genesis, policy) .unwrap() .block(); diff --git a/src/model.rs b/src/model.rs index a063b590..51ee080d 100644 --- a/src/model.rs +++ b/src/model.rs @@ -26,4 +26,5 @@ pub enum RollEvent { Apply(BlockSlot, BlockHash, RawBlock), Undo(BlockSlot, BlockHash, RawBlock), Reset(Point), + Origin, } diff --git a/src/serve/grpc/sync.rs b/src/serve/grpc/sync.rs index 6a6ef276..255c96af 100644 --- a/src/serve/grpc/sync.rs +++ b/src/serve/grpc/sync.rs @@ -33,6 +33,8 @@ fn roll_to_tip_response( action: match evt.action() { WalAction::Apply => follow_tip_response::Action::Apply(raw_to_anychain(block)).into(), WalAction::Undo => follow_tip_response::Action::Undo(raw_to_anychain(block)).into(), + // TODO: shouldn't we have a u5c event for origin? + WalAction::Origin => None, WalAction::Mark => None, }, } @@ -124,7 +126,7 @@ impl chain_sync_service_server::ChainSyncService for ChainSyncServiceImpl { &self, _request: Request, ) -> Result, tonic::Status> { - let s = crate::storage::rolldb::stream::RollStream::start_with_block(self.0.clone(), None) + let s = crate::storage::rolldb::stream::RollStream::start_after_block(self.0.clone(), None) .map(|(evt, block)| Ok(roll_to_tip_response(evt, &block))); Ok(Response::new(Box::pin(s))) diff --git a/src/storage/applydb/genesis.rs b/src/storage/applydb/genesis.rs index 72dcff17..5a8521a2 100644 --- a/src/storage/applydb/genesis.rs +++ b/src/storage/applydb/genesis.rs @@ -2,6 +2,7 @@ use pallas::ledger::addresses::ByronAddress; use pallas::ledger::configs::byron::GenesisUtxo; use pallas::ledger::primitives::byron::TxOut; use rocksdb::WriteBatch; +use tracing::info; use crate::{ prelude::*, @@ -43,6 +44,7 @@ impl ApplyDB { .collect::, _>>()? .into_iter() .fold(WriteBatch::default(), |mut batch, (k, v)| { + info!(tx = %k.0 .0, "inserting genesis utxo"); UtxoKV::stage_upsert(&self.db, k, v, &mut batch); batch }); @@ -84,11 +86,12 @@ mod tests { } #[test] - fn test_genesis_utxos() { + fn test_mainnet_genesis_utxos() { with_tmp_db(|db| { let path = std::path::PathBuf::from(std::env::var("CARGO_MANIFEST_DIR").unwrap()) - .join("test_data") - .join("mainnet-byron-genesis.json"); + .join("examples") + .join("sync-mainnet") + .join("byron.json"); let byron = pallas::ledger::configs::byron::from_file(&path).unwrap(); db.insert_genesis_utxos(&byron).unwrap(); @@ -101,4 +104,24 @@ mod tests { ) }); } + + #[test] + fn test_preview_genesis_utxos() { + with_tmp_db(|db| { + let path = std::path::PathBuf::from(std::env::var("CARGO_MANIFEST_DIR").unwrap()) + .join("examples") + .join("sync-preview") + .join("byron.json"); + + let byron = pallas::ledger::configs::byron::from_file(&path).unwrap(); + db.insert_genesis_utxos(&byron).unwrap(); + + assert_genesis_utxo_exists( + &db, + "4843cf2e582b2f9ce37600e5ab4cc678991f988f8780fed05407f9537f7712bd", + "FHnt4NL7yPXvDWHa8bVs73UEUdJd64VxWXSFNqetECtYfTd9TtJguJ14Lu3feth", + 30_000_000_000_000_000, + ); + }); + } } diff --git a/src/storage/applydb/mod.rs b/src/storage/applydb/mod.rs index 5999232f..9c354efc 100644 --- a/src/storage/applydb/mod.rs +++ b/src/storage/applydb/mod.rs @@ -3,6 +3,7 @@ pub mod genesis; use pallas::crypto::hash::Hash; use serde::{Deserialize, Serialize}; use std::{path::Path, sync::Arc}; +use tracing::{error, info}; use rocksdb::{Options, WriteBatch, DB}; @@ -55,12 +56,20 @@ impl<'a> BlockWriteBatch<'a> { ) } - pub fn spend_utxo(&mut self, tx: TxHash, output: OutputIndex) -> Result<(), Error> { - let k = DBSerde(UtxoRef(tx, output)); - let v = UtxoKV::get_by_key(self.0, k.clone())?.ok_or(Error::NotFound)?; + pub fn spend_utxo(&mut self, tx: TxHash, idx: OutputIndex) -> Result<(), Error> { + let k = DBSerde(UtxoRef(tx, idx)); + //let v = UtxoKV::get_by_key(self.0, k.clone())?.ok_or(Error::NotFound)?; + + let v = UtxoKV::get_by_key(self.0, k.clone())?.ok_or_else(|| { + error!(%tx, idx, "utxo not found"); + Error::NotFound + })?; + StxiKV::stage_upsert(self.0, k.clone(), v, &mut self.2); UtxoKV::stage_delete(self.0, k, &mut self.2); + info!(%tx, idx, "spending utxo"); + Ok(()) } diff --git a/src/storage/kvtable.rs b/src/storage/kvtable.rs index 3da232f1..62a008de 100644 --- a/src/storage/kvtable.rs +++ b/src/storage/kvtable.rs @@ -266,6 +266,15 @@ where db.cf_handle(Self::CF_NAME).unwrap() } + fn reset(db: &rocksdb::DB) -> Result<(), Error> { + db.drop_cf(Self::CF_NAME).map_err(|_| Error::IO)?; + + db.create_cf(Self::CF_NAME, &rocksdb::Options::default()) + .map_err(|_| Error::IO)?; + + Ok(()) + } + fn get_by_key(db: &rocksdb::DB, k: K) -> Result, Error> { let cf = Self::cf(db); let raw_key = Box::<[u8]>::from(k); @@ -297,7 +306,7 @@ where // iterator and see if we have at least one value. If someone know a better way // to accomplish this, please refactor. let mut iter = Self::iter_keys(db, rocksdb::IteratorMode::Start); - iter.next().is_some() + iter.next().is_none() } fn iter_keys<'a>(db: &'a rocksdb::DB, mode: rocksdb::IteratorMode) -> KeyIterator<'a, K> { diff --git a/src/storage/rolldb/mod.rs b/src/storage/rolldb/mod.rs index c5228a84..2f6b58e9 100644 --- a/src/storage/rolldb/mod.rs +++ b/src/storage/rolldb/mod.rs @@ -48,14 +48,16 @@ impl RollDB { ) .map_err(|_| Error::IO)?; - let wal_seq = wal::WalKV::last_key(&db)?.map(|x| x.0).unwrap_or_default(); + let wal_seq = wal::WalKV::initialize(&db)?; - Ok(Self { + let out = Self { db: Arc::new(db), tip_change: Arc::new(tokio::sync::Notify::new()), wal_seq, k_param, - }) + }; + + Ok(out) } pub fn get_block(&self, hash: Hash<32>) -> Result, Error> { @@ -107,6 +109,20 @@ impl RollDB { Ok(()) } + pub fn roll_back_origin(&mut self) -> Result<(), Error> { + let mut batch = WriteBatch::default(); + + let new_seq = wal::WalKV::stage_roll_back_origin(&self.db, self.wal_seq, &mut batch)?; + + ChainKV::reset(&self.db)?; + + self.db.write(batch).map_err(|_| Error::IO)?; + self.wal_seq = new_seq; + self.tip_change.notify_waiters(); + + Ok(()) + } + pub fn find_tip(&self) -> Result, Error> { // TODO: tip might be either on chain or WAL, we need to query both wal::WalKV::find_tip(&self.db) @@ -159,45 +175,44 @@ impl RollDB { Ok(out) } - pub fn crawl_wal( + pub fn crawl_wal_after( &self, - start_seq: Option, + seq: Option, ) -> impl Iterator> + '_ { - let iter = match start_seq { - Some(start_seq) => { - let start_seq = Box::<[u8]>::from(DBInt(start_seq)); - let from = rocksdb::IteratorMode::From(&start_seq, rocksdb::Direction::Forward); - wal::WalKV::iter_entries(&self.db, from) + let iter = match seq { + Some(seq) => { + let seq = Box::<[u8]>::from(DBInt(seq)); + let from = rocksdb::IteratorMode::From(&seq, rocksdb::Direction::Forward); + wal::WalKV::iter_entries(&self.db, from).skip(1) } None => { let from = rocksdb::IteratorMode::Start; - wal::WalKV::iter_entries(&self.db, from) + wal::WalKV::iter_entries(&self.db, from).skip(0) } }; iter.map(|v| v.map(|(seq, val)| (seq.0, val.0))) } - pub fn crawl_wal_from_cursor( - &self, - start_after: Option<(BlockSlot, BlockHash)>, - ) -> Result> + '_, Error> { - if let Some((slot, hash)) = start_after { - // TODO: Not sure this is 100% accurate: - // i.e Apply(X), Apply(cursor), Undo(cursor), Mark(x) - // We want to start at Apply(cursor) or Mark(cursor), but even then, - // what if we have more than one Apply(cursor), how do we know - // which is correct? - let found = WalKV::scan_until(&self.db, rocksdb::IteratorMode::End, |v| { - v.slot() == slot && v.hash().eq(&hash) - })?; - - match found { - Some(DBInt(seq)) => Ok(self.crawl_wal(Some(seq))), - None => Err(Error::NotFound), - } - } else { - Ok(self.crawl_wal(None)) + pub fn find_wal_seq(&self, block: Option<(BlockSlot, BlockHash)>) -> Result { + if block.is_none() { + return Ok(0); + } + + let (slot, hash) = block.unwrap(); + + // TODO: Not sure this is 100% accurate: + // i.e Apply(X), Apply(cursor), Undo(cursor), Mark(x) + // We want to start at Apply(cursor) or Mark(cursor), but even then, + // what if we have more than one Apply(cursor), how do we know + // which is correct? + let found = WalKV::scan_until(&self.db, rocksdb::IteratorMode::End, |v| { + v.slot() == slot && v.hash().eq(&hash) && v.is_apply() + })?; + + match found { + Some(DBInt(seq)) => Ok(seq), + None => Err(Error::NotFound), } } @@ -220,7 +235,8 @@ impl RollDB { /// /// To use Origin as start point set `from` to None. /// - /// Returns None if either point in range don't exist or `to` point is earlier in chain than `from`. + /// Returns None if either point in range don't exist or `to` point is + /// earlier in chain than `from`. pub fn read_chain_range( &self, from: Option<(BlockSlot, BlockHash)>, @@ -366,6 +382,23 @@ mod tests { (slot, hash, slot.to_be_bytes().to_vec()) } + #[test] + fn test_origin_event() { + with_tmp_db(30, |db| { + let mut iter = db.crawl_wal_after(None); + + let origin = iter.next(); + assert!(origin.is_some()); + + let origin = origin.unwrap(); + assert!(origin.is_ok()); + + let (seq, value) = origin.unwrap(); + assert_eq!(seq, 0); + assert!(value.is_origin()); + }); + } + #[test] fn test_roll_forward_blackbox() { with_tmp_db(30, |mut db| { @@ -437,7 +470,7 @@ mod tests { assert!(chain.next().is_none()); - let mut wal = db.crawl_wal(None); + let mut wal = db.crawl_wal_after(None); for i in 96..100 { let (_, val) = wal.next().unwrap().unwrap(); @@ -471,7 +504,7 @@ mod tests { assert!(chain.next().is_none()); - let mut wal = db.crawl_wal(None); + let mut wal = db.crawl_wal_after(None); for i in 77..100 { let (_, val) = wal.next().unwrap().unwrap(); diff --git a/src/storage/rolldb/stream.rs b/src/storage/rolldb/stream.rs index 13911445..d698bdd1 100644 --- a/src/storage/rolldb/stream.rs +++ b/src/storage/rolldb/stream.rs @@ -10,11 +10,11 @@ type ItemWithBlock = (super::wal::Value, RawBlock); pub struct RollStream; impl RollStream { - pub fn start(db: RollDB, seq: Option) -> impl Stream { + pub fn start_after_seq(db: RollDB, seq: Option) -> impl Stream { async_stream::stream! { let mut last_seq = seq; - let iter = db.crawl_wal(last_seq); + let iter = db.crawl_wal_after(last_seq); for (seq, val) in iter.flatten() { yield val; @@ -23,7 +23,7 @@ impl RollStream { loop { db.tip_change.notified().await; - let iter = db.crawl_wal(last_seq).skip(1); + let iter = db.crawl_wal_after(last_seq); for (seq, val) in iter.flatten() { yield val; @@ -33,14 +33,14 @@ impl RollStream { } } - pub fn start_with_block( + pub fn start_after_block( db: RollDB, seq: Option, ) -> impl Stream { async_stream::stream! { let mut last_seq = seq; - let iter = db.crawl_wal(last_seq); + let iter = db.crawl_wal_after(last_seq); for x in iter { let x = x.and_then(|(s,v)| { @@ -56,7 +56,7 @@ impl RollStream { loop { db.tip_change.notified().await; - let iter = db.crawl_wal(last_seq).skip(1); + let iter = db.crawl_wal_after(last_seq); for x in iter { let x = x.and_then(|(s,v)| { @@ -90,25 +90,30 @@ mod tests { let path = tempfile::tempdir().unwrap().into_path(); let mut db = super::RollDB::open(path.clone(), 30).unwrap(); - for i in 0..100 { + for i in 0..=100 { let (slot, hash, body) = dummy_block(i * 10); db.roll_forward(slot, hash, body).unwrap(); } let mut db2 = db.clone(); let background = tokio::spawn(async move { - for i in 100..200 { + for i in 101..=200 { let (slot, hash, body) = dummy_block(i * 10); db2.roll_forward(slot, hash, body).unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(std::time::Duration::from_millis(5)).await; } }); - let s = super::RollStream::start(db.clone(), None); + let s = super::RollStream::start_after_seq(db.clone(), None); pin_mut!(s); - for i in 0..200 { + let evt = s.next().await; + let evt = evt.unwrap(); + assert!(evt.is_origin()); + assert_eq!(evt.slot(), 0); + + for i in 0..=200 { let evt = s.next().await; let evt = evt.unwrap(); assert!(evt.is_apply()); diff --git a/src/storage/rolldb/wal.rs b/src/storage/rolldb/wal.rs index 2a50d21c..0d4f9bb9 100644 --- a/src/storage/rolldb/wal.rs +++ b/src/storage/rolldb/wal.rs @@ -10,12 +10,17 @@ pub enum WalAction { Apply, Undo, Mark, + Origin, } #[derive(Debug, Serialize, Deserialize)] pub struct Value(WalAction, super::BlockSlot, super::BlockHash); impl Value { + pub fn origin() -> Self { + Self(WalAction::Origin, 0, super::BlockHash::new([0; 32])) + } + pub fn into_apply( slot: impl Into, hash: impl Into, @@ -40,6 +45,7 @@ impl Value { WalAction::Apply => Some(Self(WalAction::Undo, self.1, self.2)), WalAction::Undo => None, WalAction::Mark => None, + WalAction::Origin => None, } } @@ -47,6 +53,7 @@ impl Value { match self.0 { WalAction::Apply => Some(Self(WalAction::Mark, self.1, self.2)), WalAction::Mark => Some(Self(WalAction::Mark, self.1, self.2)), + WalAction::Origin => Some(Self(WalAction::Origin, self.1, self.2)), WalAction::Undo => None, } } @@ -62,6 +69,10 @@ impl Value { pub fn is_undo(&self) -> bool { matches!(self.0, WalAction::Undo) } + + pub fn is_origin(&self) -> bool { + matches!(self.0, WalAction::Origin) + } } // slot => block hash @@ -72,6 +83,25 @@ impl KVTable> for WalKV { } impl WalKV { + pub fn initialize(db: &DB) -> Result { + if Self::is_empty(db) { + Self::write_seed(db)?; + Ok(0) + } else { + let last = Self::last_key(&db)?.map(|x| x.0); + Ok(last.unwrap()) + } + } + + fn write_seed(db: &DB) -> Result<(), Error> { + let mut batch = WriteBatch::default(); + let k = DBInt(0); + let v = DBSerde(Value::origin()); + Self::stage_upsert(&db, k, v, &mut batch); + + db.write(batch).map_err(|_| Error::IO) + } + fn stage_append( db: &DB, last_seq: Seq, @@ -112,6 +142,31 @@ impl WalKV { Ok(last_seq) } + pub fn stage_roll_back_origin( + db: &DB, + mut last_seq: Seq, + batch: &mut WriteBatch, + ) -> Result { + let iter = WalKV::iter_values(db, IteratorMode::End); + + for step in iter { + let value = step.map_err(|_| super::Error::IO)?.0; + + if value.is_origin() { + break; + } + + match value.into_undo() { + Some(undo) => { + last_seq = Self::stage_append(db, last_seq, undo, batch)?; + } + None => continue, + }; + } + + Ok(last_seq) + } + pub fn stage_roll_forward( db: &DB, last_seq: u64, diff --git a/src/sync/apply.rs b/src/sync/apply.rs index 2e8f9b48..0e8ce19b 100644 --- a/src/sync/apply.rs +++ b/src/sync/apply.rs @@ -1,4 +1,5 @@ use gasket::framework::*; +use pallas::ledger::configs::byron::GenesisFile; use tracing::{info, instrument, warn}; use crate::prelude::*; @@ -10,6 +11,7 @@ pub type UpstreamPort = gasket::messaging::tokio::InputPort; #[stage(name = "apply", unit = "RollEvent", worker = "Worker")] pub struct Stage { applydb: ApplyDB, + genesis: GenesisFile, pub upstream: UpstreamPort, @@ -21,9 +23,10 @@ pub struct Stage { } impl Stage { - pub fn new(applydb: ApplyDB) -> Self { + pub fn new(applydb: ApplyDB, genesis: GenesisFile) -> Self { Self { applydb, + genesis, upstream: Default::default(), // downstream: Default::default(), block_count: Default::default(), @@ -33,6 +36,17 @@ impl Stage { } impl Stage { + #[instrument(skip_all)] + fn apply_origin(&mut self) -> Result<(), WorkerError> { + info!("inserting genesis UTxOs"); + + self.applydb + .insert_genesis_utxos(&self.genesis) + .or_panic()?; + + Ok(()) + } + #[instrument(skip_all)] fn apply_block(&mut self, cbor: &[u8]) -> Result<(), WorkerError> { let block = pallas::ledger::traverse::MultiEraBlock::decode(cbor).or_panic()?; @@ -45,15 +59,6 @@ impl Stage { for consumed in tx.consumes() { batch .spend_utxo(*consumed.hash(), consumed.index()) - // TODO: since we don't have genesis utxos, it's reasonable to get missed hits. - // This needs to go away once the genesis block processing is implemented. - .or_else(|x| match x { - crate::storage::kvtable::Error::NotFound => { - warn!("skipping missing utxo"); - Ok(()) - } - x => Err(x), - }) .or_panic()?; } @@ -126,6 +131,7 @@ impl gasket::framework::Worker for Worker { match unit { RollEvent::Apply(_, _, cbor) => stage.apply_block(cbor)?, RollEvent::Undo(_, _, cbor) => stage.undo_block(cbor)?, + RollEvent::Origin => stage.apply_origin()?, RollEvent::Reset(_) => todo!(), }; diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 44f5b288..4dc80c3d 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,4 +1,5 @@ use gasket::messaging::{RecvPort, SendPort}; +use pallas::ledger::configs::byron::GenesisFile; use serde::Deserialize; use crate::prelude::*; @@ -18,6 +19,7 @@ pub fn pipeline( config: &Config, rolldb: RollDB, applydb: ApplyDB, + genesis: GenesisFile, policy: &gasket::runtime::Policy, ) -> Result { let pull_cursor = rolldb @@ -36,7 +38,7 @@ pub fn pipeline( let mut roll = roll::Stage::new(rolldb, roll_cursor); - let mut apply = apply::Stage::new(applydb); + let mut apply = apply::Stage::new(applydb, genesis); let (to_roll, from_pull) = gasket::messaging::tokio::mpsc_channel(50); pull.downstream.connect(to_roll); diff --git a/src/sync/pull.rs b/src/sync/pull.rs index 9dc19a55..12335ce9 100644 --- a/src/sync/pull.rs +++ b/src/sync/pull.rs @@ -212,11 +212,7 @@ impl gasket::framework::Worker for Worker { } #[derive(Stage)] -#[stage( - name = "chainsync", - unit = "NextResponse", - worker = "Worker" -)] +#[stage(name = "pull", unit = "NextResponse", worker = "Worker")] pub struct Stage { peer_address: String, network_magic: u64, diff --git a/src/sync/roll.rs b/src/sync/roll.rs index 041a85c6..fc333751 100644 --- a/src/sync/roll.rs +++ b/src/sync/roll.rs @@ -1,4 +1,5 @@ use gasket::framework::*; +use tracing::info; use crate::prelude::*; use crate::storage::rolldb::RollDB; @@ -35,25 +36,34 @@ impl Stage { roll_count: Default::default(), } } +} + +pub struct Worker { + last_seq: Option, +} +impl Worker { /// Catch-up output with current persisted state /// - /// Reads from Wal using the latest known cursor and outputs the corresponding downstream events - async fn catchup(&mut self) -> Result<(), WorkerError> { - let iter = self.rolldb.crawl_wal_from_cursor(self.cursor).or_panic()?; + /// Reads from Wal using the latest known cursor and outputs the + /// corresponding downstream events + async fn catchup(&mut self, stage: &mut Stage) -> Result<(), WorkerError> { + let iter = stage.rolldb.crawl_wal_after(self.last_seq); for wal in iter { - let (_, wal) = wal.or_panic()?; - - let cbor = self.rolldb.get_block(*wal.hash()).or_panic()?.unwrap(); + let (seq, wal) = wal.or_panic()?; + info!(seq, "processing wal entry"); let evt = match wal.action() { crate::storage::rolldb::wal::WalAction::Apply => { + let cbor = stage.rolldb.get_block(*wal.hash()).or_panic()?.unwrap(); RollEvent::Apply(wal.slot(), *wal.hash(), cbor) } crate::storage::rolldb::wal::WalAction::Undo => { + let cbor = stage.rolldb.get_block(*wal.hash()).or_panic()?.unwrap(); RollEvent::Undo(wal.slot(), *wal.hash(), cbor) } + crate::storage::rolldb::wal::WalAction::Origin => RollEvent::Origin, crate::storage::rolldb::wal::WalAction::Mark => { // TODO: do we really need mark events? // for now we bail @@ -61,20 +71,26 @@ impl Stage { } }; - self.downstream.send(evt.into()).await.or_panic()?; - self.cursor = Some((wal.slot(), *wal.hash())); + stage.downstream.send(evt.into()).await.or_panic()?; + self.last_seq = Some(seq); } Ok(()) } } -pub struct Worker; - #[async_trait::async_trait(?Send)] impl gasket::framework::Worker for Worker { - async fn bootstrap(_stage: &Stage) -> Result { - Ok(Self) + async fn bootstrap(stage: &Stage) -> Result { + let last_seq = match stage.cursor { + Some(cursor) => { + let last_seq = stage.rolldb.find_wal_seq(Some(cursor)).or_panic()?; + Some(last_seq) + } + None => None, + }; + + Ok(Self { last_seq }) } async fn schedule( @@ -99,13 +115,14 @@ impl gasket::framework::Worker for Worker { stage.rolldb.roll_back(*slot).or_panic()?; } pallas::network::miniprotocols::Point::Origin => { - //todo!(); + stage.rolldb.roll_back_origin().or_panic()?; } }, } - // TODO: if we have a wel seq in memory, we should avoid scanning for a particular slot/hash - stage.catchup().await?; + // TODO: if we have a wel seq in memory, we should avoid scanning for a + // particular slot/hash + self.catchup(stage).await?; // TODO: don't do this while doing full sync stage.rolldb.prune_wal().or_panic()?;