Skip to content
This repository has been archived by the owner on Jan 11, 2024. It is now read-only.

FM-367: Handle snapshot offer #438

Merged
merged 3 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fendermint/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ quickcheck = { workspace = true }
quickcheck_macros = { workspace = true }

fendermint_vm_genesis = { path = "../vm/genesis", features = ["arb"] }
fendermint_vm_snapshot = { path = "../vm/snapshot", features = ["arb"] }


# Load the same built-in actor bundle as the ref-fvm integration tests. We'll probably need built-in actors,
Expand Down
67 changes: 36 additions & 31 deletions fendermint/app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,28 +87,8 @@ impl AppState {
ChainID::from(self.state_params.chain_id)
}

/// Produce an appliction hash that is a commitment to all data replicated by consensus,
/// that is, all nodes participating in the network must agree on this otherwise we have
/// a consensus failure.
///
/// Notably it contains the actor state root _as well as_ some of the metadata maintained
/// outside the FVM, such as the timestamp and the circulating supply.
pub fn app_hash(&self) -> tendermint::hash::AppHash {
// Create an artifical CID from the FVM state params, which include everything that
// deterministically changes under consensus.
let state_params_cid =
fendermint_vm_message::cid(&self.state_params).expect("state params have a CID");

// We could reduce it to a hash to ephasize that this is not something that we can return at the moment,
// but we could just as easily store the record in the Blockstore to make it retrievable.
// It is generally not a goal to serve the entire state over the IPLD Resolver or ABCI queries, though;
// for that we should rely on the CometBFT snapshot mechanism.
// But to keep our options open, we can return the hash as a CID that nobody can retrieve, and change our mind later.

// let state_params_hash = state_params_cid.hash();
let state_params_hash = state_params_cid.to_bytes();

tendermint::hash::AppHash::try_from(state_params_hash).expect("hash can be wrapped")
to_app_hash(&self.state_params)
}

/// The state is effective at the *next* block, that is, the effects of block N are visible in the header of block N+1,
Expand Down Expand Up @@ -719,6 +699,16 @@ where
let db = self.state_store_clone();
let state = self.committed_state()?;
let mut state_params = state.state_params.clone();

// Notify the snapshotter. We don't do this in `commit` because *this* is the height at which
// this state has been officially associated with the application hash, which is something
// we will receive in `offer_snapshot` and we can compare. If we did it in `commit` we'd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done to ensure that the state_params that we are requesting belongs to the right height because in end_block we haven't yet computed the up to date state_params, right?

// have to associate the snapshot with `block_height + 1`. But this way we also know that
// others have agreed with our results.
if let Some(ref snapshots) = self.snapshots {
atomically(|| snapshots.notify(block_height as u64, state_params.clone())).await;
}

state_params.timestamp = to_timestamp(request.header.time);

let state = FvmExecState::new(db, self.multi_engine.as_ref(), block_height, state_params)
Expand Down Expand Up @@ -810,10 +800,9 @@ where

let app_hash = state.app_hash();
let block_height = state.block_height;
let state_params = state.state_params.clone();

tracing::debug!(
height = state.block_height,
block_height,
state_root = state_root.to_string(),
app_hash = app_hash.to_string(),
timestamp = state.state_params.timestamp.0,
Expand Down Expand Up @@ -842,11 +831,6 @@ where
let mut guard = self.check_state.lock().await;
*guard = None;

// Notify the snapshotter.
if let Some(ref snapshots) = self.snapshots {
atomically(|| snapshots.on_commit(block_height, state_params.clone())).await;
}

let response = response::Commit {
data: app_hash.into(),
// We have to retain blocks until we can support Snapshots.
Expand All @@ -855,7 +839,7 @@ where
Ok(response)
}

/// Used during state sync to discover available snapshots on peers.
/// List the snapshots available on this node to be served to remote peers.
async fn list_snapshots(&self) -> AbciResult<response::ListSnapshots> {
if let Some(ref client) = self.snapshots {
let snapshots = atomically(|| client.list_snapshots()).await;
Expand All @@ -865,7 +849,7 @@ where
}
}

/// Used during state sync to retrieve chunks of snapshots from peers.
/// Load a particular snapshot chunk a remote peer is asking for.
async fn load_snapshot_chunk(
&self,
request: request::LoadSnapshotChunk,
Expand All @@ -874,7 +858,7 @@ where
if let Some(snapshot) =
atomically(|| client.access_snapshot(request.height.value(), request.format)).await
{
match snapshot.load_chunk(request.chunk as usize) {
match snapshot.load_chunk(request.chunk) {
Ok(chunk) => {
return Ok(response::LoadSnapshotChunk {
chunk: chunk.into(),
Expand All @@ -888,4 +872,25 @@ where
}
Ok(Default::default())
}

/// Decide whether to start downloading a snapshot from peers.
async fn offer_snapshot(
&self,
request: request::OfferSnapshot,
) -> AbciResult<response::OfferSnapshot> {
if self.snapshots.is_some() {
match from_snapshot(request).context("failed to parse snapshot") {
Ok(manifest) => {
tracing::info!(?manifest, "received snapshot offer");
// We can look at the version but currently there's only one.
return Ok(response::OfferSnapshot::Accept);
}
Err(e) => {
tracing::warn!("failed to parse snapshot offer: {e:#}");
return Ok(response::OfferSnapshot::Reject);
}
}
}
Ok(Default::default())
}
}
94 changes: 89 additions & 5 deletions fendermint/app/src/tmconv.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
// Copyright 2022-2023 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT
//! Conversions to Tendermint data types.
use anyhow::{anyhow, Context};
use anyhow::{anyhow, bail, Context};
use fendermint_vm_core::Timestamp;
use fendermint_vm_genesis::{Power, Validator};
use fendermint_vm_interpreter::fvm::{state::BlockHash, FvmApplyRet, FvmCheckRet, FvmQueryRet};
use fendermint_vm_interpreter::fvm::{
state::{BlockHash, FvmStateParams},
FvmApplyRet, FvmCheckRet, FvmQueryRet,
};
use fendermint_vm_message::signed::DomainHash;
use fendermint_vm_snapshot::manifest::SnapshotItem;
use fendermint_vm_snapshot::manifest::{SnapshotItem, SnapshotManifest};
use fvm_shared::{address::Address, error::ExitCode, event::StampedEvent, ActorID};
use prost::Message;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, num::NonZeroU32};
use tendermint::abci::{response, Code, Event, EventAttribute};

use crate::{app::AppError, BlockHeight};

#[derive(Serialize, Deserialize, Debug, Clone)]
struct SnapshotMetadata {
size: u64,
state_params: FvmStateParams,
}

/// IPLD encoding of data types we know we must be able to encode.
macro_rules! ipld_encode {
($var:ident) => {
Expand Down Expand Up @@ -359,26 +369,89 @@ pub fn to_snapshots(
Ok(response::ListSnapshots { snapshots })
}

/// Convert a snapshot manifest to the Tendermint ABCI type.
pub fn to_snapshot(snapshot: SnapshotItem) -> anyhow::Result<tendermint::abci::types::Snapshot> {
// Put anything that doesn't fit into fields of the ABCI snapshot into the metadata.
let metadata = SnapshotMetadata {
size: snapshot.manifest.size,
state_params: snapshot.manifest.state_params,
};

Ok(tendermint::abci::types::Snapshot {
height: snapshot
.manifest
.block_height
.try_into()
.expect("height is valid"),
format: snapshot.manifest.version,
chunks: snapshot.manifest.chunks as u32,
chunks: snapshot.manifest.chunks,
hash: snapshot.manifest.checksum.into(),
metadata: fvm_ipld_encoding::to_vec(&snapshot.manifest.state_params)?.into(),
metadata: fvm_ipld_encoding::to_vec(&metadata)?.into(),
})
}

/// Parse a Tendermint ABCI snapshot offer to a manifest.
pub fn from_snapshot(
offer: tendermint::abci::request::OfferSnapshot,
) -> anyhow::Result<SnapshotManifest> {
let metadata = fvm_ipld_encoding::from_slice::<SnapshotMetadata>(&offer.snapshot.metadata)
.context("failed to parse snapshot metadata")?;

let app_hash = to_app_hash(&metadata.state_params);

if app_hash != offer.app_hash {
bail!("the application hash does not match the metadata");
}

let checksum = tendermint::hash::Hash::try_from(offer.snapshot.hash)
.context("failed to parse checksum")?;

let manifest = SnapshotManifest {
block_height: offer.snapshot.height.value(),
size: metadata.size,
chunks: offer.snapshot.chunks,
checksum,
state_params: metadata.state_params,
version: offer.snapshot.format,
};

Ok(manifest)
}

/// Produce an appliction hash that is a commitment to all data replicated by consensus,
/// that is, all nodes participating in the network must agree on this otherwise we have
/// a consensus failure.
///
/// Notably it contains the actor state root _as well as_ some of the metadata maintained
/// outside the FVM, such as the timestamp and the circulating supply.
pub fn to_app_hash(state_params: &FvmStateParams) -> tendermint::hash::AppHash {
// Create an artifical CID from the FVM state params, which include everything that
// deterministically changes under consensus.
let state_params_cid =
fendermint_vm_message::cid(state_params).expect("state params have a CID");

// We could reduce it to a hash to ephasize that this is not something that we can return at the moment,
// but we could just as easily store the record in the Blockstore to make it retrievable.
// It is generally not a goal to serve the entire state over the IPLD Resolver or ABCI queries, though;
// for that we should rely on the CometBFT snapshot mechanism.
// But to keep our options open, we can return the hash as a CID that nobody can retrieve, and change our mind later.

// let state_params_hash = state_params_cid.hash();
let state_params_hash = state_params_cid.to_bytes();

tendermint::hash::AppHash::try_from(state_params_hash).expect("hash can be wrapped")
}

#[cfg(test)]
mod tests {
use fendermint_vm_snapshot::manifest::SnapshotItem;
use fvm_shared::error::ExitCode;
use tendermint::abci::request;

use crate::tmconv::to_error_msg;

use super::{from_snapshot, to_app_hash, to_snapshot};

#[test]
fn code_error_message() {
assert_eq!(to_error_msg(ExitCode::OK), "");
Expand All @@ -387,4 +460,15 @@ mod tests {
"The message sender doesn't exist."
);
}

#[quickcheck_macros::quickcheck]
fn abci_snapshot_metadata(snapshot: SnapshotItem) {
let abci_snapshot = to_snapshot(snapshot.clone()).unwrap();
let abci_offer = request::OfferSnapshot {
snapshot: abci_snapshot,
app_hash: to_app_hash(&snapshot.manifest.state_params),
};
let manifest = from_snapshot(abci_offer).unwrap();
assert_eq!(manifest, snapshot.manifest)
}
}
2 changes: 1 addition & 1 deletion fendermint/vm/snapshot/golden/manifest/cbor/manifest.cbor
Original file line number Diff line number Diff line change
@@ -1 +1 @@
a66c626c6f636b5f6865696768741b961641f61f13b7d66473697a651bffffffffffffffff666368756e6b731bf8cdb9e2c96ffe7168636865636b73756d7840424244443542363045304139304446343043334531383143323137324337433138353544423943464434354533313241373436393534464643373134463839416c73746174655f706172616d73a76a73746174655f726f6f74d82a58230012204c94485e0c21ae6c41ce1dfe7b6bfaceea5ab68e40a2476f50208e526f5060806974696d657374616d701b9ede14bdf44bb50e6f6e6574776f726b5f76657273696f6e1affffffff68626173655f6665655100a4a6afa1e117f427a2e42a6ae99b7f3e6b636972635f737570706c795100c4910e2a4c8380bc92a6c0641cf5ca5a68636861696e5f69641b000b5dc8fad6f15f6b706f7765725f7363616c65206776657273696f6e1adbb7af82
a66c626c6f636b5f6865696768741a99e5f1a76473697a651b9a0ed59575887285666368756e6b731af71159e768636865636b73756d7840453243304636444136464643463335413334334546304545394445343436353436374143443530344338463237313243364142323034343543383341313932416c73746174655f706172616d73a76a73746174655f726f6f74d82a58230012202a0ab732b4e9d85ef7dc25303b64ab527c25a4d77815ebb579f396ec6caccad36974696d657374616d701bdf399b7bb39519486f6e6574776f726b5f76657273696f6e1affffffff68626173655f66656551005eb4d601fc663685053ee078217bd77f6b636972635f737570706c795100ffffffffffffffffb5b5b138edf6bed168636861696e5f69641b000a9a18ec6436676b706f7765725f7363616c65206776657273696f6e1a33c9bad2
2 changes: 1 addition & 1 deletion fendermint/vm/snapshot/golden/manifest/cbor/manifest.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
SnapshotManifest { block_height: 10814904080515971030, size: 18446744073709551615, chunks: 17928190075325120113, checksum: Hash::Sha256(BBDD5B60E0A90DF40C3E181C2172C7C1855DB9CFD45E312A746954FFC714F89A), state_params: FvmStateParams { state_root: Cid(QmTVaqUKv8J2QXqG31iYqejfyTNYFXTUQiKA9Peogtono1), timestamp: Timestamp(11447610108902356238), network_version: NetworkVersion(4294967295), base_fee: TokenAmount(218858874834320873873.679132775885274942), circ_supply: TokenAmount(261281857523328175788.027999901887875674), chain_id: 3199342527050079, power_scale: -1 }, version: 3686248322 }
SnapshotManifest { block_height: 2581983655, size: 11101044969413571205, chunks: 4145109479, checksum: Hash::Sha256(E2C0F6DA6FFCF35A343EF0EE9DE4465467ACD504C8F2712C6AB20445C83A192A), state_params: FvmStateParams { state_root: Cid(QmRAmJvPSFPjeHkVJyPktbmM2SRHURjbM7xs7JRD1zCjWJ), timestamp: Timestamp(16085058499726612808), network_version: NetworkVersion(4294967295), base_fee: TokenAmount(125886385631315495367.993087794916087679), circ_supply: TokenAmount(340282366920938463458.021429707776900817), chain_id: 2984181602989671, power_scale: -1 }, version: 868858578 }
4 changes: 2 additions & 2 deletions fendermint/vm/snapshot/golden/manifest/json/manifest.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"block_height": 18446744073709551615,
"size": 11344242012067624990,
"chunks": 220761337372187410,
"chunks": 22076,
"checksum": "A3B844BB3068947681E591126B1AAC925B7BF1BB56BA6DB77D87745365B0949E",
"state_params": {
"state_root": "QmYbxwhLej3Te1etMuFqWb3Gwy7CpVaXAe5deWmqrphMhg",
Expand All @@ -13,4 +13,4 @@
"power_scale": 0
},
"version": 0
}
}
2 changes: 1 addition & 1 deletion fendermint/vm/snapshot/golden/manifest/json/manifest.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
SnapshotManifest { block_height: 18446744073709551615, size: 11344242012067624990, chunks: 220761337372187410, checksum: Hash::Sha256(A3B844BB3068947681E591126B1AAC925B7BF1BB56BA6DB77D87745365B0949E), state_params: FvmStateParams { state_root: Cid(QmYbxwhLej3Te1etMuFqWb3Gwy7CpVaXAe5deWmqrphMhg), timestamp: Timestamp(1), network_version: NetworkVersion(4294967295), base_fee: TokenAmount(299246354255658060378.714945246048246606), circ_supply: TokenAmount(93362016975129332347.987662062653906832), chain_id: 503525136242505, power_scale: 0 }, version: 0 }
SnapshotManifest { block_height: 18446744073709551615, size: 11344242012067624990, chunks: 22076, checksum: Hash::Sha256(A3B844BB3068947681E591126B1AAC925B7BF1BB56BA6DB77D87745365B0949E), state_params: FvmStateParams { state_root: Cid(QmYbxwhLej3Te1etMuFqWb3Gwy7CpVaXAe5deWmqrphMhg), timestamp: Timestamp(1), network_version: NetworkVersion(4294967295), base_fee: TokenAmount(299246354255658060378.714945246048246606), circ_supply: TokenAmount(93362016975129332347.987662062653906832), chain_id: 503525136242505, power_scale: 0 }, version: 0 }
13 changes: 8 additions & 5 deletions fendermint/vm/snapshot/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ pub struct SnapshotClient {

impl SnapshotClient {
/// Set the latest block state parameters and notify the manager.
pub fn on_commit(&self, block_height: BlockHeight, params: FvmStateParams) -> Stm<()> {
///
/// Call this with the block height where the `app_hash` in the block reflects the
/// state in the parameters, that is, the in the *next* block.
pub fn notify(&self, block_height: BlockHeight, state_params: FvmStateParams) -> Stm<()> {
if block_height % self.snapshot_interval == 0 {
self.state
.latest_params
.write(Some((params, block_height)))?;
.write(Some((state_params, block_height)))?;
}
Ok(())
}
Expand Down Expand Up @@ -307,8 +310,8 @@ where
// Create and export a manifest that we can easily look up.
let manifest = SnapshotManifest {
block_height,
size: snapshot_size,
chunks: chunks_count,
size: snapshot_size as u64,
chunks: chunks_count as u32,
checksum: checksum_bytes,
state_params,
version: snapshot_version,
Expand Down Expand Up @@ -445,7 +448,7 @@ mod tests {
assert!(snapshots.is_empty());

// Notify about snapshottable height.
atomically(|| snapshot_client.on_commit(0, state_params.clone())).await;
atomically(|| snapshot_client.notify(0, state_params.clone())).await;

// Wait for the new snapshot to appear in memory.
let snapshots = tokio::time::timeout(
Expand Down
Loading
Loading