Skip to content
This repository has been archived by the owner on Jan 11, 2024. It is now read-only.

Commit

Permalink
FM-362: Save snapshot chunks (#441)
Browse files Browse the repository at this point in the history
* FM-362: Split into multiple modules

* FM-362: Offer snapshot, create temporary directory

* FM-362: Save snapshot chunk

* FM-362: Verify snapshot checksum
  • Loading branch information
aakoshh authored Nov 27, 2023
1 parent 37e20de commit 89a3fb8
Show file tree
Hide file tree
Showing 12 changed files with 460 additions and 172 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 73 additions & 4 deletions fendermint/app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::path::PathBuf;
use std::sync::Arc;

use anyhow::{anyhow, Context, Result};
use async_stm::atomically;
use async_stm::{atomically, atomically_or_err};
use async_trait::async_trait;
use cid::Cid;
use fendermint_abci::util::take_until_max_size;
Expand All @@ -32,7 +32,7 @@ use fendermint_vm_interpreter::{
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProposalInterpreter, QueryInterpreter,
};
use fendermint_vm_message::query::FvmQueryHeight;
use fendermint_vm_snapshot::manager::SnapshotClient;
use fendermint_vm_snapshot::{SnapshotClient, SnapshotError};
use fvm::engine::MultiEngine;
use fvm_ipld_blockstore::Blockstore;
use fvm_shared::chainid::ChainID;
Expand Down Expand Up @@ -874,16 +874,31 @@ where
}

/// Decide whether to start downloading a snapshot from peers.
///
/// This method is also called when a download is aborted and a new snapshot is offered,
/// so potentially we have to clean up previous resources and start a new one.
async fn offer_snapshot(
&self,
request: request::OfferSnapshot,
) -> AbciResult<response::OfferSnapshot> {
if self.snapshots.is_some() {
if let Some(ref client) = self.snapshots {
match from_snapshot(request).context("failed to parse snapshot") {
Ok(manifest) => {
tracing::info!(?manifest, "received snapshot offer");
// We can look at the version but currently there's only one.
return Ok(response::OfferSnapshot::Accept);
match atomically_or_err(|| client.offer_snapshot(manifest.clone())).await {
Ok(()) => {
return Ok(response::OfferSnapshot::Accept);
}
Err(SnapshotError::IncompatibleVersion(version)) => {
tracing::warn!(version, "rejecting offered snapshot version");
return Ok(response::OfferSnapshot::RejectFormat);
}
Err(e) => {
tracing::error!(error = ?e, "failed to start snapshot download");
return Ok(response::OfferSnapshot::Abort);
}
};
}
Err(e) => {
tracing::warn!("failed to parse snapshot offer: {e:#}");
Expand All @@ -893,4 +908,58 @@ where
}
Ok(Default::default())
}

/// Apply the given snapshot chunk to the application's state.
async fn apply_snapshot_chunk(
&self,
request: request::ApplySnapshotChunk,
) -> AbciResult<response::ApplySnapshotChunk> {
tracing::debug!(chunk = request.index, "received snapshot chunk");
let default = response::ApplySnapshotChunk::default();

if let Some(ref client) = self.snapshots {
match atomically_or_err(|| {
client.apply_chunk(request.index, request.chunk.clone().into())
})
.await
{
Ok(completed) => {
if completed {
tracing::info!("received all snapshot chunks");
}
return Ok(response::ApplySnapshotChunk {
result: response::ApplySnapshotChunkResult::Accept,
..default
});
}
Err(SnapshotError::UnexpectedChunk(expected, got)) => {
tracing::warn!(got, expected, "unexpected snapshot chunk index");
return Ok(response::ApplySnapshotChunk {
result: response::ApplySnapshotChunkResult::Retry,
refetch_chunks: vec![expected],
..default
});
}
Err(SnapshotError::WrongChecksum(expected, got)) => {
tracing::warn!(?got, ?expected, "wrong snapshot checksum");
// We could retry this snapshot, or try another one.
// If we retry, we have to tell which chunks to refetch.
return Ok(response::ApplySnapshotChunk {
result: response::ApplySnapshotChunkResult::RejectSnapshot,
..default
});
}
Err(e) => {
tracing::error!(
chunk = request.index,
sender = request.sender,
error = ?e,
"failed to process snapshot chunk"
);
}
}
}

Ok(default)
}
}
4 changes: 2 additions & 2 deletions fendermint/app/src/tmconv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use fendermint_vm_interpreter::fvm::{
FvmApplyRet, FvmCheckRet, FvmQueryRet,
};
use fendermint_vm_message::signed::DomainHash;
use fendermint_vm_snapshot::manifest::{SnapshotItem, SnapshotManifest};
use fendermint_vm_snapshot::{SnapshotItem, SnapshotManifest};
use fvm_shared::{address::Address, error::ExitCode, event::StampedEvent, ActorID};
use prost::Message;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -444,7 +444,7 @@ pub fn to_app_hash(state_params: &FvmStateParams) -> tendermint::hash::AppHash {

#[cfg(test)]
mod tests {
use fendermint_vm_snapshot::manifest::SnapshotItem;
use fendermint_vm_snapshot::SnapshotItem;
use fvm_shared::error::ExitCode;
use tendermint::abci::request;

Expand Down
1 change: 1 addition & 0 deletions fendermint/vm/snapshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }

Expand Down
1 change: 1 addition & 0 deletions fendermint/vm/snapshot/src/car/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ mod tests {
.collect::<Result<Vec<_>, _>>()
.unwrap();

// There are few enough that we can get away without converting to an integer.
chunks.sort_unstable_by_key(|c| c.path().to_string_lossy().to_string());

let chunks = chunks
Expand Down
151 changes: 151 additions & 0 deletions fendermint/vm/snapshot/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright 2022-2023 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use std::{sync::Arc, time::SystemTime};

use async_stm::{abort, Stm, StmResult, TVar};
use fendermint_vm_interpreter::fvm::state::{
snapshot::{BlockHeight, SnapshotVersion},
FvmStateParams,
};
use tempfile::tempdir;

use crate::{
manifest,
state::{SnapshotDownload, SnapshotState},
SnapshotError, SnapshotItem, SnapshotManifest,
};

/// Interface to snapshot state for the application.
#[derive(Clone)]
pub struct SnapshotClient {
/// The client will only notify the manager of snapshottable heights.
snapshot_interval: BlockHeight,
state: SnapshotState,
}

impl SnapshotClient {
pub fn new(snapshot_interval: BlockHeight, state: SnapshotState) -> Self {
Self {
snapshot_interval,
state,
}
}
/// Set the latest block state parameters and notify the manager.
///
/// Call this with the block height where the `app_hash` in the block reflects the
/// state in the parameters, that is, the in the *next* block.
pub fn notify(&self, block_height: BlockHeight, state_params: FvmStateParams) -> Stm<()> {
if block_height % self.snapshot_interval == 0 {
self.state
.latest_params
.write(Some((state_params, block_height)))?;
}
Ok(())
}

/// List completed snapshots.
pub fn list_snapshots(&self) -> Stm<im::Vector<SnapshotItem>> {
self.state.snapshots.read_clone()
}

/// Try to find a snapshot, if it still exists.
///
/// If found, mark it as accessed, so that it doesn't get purged while likely to be requested or read from disk.
pub fn access_snapshot(
&self,
block_height: BlockHeight,
version: SnapshotVersion,
) -> Stm<Option<SnapshotItem>> {
let mut snapshots = self.state.snapshots.read_clone()?;
let mut snapshot = None;
for s in snapshots.iter_mut() {
if s.manifest.block_height == block_height && s.manifest.version == version {
s.last_access = SystemTime::now();
snapshot = Some(s.clone());
break;
}
}
if snapshot.is_some() {
self.state.snapshots.write(snapshots)?;
}
Ok(snapshot)
}

/// If the offered snapshot is accepted, we create a temporary directory to hold the chunks
/// and remember it as our current snapshot being downloaded.
pub fn offer_snapshot(&self, manifest: SnapshotManifest) -> StmResult<(), SnapshotError> {
if manifest.version != 1 {
abort(SnapshotError::IncompatibleVersion(manifest.version))
} else {
match tempdir() {
Ok(dir) => {
let download = SnapshotDownload {
manifest,
download_dir: Arc::new(dir),
next_index: TVar::new(0),
};
self.state.current_download.write(Some(download))?;
Ok(())
}
Err(e) => abort(SnapshotError::from(e))?,
}
}
}

/// Take a chunk sent to us by a remote peer. This is our chance to validate chunks on the fly.
///
/// Return a flag indicating whether all the chunks have been received and loaded to the blockstore.
pub fn apply_chunk(&self, index: u32, contents: Vec<u8>) -> StmResult<bool, SnapshotError> {
if let Some(cd) = self.state.current_download.read()?.as_ref() {
let next_index = cd.next_index.read_clone()?;
if index != next_index {
abort(SnapshotError::UnexpectedChunk(next_index, index))
} else {
let part_path = cd
.download_dir
.as_ref()
.path()
.join(format!("{}.part", index));

// We are doing IO inside the STM transaction, but that's okay because there is no contention on the download.
match std::fs::write(part_path, contents) {
Ok(()) => {
let next_index = index + 1;
cd.next_index.write(next_index)?;

if next_index == cd.manifest.chunks {
// Verify the checksum then load the snapshot and remove the current download from memory.
match manifest::parts_checksum(cd.download_dir.as_ref()) {
Ok(checksum) => {
if checksum == cd.manifest.checksum {
// TODO: Import Snapshot.
Ok(true)
} else {
abort(SnapshotError::WrongChecksum(
cd.manifest.checksum,
checksum,
))
}
}
Err(e) => abort(SnapshotError::IoError(std::io::Error::new(
std::io::ErrorKind::Other,
e.to_string(),
))),
}
} else {
Ok(false)
}
}
Err(e) => {
// If we failed to save the data to disk we can return an error that will cause all snapshots to be aborted.
// There is no point trying to clear download from the state here because if we `abort` then all changes will be dropped.
abort(SnapshotError::from(e))
}
}
}
} else {
abort(SnapshotError::NoDownload)
}
}
}
19 changes: 19 additions & 0 deletions fendermint/vm/snapshot/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2022-2023 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use fendermint_vm_interpreter::fvm::state::snapshot::SnapshotVersion;

/// Possible errors with snapshots.
#[derive(Debug, thiserror::Error)]
pub enum SnapshotError {
#[error("incompatible snapshot version: {0}")]
IncompatibleVersion(SnapshotVersion),
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
#[error("there is no ongoing snapshot download")]
NoDownload,
#[error("unexpected chunk index; expected {0}, got {1}")]
UnexpectedChunk(u32, u32),
#[error("wrong checksum; expected {0}, got {1}")]
WrongChecksum(tendermint::Hash, tendermint::Hash),
}
15 changes: 12 additions & 3 deletions fendermint/vm/snapshot/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
// Copyright 2022-2023 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT
pub mod car;
pub mod manager;
pub mod manifest;
mod car;
mod client;
mod error;
mod manager;
mod manifest;
mod state;

pub use client::SnapshotClient;
pub use error::SnapshotError;
pub use manager::SnapshotManager;
pub use manifest::SnapshotManifest;
pub use state::SnapshotItem;
Loading

0 comments on commit 89a3fb8

Please sign in to comment.