-
Notifications
You must be signed in to change notification settings - Fork 10
Top down discussion #181
Top down discussion #181
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,8 +6,12 @@ use std::sync::{Arc, Mutex}; | |
|
||
use anyhow::{anyhow, Context, Result}; | ||
use async_trait::async_trait; | ||
use bytes::Bytes; | ||
use cid::Cid; | ||
use fendermint_abci::{AbciResult, Application}; | ||
use fendermint_ipc::parent::PollingParentSyncer; | ||
use fendermint_ipc::pof::ParentFinalityProvider; | ||
use fendermint_ipc::IPCMessage; | ||
use fendermint_storage::{ | ||
Codec, Encode, KVCollection, KVRead, KVReadable, KVStore, KVWritable, KVWrite, | ||
}; | ||
|
@@ -24,6 +28,7 @@ use fendermint_vm_interpreter::signed::InvalidSignature; | |
use fendermint_vm_interpreter::{ | ||
CheckInterpreter, ExecInterpreter, GenesisInterpreter, QueryInterpreter, | ||
}; | ||
use fendermint_vm_message::chain::ChainMessage; | ||
use fvm::engine::MultiEngine; | ||
use fvm_ipld_blockstore::Blockstore; | ||
use fvm_shared::chainid::ChainID; | ||
|
@@ -125,6 +130,8 @@ where | |
/// | ||
/// Zero means unlimited. | ||
state_hist_size: u64, | ||
/// The top down parent finality. | ||
parent_finality: ParentFinalityProvider<PollingParentSyncer>, | ||
} | ||
|
||
impl<DB, SS, S, I> App<DB, SS, S, I> | ||
|
@@ -157,6 +164,7 @@ where | |
interpreter: Arc::new(interpreter), | ||
exec_state: Arc::new(Mutex::new(None)), | ||
check_state: Arc::new(tokio::sync::Mutex::new(None)), | ||
parent_finality: (), | ||
}; | ||
app.init_committed_state()?; | ||
Ok(app) | ||
|
@@ -291,6 +299,37 @@ where | |
let state = self.committed_state()?; | ||
Ok((state.state_params, state.block_height)) | ||
} | ||
|
||
fn verify_ipc_message(&self, ipc_message: IPCMessage) -> bool { | ||
match ipc_message { | ||
IPCMessage::TopDown(finality) => self.parent_finality.check_finality(&finality), | ||
IPCMessage::BottomUp => unimplemented!(), | ||
} | ||
} | ||
|
||
/// Verifies the list of transactions to see if they are valid. Returns true if all the txns | ||
/// are valid and false if any one of the txns is invalid. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Like we discussed, let's add a reminder here to explore an alternative serialization that would allow us to inspect the first few bytes of a transaction to determine their type without having to deserialize all of them. No need to address it now, but definitely something worth exploring. |
||
fn verify_messages(&self, txns: &Vec<Bytes>) -> bool { | ||
for tx in txns { | ||
let is_ok = match serde_json::from_slice::<ChainMessage>(tx) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you take a look at the So this check should not be here, or exist in this format, because checking this is exactly the purpose of |
||
Err(_) => { | ||
// FIXME: maybe we can reject the proposal when the transaction cannot be deserialized | ||
tracing::info!("cannot deserialized transaction: {tx:?}"); | ||
Comment on lines
+316
to
+317
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was also my first intuition, but then I thought we can also let the proposal pass and punish the validator during execution. |
||
continue; | ||
} | ||
Ok(message) => match message { | ||
ChainMessage::IPC(ipc) => self.verify_ipc_message(ipc), | ||
_ => continue, | ||
}, | ||
}; | ||
|
||
if !is_ok { | ||
return false; | ||
} | ||
} | ||
|
||
true | ||
} | ||
} | ||
|
||
// NOTE: The `Application` interface doesn't allow failures at the moment. The protobuf | ||
|
@@ -490,6 +529,47 @@ where | |
Ok(response) | ||
} | ||
|
||
/// 1. Prepare txns | ||
/// 2. Prepare parent finality proposal | ||
async fn prepare_proposal( | ||
&self, | ||
request: request::PrepareProposal, | ||
) -> AbciResult<response::PrepareProposal> { | ||
let max_tx_bytes: usize = request.max_tx_bytes.try_into().unwrap(); | ||
let mut size: usize = 0; | ||
let mut txs = Vec::new(); | ||
for tx in request.txs { | ||
if size.saturating_add(tx.len()) > max_tx_bytes { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have question here (probably for @aakoshh), when a transaction is disregarded for a proposal, it is still kept in the mempool for a future proposal, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would think so. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
break; | ||
} | ||
size += tx.len(); | ||
txs.push(tx); | ||
} | ||
Comment on lines
+545
to
+547
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added this as a utility function to #185 so there's no need to repeat it in |
||
|
||
if let Some(finality_proposal) = self | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's move this logic into the |
||
.parent_finality | ||
.next_finality_proposal() | ||
.map_err(|e| anyhow!("cannot get next finality to propose due to: {e}"))? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How can this method fail? It's only reading data from memory, not doing any IO, right? Be very careful when you use |
||
{ | ||
let bytes = | ||
serde_json::to_vec(&ChainMessage::IPC(IPCMessage::TopDown(finality_proposal)))?; | ||
Comment on lines
+554
to
+555
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Once moved into the In any case, it was wrong to put this after the stuff above that summed up all the bytes, because this could have brought the total above the allowed limit. This will not be a problem if you rebase onto #185 because there the check is done after the interpreters have finished. For this reason (the possibility of this check removing a message we wanted to include in the proposal) it is crucial that for example If we used STM, we might actually want to maintain two regions there: a "transient" region modified during |
||
txs.push(bytes.into()); | ||
} | ||
|
||
Ok(response::PrepareProposal { txs }) | ||
} | ||
|
||
async fn process_proposal( | ||
&self, | ||
request: request::ProcessProposal, | ||
) -> AbciResult<response::ProcessProposal> { | ||
if !self.verify_messages(&request.txs) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move to |
||
Ok(response::ProcessProposal::Reject) | ||
} else { | ||
Ok(response::ProcessProposal::Accept) | ||
} | ||
} | ||
|
||
/// Signals the beginning of a new block, prior to any `DeliverTx` calls. | ||
async fn begin_block(&self, request: request::BeginBlock) -> AbciResult<response::BeginBlock> { | ||
let db = self.state_store_clone(); | ||
|
@@ -538,6 +618,7 @@ where | |
); | ||
to_deliver_tx(ret) | ||
} | ||
_ => todo!(), | ||
}, | ||
}; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
[package] | ||
name = "fendermint_ipc" | ||
version = "0.1.0" | ||
edition = "2021" | ||
description = "Interfacing with IPC" | ||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
||
[dependencies] | ||
tokio = { workspace = true } | ||
anyhow = { workspace = true } | ||
ipc-sdk = { workspace = true } | ||
async-trait = { workspace = true } | ||
tracing = { workspace = true } | ||
serde = { workspace = true } | ||
serde_json = { workspace = true } | ||
cid = { workspace = true } | ||
fvm_ipld_encoding = { workspace = true } | ||
num-traits = { workspace = true } |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
// Copyright 2022-2023 Protocol Labs | ||
// SPDX-License-Identifier: Apache-2.0, MIT | ||
|
||
use crate::BlockHeight; | ||
|
||
/// The ipc agent proxy. Use this struct to interface with the running ipc-agent. | ||
pub(crate) struct AgentProxy {} | ||
|
||
impl AgentProxy { | ||
pub async fn get_chain_head_height(&self) -> anyhow::Result<BlockHeight> { | ||
todo!() | ||
} | ||
|
||
pub async fn get_block_hash(&self, _height: BlockHeight) -> anyhow::Result<Vec<u8>> { | ||
todo!() | ||
} | ||
|
||
// pub async fn get_top_down_msgs( | ||
// &self, | ||
// _height: BlockHeight, | ||
// _nonce: u64, | ||
// ) -> anyhow::Result<Vec<CrossMsg>> { | ||
// todo!() | ||
// } | ||
// | ||
// pub async fn get_membership(&self) -> anyhow::Result<ValidatorSet> { | ||
// todo!() | ||
// } | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,104 @@ | ||||||
// Copyright 2022-2023 Protocol Labs | ||||||
// SPDX-License-Identifier: Apache-2.0, MIT | ||||||
|
||||||
use num_traits::{PrimInt}; | ||||||
use std::cmp::{max, min}; | ||||||
use std::collections::HashMap; | ||||||
use std::hash::Hash; | ||||||
|
||||||
type Bounds<T> = (T, T); | ||||||
|
||||||
/// The key value cache. The key must be numeric and falls within a range/bound. | ||||||
pub struct RangeKeyCache<Key, Value> { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I think these won't be harder to read, and are well established? |
||||||
/// Stores the data in a hashmap. | ||||||
data: HashMap<Key, Value>, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could use a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TBH even a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, initially was |
||||||
/// The lower and upper bound of keys stored in data | ||||||
bounds: Option<Bounds<Key>>, | ||||||
} | ||||||
|
||||||
impl<Key: PrimInt + Hash, Value> RangeKeyCache<Key, Value> { | ||||||
pub fn new() -> Self { | ||||||
Self { | ||||||
data: Default::default(), | ||||||
bounds: None, | ||||||
} | ||||||
} | ||||||
|
||||||
pub fn upper_bound(&self) -> Option<Key> { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I saw later that if you started with an empty cache, inserted something, then removed it, this would still remember what the upper bound used to be. If this is an intended behaviour, it would be great to capture it as an invariant, although personally I'm not sure how intuitive it is that the cache knows about a height which you cannot retrieve from it. |
||||||
self.within_bounds(|(_, upper)| *upper) | ||||||
} | ||||||
|
||||||
pub fn get_value(&self, key: Key) -> Option<&Value> { | ||||||
self.within_bounds(|(lower_bound, upper_bound)| { | ||||||
if *lower_bound > key || *upper_bound < key { | ||||||
return None; | ||||||
} | ||||||
return self.data.get(&key); | ||||||
}) | ||||||
.flatten() | ||||||
} | ||||||
|
||||||
pub fn values_within_range(&self, start: Key, end: Option<Key>) -> Vec<&Value> { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
self.within_bounds(|(lower_bound, upper_bound)| { | ||||||
let start = max(*lower_bound, start); | ||||||
let end = min(*upper_bound, end.unwrap_or(*upper_bound)); | ||||||
|
||||||
let mut r = vec![]; | ||||||
let mut i = start; | ||||||
while i <= end { | ||||||
if let Some(v) = self.get_value(i) { | ||||||
r.push(v); | ||||||
} | ||||||
|
||||||
i = i + Key::one(); | ||||||
} | ||||||
|
||||||
r | ||||||
}) | ||||||
.unwrap_or(vec![]) | ||||||
} | ||||||
|
||||||
/// Removes the block hashes stored till the specified height, exclusive. | ||||||
pub fn remove_key_till(&mut self, key: Key) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
if let Some((lower_bound, upper_bound)) = self.bounds.as_mut() { | ||||||
if *lower_bound > key || *upper_bound < key { | ||||||
return; | ||||||
} | ||||||
|
||||||
let mut i = *lower_bound; | ||||||
while i < key { | ||||||
self.data.remove(&i); | ||||||
i = i + Key::one(); | ||||||
} | ||||||
|
||||||
*lower_bound = key; | ||||||
} | ||||||
} | ||||||
|
||||||
/// Insert the block hash at the next height | ||||||
pub fn insert_after_lower_bound(&mut self, key: Key, val: Value) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I reckon the implementation of this cache can be simplified to a struct which has a |
||||||
match &mut self.bounds { | ||||||
None => { | ||||||
self.data.insert(key, val); | ||||||
self.bounds.replace((key, key)); | ||||||
} | ||||||
Some((upper, lower)) => { | ||||||
if *lower > key { | ||||||
return; | ||||||
} | ||||||
|
||||||
self.data.insert(key, val); | ||||||
if *upper < key { | ||||||
*upper = key; | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
fn within_bounds<F, T>(&self, f: F) -> Option<T> | ||||||
where | ||||||
F: Fn(&(Key, Key)) -> T, | ||||||
{ | ||||||
self.bounds.as_ref().map(f) | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
// Copyright 2022-2023 Protocol Labs | ||
// SPDX-License-Identifier: Apache-2.0, MIT | ||
//! Interfacing with IPC, provides utility functions | ||
|
||
mod agent; | ||
mod cache; | ||
mod message; | ||
pub mod parent; | ||
pub mod pof; | ||
|
||
use crate::pof::IPCParentFinality; | ||
use async_trait::async_trait; | ||
use ipc_sdk::cross::CrossMsg; | ||
use ipc_sdk::ValidatorSet; | ||
pub use message::IPCMessage; | ||
|
||
#[derive(Debug, Clone)] | ||
pub struct Config { | ||
/// The number of blocks to delay reporting when creating the pof | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
chain_head_delay: BlockHeight, | ||
/// The lower bound for the chain head height in parent view | ||
chain_head_lower_bound: BlockHeight, | ||
Comment on lines
+21
to
+22
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be helpful to explain here whether this is an absolute value or a relative one compared to the latest height. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggests it's a delta, but then what's the difference between this and the "delay" field? 🤔 |
||
|
||
/// Parent syncing cron period, in seconds | ||
polling_interval: u64, | ||
} | ||
|
||
type BlockHeight = u64; | ||
type Nonce = u64; | ||
|
||
/// Obtain the latest state information required for IPC from the parent subnet | ||
pub trait ParentViewProvider { | ||
/// Fetch the latest chain head | ||
fn latest_height(&self) -> Option<BlockHeight>; | ||
/// Fetch the block hash at target height | ||
fn block_hash(&self, height: BlockHeight) -> Option<Vec<u8>>; | ||
/// Get the top down messages from the nonce of a height | ||
fn top_down_msgs(&self, height: BlockHeight, nonce: u64) -> Vec<CrossMsg>; | ||
/// Get the latest membership information | ||
fn membership(&self) -> Option<ValidatorSet>; | ||
|
||
/// Called when finality is committed | ||
fn on_finality_committed(&self, finality: &IPCParentFinality); | ||
} | ||
|
||
/// Obtain the latest state information required for IPC from the parent subnet | ||
#[async_trait] | ||
pub trait ParentViewQuery { | ||
/// Fetch the latest chain head | ||
async fn latest_height(&self) -> anyhow::Result<BlockHeight>; | ||
/// Fetch the block hash at target height | ||
async fn block_hash(&self, height: BlockHeight) -> anyhow::Result<Option<Vec<u8>>>; | ||
/// Get the top down messages from the nonce of a height | ||
async fn top_down_msgs(&self, height: BlockHeight, nonce: u64) | ||
Comment on lines
+53
to
+54
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious: how does one learn the nonce of a height? All I see in this interface is that I can get the latest height, and I can ask about block hashes (I assume tipset CIDs) up to that height. Do I have to maintain the nonce on this side and probe height-by-height to check if are top-down messages for the next nonce? |
||
-> anyhow::Result<Vec<CrossMsg>>; | ||
/// Get the latest membership information | ||
async fn membership(&self) -> anyhow::Result<ValidatorSet>; | ||
Comment on lines
+56
to
+57
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What membership is this? Is it the configuration that the parent thinks the child subnet should adopt? What does "latest" mean here, is it final? |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
// Copyright 2022-2023 Protocol Labs | ||
// SPDX-License-Identifier: Apache-2.0, MIT | ||
|
||
//! IPC messages | ||
|
||
use crate::pof::IPCParentFinality; | ||
use serde::{Deserialize, Serialize}; | ||
|
||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] | ||
pub enum IPCMessage { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's move the IPC related messages (and messages only) into the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On a side note: I always struggle to decide between using acronyms in PascalCasing names, but I usually go with only capitalising the first letter, e.g. Have you guys established the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the equivalent of this enum in the crate I mentioned above in #187 with a placeholder for |
||
TopDown(IPCParentFinality), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, yes, almost forgot: because every validator has the actual content of this from its own parent, there is no need to propose it in its entire glory. The Instead of including everything in the proposal, we can include just the height/nonce of the messages up to which point we want to execute, and the CID of the resulting data structure that has all those messages in it, including the membership set up to that point (not everyone's different idea of latest). Then everyone can construct their view of what that finality cutoff should contain and what its CID should be, and decide whether to vote on it or not. This will also require a way to actually retrieve that CID from the others in case you don't have it during execution, so it's a good idea to save it into the blockstore, once it's agreed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think using Cid to replace all the top down messages will be a speed boost. But the issue now, like you mentioned, if someone does not have this |
||
BottomUp, | ||
} | ||
|
||
impl From<IPCMessage> for Vec<u8> { | ||
fn from(value: IPCMessage) -> Self { | ||
serde_json::to_vec(&value).expect("should not happen") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure this should not happen? What if someone by mistake does this over a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, this is super confusing actually. I read it as parsing So yes, this will (probably) not fail because we can always convert to JSON, and this isn't the case when we're running the risk of a malicious validator passing us some untrustworthy byte content. However, I'd argue this implementation should not exist: it is not an inherent property of |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can now be moved into the
ChainMessageInterpreter
in the implementation of theProposalInterpreter
added in #185My goal with
app.rs
was to keep its size and scope to a minimum, it's arguably too big already. Ideally it should only be an implementation of theApplication
trait and dispatch to other components. Everything it does is in support of this, mainly state management and conversions to Tendermint format.It is for this reason that the top-level interpreter which deals with transactions and genesis receives binary input: to make
app.rs
oblivious to the actual payloads we are working with. It has to know the output format in order to be able to translate it to Tendermint types.I'd like us to maintain this separation of concerns and move the finality checks into the interpreter. This can be achieved in multiple ways:
The parent view or
parent_finality
can be added to theChainMessageInterpreter
during application construction, instead of theApp
struct, but ideally only if it's a stateless component. If parent finality checking needs some state, it should be lifted into theApp
for safekeeping, and passed to the interpreter during invocation. I would try using STM for this, so the interpreter can read the state of the parent chain from data structures maintained by another background component, which can be startedrun.rs
. This same STM enabled componet can also be a shared with the transaction processing where we can add things CIDs to look up, for another background process to handle.