Skip to content
This repository has been archived by the owner on Jan 11, 2024. It is now read-only.

Commit

Permalink
FM-365: Commit snapshot (#443)
Browse files Browse the repository at this point in the history
* FM-365: Change apply_chunk to save_chunk

* FM-365: Naively load the snapshot into the blockstore

* FM-365: Import the snapshot and update the app state

* FM-365: Comments about what we could do about the snapshots in tmp

* FM-365: Validate snapshot body CIDs while loading

* FM-365: Comment about vulnerabilities
  • Loading branch information
aakoshh authored Nov 27, 2023
1 parent 89a3fb8 commit a1eb20a
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 42 deletions.
51 changes: 46 additions & 5 deletions fendermint/app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,14 @@ where
tracing::info!(?manifest, "received snapshot offer");
// We can look at the version but currently there's only one.
match atomically_or_err(|| client.offer_snapshot(manifest.clone())).await {
Ok(()) => {
Ok(path) => {
tracing::info!(
download_dir = path.to_string_lossy().to_string(),
height = manifest.block_height,
size = manifest.size,
chunks = manifest.chunks,
"downloading snapshot"
);
return Ok(response::OfferSnapshot::Accept);
}
Err(SnapshotError::IncompatibleVersion(version)) => {
Expand Down Expand Up @@ -919,13 +926,47 @@ where

if let Some(ref client) = self.snapshots {
match atomically_or_err(|| {
client.apply_chunk(request.index, request.chunk.clone().into())
client.save_chunk(request.index, request.chunk.clone().into())
})
.await
{
Ok(completed) => {
if completed {
tracing::info!("received all snapshot chunks");
Ok(snapshot) => {
if let Some(snapshot) = snapshot {
tracing::info!(
download_dir = snapshot.snapshot_dir.to_string_lossy().to_string(),
height = snapshot.manifest.block_height,
"received all snapshot chunks",
);

// Ideally we would import into some isolated store then validate,
// but for now let's trust that all is well.
if let Err(e) = snapshot.import(self.state_store_clone(), true).await {
tracing::error!(error =? e, "failed to import snapshot");
return Ok(response::ApplySnapshotChunk {
result: response::ApplySnapshotChunkResult::RejectSnapshot,
..default
});
}

tracing::info!(
height = snapshot.manifest.block_height,
"imported snapshot"
);

// Now insert the new state into the history.
let mut state = self.committed_state()?;
state.block_height = snapshot.manifest.block_height;
state.state_params = snapshot.manifest.state_params;
self.set_committed_state(state)?;

// TODO: We can remove the `current_download` from the STM
// state here which would cause it to get dropped from /tmp,
// but for now let's keep it just in case we need to investigate
// some problem.

// We could also move the files into our own snapshot directory
// so that we can offer it to others, but again let's hold on
// until we have done more robust validation.
}
return Ok(response::ApplySnapshotChunk {
result: response::ApplySnapshotChunkResult::Accept,
Expand Down
25 changes: 21 additions & 4 deletions fendermint/vm/interpreter/src/fvm/state/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use cid::Cid;
use futures_core::Stream;
use fvm::state_tree::StateTree;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_car::{load_car_unchecked, CarHeader};
use fvm_ipld_car::{load_car, load_car_unchecked, CarHeader};
use fvm_ipld_encoding::{from_slice, CborStore, DAG_CBOR};
use libipld::Ipld;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -61,10 +61,19 @@ where
}

/// Read the snapshot from file and load all the data into the store
pub async fn read_car(path: impl AsRef<Path>, store: BS) -> anyhow::Result<Self> {
pub async fn read_car(
path: impl AsRef<Path>,
store: BS,
validate: bool,
) -> anyhow::Result<Self> {
let file = tokio::fs::File::open(path).await?;

let roots = load_car_unchecked(&store, file.compat()).await?;
let roots = if validate {
load_car(&store, file.compat()).await?
} else {
load_car_unchecked(&store, file.compat()).await?
};

if roots.len() != 1 {
return Err(anyhow!("invalid snapshot, should have 1 root cid"));
}
Expand Down Expand Up @@ -193,6 +202,14 @@ where

Ok((root_cid, streamer))
}

pub fn block_height(&self) -> BlockHeight {
self.block_height
}

pub fn state_params(&self) -> &FvmStateParams {
&self.state_params
}
}

#[pin_project::pin_project]
Expand Down Expand Up @@ -369,7 +386,7 @@ mod tests {
assert!(r.is_ok());

let new_store = MemoryBlockstore::new();
let Snapshot::V1(loaded_snapshot) = Snapshot::read_car(tmp_file.path(), new_store)
let Snapshot::V1(loaded_snapshot) = Snapshot::read_car(tmp_file.path(), new_store, true)
.await
.unwrap();

Expand Down
49 changes: 40 additions & 9 deletions fendermint/vm/snapshot/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2022-2023 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use std::{sync::Arc, time::SystemTime};
use std::{path::PathBuf, sync::Arc, time::SystemTime};

use async_stm::{abort, Stm, StmResult, TVar};
use fendermint_vm_interpreter::fvm::state::{
Expand All @@ -13,7 +13,7 @@ use tempfile::tempdir;
use crate::{
manifest,
state::{SnapshotDownload, SnapshotState},
SnapshotError, SnapshotItem, SnapshotManifest,
SnapshotError, SnapshotItem, SnapshotManifest, MANIFEST_FILE_NAME,
};

/// Interface to snapshot state for the application.
Expand Down Expand Up @@ -74,19 +74,39 @@ impl SnapshotClient {

/// If the offered snapshot is accepted, we create a temporary directory to hold the chunks
/// and remember it as our current snapshot being downloaded.
pub fn offer_snapshot(&self, manifest: SnapshotManifest) -> StmResult<(), SnapshotError> {
pub fn offer_snapshot(&self, manifest: SnapshotManifest) -> StmResult<PathBuf, SnapshotError> {
if manifest.version != 1 {
abort(SnapshotError::IncompatibleVersion(manifest.version))
} else {
match tempdir() {
Ok(dir) => {
// Create a `parts` sub-directory for the chunks.
if let Err(e) = std::fs::create_dir(dir.path().join("parts")) {
return abort(SnapshotError::from(e));
};

// Save the manifest into the temp directory;
// that way we can always see on the file system what's happening.
let json = match serde_json::to_string_pretty(&manifest)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
{
Ok(json) => json,
Err(e) => return abort(SnapshotError::from(e)),
};
if let Err(e) = std::fs::write(dir.path().join(MANIFEST_FILE_NAME), json) {
return abort(SnapshotError::from(e));
}

let download_path = dir.path().into();
let download = SnapshotDownload {
manifest,
download_dir: Arc::new(dir),
next_index: TVar::new(0),
};

self.state.current_download.write(Some(download))?;
Ok(())

Ok(download_path)
}
Err(e) => abort(SnapshotError::from(e))?,
}
Expand All @@ -95,8 +115,15 @@ impl SnapshotClient {

/// Take a chunk sent to us by a remote peer. This is our chance to validate chunks on the fly.
///
/// Return a flag indicating whether all the chunks have been received and loaded to the blockstore.
pub fn apply_chunk(&self, index: u32, contents: Vec<u8>) -> StmResult<bool, SnapshotError> {
/// Returns `None` while there are more chunks to download and `Some` when all
/// the chunks have been received and basic file integrity validated.
///
/// Then we can import the snapshot into the blockstore separately.
pub fn save_chunk(
&self,
index: u32,
contents: Vec<u8>,
) -> StmResult<Option<SnapshotItem>, SnapshotError> {
if let Some(cd) = self.state.current_download.read()?.as_ref() {
let next_index = cd.next_index.read_clone()?;
if index != next_index {
Expand All @@ -106,6 +133,7 @@ impl SnapshotClient {
.download_dir
.as_ref()
.path()
.join("parts")
.join(format!("{}.part", index));

// We are doing IO inside the STM transaction, but that's okay because there is no contention on the download.
Expand All @@ -119,8 +147,11 @@ impl SnapshotClient {
match manifest::parts_checksum(cd.download_dir.as_ref()) {
Ok(checksum) => {
if checksum == cd.manifest.checksum {
// TODO: Import Snapshot.
Ok(true)
let item = SnapshotItem::new(
cd.download_dir.path().into(),
cd.manifest.clone(),
);
Ok(Some(item))
} else {
abort(SnapshotError::WrongChecksum(
cd.manifest.checksum,
Expand All @@ -134,7 +165,7 @@ impl SnapshotClient {
))),
}
} else {
Ok(false)
Ok(None)
}
}
Err(e) => {
Expand Down
9 changes: 9 additions & 0 deletions fendermint/vm/snapshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ mod manager;
mod manifest;
mod state;

/// The file name to export the CAR to.
const SNAPSHOT_FILE_NAME: &str = "snapshot.car";

/// The file name in snapshot directories that contains the manifest.
const MANIFEST_FILE_NAME: &str = "manifest.json";

/// Name of the subdirectory where `{idx}.part` files are stored within a snapshot.
const PARTS_DIR_NAME: &str = "parts";

pub use client::SnapshotClient;
pub use error::SnapshotError;
pub use manager::SnapshotManager;
Expand Down
16 changes: 7 additions & 9 deletions fendermint/vm/snapshot/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@ use std::time::Duration;

use crate::manifest::{file_checksum, list_manifests, write_manifest, SnapshotManifest};
use crate::state::SnapshotState;
use crate::{car, SnapshotClient, SnapshotItem};
use crate::{car, SnapshotClient, SnapshotItem, PARTS_DIR_NAME, SNAPSHOT_FILE_NAME};
use anyhow::Context;
use async_stm::{atomically, retry, TVar};
use fendermint_vm_interpreter::fvm::state::snapshot::{BlockHeight, Snapshot};
use fendermint_vm_interpreter::fvm::state::FvmStateParams;
use fvm_ipld_blockstore::Blockstore;
use tendermint_rpc::Client;

/// The file name to export the CAR to.
const SNAPSHOT_FILE_NAME: &str = "snapshot.car";

/// Create snapshots at regular block intervals.
pub struct SnapshotManager<BS> {
/// Blockstore
Expand Down Expand Up @@ -199,8 +196,8 @@ where
.context("failed to create temp dir for snapshot")?;

let snapshot_path = temp_dir.path().join(SNAPSHOT_FILE_NAME);
let checksum_path = temp_dir.path().join("parts.sha256");
let parts_path = temp_dir.path().join("parts");
let checksum_path = temp_dir.path().join(format!("{PARTS_DIR_NAME}.sha256"));
let parts_path = temp_dir.path().join(PARTS_DIR_NAME);

// TODO: See if we can reuse the contents of an existing CAR file.

Expand Down Expand Up @@ -304,7 +301,7 @@ mod tests {
use fvm::engine::MultiEngine;
use quickcheck::Arbitrary;

use crate::manifest;
use crate::{manifest, PARTS_DIR_NAME};

use super::SnapshotManager;

Expand Down Expand Up @@ -388,8 +385,9 @@ mod tests {
assert_eq!(snapshots.len(), 1, "can list manifests");
assert_eq!(snapshots[0], snapshot);

let checksum = manifest::parts_checksum(snapshot.snapshot_dir.as_path().join("parts"))
.expect("parts checksum can be calculated");
let checksum =
manifest::parts_checksum(snapshot.snapshot_dir.as_path().join(PARTS_DIR_NAME))
.expect("parts checksum can be calculated");

assert_eq!(
checksum, snapshot.manifest.checksum,
Expand Down
26 changes: 15 additions & 11 deletions fendermint/vm/snapshot/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ use fendermint_vm_interpreter::fvm::state::{
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};

use crate::SnapshotItem;

/// The file name in snapshot directories that contains the manifest.
const MANIFEST_FILE_NAME: &str = "manifest.json";
use crate::{SnapshotItem, MANIFEST_FILE_NAME};

#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct SnapshotManifest {
Expand Down Expand Up @@ -115,6 +112,19 @@ pub fn file_checksum(path: impl AsRef<Path>) -> anyhow::Result<tendermint::Hash>
pub fn parts_checksum(path: impl AsRef<Path>) -> anyhow::Result<tendermint::Hash> {
let mut hasher = Sha256::new();

let chunks = list_parts(path)?;

for path in chunks {
let mut file = std::fs::File::open(path).context("failed to open part")?;
let _ = std::io::copy(&mut file, &mut hasher)?;
}

let hash = hasher.finalize().into();
Ok(tendermint::Hash::Sha256(hash))
}

/// List all the `{idx}.part` files in a directory.
pub fn list_parts(path: impl AsRef<Path>) -> anyhow::Result<Vec<PathBuf>> {
let mut chunks = std::fs::read_dir(path.as_ref())
.unwrap()
.collect::<Result<Vec<_>, _>>()
Expand Down Expand Up @@ -142,13 +152,7 @@ pub fn parts_checksum(path: impl AsRef<Path>) -> anyhow::Result<tendermint::Hash
.expect("file part names are prefixed by index")
});

for entry in chunks {
let mut file = std::fs::File::open(&entry.path()).context("failed to open part")?;
let _ = std::io::copy(&mut file, &mut hasher)?;
}

let hash = hasher.finalize().into();
Ok(tendermint::Hash::Sha256(hash))
Ok(chunks.into_iter().map(|c| c.path()).collect())
}

#[cfg(feature = "arb")]
Expand Down
Loading

0 comments on commit a1eb20a

Please sign in to comment.