From bb8a8ebc024da0693baba6bdd5db24df4c546f21 Mon Sep 17 00:00:00 2001 From: jouzo Date: Fri, 18 Aug 2023 11:14:32 +0200 Subject: [PATCH] Revert "Revert #2337 (#2346)". Restore RocksDB as BlockStore backend This reverts commit a9bfaac5d4aad864b25a31eed6eb8f8aa950b7f2. --- lib/Cargo.lock | 1 + lib/ain-evm/Cargo.toml | 3 + lib/ain-evm/src/block.rs | 152 +++++------ lib/ain-evm/src/core.rs | 28 +- lib/ain-evm/src/evm.rs | 20 +- lib/ain-evm/src/lib.rs | 2 + lib/ain-evm/src/storage/block_store.rs | 220 ++++++++++++++++ lib/ain-evm/src/storage/code.rs | 43 ---- lib/ain-evm/src/storage/data_handler.rs | 325 ----------------------- lib/ain-evm/src/storage/db.rs | 329 ++++++++++++++++++++++++ lib/ain-evm/src/storage/mod.rs | 94 +++---- 11 files changed, 703 insertions(+), 514 deletions(-) create mode 100644 lib/ain-evm/src/storage/block_store.rs delete mode 100644 lib/ain-evm/src/storage/code.rs delete mode 100644 lib/ain-evm/src/storage/data_handler.rs create mode 100644 lib/ain-evm/src/storage/db.rs diff --git a/lib/Cargo.lock b/lib/Cargo.lock index 01110b707d..eaf34b08ac 100644 --- a/lib/Cargo.lock +++ b/lib/Cargo.lock @@ -134,6 +134,7 @@ dependencies = [ "rand 0.8.5", "ripemd", "rlp", + "rocksdb", "rustc-hex", "serde", "serde_json", diff --git a/lib/ain-evm/Cargo.toml b/lib/ain-evm/Cargo.toml index 2d2ec1e025..e735b0c601 100644 --- a/lib/ain-evm/Cargo.toml +++ b/lib/ain-evm/Cargo.toml @@ -51,6 +51,9 @@ num = { version = "0.4", default-features = false, features = ["alloc"] } ripemd = { version = "0.1", default-features = false } substrate-bn = "0.6.0" +# Database dependencies +rocksdb = { version = "0.20.1", default-features = false } + [dev-dependencies] tempdir = "0.3.7" once_cell = "1.17.1" diff --git a/lib/ain-evm/src/block.rs b/lib/ain-evm/src/block.rs index 47eb935357..144b8cc186 100644 --- a/lib/ain-evm/src/block.rs +++ b/lib/ain-evm/src/block.rs @@ -321,80 +321,80 @@ impl BlockService { #[cfg(test)] mod tests { - use super::*; - - #[test] - fn test_base_fee_equal() { - let block = BlockService::new(Arc::new(Storage::new())).unwrap(); - assert_eq!( - U256::from(20_000_000_000u64), - block.base_fee_calculation( - 15_000_000, - 15_000_000, - U256::from(20_000_000_000u64), - U256::from(8), - U256::from(10_000_000_000u64) - ) - ) - } - - #[test] - fn test_base_fee_max_increase() { - let block = BlockService::new(Arc::new(Storage::new())).unwrap(); - assert_eq!( - U256::from(22_500_000_000u64), // should increase by 12.5% - block.base_fee_calculation( - 30_000_000, - 15_000_000, - U256::from(20_000_000_000u64), - U256::from(8), - U256::from(10_000_000_000u64) - ) - ) - } - - #[test] - fn test_base_fee_increase() { - let block = BlockService::new(Arc::new(Storage::new())).unwrap(); - assert_eq!( - U256::from(20_833_333_333u64), // should increase by ~4.15% - block.base_fee_calculation( - 20_000_000, - 15_000_000, - U256::from(20_000_000_000u64), - U256::from(8), - U256::from(10_000_000_000u64) - ) - ) - } - - #[test] - fn test_base_fee_max_decrease() { - let block = BlockService::new(Arc::new(Storage::new())).unwrap(); - assert_eq!( - U256::from(17_500_000_000u64), // should decrease by 12.5% - block.base_fee_calculation( - 0, - 15_000_000, - U256::from(20_000_000_000u64), - U256::from(8), - U256::from(10_000_000_000u64) - ) - ) - } - - #[test] - fn test_base_fee_decrease() { - let block = BlockService::new(Arc::new(Storage::new())).unwrap(); - assert_eq!( - U256::from(19_166_666_667u64), // should increase by ~4.15% - block.base_fee_calculation( - 10_000_000, - 15_000_000, - U256::from(20_000_000_000u64), - U256::from(8), - U256::from(10_000_000_000u64) - ) - ) - } + // use super::*; + + // #[test] + // fn test_base_fee_equal() { + // let block = BlockService::new(Arc::new(Storage::new()?)).unwrap(); + // assert_eq!( + // U256::from(20_000_000_000u64), + // block.base_fee_calculation( + // 15_000_000, + // 15_000_000, + // U256::from(20_000_000_000u64), + // U256::from(8), + // U256::from(10_000_000_000u64) + // ) + // ) + // } + + // #[test] + // fn test_base_fee_max_increase() { + // let block = BlockService::new(Arc::new(Storage::new()?)).unwrap(); + // assert_eq!( + // U256::from(22_500_000_000u64), // should increase by 12.5% + // block.base_fee_calculation( + // 30_000_000, + // 15_000_000, + // U256::from(20_000_000_000u64), + // U256::from(8), + // U256::from(10_000_000_000u64) + // ) + // ) + // } + + // #[test] + // fn test_base_fee_increase() { + // let block = BlockService::new(Arc::new(Storage::new()?)).unwrap(); + // assert_eq!( + // U256::from(20_833_333_333u64), // should increase by ~4.15% + // block.base_fee_calculation( + // 20_000_000, + // 15_000_000, + // U256::from(20_000_000_000u64), + // U256::from(8), + // U256::from(10_000_000_000u64) + // ) + // ) + // } + + // #[test] + // fn test_base_fee_max_decrease() { + // let block = BlockService::new(Arc::new(Storage::new()?)).unwrap(); + // assert_eq!( + // U256::from(17_500_000_000u64), // should decrease by 12.5% + // block.base_fee_calculation( + // 0, + // 15_000_000, + // U256::from(20_000_000_000u64), + // U256::from(8), + // U256::from(10_000_000_000u64) + // ) + // ) + // } + + // #[test] + // fn test_base_fee_decrease() { + // let block = BlockService::new(Arc::new(Storage::new()?)).unwrap(); + // assert_eq!( + // U256::from(19_166_666_667u64), // should increase by ~4.15% + // block.base_fee_calculation( + // 10_000_000, + // 15_000_000, + // U256::from(20_000_000_000u64), + // U256::from(8), + // U256::from(10_000_000_000u64) + // ) + // ) + // } } diff --git a/lib/ain-evm/src/core.rs b/lib/ain-evm/src/core.rs index 58439dcbdc..41b5c176fa 100644 --- a/lib/ain-evm/src/core.rs +++ b/lib/ain-evm/src/core.rs @@ -53,21 +53,16 @@ pub struct ValidateTxInfo { pub used_gas: u64, } -fn init_vsdb() { +fn init_vsdb(path: PathBuf) { debug!(target: "vsdb", "Initializating VSDB"); - let datadir = ain_cpp_imports::get_datadir(); - let path = PathBuf::from(datadir).join("evm"); - if !path.exists() { - std::fs::create_dir(&path).expect("Error creating `evm` dir"); - } let vsdb_dir_path = path.join(".vsdb"); vsdb_set_base_dir(&vsdb_dir_path).expect("Could not update vsdb base dir"); debug!(target: "vsdb", "VSDB directory : {}", vsdb_dir_path.display()); } impl EVMCoreService { - pub fn restore(storage: Arc) -> Self { - init_vsdb(); + pub fn restore(storage: Arc, path: PathBuf) -> Self { + init_vsdb(path); Self { tx_queues: Arc::new(TransactionQueueMap::new()), @@ -76,17 +71,24 @@ impl EVMCoreService { } } - pub fn new_from_json(storage: Arc, path: PathBuf) -> Result { - debug!("Loading genesis state from {}", path.display()); - init_vsdb(); + pub fn new_from_json( + storage: Arc, + genesis_path: PathBuf, + evm_datadir: PathBuf, + ) -> Result { + debug!("Loading genesis state from {}", genesis_path.display()); + init_vsdb(evm_datadir); let handler = Self { tx_queues: Arc::new(TransactionQueueMap::new()), trie_store: Arc::new(TrieDBStore::new()), storage: Arc::clone(&storage), }; - let (state_root, genesis) = - TrieDBStore::genesis_state_root_from_json(&handler.trie_store, &handler.storage, path)?; + let (state_root, genesis) = TrieDBStore::genesis_state_root_from_json( + &handler.trie_store, + &handler.storage, + genesis_path, + )?; let gas_limit = storage.get_attributes_or_default()?.block_gas_limit; let block: Block = Block::new( diff --git a/lib/ain-evm/src/evm.rs b/lib/ain-evm/src/evm.rs index c170c740df..8b4eebb9b9 100644 --- a/lib/ain-evm/src/evm.rs +++ b/lib/ain-evm/src/evm.rs @@ -77,16 +77,26 @@ impl EVMServices { /// /// Returns an instance of the struct, either restored from storage or created from a JSON file. pub fn new() -> Result { - if let Some(path) = ain_cpp_imports::get_state_input_json() { + let datadir = ain_cpp_imports::get_datadir(); + let path = PathBuf::from(datadir).join("evm"); + if !path.exists() { + std::fs::create_dir(&path)? + } + + if let Some(state_input_path) = ain_cpp_imports::get_state_input_json() { if ain_cpp_imports::get_network() != "regtest" { return Err(format_err!( "Loading a genesis from JSON file is restricted to regtest network" ) .into()); } - let storage = Arc::new(Storage::new()); + let storage = Arc::new(Storage::new(&path)?); Ok(Self { - core: EVMCoreService::new_from_json(Arc::clone(&storage), PathBuf::from(path))?, + core: EVMCoreService::new_from_json( + Arc::clone(&storage), + PathBuf::from(state_input_path), + path, + )?, block: BlockService::new(Arc::clone(&storage))?, receipt: ReceiptService::new(Arc::clone(&storage)), logs: LogService::new(Arc::clone(&storage)), @@ -94,9 +104,9 @@ impl EVMServices { storage, }) } else { - let storage = Arc::new(Storage::restore()); + let storage = Arc::new(Storage::restore(&path)?); Ok(Self { - core: EVMCoreService::restore(Arc::clone(&storage)), + core: EVMCoreService::restore(Arc::clone(&storage), path), block: BlockService::new(Arc::clone(&storage))?, receipt: ReceiptService::new(Arc::clone(&storage)), logs: LogService::new(Arc::clone(&storage)), diff --git a/lib/ain-evm/src/lib.rs b/lib/ain-evm/src/lib.rs index 69b69362ed..243b015605 100644 --- a/lib/ain-evm/src/lib.rs +++ b/lib/ain-evm/src/lib.rs @@ -50,6 +50,8 @@ pub enum EVMError { StorageError(String), #[error("EVM: serde_json error")] JsonError(#[from] serde_json::Error), + #[error("EVM: rocksdb error")] + RocksDBError(#[from] rocksdb::Error), #[error(transparent)] Other(#[from] anyhow::Error), } diff --git a/lib/ain-evm/src/storage/block_store.rs b/lib/ain-evm/src/storage/block_store.rs new file mode 100644 index 0000000000..100c9f238f --- /dev/null +++ b/lib/ain-evm/src/storage/block_store.rs @@ -0,0 +1,220 @@ +use std::fs; +use std::path::Path; +use std::{collections::HashMap, marker::PhantomData, sync::Arc}; + +use anyhow::format_err; +use ethereum::{BlockAny, TransactionV2}; +use primitive_types::{H160, H256, U256}; + +use super::db::{Column, ColumnName, LedgerColumn, Rocks}; +use super::traits::{BlockStorage, FlushableStorage, ReceiptStorage, Rollback, TransactionStorage}; +use crate::log::LogIndex; +use crate::receipt::Receipt; +use crate::storage::db::columns; +use crate::storage::traits::LogStorage; +use crate::Result; + +#[derive(Debug, Clone)] +pub struct BlockStore(Arc); + +impl BlockStore { + pub fn new(path: &Path) -> Result { + let path = path.join("indexes"); + fs::create_dir_all(&path)?; + let backend = Arc::new(Rocks::open(&path)?); + + Ok(Self(backend)) + } + + pub fn column(&self) -> LedgerColumn + where + C: Column + ColumnName, + { + LedgerColumn { + backend: Arc::clone(&self.0), + column: PhantomData, + } + } +} + +impl TransactionStorage for BlockStore { + fn extend_transactions_from_block(&self, block: &BlockAny) -> Result<()> { + let transactions_cf = self.column::(); + for transaction in &block.transactions { + transactions_cf.put(&transaction.hash(), transaction)? + } + Ok(()) + } + + fn get_transaction_by_hash(&self, hash: &H256) -> Result> { + let transactions_cf = self.column::(); + transactions_cf.get(hash) + } + + fn get_transaction_by_block_hash_and_index( + &self, + block_hash: &H256, + index: usize, + ) -> Result> { + let blockmap_cf = self.column::(); + let blocks_cf = self.column::(); + + if let Some(block_number) = blockmap_cf.get(block_hash)? { + let block = blocks_cf.get(&block_number)?; + + match block { + Some(block) => Ok(block.transactions.get(index).cloned()), + None => Ok(None), + } + } else { + Ok(None) + } + } + + fn get_transaction_by_block_number_and_index( + &self, + block_number: &U256, + index: usize, + ) -> Result> { + let blocks_cf = self.column::(); + let block = blocks_cf + .get(block_number)? + .ok_or(format_err!("Error fetching block by number"))?; + + Ok(block.transactions.get(index).cloned()) + } + + fn put_transaction(&self, transaction: &TransactionV2) -> Result<()> { + let transactions_cf = self.column::(); + println!( + "putting transaction k {:x?} v {:#?}", + transaction.hash(), + transaction + ); + transactions_cf.put(&transaction.hash(), transaction) + } +} + +impl BlockStorage for BlockStore { + fn get_block_by_number(&self, number: &U256) -> Result> { + let blocks_cf = self.column::(); + blocks_cf.get(number) + } + + fn get_block_by_hash(&self, block_hash: &H256) -> Result> { + let blocks_map_cf = self.column::(); + match blocks_map_cf.get(block_hash) { + Ok(Some(block_number)) => self.get_block_by_number(&block_number), + Ok(None) => Ok(None), + Err(e) => Err(e), + } + } + + fn put_block(&self, block: &BlockAny) -> Result<()> { + self.extend_transactions_from_block(block)?; + + let block_number = block.header.number; + let hash = block.header.hash(); + let blocks_cf = self.column::(); + let blocks_map_cf = self.column::(); + + blocks_cf.put(&block_number, block)?; + blocks_map_cf.put(&hash, &block_number) + } + + fn get_latest_block(&self) -> Result> { + let latest_block_cf = self.column::(); + + match latest_block_cf.get(&()) { + Ok(Some(block_number)) => self.get_block_by_number(&block_number), + Ok(None) => Ok(None), + Err(e) => Err(e), + } + } + + fn put_latest_block(&self, block: Option<&BlockAny>) -> Result<()> { + if let Some(block) = block { + let latest_block_cf = self.column::(); + let block_number = block.header.number; + latest_block_cf.put(&(), &block_number)?; + } + Ok(()) + } +} + +impl ReceiptStorage for BlockStore { + fn get_receipt(&self, tx: &H256) -> Result> { + let receipts_cf = self.column::(); + receipts_cf.get(tx) + } + + fn put_receipts(&self, receipts: Vec) -> Result<()> { + let receipts_cf = self.column::(); + for receipt in receipts { + receipts_cf.put(&receipt.tx_hash, &receipt)?; + } + Ok(()) + } +} + +impl LogStorage for BlockStore { + fn get_logs(&self, block_number: &U256) -> Result>>> { + let logs_cf = self.column::(); + logs_cf.get(block_number) + } + + fn put_logs(&self, address: H160, logs: Vec, block_number: U256) -> Result<()> { + let logs_cf = self.column::(); + if let Some(mut map) = self.get_logs(&block_number)? { + map.insert(address, logs); + logs_cf.put(&block_number, &map) + } else { + let map = HashMap::from([(address, logs)]); + logs_cf.put(&block_number, &map) + } + } +} + +impl FlushableStorage for BlockStore { + fn flush(&self) -> Result<()> { + self.0.flush() + } +} + +impl BlockStore { + pub fn get_code_by_hash(&self, hash: &H256) -> Result>> { + let code_cf = self.column::(); + code_cf.get_bytes(hash) + } + + pub fn put_code(&self, hash: &H256, code: &[u8]) -> Result<()> { + let code_cf = self.column::(); + code_cf.put_bytes(hash, code) + } +} + +impl Rollback for BlockStore { + fn disconnect_latest_block(&self) -> Result<()> { + if let Some(block) = self.get_latest_block()? { + println!("disconnecting block number : {:x?}", block.header.number); + let transactions_cf = self.column::(); + let receipts_cf = self.column::(); + for tx in &block.transactions { + transactions_cf.delete(&tx.hash())?; + receipts_cf.delete(&tx.hash())?; + } + + let blocks_cf = self.column::(); + blocks_cf.delete(&block.header.number)?; + + let blocks_map_cf = self.column::(); + blocks_map_cf.delete(&block.header.hash())?; + + if let Some(block) = self.get_block_by_hash(&block.header.parent_hash)? { + let latest_block_cf = self.column::(); + latest_block_cf.put(&(), &block.header.number)?; + } + } + Ok(()) + } +} diff --git a/lib/ain-evm/src/storage/code.rs b/lib/ain-evm/src/storage/code.rs deleted file mode 100644 index 064a75f040..0000000000 --- a/lib/ain-evm/src/storage/code.rs +++ /dev/null @@ -1,43 +0,0 @@ -use std::collections::HashMap; - -use primitive_types::{H256, U256}; -use serde::{Deserialize, Serialize}; - -use super::traits::PersistentState; -/// `CodeHistory` maintains a history of accounts' codes. -/// -/// It tracks the current state (`code_map`), as well as a history (`history`) of code hashes -/// that should be removed if a specific block is rolled back. The correct account code_hash -/// is tracked by the state trie. -/// This structure is solely required for rolling back and preventing ghost entries. -#[derive(Default, Debug, Serialize, Deserialize)] -pub struct CodeHistory { - /// The current state of each code - code_map: HashMap>, - /// A map from block number to a vector of code hashes to remove for that block. - history: HashMap>, -} - -impl PersistentState for CodeHistory {} - -impl CodeHistory { - pub fn insert(&mut self, block_number: U256, code_hash: H256, code: Vec) { - self.code_map.insert(code_hash, code); - self.history - .entry(block_number) - .or_default() - .push(code_hash); - } - - pub fn get(&self, code_hash: &H256) -> Option<&Vec> { - self.code_map.get(code_hash) - } - - pub fn rollback(&mut self, block_number: U256) { - if let Some(code_hashes) = self.history.remove(&block_number) { - for code_hash in &code_hashes { - self.code_map.remove(code_hash); - } - } - } -} diff --git a/lib/ain-evm/src/storage/data_handler.rs b/lib/ain-evm/src/storage/data_handler.rs deleted file mode 100644 index c57c0d332a..0000000000 --- a/lib/ain-evm/src/storage/data_handler.rs +++ /dev/null @@ -1,325 +0,0 @@ -use std::borrow::ToOwned; -use std::{collections::HashMap, sync::RwLock}; - -use ain_cpp_imports::Attributes; -use ethereum::{BlockAny, TransactionV2}; -use primitive_types::{H160, H256, U256}; - -use super::traits::AttributesStorage; -use super::{ - code::CodeHistory, - traits::{ - BlockStorage, FlushableStorage, PersistentState, ReceiptStorage, Rollback, - TransactionStorage, - }, -}; -use crate::log::LogIndex; -use crate::receipt::Receipt; -use crate::storage::traits::LogStorage; -use crate::Result; - -pub static BLOCK_MAP_PATH: &str = "block_map.bin"; -pub static BLOCK_DATA_PATH: &str = "block_data.bin"; -pub static LATEST_BLOCK_DATA_PATH: &str = "latest_block_data.bin"; -pub static RECEIPT_MAP_PATH: &str = "receipt_map.bin"; -pub static CODE_MAP_PATH: &str = "code_map.bin"; -pub static TRANSACTION_DATA_PATH: &str = "transaction_data.bin"; -pub static ADDRESS_LOGS_MAP_PATH: &str = "address_logs_map.bin"; -pub static ATTRIBUTES_DATA_PATH: &str = "attributes_data.bin"; - -type BlockHashtoBlock = HashMap; -type Blocks = HashMap; -type TxHashToTx = HashMap; -type LatestBlockNumber = U256; -type TransactionHashToReceipt = HashMap; -type AddressToLogs = HashMap>>; -type OptionalAttributes = Option; - -impl PersistentState for BlockHashtoBlock {} -impl PersistentState for Blocks {} -impl PersistentState for LatestBlockNumber {} -impl PersistentState for TransactionHashToReceipt {} -impl PersistentState for TxHashToTx {} -impl PersistentState for AddressToLogs {} -impl PersistentState for OptionalAttributes {} - -#[derive(Debug, Default)] -pub struct BlockchainDataHandler { - // Improvements: Add transaction_map behind feature flag -txindex or equivalent - transactions: RwLock, - - receipts: RwLock, - - block_map: RwLock, - blocks: RwLock, - latest_block_number: RwLock>, - - code_map: RwLock, - - address_logs_map: RwLock, - attributes: RwLock>, -} - -impl BlockchainDataHandler { - pub fn restore() -> Self { - BlockchainDataHandler { - transactions: RwLock::new( - TxHashToTx::load_from_disk(TRANSACTION_DATA_PATH) - .expect("Error loading blocks data"), - ), - block_map: RwLock::new( - BlockHashtoBlock::load_from_disk(BLOCK_MAP_PATH) - .expect("Error loading block_map data"), - ), - latest_block_number: RwLock::new( - LatestBlockNumber::load_from_disk(LATEST_BLOCK_DATA_PATH).ok(), - ), - blocks: RwLock::new( - Blocks::load_from_disk(BLOCK_DATA_PATH).expect("Error loading blocks data"), - ), - receipts: RwLock::new( - TransactionHashToReceipt::load_from_disk(RECEIPT_MAP_PATH) - .expect("Error loading receipts data"), - ), - code_map: RwLock::new(CodeHistory::load_from_disk(CODE_MAP_PATH).unwrap_or_default()), - address_logs_map: RwLock::new( - AddressToLogs::load_from_disk(ADDRESS_LOGS_MAP_PATH).unwrap_or_default(), - ), - attributes: RwLock::new( - OptionalAttributes::load_from_disk(ATTRIBUTES_DATA_PATH) - .expect("Error loading attributes data"), - ), - } - } -} - -impl TransactionStorage for BlockchainDataHandler { - // TODO: Feature flag - fn extend_transactions_from_block(&self, block: &BlockAny) -> Result<()> { - let mut transactions = self.transactions.write().unwrap(); - - for transaction in &block.transactions { - let hash = transaction.hash(); - transactions.insert(hash, transaction.clone()); - } - Ok(()) - } - - fn get_transaction_by_hash(&self, hash: &H256) -> Result> { - let transaction = self - .transactions - .read() - .unwrap() - .get(hash) - .map(ToOwned::to_owned); - Ok(transaction) - } - - fn get_transaction_by_block_hash_and_index( - &self, - block_hash: &H256, - index: usize, - ) -> Result> { - self.block_map - .write() - .unwrap() - .get(block_hash) - .map_or(Ok(None), |block_number| { - self.get_transaction_by_block_number_and_index(block_number, index) - }) - } - - fn get_transaction_by_block_number_and_index( - &self, - block_number: &U256, - index: usize, - ) -> Result> { - let transaction = self - .blocks - .write() - .unwrap() - .get(block_number) - .and_then(|block| block.transactions.get(index).map(ToOwned::to_owned)); - Ok(transaction) - } - - fn put_transaction(&self, transaction: &TransactionV2) -> Result<()> { - self.transactions - .write() - .unwrap() - .insert(transaction.hash(), transaction.clone()); - Ok(()) - } -} - -impl BlockStorage for BlockchainDataHandler { - fn get_block_by_number(&self, number: &U256) -> Result> { - let block = self - .blocks - .write() - .unwrap() - .get(number) - .map(ToOwned::to_owned); - Ok(block) - } - - fn get_block_by_hash(&self, block_hash: &H256) -> Result> { - self.block_map - .write() - .unwrap() - .get(block_hash) - .map_or(Ok(None), |block_number| { - self.get_block_by_number(block_number) - }) - } - - fn put_block(&self, block: &BlockAny) -> Result<()> { - self.extend_transactions_from_block(block)?; - - let block_number = block.header.number; - let hash = block.header.hash(); - self.blocks - .write() - .unwrap() - .insert(block_number, block.clone()); - self.block_map.write().unwrap().insert(hash, block_number); - Ok(()) - } - - fn get_latest_block(&self) -> Result> { - self.latest_block_number - .read() - .unwrap() - .as_ref() - .map_or(Ok(None), |number| self.get_block_by_number(number)) - } - - fn put_latest_block(&self, block: Option<&BlockAny>) -> Result<()> { - let mut latest_block_number = self.latest_block_number.write().unwrap(); - *latest_block_number = block.map(|b| b.header.number); - Ok(()) - } -} - -impl ReceiptStorage for BlockchainDataHandler { - fn get_receipt(&self, tx: &H256) -> Result> { - let receipt = self.receipts.read().unwrap().get(tx).map(ToOwned::to_owned); - Ok(receipt) - } - - fn put_receipts(&self, receipts: Vec) -> Result<()> { - let mut receipt_map = self.receipts.write().unwrap(); - for receipt in receipts { - receipt_map.insert(receipt.tx_hash, receipt); - } - Ok(()) - } -} - -impl LogStorage for BlockchainDataHandler { - fn get_logs(&self, block_number: &U256) -> Result>>> { - let logs = self - .address_logs_map - .read() - .unwrap() - .get(block_number) - .map(ToOwned::to_owned); - Ok(logs) - } - - fn put_logs(&self, address: H160, logs: Vec, block_number: U256) -> Result<()> { - let mut address_logs_map = self.address_logs_map.write().unwrap(); - - let address_map = address_logs_map.entry(block_number).or_default(); - address_map.insert(address, logs); - Ok(()) - } -} - -impl FlushableStorage for BlockchainDataHandler { - fn flush(&self) -> Result<()> { - self.block_map - .write() - .unwrap() - .save_to_disk(BLOCK_MAP_PATH)?; - self.blocks.write().unwrap().save_to_disk(BLOCK_DATA_PATH)?; - self.latest_block_number - .write() - .unwrap() - .unwrap_or_default() - .save_to_disk(LATEST_BLOCK_DATA_PATH)?; - self.receipts - .write() - .unwrap() - .save_to_disk(RECEIPT_MAP_PATH)?; - self.transactions - .write() - .unwrap() - .save_to_disk(TRANSACTION_DATA_PATH)?; - self.code_map.write().unwrap().save_to_disk(CODE_MAP_PATH)?; - self.address_logs_map - .write() - .unwrap() - .save_to_disk(ADDRESS_LOGS_MAP_PATH) - } -} - -impl BlockchainDataHandler { - pub fn get_code_by_hash(&self, hash: &H256) -> Result>> { - let code = self - .code_map - .read() - .unwrap() - .get(hash) - .map(ToOwned::to_owned); - Ok(code) - } - - pub fn put_code(&self, hash: &H256, code: &[u8]) -> Result<()> { - let block_number = self - .get_latest_block()? - .map(|b| b.header.number) - .unwrap_or_default() - + 1; - self.code_map - .write() - .unwrap() - .insert(block_number, *hash, code.to_vec()); - Ok(()) - } -} - -impl Rollback for BlockchainDataHandler { - fn disconnect_latest_block(&self) -> Result<()> { - if let Some(block) = self.get_latest_block()? { - println!("disconnecting block number : {:x?}", block.header.number); - let mut transactions = self.transactions.write().unwrap(); - let mut receipts = self.receipts.write().unwrap(); - for tx in &block.transactions { - let hash = &tx.hash(); - transactions.remove(hash); - receipts.remove(hash); - } - - self.block_map.write().unwrap().remove(&block.header.hash()); - self.blocks.write().unwrap().remove(&block.header.number); - self.code_map.write().unwrap().rollback(block.header.number); - - self.put_latest_block(self.get_block_by_hash(&block.header.parent_hash)?.as_ref())? - } - Ok(()) - } -} - -impl AttributesStorage for BlockchainDataHandler { - fn put_attributes(&self, attr: Option<&Attributes>) -> Result<()> { - let mut attributes = self.attributes.write().unwrap(); - *attributes = attr.cloned(); - Ok(()) - } - - fn get_attributes(&self) -> Result> { - let attributes = self.attributes.read().unwrap().as_ref().cloned(); - Ok(attributes) - } -} diff --git a/lib/ain-evm/src/storage/db.rs b/lib/ain-evm/src/storage/db.rs new file mode 100644 index 0000000000..dcd2eda94b --- /dev/null +++ b/lib/ain-evm/src/storage/db.rs @@ -0,0 +1,329 @@ +use std::path::PathBuf; +use std::{collections::HashMap, marker::PhantomData, path::Path, sync::Arc}; + +use bincode; +use ethereum::{BlockAny, TransactionV2}; +use primitive_types::{H160, H256, U256}; +use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, Options, DB}; +use serde::de::DeserializeOwned; +use serde::Serialize; + +use crate::log::LogIndex; +use crate::receipt::Receipt; +use crate::Result; + +fn get_db_options() -> Options { + let mut options = Options::default(); + options.create_if_missing(true); + options.create_missing_column_families(true); + // A good value for this is the number of cores on the machine // TODO fetch via ffi + options.increase_parallelism(2); + + let mut env = rocksdb::Env::new().unwrap(); + + // While a compaction is ongoing, all the background threads + // could be used by the compaction. This can stall writes which + // need to flush the memtable. Add some high-priority background threads + // which can service these writes. + env.set_high_priority_background_threads(4); + options.set_env(&env); + + // Set max total wal size to 4G. + options.set_max_total_wal_size(4 * 1024 * 1024 * 1024); + + options +} + +#[derive(Debug)] +pub struct Rocks(DB); + +impl Rocks { + pub fn open(path: &PathBuf) -> Result { + let cf_descriptors = Self::column_names() + .into_iter() + .map(|cf_name| ColumnFamilyDescriptor::new(cf_name, Options::default())); + + let db_opts = get_db_options(); + let db = DB::open_cf_descriptors(&db_opts, path, cf_descriptors)?; + + Ok(Self(db)) + } + + fn column_names() -> Vec<&'static str> { + vec![ + columns::Blocks::NAME, + columns::Transactions::NAME, + columns::Receipts::NAME, + columns::BlockMap::NAME, + columns::LatestBlockNumber::NAME, + columns::CodeMap::NAME, + columns::AddressLogsMap::NAME, + ] + } + + #[allow(dead_code)] + fn destroy(path: &Path) -> Result<()> { + DB::destroy(&Options::default(), path)?; + + Ok(()) + } + + pub fn cf_handle(&self, cf: &str) -> &ColumnFamily { + self.0 + .cf_handle(cf) + .expect("should never get an unknown column") + } + + fn get_cf(&self, cf: &ColumnFamily, key: &[u8]) -> Result>> { + let opt = self.0.get_cf(cf, key)?; + Ok(opt) + } + + fn put_cf(&self, cf: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> { + self.0.put_cf(cf, key, value)?; + Ok(()) + } + + fn delete_cf(&self, cf: &ColumnFamily, key: &[u8]) -> Result<()> { + self.0.delete_cf(cf, key)?; + Ok(()) + } + + pub fn flush(&self) -> Result<()> { + self.0.flush()?; + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct LedgerColumn +where + C: Column + ColumnName, +{ + pub backend: Arc, + pub column: PhantomData, +} + +impl LedgerColumn +where + C: Column + ColumnName, +{ + pub fn get_bytes(&self, key: &C::Index) -> Result>> { + self.backend.get_cf(self.handle(), &C::key(key)) + } + + pub fn put_bytes(&self, key: &C::Index, value: &[u8]) -> Result<()> { + self.backend.put_cf(self.handle(), &C::key(key), value) + } + + pub fn handle(&self) -> &ColumnFamily { + self.backend.cf_handle(C::NAME) + } +} + +pub mod columns { + + #[derive(Debug)] + /// Column family for blocks data + pub struct Blocks; + + #[derive(Debug)] + /// Column family for transactions data + pub struct Transactions; + + #[derive(Debug)] + /// Column family for receipts data + pub struct Receipts; + + #[derive(Debug)] + /// Column family for block map data + pub struct BlockMap; + + #[derive(Debug)] + /// Column family for latest block number data + pub struct LatestBlockNumber; + + #[derive(Debug)] + /// Column family for code map data + pub struct CodeMap; + + #[derive(Debug)] + /// Column family for address logs map data + pub struct AddressLogsMap; +} + +// +// ColumnName trait. Define associated column family NAME +// +pub trait ColumnName { + const NAME: &'static str; +} + +const BLOCKS_CF: &str = "blocks"; +const TRANSACTIONS_CF: &str = "transactions"; +const RECEIPTS_CF: &str = "receipts"; +const BLOCK_MAP_CF: &str = "block_map"; +const LATEST_BLOCK_NUMBER_CF: &str = "latest_block_number"; +const CODE_MAP_CF: &str = "code_map"; +const ADDRESS_LOGS_MAP_CF: &str = "address_logs_map"; + +// +// ColumnName impl +// +impl ColumnName for columns::Transactions { + const NAME: &'static str = TRANSACTIONS_CF; +} + +impl ColumnName for columns::Blocks { + const NAME: &'static str = BLOCKS_CF; +} + +impl ColumnName for columns::Receipts { + const NAME: &'static str = RECEIPTS_CF; +} + +impl ColumnName for columns::BlockMap { + const NAME: &'static str = BLOCK_MAP_CF; +} + +impl ColumnName for columns::LatestBlockNumber { + const NAME: &'static str = LATEST_BLOCK_NUMBER_CF; +} + +impl ColumnName for columns::AddressLogsMap { + const NAME: &'static str = ADDRESS_LOGS_MAP_CF; +} +impl ColumnName for columns::CodeMap { + const NAME: &'static str = CODE_MAP_CF; +} + +// +// Column trait. Define associated index type +// +pub trait Column { + type Index; + + fn key(index: &Self::Index) -> Vec; +} + +// +// Column trait impl +// + +impl Column for columns::Transactions { + type Index = H256; + + fn key(index: &Self::Index) -> Vec { + index.as_bytes().to_vec() + } +} + +impl Column for columns::Blocks { + type Index = U256; + + fn key(index: &Self::Index) -> Vec { + let mut bytes = [0_u8; 32]; + index.to_big_endian(&mut bytes); + bytes.to_vec() + } +} + +impl Column for columns::Receipts { + type Index = H256; + + fn key(index: &Self::Index) -> Vec { + index.to_fixed_bytes().to_vec() + } +} +impl Column for columns::BlockMap { + type Index = H256; + + fn key(index: &Self::Index) -> Vec { + index.to_fixed_bytes().to_vec() + } +} + +impl Column for columns::LatestBlockNumber { + type Index = (); + + fn key(_index: &Self::Index) -> Vec { + b"latest".to_vec() + } +} + +impl Column for columns::AddressLogsMap { + type Index = U256; + + fn key(index: &Self::Index) -> Vec { + let mut bytes = [0_u8; 32]; + index.to_big_endian(&mut bytes); + bytes.to_vec() + } +} + +impl Column for columns::CodeMap { + type Index = H256; + + fn key(index: &Self::Index) -> Vec { + index.to_fixed_bytes().to_vec() + } +} + +// +// TypedColumn trait. Define associated value type +// +pub trait TypedColumn: Column { + type Type: Serialize + DeserializeOwned; +} + +// +// TypedColumn impl +// +impl TypedColumn for columns::Transactions { + type Type = TransactionV2; +} + +impl TypedColumn for columns::Blocks { + type Type = BlockAny; +} + +impl TypedColumn for columns::Receipts { + type Type = Receipt; +} + +impl TypedColumn for columns::BlockMap { + type Type = U256; +} + +impl TypedColumn for columns::LatestBlockNumber { + type Type = U256; +} + +impl TypedColumn for columns::AddressLogsMap { + type Type = HashMap>; +} + +impl LedgerColumn +where + C: TypedColumn + ColumnName, +{ + pub fn get(&self, key: &C::Index) -> Result> { + if let Some(serialized_value) = self.get_bytes(key)? { + let value = bincode::deserialize(&serialized_value)?; + + Ok(Some(value)) + } else { + Ok(None) + } + } + + pub fn put(&self, key: &C::Index, value: &C::Type) -> Result<()> { + let serialized_value = bincode::serialize(value)?; + + self.put_bytes(key, &serialized_value) + } + + pub fn delete(&self, key: &C::Index) -> Result<()> { + self.backend.delete_cf(self.handle(), &C::key(key)) + } +} diff --git a/lib/ain-evm/src/storage/mod.rs b/lib/ain-evm/src/storage/mod.rs index 2cf51dbcfb..2071a04f35 100644 --- a/lib/ain-evm/src/storage/mod.rs +++ b/lib/ain-evm/src/storage/mod.rs @@ -1,17 +1,17 @@ +mod block_store; mod cache; -mod code; -mod data_handler; +mod db; pub mod traits; -use std::collections::HashMap; +use std::{collections::HashMap, path::Path}; use ain_cpp_imports::Attributes; use ethereum::{BlockAny, TransactionV2}; use primitive_types::{H160, H256, U256}; use self::{ + block_store::BlockStore, cache::Cache, - data_handler::BlockchainDataHandler, traits::{ AttributesStorage, BlockStorage, FlushableStorage, ReceiptStorage, Rollback, TransactionStorage, @@ -25,35 +25,29 @@ use crate::Result; #[derive(Debug)] pub struct Storage { cache: Cache, - blockchain_data_handler: BlockchainDataHandler, -} - -impl Default for Storage { - fn default() -> Self { - Self::new() - } + blockstore: BlockStore, } impl Storage { - pub fn new() -> Self { - Self { + pub fn new(path: &Path) -> Result { + Ok(Self { cache: Cache::new(None), - blockchain_data_handler: BlockchainDataHandler::default(), - } + blockstore: BlockStore::new(path)?, + }) } - pub fn restore() -> Self { - Self { + pub fn restore(path: &Path) -> Result { + Ok(Self { cache: Cache::new(None), - blockchain_data_handler: BlockchainDataHandler::restore(), - } + blockstore: BlockStore::new(path)?, + }) } } impl BlockStorage for Storage { fn get_block_by_number(&self, number: &U256) -> Result> { self.cache.get_block_by_number(number).or_else(|_| { - let block = self.blockchain_data_handler.get_block_by_number(number); + let block = self.blockstore.get_block_by_number(number); if let Ok(Some(ref block)) = block { self.cache.put_block(block)?; } @@ -63,7 +57,7 @@ impl BlockStorage for Storage { fn get_block_by_hash(&self, block_hash: &H256) -> Result> { self.cache.get_block_by_hash(block_hash).or_else(|_| { - let block = self.blockchain_data_handler.get_block_by_hash(block_hash); + let block = self.blockstore.get_block_by_hash(block_hash); if let Ok(Some(ref block)) = block { self.cache.put_block(block)?; } @@ -73,12 +67,12 @@ impl BlockStorage for Storage { fn put_block(&self, block: &BlockAny) -> Result<()> { self.cache.put_block(block)?; - self.blockchain_data_handler.put_block(block) + self.blockstore.put_block(block) } fn get_latest_block(&self) -> Result> { let block = self.cache.get_latest_block().or_else(|_| { - let latest_block = self.blockchain_data_handler.get_latest_block(); + let latest_block = self.blockstore.get_latest_block(); if let Ok(Some(ref block)) = latest_block { self.cache.put_latest_block(Some(block))?; } @@ -89,7 +83,7 @@ impl BlockStorage for Storage { fn put_latest_block(&self, block: Option<&BlockAny>) -> Result<()> { self.cache.put_latest_block(block)?; - self.blockchain_data_handler.put_latest_block(block) + self.blockstore.put_latest_block(block) } } @@ -98,13 +92,12 @@ impl TransactionStorage for Storage { // Feature flag self.cache.extend_transactions_from_block(block)?; - self.blockchain_data_handler - .extend_transactions_from_block(block) + self.blockstore.extend_transactions_from_block(block) } fn get_transaction_by_hash(&self, hash: &H256) -> Result> { let transaction = self.cache.get_transaction_by_hash(hash).or_else(|_| { - let transaction = self.blockchain_data_handler.get_transaction_by_hash(hash); + let transaction = self.blockstore.get_transaction_by_hash(hash); if let Ok(Some(ref transaction)) = transaction { self.cache.put_transaction(transaction)?; } @@ -123,7 +116,7 @@ impl TransactionStorage for Storage { .get_transaction_by_block_hash_and_index(hash, index) .or_else(|_| { let transaction = self - .blockchain_data_handler + .blockstore .get_transaction_by_block_hash_and_index(hash, index); if let Ok(Some(ref transaction)) = transaction { self.cache.put_transaction(transaction)?; @@ -143,7 +136,7 @@ impl TransactionStorage for Storage { .get_transaction_by_block_number_and_index(number, index) .or_else(|_| { let transaction = self - .blockchain_data_handler + .blockstore .get_transaction_by_block_number_and_index(number, index); if let Ok(Some(ref transaction)) = transaction { self.cache.put_transaction(transaction)?; @@ -155,79 +148,76 @@ impl TransactionStorage for Storage { fn put_transaction(&self, transaction: &TransactionV2) -> Result<()> { self.cache.put_transaction(transaction)?; - self.blockchain_data_handler.put_transaction(transaction) + self.blockstore.put_transaction(transaction) } } impl ReceiptStorage for Storage { fn get_receipt(&self, tx: &H256) -> Result> { - self.blockchain_data_handler.get_receipt(tx) + self.blockstore.get_receipt(tx) } fn put_receipts(&self, receipts: Vec) -> Result<()> { - self.blockchain_data_handler.put_receipts(receipts) + self.blockstore.put_receipts(receipts) } } impl LogStorage for Storage { fn get_logs(&self, block_number: &U256) -> Result>>> { - self.blockchain_data_handler.get_logs(block_number) + self.blockstore.get_logs(block_number) } fn put_logs(&self, address: H160, logs: Vec, block_number: U256) -> Result<()> { - self.blockchain_data_handler - .put_logs(address, logs, block_number) + self.blockstore.put_logs(address, logs, block_number) } } impl FlushableStorage for Storage { fn flush(&self) -> Result<()> { - self.blockchain_data_handler.flush() + self.blockstore.flush() } } impl Storage { pub fn get_code_by_hash(&self, hash: H256) -> Result>> { - self.blockchain_data_handler.get_code_by_hash(&hash) + self.blockstore.get_code_by_hash(&hash) } pub fn put_code(&self, hash: H256, code: Vec) -> Result<()> { - self.blockchain_data_handler.put_code(&hash, &code) + self.blockstore.put_code(&hash, &code) } } impl Storage { pub fn dump_db(&self) { - println!( - "self.block_data_handler : {:#?}", - self.blockchain_data_handler - ); + // println!("self.block_data_handler : {:#?}", self.blockstore); } } impl Rollback for Storage { fn disconnect_latest_block(&self) -> Result<()> { self.cache.disconnect_latest_block()?; - self.blockchain_data_handler.disconnect_latest_block() + self.blockstore.disconnect_latest_block() } } impl AttributesStorage for Storage { fn put_attributes(&self, attributes: Option<&Attributes>) -> Result<()> { self.cache.put_attributes(attributes)?; - self.blockchain_data_handler.put_attributes(attributes)?; + // self.blockstore.put_attributes(attributes)?; Ok(()) } fn get_attributes(&self) -> Result> { - let attributes = self.cache.get_attributes().or_else(|_| { - let attributes = self.blockchain_data_handler.get_attributes(); - if let Ok(Some(ref attr)) = attributes { - self.cache.put_attributes(Some(attr))?; - } - attributes - })?; - Ok(attributes) + // let attributes = self.cache.get_attributes().or_else(|_| { + // let attributes = self.blockstore.get_attributes(); + // if let Ok(Some(ref attr)) = attributes { + // self.cache.put_attributes(Some(attr))?; + // } + // attributes + // })?; + // Ok(attributes) + Ok(None) } }