Skip to content

Commit

Permalink
Merge pull request #697 from EspressoSystems/jb/tx-drop
Browse files Browse the repository at this point in the history
Revert fs transaction on drop and add regression test
  • Loading branch information
jbearer authored Sep 13, 2024
2 parents 5eefc35 + 32b2582 commit 140b30a
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 12 deletions.
80 changes: 80 additions & 0 deletions src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ pub mod persistence_tests {
mocks::{MockPayload, MockTypes},
setup_test,
},
types::HeightIndexed,
Leaf,
};
use committable::Committable;
Expand Down Expand Up @@ -590,6 +591,85 @@ pub mod persistence_tests {
ds.get_leaf(1).await.try_resolve().unwrap_err();
ds.get_block(1).await.try_resolve().unwrap_err();
}

#[async_std::test]
pub async fn test_drop_tx<D: TestableDataSource>()
where
for<'a> D::Transaction<'a>: UpdateDataSource<MockTypes>
+ NodeDataSource<MockTypes>
+ AvailabilityStorage<MockTypes>,
for<'a> D::ReadOnly<'a>: NodeDataSource<MockTypes>,
{
setup_test();

let storage = D::create(0).await;
let ds = D::connect(&storage).await;

// Mock up some consensus data.
let mut mock_qc = QuorumCertificate::<MockTypes>::genesis(
&TestValidatedState::default(),
&TestInstanceState::default(),
)
.await;
let mut mock_leaf = Leaf::<MockTypes>::genesis(
&TestValidatedState::default(),
&TestInstanceState::default(),
)
.await;
// Increment the block number, to distinguish this block from the genesis block, which
// already exists.
mock_leaf.block_header_mut().block_number += 1;
mock_qc.data.leaf_commit = mock_leaf.commit();

let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
let leaf = LeafQueryData::new(mock_leaf.clone(), mock_qc.clone()).unwrap();

// Insert, but do not commit, some data and check that we can read it back.
tracing::info!("write");
let mut tx = ds.write().await.unwrap();
tx.insert_leaf(leaf.clone()).await.unwrap();
tx.insert_block(block.clone()).await.unwrap();

assert_eq!(tx.block_height().await.unwrap(), 2);
assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
assert_eq!(block, tx.get_block(1.into()).await.unwrap());

// Drop the transaction, causing a revert.
drop(tx);

// Open a new transaction and check that the changes are reverted.
tracing::info!("read");
let tx = ds.read().await.unwrap();
assert_eq!(tx.block_height().await.unwrap(), 0);
drop(tx);

// Get a mutable transaction again, insert different data.
mock_leaf.block_header_mut().block_number += 1;
mock_qc.data.leaf_commit = mock_leaf.commit();
let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
let leaf = LeafQueryData::new(mock_leaf, mock_qc).unwrap();

tracing::info!("write again");
let mut tx = ds.write().await.unwrap();
tx.insert_leaf(leaf.clone()).await.unwrap();
tx.insert_block(block.clone()).await.unwrap();
tx.commit().await.unwrap();

// Read the data back. We should have _only_ the data that was written in the final
// transaction.
tracing::info!("read again");
let height = leaf.height() as usize;
assert_eq!(
NodeDataSource::<MockTypes>::block_height(&ds)
.await
.unwrap(),
height + 1
);
assert_eq!(leaf, ds.get_leaf(height).await.await);
assert_eq!(block, ds.get_block(height).await.await);
ds.get_leaf(height - 1).await.try_resolve().unwrap_err();
ds.get_block(height - 1).await.try_resolve().unwrap_err();
}
}

/// Generic tests we can instantiate for all the node data sources.
Expand Down
53 changes: 41 additions & 12 deletions src/data_source/storage/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,43 @@ where
}
}

pub trait Revert {
fn revert(&mut self);
}

impl<'a, Types> Revert for RwLockWriteGuard<'a, FileSystemStorageInner<Types>>
where
Types: NodeType,
Payload<Types>: QueryablePayload<Types>,
{
fn revert(&mut self) {
self.leaf_storage.revert_version().unwrap();
self.block_storage.revert_version().unwrap();
self.vid_storage.revert_version().unwrap();
}
}

impl<'a, Types> Revert for RwLockReadGuard<'a, FileSystemStorageInner<Types>>
where
Types: NodeType,
Payload<Types>: QueryablePayload<Types>,
{
fn revert(&mut self) {
// Nothing to revert for a read-only transaction.
}
}

#[derive(Debug)]
pub struct Transaction<T> {
pub struct Transaction<T: Revert> {
inner: T,
}

impl<T: Revert> Drop for Transaction<T> {
fn drop(&mut self) {
self.inner.revert();
}
}

impl<'a, Types> update::Transaction
for Transaction<RwLockWriteGuard<'a, FileSystemStorageInner<Types>>>
where
Expand All @@ -291,12 +323,9 @@ where
Ok(())
}

fn revert(mut self) -> impl Future + Send {
async move {
self.inner.leaf_storage.revert_version().unwrap();
self.inner.block_storage.revert_version().unwrap();
self.inner.vid_storage.revert_version().unwrap();
}
fn revert(self) -> impl Future + Send {
// The revert is handled when `self` is dropped.
async move {}
}
}

Expand All @@ -312,8 +341,8 @@ where
}

fn revert(self) -> impl Future + Send {
// Nothing to revert for a read-only transaction.
async {}
// The revert is handled when `self` is dropped.
async move {}
}
}

Expand Down Expand Up @@ -388,7 +417,7 @@ where
Types: NodeType,
Payload<Types>: QueryablePayload<Types>,
Header<Types>: QueryableHeader<Types>,
T: Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
{
async fn get_leaf(&self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
let n = match id {
Expand Down Expand Up @@ -581,7 +610,7 @@ where
Types: NodeType,
Payload<Types>: QueryablePayload<Types>,
Header<Types>: QueryableHeader<Types>,
T: Deref<Target = FileSystemStorageInner<Types>> + Sync,
T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Sync,
{
async fn block_height(&self) -> QueryResult<usize> {
Ok(self.inner.leaf_storage.iter().len())
Expand Down Expand Up @@ -680,4 +709,4 @@ where
}
}

impl<T> PrunedHeightStorage for Transaction<T> {}
impl<T: Revert> PrunedHeightStorage for Transaction<T> {}

0 comments on commit 140b30a

Please sign in to comment.