diff --git a/applications/tari_validator_node/src/state_store.rs b/applications/tari_validator_node/src/state_store.rs index fbc7f00d5..4e76cc98f 100644 --- a/applications/tari_validator_node/src/state_store.rs +++ b/applications/tari_validator_node/src/state_store.rs @@ -90,13 +90,15 @@ impl StateStore for ValidatorNodeStateStore { match self { ValidatorNodeStateStore::Rocksdb(db) => { let tx = db.create_read_tx()?; - let tx = ValidatorNodeStateStoreReadTransaction::Rocksdb(tx); - Ok(ValidatorNodeStateStoreWriteTransaction {tx: Some(tx) } ) + let read_tx = ValidatorNodeStateStoreReadTransaction::Rocksdb(tx); + let write_tx = db.create_write_tx()?; + Ok(ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, read_tx } ) }, ValidatorNodeStateStore::Sqlite(db) => { let tx = db.create_read_tx()?; - let tx = ValidatorNodeStateStoreReadTransaction::Sqlite(tx); - Ok(ValidatorNodeStateStoreWriteTransaction {tx: Some(tx) } ) + let read_tx = ValidatorNodeStateStoreReadTransaction::Sqlite(tx); + let write_tx = db.create_write_tx()?; + Ok(ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, read_tx }) }, } } @@ -126,15 +128,25 @@ impl StateStore for ValidatorNodeStateStore { } } -pub struct ValidatorNodeStateStoreWriteTransaction<'a> { - tx: Option> +pub enum ValidatorNodeStateStoreWriteTransaction<'a> { + Rocksdb { + write_tx: as StateStore>::WriteTransaction<'a>, + read_tx: ValidatorNodeStateStoreReadTransaction<'a>, + }, + Sqlite { + write_tx: as StateStore>::WriteTransaction<'a>, + read_tx: ValidatorNodeStateStoreReadTransaction<'a>, + }, } impl<'a> Deref for ValidatorNodeStateStoreWriteTransaction<'a> { type Target = ValidatorNodeStateStoreReadTransaction<'a>; fn deref(&self) -> &Self::Target { - self.tx.as_ref().unwrap() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { read_tx, .. } => read_tx, + ValidatorNodeStateStoreWriteTransaction::Sqlite { read_tx, .. } => read_tx, + } } } @@ -145,19 +157,31 @@ impl<'tx> StateStoreWriteTransaction type Addr = PeerAddress; fn commit(&mut self) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.commit(), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.commit(), + } } fn rollback(&mut self) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.rollback(), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.rollback(), + } } fn blocks_insert(&mut self, block: &tari_dan_storage::consensus_models::Block) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.blocks_insert(block), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.blocks_insert(block), + } } fn blocks_delete(&mut self, block_id: &tari_dan_storage::consensus_models::BlockId) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.blocks_delete(block_id), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.blocks_delete(block_id), + } } fn blocks_set_flags( @@ -166,59 +190,101 @@ impl<'tx> StateStoreWriteTransaction is_committed: Option, is_justified: Option, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.blocks_set_flags(block_id, is_committed, is_justified), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.blocks_set_flags(block_id, is_committed, is_justified), + } } fn block_diffs_insert(&mut self, block_id: &tari_dan_storage::consensus_models::BlockId, changes: &[tari_dan_storage::consensus_models::SubstateChange]) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.block_diffs_insert(block_id, changes), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.block_diffs_insert(block_id, changes), + } } fn block_diffs_remove(&mut self, block_id: &tari_dan_storage::consensus_models::BlockId) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.block_diffs_remove(block_id), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.block_diffs_remove(block_id), + } } fn quorum_certificates_insert(&mut self, qc: &tari_dan_storage::consensus_models::QuorumCertificate) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.quorum_certificates_insert(qc), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.quorum_certificates_insert(qc), + } } fn quorum_certificates_set_shares_processed(&mut self, qc_id: &tari_dan_storage::consensus_models::QcId) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.quorum_certificates_set_shares_processed(qc_id), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.quorum_certificates_set_shares_processed(qc_id), + } } fn last_sent_vote_set(&mut self, last_sent_vote: &tari_dan_storage::consensus_models::LastSentVote) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.last_sent_vote_set(last_sent_vote), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.last_sent_vote_set(last_sent_vote), + } } fn last_voted_set(&mut self, last_voted: &tari_dan_storage::consensus_models::LastVoted) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.last_voted_set(last_voted), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.last_voted_set(last_voted), + } } fn last_votes_unset(&mut self, last_voted: &tari_dan_storage::consensus_models::LastVoted) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.last_votes_unset(last_voted), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.last_votes_unset(last_voted), + } } fn last_executed_set(&mut self, last_exec: &tari_dan_storage::consensus_models::LastExecuted) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.last_executed_set(last_exec), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.last_executed_set(last_exec), + } } fn last_proposed_set(&mut self, last_proposed: &tari_dan_storage::consensus_models::LastProposed) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.last_proposed_set(last_proposed), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.last_proposed_set(last_proposed), + } } fn last_proposed_unset(&mut self, last_proposed: &tari_dan_storage::consensus_models::LastProposed) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.last_proposed_unset(last_proposed), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.last_proposed_unset(last_proposed), + } } fn leaf_block_set(&mut self, leaf_node: &tari_dan_storage::consensus_models::LeafBlock) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.leaf_block_set(leaf_node), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.leaf_block_set(leaf_node), + } } fn locked_block_set(&mut self, locked_block: &tari_dan_storage::consensus_models::LockedBlock) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.locked_block_set(locked_block), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.locked_block_set(locked_block), + } } fn high_qc_set(&mut self, high_qc: &tari_dan_storage::consensus_models::HighQc) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.high_qc_set(high_qc), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.high_qc_set(high_qc), + } } fn foreign_proposals_upsert( @@ -226,15 +292,24 @@ impl<'tx> StateStoreWriteTransaction foreign_proposal: &tari_dan_storage::consensus_models::ForeignProposal, proposed_in_block: Option, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.foreign_proposals_upsert(foreign_proposal, proposed_in_block), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.foreign_proposals_upsert(foreign_proposal, proposed_in_block), + } } fn foreign_proposals_delete(&mut self, block_id: &tari_dan_storage::consensus_models::BlockId) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.foreign_proposals_delete(block_id), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.foreign_proposals_delete(block_id), + } } fn foreign_proposals_delete_in_epoch(&mut self, epoch: tari_dan_common_types::Epoch) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.foreign_proposals_delete_in_epoch(epoch), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.foreign_proposals_delete_in_epoch(epoch), + } } fn foreign_proposals_set_status( @@ -242,7 +317,10 @@ impl<'tx> StateStoreWriteTransaction block_id: &tari_dan_storage::consensus_models::BlockId, status: tari_dan_storage::consensus_models::ForeignProposalStatus, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.foreign_proposals_set_status(block_id, status), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.foreign_proposals_set_status(block_id, status), + } } fn foreign_proposals_set_proposed_in( @@ -250,11 +328,17 @@ impl<'tx> StateStoreWriteTransaction block_id: &tari_dan_storage::consensus_models::BlockId, proposed_in_block: &tari_dan_storage::consensus_models::BlockId, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.foreign_proposals_set_proposed_in(block_id, proposed_in_block), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.foreign_proposals_set_proposed_in(block_id, proposed_in_block), + } } fn foreign_proposals_clear_proposed_in(&mut self, proposed_in_block: &tari_dan_storage::consensus_models::BlockId) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.foreign_proposals_clear_proposed_in(proposed_in_block), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.foreign_proposals_clear_proposed_in(proposed_in_block), + } } fn foreign_send_counters_set( @@ -262,29 +346,44 @@ impl<'tx> StateStoreWriteTransaction foreign_send_counter: &tari_dan_storage::consensus_models::ForeignSendCounters, block_id: &tari_dan_storage::consensus_models::BlockId, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.foreign_send_counters_set(foreign_send_counter, block_id), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.foreign_send_counters_set(foreign_send_counter, block_id), + } } fn foreign_receive_counters_set( &mut self, foreign_send_counter: &tari_dan_storage::consensus_models::ForeignReceiveCounters, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.foreign_receive_counters_set(foreign_send_counter), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.foreign_receive_counters_set(foreign_send_counter), + } } fn transactions_insert(&mut self, transaction: &tari_dan_storage::consensus_models::TransactionRecord) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.transactions_insert(transaction), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.transactions_insert(transaction), + } } fn transactions_update(&mut self, transaction: &tari_dan_storage::consensus_models::TransactionRecord) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.transactions_update(transaction), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.transactions_update(transaction), + } } fn transactions_save_all<'a, I: IntoIterator>( &mut self, transaction: I, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.transactions_save_all(transaction), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.transactions_save_all(transaction), + } } fn transactions_finalize_all<'a, I: IntoIterator>( @@ -292,18 +391,27 @@ impl<'tx> StateStoreWriteTransaction block_id: tari_dan_storage::consensus_models::BlockId, transaction: I, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.transactions_finalize_all(block_id, transaction), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.transactions_finalize_all(block_id, transaction), + } } fn transaction_executions_insert_or_ignore( &mut self, transaction_execution: &tari_dan_storage::consensus_models::BlockTransactionExecution, ) -> Result { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.transaction_executions_insert_or_ignore(transaction_execution), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.transaction_executions_insert_or_ignore(transaction_execution), + } } fn transaction_executions_remove_any_by_block_id(&mut self, block_id: &tari_dan_storage::consensus_models::BlockId) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.transaction_executions_remove_any_by_block_id(block_id), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.transaction_executions_remove_any_by_block_id(block_id), + } } fn transaction_pool_insert_new( @@ -312,7 +420,10 @@ impl<'tx> StateStoreWriteTransaction decision: tari_dan_storage::consensus_models::Decision, is_ready: bool, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.transaction_pool_insert_new(tx_id, decision, is_ready), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.transaction_pool_insert_new(tx_id, decision, is_ready), + } } fn transaction_pool_add_pending_update( @@ -320,27 +431,42 @@ impl<'tx> StateStoreWriteTransaction block_id: &tari_dan_storage::consensus_models::BlockId, pool_update: &tari_dan_storage::consensus_models::TransactionPoolStatusUpdate, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.transaction_pool_add_pending_update(block_id, pool_update), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.transaction_pool_add_pending_update(block_id, pool_update), + } } fn transaction_pool_remove(&mut self, transaction_id: &tari_transaction::TransactionId) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.transaction_pool_remove(transaction_id), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.transaction_pool_remove(transaction_id), + } } fn transaction_pool_remove_all<'a, I: IntoIterator>( &mut self, transaction_ids: I, ) -> Result, tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.transaction_pool_remove_all(transaction_ids), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.transaction_pool_remove_all(transaction_ids), + } } fn transaction_pool_confirm_all_transitions(&mut self, new_locked_block: &tari_dan_storage::consensus_models::LockedBlock) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.transaction_pool_confirm_all_transitions(new_locked_block), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.transaction_pool_confirm_all_transitions(new_locked_block), + } } fn transaction_pool_state_updates_remove_any_by_block_id(&mut self, block_id: &tari_dan_storage::consensus_models::BlockId) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.transaction_pool_state_updates_remove_any_by_block_id(block_id), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.transaction_pool_state_updates_remove_any_by_block_id(block_id), + } } fn missing_transactions_insert<'a, IMissing: IntoIterator>( @@ -349,7 +475,10 @@ impl<'tx> StateStoreWriteTransaction foreign_proposals: &[tari_dan_storage::consensus_models::ForeignProposal], missing_transaction_ids: IMissing, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.missing_transactions_insert(park_block, foreign_proposals, missing_transaction_ids), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.missing_transactions_insert(park_block, foreign_proposals, missing_transaction_ids), + } } fn missing_transactions_remove( @@ -357,11 +486,17 @@ impl<'tx> StateStoreWriteTransaction height: tari_dan_common_types::NodeHeight, transaction_id: &tari_transaction::TransactionId, ) -> Result)>, tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.missing_transactions_remove(height, transaction_id), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.missing_transactions_remove(height, transaction_id), + } } fn foreign_parked_blocks_insert(&mut self, park_block: &tari_dan_storage::consensus_models::ForeignParkedProposal) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.foreign_parked_blocks_insert(park_block), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.foreign_parked_blocks_insert(park_block), + } } fn foreign_parked_blocks_insert_missing_transactions<'a, I: IntoIterator>( @@ -369,22 +504,34 @@ impl<'tx> StateStoreWriteTransaction park_block_id: &tari_dan_storage::consensus_models::BlockId, missing_transaction_ids: I, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.foreign_parked_blocks_insert_missing_transactions(park_block_id, missing_transaction_ids), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.foreign_parked_blocks_insert_missing_transactions(park_block_id, missing_transaction_ids), + } } fn foreign_parked_blocks_remove_all_by_transaction( &mut self, transaction_id: &tari_transaction::TransactionId, ) -> Result, tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.foreign_parked_blocks_remove_all_by_transaction(transaction_id), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.foreign_parked_blocks_remove_all_by_transaction(transaction_id), + } } fn votes_insert(&mut self, vote: &tari_dan_storage::consensus_models::Vote) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.votes_insert(vote), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.votes_insert(vote), + } } fn votes_delete_all(&mut self) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.votes_delete_all(), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.votes_delete_all(), + } } fn substate_locks_insert_all<'a, I: IntoIterator)>>( @@ -392,22 +539,34 @@ impl<'tx> StateStoreWriteTransaction block_id: &tari_dan_storage::consensus_models::BlockId, locks: I, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.substate_locks_insert_all(block_id, locks), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.substate_locks_insert_all(block_id, locks), + } } fn substate_locks_remove_many_for_transactions<'a, I: Iterator>( &mut self, transaction_ids: std::iter::Peekable, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.substate_locks_remove_many_for_transactions(transaction_ids), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.substate_locks_remove_many_for_transactions(transaction_ids), + } } fn substate_locks_remove_any_by_block_id(&mut self, block_id: &tari_dan_storage::consensus_models::BlockId) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.substate_locks_remove_any_by_block_id(block_id), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.substate_locks_remove_any_by_block_id(block_id), + } } fn substates_create(&mut self, substate: &tari_dan_storage::consensus_models::SubstateRecord) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.substates_create(substate), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.substates_create(substate), + } } fn substates_down( @@ -419,7 +578,10 @@ impl<'tx> StateStoreWriteTransaction destroyed_transaction_id: &tari_transaction::TransactionId, destroyed_qc_id: &tari_dan_storage::consensus_models::QcId, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.substates_down(versioned_substate_id, shard, epoch, destroyed_block_height, destroyed_transaction_id, destroyed_qc_id), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.substates_down(versioned_substate_id, shard, epoch, destroyed_block_height, destroyed_transaction_id, destroyed_qc_id), + } } fn foreign_substate_pledges_save( @@ -428,14 +590,20 @@ impl<'tx> StateStoreWriteTransaction shard_group: tari_dan_common_types::ShardGroup, pledges: &tari_dan_storage::consensus_models::SubstatePledges, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.foreign_substate_pledges_save(transaction_id, shard_group, pledges), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.foreign_substate_pledges_save(transaction_id, shard_group, pledges), + } } fn foreign_substate_pledges_remove_many<'a, I: IntoIterator>( &mut self, transaction_ids: I, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.foreign_substate_pledges_remove_many(transaction_ids), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.foreign_substate_pledges_remove_many(transaction_ids), + } } fn pending_state_tree_diffs_insert( @@ -444,22 +612,34 @@ impl<'tx> StateStoreWriteTransaction shard: tari_dan_common_types::shard::Shard, diff: &tari_dan_storage::consensus_models::VersionedStateHashTreeDiff, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.pending_state_tree_diffs_insert(block_id, shard, diff), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.pending_state_tree_diffs_insert(block_id, shard, diff), + } } fn pending_state_tree_diffs_remove_by_block(&mut self, block_id: &tari_dan_storage::consensus_models::BlockId) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.pending_state_tree_diffs_remove_by_block(block_id), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.pending_state_tree_diffs_remove_by_block(block_id), + } } fn pending_state_tree_diffs_remove_and_return_by_block( &mut self, block_id: &tari_dan_storage::consensus_models::BlockId, ) -> Result>, tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.pending_state_tree_diffs_remove_and_return_by_block(block_id), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.pending_state_tree_diffs_remove_and_return_by_block(block_id), + } } fn state_tree_nodes_insert(&mut self, shard: tari_dan_common_types::shard::Shard, key: NodeKey, node: Node) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.state_tree_nodes_insert(shard, key, node), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.state_tree_nodes_insert(shard, key, node), + } } fn state_tree_nodes_record_stale_tree_node( @@ -467,19 +647,31 @@ impl<'tx> StateStoreWriteTransaction shard: tari_dan_common_types::shard::Shard, node: StaleTreeNode, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.state_tree_nodes_record_stale_tree_node(shard, node), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.state_tree_nodes_record_stale_tree_node(shard, node), + } } fn state_tree_shard_versions_set(&mut self, shard: tari_dan_common_types::shard::Shard, version: Version) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.state_tree_shard_versions_set(shard, version), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.state_tree_shard_versions_set(shard, version), + } } fn epoch_checkpoint_save(&mut self, checkpoint: &tari_dan_storage::consensus_models::EpochCheckpoint) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.epoch_checkpoint_save(checkpoint), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.epoch_checkpoint_save(checkpoint), + } } fn burnt_utxos_insert(&mut self, burnt_utxo: &tari_dan_storage::consensus_models::BurntUtxo) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.burnt_utxos_insert(burnt_utxo), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.burnt_utxos_insert(burnt_utxo), + } } fn burnt_utxos_set_proposed_block( @@ -487,15 +679,24 @@ impl<'tx> StateStoreWriteTransaction substate_id: &tari_engine_types::substate::SubstateId, proposed_in_block: &tari_dan_storage::consensus_models::BlockId, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.burnt_utxos_set_proposed_block(substate_id, proposed_in_block), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.burnt_utxos_set_proposed_block(substate_id, proposed_in_block), + } } fn burnt_utxos_clear_proposed_block(&mut self, proposed_in_block: &tari_dan_storage::consensus_models::BlockId) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.burnt_utxos_clear_proposed_block(proposed_in_block), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.burnt_utxos_clear_proposed_block(proposed_in_block), + } } fn burnt_utxos_delete(&mut self, substate_id: &tari_engine_types::substate::SubstateId) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.burnt_utxos_delete(substate_id), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.burnt_utxos_delete(substate_id), + } } fn lock_conflicts_insert_all<'a, I: IntoIterator)>>( @@ -503,11 +704,17 @@ impl<'tx> StateStoreWriteTransaction block_id: &tari_dan_storage::consensus_models::BlockId, conflicts: I, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.lock_conflicts_insert_all(block_id, conflicts), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.lock_conflicts_insert_all(block_id, conflicts), + } } fn validator_epoch_stats_add_participation_share(&mut self, qc_id: &tari_dan_storage::consensus_models::QcId) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.validator_epoch_stats_add_participation_share(qc_id), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.validator_epoch_stats_add_participation_share(qc_id), + } } fn validator_epoch_stats_updates<'a, I: IntoIterator>>( @@ -515,7 +722,10 @@ impl<'tx> StateStoreWriteTransaction epoch: tari_dan_common_types::Epoch, updates: I, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.validator_epoch_stats_updates(epoch, updates), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.validator_epoch_stats_updates(epoch, updates), + } } fn suspended_nodes_insert( @@ -523,7 +733,10 @@ impl<'tx> StateStoreWriteTransaction public_key: &tari_common_types::types::PublicKey, suspended_in_block: tari_dan_storage::consensus_models::BlockId, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.suspended_nodes_insert(public_key, suspended_in_block), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.suspended_nodes_insert(public_key, suspended_in_block), + } } fn suspended_nodes_mark_for_removal( @@ -531,15 +744,24 @@ impl<'tx> StateStoreWriteTransaction public_key: &tari_common_types::types::PublicKey, resumed_in_block: tari_dan_storage::consensus_models::BlockId, ) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.suspended_nodes_mark_for_removal(public_key, resumed_in_block), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.suspended_nodes_mark_for_removal(public_key, resumed_in_block), + } } fn suspended_nodes_delete(&mut self, public_key: &tari_common_types::types::PublicKey) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.suspended_nodes_delete(public_key), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.suspended_nodes_delete(public_key), + } } fn diagnostics_add_no_vote(&mut self, block_id: tari_dan_storage::consensus_models::BlockId, reason: tari_dan_storage::consensus_models::NoVoteReason) -> Result<(), tari_dan_storage::StorageError> { - todo!() + match self { + ValidatorNodeStateStoreWriteTransaction::Rocksdb { write_tx, .. } => write_tx.diagnostics_add_no_vote(block_id, reason), + ValidatorNodeStateStoreWriteTransaction::Sqlite { write_tx, .. } => write_tx.diagnostics_add_no_vote(block_id, reason), + } } }