diff --git a/Cargo.lock b/Cargo.lock index 0b75d079..47096b8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2935,7 +2935,6 @@ dependencies = [ "fvm_shared", "num-derive 0.3.3", "num-traits", - "objectstore_actor_sdk", "serde", "serde_tuple", ] diff --git a/fendermint/actors/objectstore/Cargo.toml b/fendermint/actors/objectstore/Cargo.toml index 4fbdb02e..8bfcabc2 100644 --- a/fendermint/actors/objectstore/Cargo.toml +++ b/fendermint/actors/objectstore/Cargo.toml @@ -10,7 +10,14 @@ version = "0.1.0" crate-type = ["cdylib", "lib"] [dependencies] +anyhow = { workspace = true } cid = { workspace = true, default-features = false } +frc42_dispatch = { workspace = true } +num-derive = { workspace = true } +num-traits = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_tuple = { workspace = true } + fil_actors_runtime = { workspace = true, optional = true, features = [ "fil-actor", ] } @@ -18,14 +25,8 @@ fvm_shared = { workspace = true } fvm_ipld_encoding = { workspace = true } fvm_ipld_blockstore = { workspace = true } fvm_ipld_hamt = { workspace = true } -num-derive = { workspace = true } -serde = { workspace = true, features = ["derive"] } -serde_tuple = { workspace = true } -num-traits = { workspace = true } -frc42_dispatch = { workspace = true } -anyhow = { workspace = true } + fendermint_actor_machine = { path = "../machine" } -objectstore_actor_sdk = { path = "../../../textile/objectstore_actor_sdk" } [dev-dependencies] fil_actors_runtime = { workspace = true, features = [ diff --git a/fendermint/actors/objectstore/src/actor.rs b/fendermint/actors/objectstore/src/actor.rs index 113c2809..7a8623d0 100644 --- a/fendermint/actors/objectstore/src/actor.rs +++ b/fendermint/actors/objectstore/src/actor.rs @@ -66,6 +66,10 @@ impl Actor { Ok(()) } + // Deleting an object removes the key from the store, but not from the underlying storage. + // So, we can't just delete it here via syscall. + // Once implemented, the DA mechanism may cause the data to be entangled with other data. + // The retention policies will handle deleting / GC. fn delete_object(rt: &impl Runtime, params: DeleteParams) -> Result { Self::ensure_write_allowed(rt)?; @@ -74,17 +78,6 @@ impl Actor { e.downcast_default(ExitCode::USR_ILLEGAL_STATE, "failed to delete object") }) })?; - - // Clean up external object storage if it existed. - if let Some(Object::External((v, _))) = res.0 { - objectstore_actor_sdk::cid_rm(v.0).map_err(|en| { - ActorError::checked( - ExitCode::USR_ILLEGAL_STATE, - format!("cid_rm syscall failed with {en}"), - None, - ) - })?; - } Ok(res.1) } diff --git a/fendermint/actors/objectstore/src/state.rs b/fendermint/actors/objectstore/src/state.rs index 68abe939..89e95830 100644 --- a/fendermint/actors/objectstore/src/state.rs +++ b/fendermint/actors/objectstore/src/state.rs @@ -143,7 +143,7 @@ impl State { } } } - // Don't error here in case key was deleted before value was resolved. + // Don't error here in case the key was deleted before the value was resolved. None => Ok(()), } } diff --git a/fendermint/app/src/app.rs b/fendermint/app/src/app.rs index 81d67073..f2b4a737 100644 --- a/fendermint/app/src/app.rs +++ b/fendermint/app/src/app.rs @@ -410,7 +410,10 @@ where Genesis = Vec, Output = FvmGenesisOutput, >, - I: ProposalInterpreter>, + I: ProposalInterpreter< + State = (ChainEnv, FvmExecState>), + Message = Vec, + >, I: ExecInterpreter< State = (ChainEnv, FvmExecState), Message = Vec, @@ -641,11 +644,20 @@ where time = request.time.to_string(), "prepare proposal" ); + let state = self.committed_state()?; + let exec_state = FvmExecState::new( + ReadOnlyBlockstore::new(self.state_store_clone()), + self.multi_engine.as_ref(), + state.block_height as ChainEpoch, + state.state_params, + ) + .context("error creating execution state")?; + let txs = request.txs.into_iter().map(|tx| tx.to_vec()).collect(); let txs = self .interpreter - .prepare(self.chain_env.clone(), txs) + .prepare((self.chain_env.clone(), exec_state), txs) .await .context("failed to prepare proposal")?; @@ -665,12 +677,21 @@ where time = request.time.to_string(), "process proposal" ); + let state = self.committed_state()?; + let exec_state = FvmExecState::new( + ReadOnlyBlockstore::new(self.state_store_clone()), + self.multi_engine.as_ref(), + state.block_height as ChainEpoch, + state.state_params, + ) + .context("error creating execution state")?; + let txs: Vec<_> = request.txs.into_iter().map(|tx| tx.to_vec()).collect(); let num_txs = txs.len(); let accept = self .interpreter - .process(self.chain_env.clone(), txs) + .process((self.chain_env.clone(), exec_state), txs) .await .context("failed to process proposal")?; diff --git a/fendermint/vm/interpreter/src/chain.rs b/fendermint/vm/interpreter/src/chain.rs index e2422fbf..72fbeccd 100644 --- a/fendermint/vm/interpreter/src/chain.rs +++ b/fendermint/vm/interpreter/src/chain.rs @@ -1,16 +1,13 @@ // Copyright 2022-2024 Protocol Labs // SPDX-License-Identifier: Apache-2.0, MIT -use crate::fvm::state::ipc::GatewayCaller; -use crate::fvm::{topdown, FvmApplyRet, PowerUpdates}; -use crate::{ - fvm::state::FvmExecState, - fvm::FvmMessage, - signed::{SignedMessageApplyRes, SignedMessageCheckRes, SyntheticMessage, VerifiableMessage}, - CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProposalInterpreter, QueryInterpreter, -}; -use anyhow::{bail, Context}; + +use anyhow::{anyhow, bail, Context}; use async_stm::atomically; use async_trait::async_trait; +use fendermint_actor_objectstore::{ + GetParams, + Method::{GetObject, ResolveExternalObject}, +}; use fendermint_tracing::emit; use fendermint_vm_actor_interface::{ipc, system}; use fendermint_vm_event::ParentFinalityMissingQuorum; @@ -36,6 +33,17 @@ use fvm_shared::econ::TokenAmount; use fvm_shared::message::Message; use num_traits::Zero; use std::sync::Arc; +use tokio_util::bytes; + +use crate::fvm::state::ipc::GatewayCaller; +use crate::fvm::{topdown, FvmApplyRet, PowerUpdates}; +use crate::{ + fvm::state::FvmExecState, + fvm::store::ReadOnlyBlockstore, + fvm::FvmMessage, + signed::{SignedMessageApplyRes, SignedMessageCheckRes, SyntheticMessage, VerifiableMessage}, + CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProposalInterpreter, QueryInterpreter, +}; /// A resolution pool for bottom-up and top-down checkpoints. pub type CheckpointPool = ResolvePool; @@ -121,7 +129,7 @@ where DB: Blockstore + Clone + 'static + Send + Sync, I: Sync + Send, { - type State = ChainEnv; + type State = (ChainEnv, FvmExecState>); type Message = ChainMessage; /// Check whether there are any "ready" messages in the IPLD resolution mempool which can be appended to the proposal. @@ -130,11 +138,11 @@ where /// account the transactions which are part of top-down or bottom-up checkpoints, to stay within gas limits. async fn prepare( &self, - state: Self::State, + (env, mut state): Self::State, mut msgs: Vec, ) -> anyhow::Result> { // Collect resolved CIDs ready to be proposed from the pool. - let ckpts = atomically(|| state.checkpoint_pool.collect_resolved()).await; + let ckpts = atomically(|| env.checkpoint_pool.collect_resolved()).await; // Create transactions ready to be included on the chain. let ckpts = ckpts.into_iter().map(|ckpt| match ckpt { @@ -143,14 +151,14 @@ where // Prepare top down proposals. // Before we try to find a quorum, pause incoming votes. This is optional but if there are lots of votes coming in it might hold up proposals. - atomically(|| state.parent_finality_votes.pause_votes_until_find_quorum()).await; + atomically(|| env.parent_finality_votes.pause_votes_until_find_quorum()).await; // The pre-requisite for proposal is that there is a quorum of gossiped votes at that height. // The final proposal can be at most as high as the quorum, but can be less if we have already, // hit some limits such as how many blocks we can propose in a single step. let finalities = atomically(|| { - let parent = state.parent_finality_provider.next_proposal()?; - let quorum = state + let parent = env.parent_finality_provider.next_proposal()?; + let quorum = env .parent_finality_votes .find_quorum()? .map(|(height, block_hash)| IPCParentFinality { height, block_hash }); @@ -195,7 +203,7 @@ where // view of object resolution, rather than considering those that _might_ have a quorum, // but have not yet been resolved by _this_ proposer. However, an object like this will get // picked up by a different proposer who _does_ consider it resolved. - let local_resolved_objects = atomically(|| state.object_pool.collect_resolved()).await; + let local_resolved_objects = atomically(|| env.object_pool.collect_resolved()).await; // Create transactions ready to be included on the chain. These are from locally resolved // objects that have reached a global quorum and are not yet finalized. @@ -203,48 +211,51 @@ where // If the object has already been finalized, i.e., it was proposed in an earlier block with // a quorum that did not include _this_ proposer, we can just remove it from the local // resolve pool. If we were to propose it, it would be rejected in the process step. - let mut objects: Vec = vec![]; - for item in local_resolved_objects.iter() { - let obj = item.obj.value.to_bytes(); - let (is_finalized, is_globally_resolved) = - atomically( - || match state.parent_finality_votes.is_object_finalized(&obj)? { - true => Ok((true, false)), - false => match state.parent_finality_votes.find_object_quorum(&obj)? { - true => Ok((false, true)), - false => Ok((false, false)), - }, - }, - ) - .await; - - // Remove here otherwise proposal will be rejected - if is_finalized { - tracing::debug!(cid = ?item.obj.value, "object already finalized; removing from pool"); - atomically(|| state.object_pool.remove(item)).await; - continue; - } + if !local_resolved_objects.is_empty() { + let mut objects: Vec = vec![]; + // We start a blockstore transaction that can be reverted + state.state_tree_mut().begin_transaction(); + for item in local_resolved_objects.iter() { + if is_object_finalized(&mut state, item)? { + tracing::debug!(cid = ?item.obj.value, "object already finalized; removing from pool"); + atomically(|| env.object_pool.remove(item)).await; + continue; + } - // Add to messages - if is_globally_resolved { - tracing::debug!(cid = ?item.obj.value, "object has quorum; adding tx to chain"); - objects.push(ChainMessage::Ipc(IpcMessage::ObjectResolved( - item.obj.clone(), - ))); + let is_globally_resolved = atomically(|| { + env.parent_finality_votes + .find_object_quorum(&item.obj.value.to_bytes()) + }) + .await; + if is_globally_resolved { + tracing::debug!(cid = ?item.obj.value, "object has quorum; adding tx to chain"); + objects.push(ChainMessage::Ipc(IpcMessage::ObjectResolved( + item.obj.clone(), + ))); + } } - } + state + .state_tree_mut() + .end_transaction(true) + .expect("we just started a transaction"); - let pending_objects = atomically(|| state.object_pool.count()).await; - tracing::info!(size = pending_objects, "ipfs pool status"); + let pending_objects = atomically(|| env.object_pool.count()).await; + tracing::info!(size = pending_objects, "ipfs pool status"); - // Append at the end - if we run out of block space, these are going to be reproposed in the next block. - msgs.extend(objects); + // Append at the end - if we run out of block space, + // these are going to be reproposed in the next block. + msgs.extend(objects); + } Ok(msgs) } /// Perform finality checks on top-down transactions and availability checks on bottom-up transactions. - async fn process(&self, env: Self::State, msgs: Vec) -> anyhow::Result { + async fn process( + &self, + (env, mut state): Self::State, + msgs: Vec, + ) -> anyhow::Result { for msg in msgs { match msg { ChainMessage::Ipc(IpcMessage::BottomUpExec(msg)) => { @@ -281,35 +292,33 @@ where } } ChainMessage::Ipc(IpcMessage::ObjectResolved(obj)) => { - // Ensure that the object is ready to be included on chain. We can accept the - // proposal if the object has reached a global quorum and is not yet finalized. let item = ObjectPoolItem { obj }; - let obj = item.obj.value.to_bytes(); - - let (is_finalized, is_globally_resolved) = atomically(|| { - match env.parent_finality_votes.is_object_finalized(&obj)? { - true => Ok((true, true)), - false => match env.parent_finality_votes.find_object_quorum(&obj)? { - true => Ok((false, true)), - false => Ok((false, false)), - }, - } - }) - .await; - // If already finalized, reject this proposal - if is_finalized { + // Ensure that the object is ready to be included on-chain. + // We can accept the proposal if the object has reached a global quorum and is + // not yet finalized. + // Start a blockstore transaction that can be reverted. + state.state_tree_mut().begin_transaction(); + if is_object_finalized(&mut state, &item)? { tracing::debug!(cid = ?item.obj.value, "object is already finalized; rejecting proposal"); return Ok(false); } + state + .state_tree_mut() + .end_transaction(true) + .expect("we just started a transaction"); - // If not globally resolved, reject this proposal + let is_globally_resolved = atomically(|| { + env.parent_finality_votes + .find_object_quorum(&item.obj.value.to_bytes()) + }) + .await; if !is_globally_resolved { tracing::debug!(cid = ?item.obj.value, "object is not globally resolved; rejecting proposal"); return Ok(false); } - // If also locally resolved, and we're not going to reject, we can remove it from the pool + // Remove from pool if locally resolved let is_locally_resolved = atomically(|| match env.object_pool.get_status(&item)? { None => Ok(false), @@ -367,6 +376,7 @@ where if let Some(obj) = msg.object { atomically(|| env.object_pool.add(ObjectPoolItem { obj: obj.clone() })) .await; + tracing::debug!(cid = ?obj.value, store = ?obj.address, "object added to pool"); } } @@ -503,15 +513,14 @@ where IpcMessage::ObjectResolved(obj) => { let from = system::SYSTEM_ACTOR_ADDR; let to = obj.address; - let method_num = - fendermint_actor_objectstore::Method::ResolveExternalObject as u64; + let method_num = ResolveExternalObject as u64; let gas_limit = fvm_shared::BLOCK_GAS_LIMIT; - let input = fendermint_actor_objectstore::ResolveExternalParams { + let params = fendermint_actor_objectstore::ResolveExternalParams { key: obj.key, value: obj.value, }; - let params = RawBytes::serialize(input)?; + let params = RawBytes::serialize(params)?; let msg = Message { version: Default::default(), from, @@ -543,12 +552,6 @@ where "implicit tx delivered" ); - atomically(|| { - env.parent_finality_votes - .finalize_object(obj.value.to_bytes()) - }) - .await; - tracing::debug!( cid = ?obj.value, "chain interpreter has finalized object" @@ -724,3 +727,50 @@ fn relayed_bottom_up_ckpt_to_fvm( Ok(msg) } + +/// Check if an object has been finalized (resolved) by reading its on-chain state. +/// This approach uses an implicit FVM transaction to query a read-only blockstore. +fn is_object_finalized( + state: &mut FvmExecState>, + item: &ObjectPoolItem, +) -> anyhow::Result +where + DB: Blockstore + Clone + 'static + Send + Sync, +{ + let params = GetParams { + key: item.obj.key.clone(), + }; + let params = RawBytes::serialize(params)?; + let msg = FvmMessage { + version: 0, + from: system::SYSTEM_ACTOR_ADDR, + to: item.obj.address, + sequence: 0, + value: Default::default(), + method_num: GetObject as u64, + params, + gas_limit: fvm_shared::BLOCK_GAS_LIMIT, + gas_fee_cap: Default::default(), + gas_premium: Default::default(), + }; + let (apply_ret, _) = state.execute_implicit(msg)?; + + let data: bytes::Bytes = apply_ret.msg_receipt.return_data.to_vec().into(); + let object = + fvm_ipld_encoding::from_slice::>(&data) + .map_err(|e| anyhow!("error parsing as Option: {e}"))?; + + Ok(match object { + Some(object) => match object { + fendermint_actor_objectstore::Object::External((_, resolved)) => resolved, + fendermint_actor_objectstore::Object::Internal(_) => true, // cannot happen + }, + None => { + // The object was deleted before it was resolved. + // We can return true here because the objectstore actor will ignore the final implicit + // call to resolve an object if it doesn't exist. + // Otherwise, we'd have to reject the proposal and do another round of voting. + true + } + }) +} diff --git a/fendermint/vm/message/src/signed.rs b/fendermint/vm/message/src/signed.rs index bc9f8818..84c231d9 100644 --- a/fendermint/vm/message/src/signed.rs +++ b/fendermint/vm/message/src/signed.rs @@ -218,7 +218,7 @@ impl SignedMessage { if rec_addr.0 == from.0 { Ok(()) } else { - Err(SignedMessageError::InvalidSignature("the Ethereum delegated address did not match the one recovered from the signature".to_string())) + Err(SignedMessageError::InvalidSignature("the Ethereum delegated address did not match the one recovered from the signature (client chain ID may incorrect)".to_string())) } } } diff --git a/fendermint/vm/topdown/src/voting.rs b/fendermint/vm/topdown/src/voting.rs index 4875d083..6e5650d4 100644 --- a/fendermint/vm/topdown/src/voting.rs +++ b/fendermint/vm/topdown/src/voting.rs @@ -64,12 +64,6 @@ pub struct VoteTally { /// and is often retried due to votes being added. pause_votes: TVar, - /// The *finalized collection* of objects as observed by this node. - /// - /// The record of this collection will have been included on chain, but we'll need to - /// figure out pruning and how to make this resilient to node restarts. - objects: TVar>, - /// Index votes received by object. object_votes: TVar>>, @@ -95,7 +89,6 @@ where chain: TVar::default(), votes: TVar::default(), pause_votes: TVar::new(false), - objects: TVar::default(), object_votes: TVar::default(), pause_object_votes: TVar::new(false), } @@ -110,7 +103,6 @@ where chain: TVar::new(im::OrdMap::from_iter([(height, Some(hash))])), votes: TVar::default(), pause_votes: TVar::new(false), - objects: TVar::default(), object_votes: TVar::default(), pause_object_votes: TVar::new(false), } @@ -327,18 +319,13 @@ where /// Add a vote for an object we received. /// - /// Returns `true` if this vote was added, `false` if it was ignored as a - /// duplicate or was already finalized, and an error if it's an - /// equivocation or from a validator we don't know. + /// Returns `true` if this vote was added, `false` if it was ignored as a duplicate, + /// and an error if it's an equivocation or from a validator we don't know. pub fn add_object_vote(&self, validator_key: K, object: O) -> StmResult> { if *self.pause_object_votes.read()? { retry()?; } - if self.is_object_finalized(&object)? { - return Ok(false); - } - if !self.has_power(&validator_key)? { return abort(Error::UnpoweredValidator(validator_key)); } @@ -361,8 +348,7 @@ where self.pause_object_votes.write(true) } - /// Determine if a quorum exists on an object (from our perspective) from gathered enough - /// votes from validators. + /// Determine if an object has (from our perspective) gathered enoughvotes from validators. pub fn find_object_quorum(&self, object: &O) -> Stm { self.pause_object_votes.write(false)?; @@ -397,27 +383,6 @@ where Ok(weight >= quorum_threshold) } - /// Determine whether an object is finalized. Finalized means that a quorum was reached and - /// "finalized" on chain. - pub fn is_object_finalized(&self, object: &O) -> Stm { - Ok(self.objects.read()?.contains_key(object)) - } - - /// Sets an object as having been finalized by voting. - pub fn finalize_object(&self, object: O) -> Stm<()> { - self.objects.update(|mut objects| { - objects.insert(object.clone(), true); - objects - })?; - - self.object_votes.update(|mut votes| { - votes.remove(&object); - votes - })?; - - Ok(()) - } - /// Overwrite the power table after it has changed to a new snapshot. /// /// This method expects absolute values, it completely replaces the existing powers.