Skip to content
This repository has been archived by the owner on Jan 11, 2024. It is now read-only.

FM-184: Proposal interpreter #185

Merged
merged 7 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions fendermint/abci/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use tendermint::abci::{request, response, Request, Response};
use tower::Service;
use tower_abci::BoxError;

use crate::util::take_until_max_size;

/// Allow returning a result from the methods, so the [`Application`]
/// implementation doesn't have to be full of `.expect("failed...")`
/// or `.unwrap()` calls. It is still good practice to use for example
Expand Down Expand Up @@ -62,20 +64,12 @@ pub trait Application {
&self,
request: request::PrepareProposal,
) -> AbciResult<response::PrepareProposal> {
let max_tx_bytes: usize = request.max_tx_bytes.try_into().unwrap();
let mut size: usize = 0;
let mut txs = Vec::new();
for tx in request.txs {
if size.saturating_add(tx.len()) > max_tx_bytes {
break;
}
size += tx.len();
txs.push(tx);
}
let txs = take_until_max_size(request.txs, request.max_tx_bytes.try_into().unwrap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only check here if we reach the size limit of the Tendermint block, right? Should we add also here a TODO to remember not to exhaust the gas limit of a block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gas aspect is definitely ignored at the moment, thanks for the reminder. I created an issue to track it: https://github.com/consensus-shipyard/fendermint/issues/208

The check would not be here because at this size we don't know what messages we are dealing with, but once they are parsed into ChainMessage we can look at the gas limits.


Ok(response::PrepareProposal { txs })
}

/// Opporunity for the application to inspect the proposal before voting on it.
/// Opportunity for the application to inspect the proposal before voting on it.
///
/// The application should accept the proposal unless there's something wrong with it.
///
Expand Down
1 change: 1 addition & 0 deletions fendermint/abci/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
mod application;

pub use application::{AbciResult, Application, ApplicationService};
pub mod util;
19 changes: 19 additions & 0 deletions fendermint/abci/src/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2022-2023 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

/// Take the first transactions until the first one that would exceed the maximum limit.
///
/// The function does not skip or reorder transaction even if a later one would stay within the limit.
pub fn take_until_max_size<T: AsRef<[u8]>>(txs: Vec<T>, max_tx_bytes: usize) -> Vec<T> {
let mut size: usize = 0;
let mut out = Vec::new();
for tx in txs {
let bz: &[u8] = tx.as_ref();
if size.saturating_add(bz.len()) > max_tx_bytes {
break;
}
size += bz.len();
out.push(tx);
}
out
}
56 changes: 50 additions & 6 deletions fendermint/app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::{Arc, Mutex};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use cid::Cid;
use fendermint_abci::util::take_until_max_size;
use fendermint_abci::{AbciResult, Application};
use fendermint_storage::{
Codec, Encode, KVCollection, KVRead, KVReadable, KVStore, KVWritable, KVWrite,
Expand All @@ -22,7 +23,7 @@ use fendermint_vm_interpreter::fvm::state::{
use fendermint_vm_interpreter::fvm::{FvmApplyRet, FvmGenesisOutput};
use fendermint_vm_interpreter::signed::InvalidSignature;
use fendermint_vm_interpreter::{
CheckInterpreter, ExecInterpreter, GenesisInterpreter, QueryInterpreter,
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProposalInterpreter, QueryInterpreter,
};
use fvm::engine::MultiEngine;
use fvm_ipld_blockstore::Blockstore;
Expand Down Expand Up @@ -309,6 +310,15 @@ where
S::Namespace: Sync + Send,
DB: KVWritable<S> + KVReadable<S> + Clone + Send + Sync + 'static,
SS: Blockstore + Clone + Send + Sync + 'static,
I: GenesisInterpreter<
State = FvmGenesisState<SS>,
Genesis = Vec<u8>,
Output = FvmGenesisOutput,
>,
I: ProposalInterpreter<
State = (), // TODO
Message = Vec<u8>,
>,
I: ExecInterpreter<
State = FvmExecState<SS>,
Message = Vec<u8>,
Expand All @@ -326,11 +336,6 @@ where
Query = BytesMessageQuery,
Output = BytesMessageQueryRet,
>,
I: GenesisInterpreter<
State = FvmGenesisState<SS>,
Genesis = Vec<u8>,
Output = FvmGenesisOutput,
>,
{
/// Provide information about the ABCI application.
async fn info(&self, _request: request::Info) -> AbciResult<response::Info> {
Expand Down Expand Up @@ -490,6 +495,45 @@ where
Ok(response)
}

/// Amend which transactions to put into the next block proposal.
async fn prepare_proposal(
&self,
request: request::PrepareProposal,
) -> AbciResult<response::PrepareProposal> {
let txs = request.txs.into_iter().map(|tx| tx.to_vec()).collect();

let txs = self
.interpreter
.prepare((), txs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to pass txs as a parameter to prepare? I had the impression that we are creating extra transactions but not modifying?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might not be modifying them now, but the interface itself doesn't need to know that, and prepare is about doing whatever you like with the list of transactions in the mempool, so passing it along seems to make sense.

I have not implemented any of the following but passing the transactions could be useful for:

  • Select the most profitable transactions to include if we are over some block gas limit
  • Make sure there is a partial ordering of transactions by account and sequence number, although it's likely that the CometBFT mempool took care to maintain the order and this is a non-issue
  • Replace transactions with CIDs (could be interesting if the same transactions were proposed over and over)

So you are right, at the moment all we do is add more, but not passing along would break this abstraction IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, gas limit as well, Tendermint has no idea about it.

https://github.com/consensus-shipyard/fendermint/issues/208

.await
.context("failed to prepare proposal")?;

let txs = txs.into_iter().map(bytes::Bytes::from).collect();
let txs = take_until_max_size(txs, request.max_tx_bytes.try_into().unwrap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here for the gas limit check or should we only check as part of process_proposal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right that it must be part of both prepare and process otherwise you risk freaking out CometBFT by rejecting your own proposal, which is a big no-no.

Again it would not be here as this is purely based on sizes and cannot access gas. This logic is here because it is mandatory, not something application specific like the gas, which must be delegated to interpreters.


Ok(response::PrepareProposal { txs })
}

/// Inspect a proposal and decide whether to vote on it.
async fn process_proposal(
&self,
request: request::ProcessProposal,
) -> AbciResult<response::ProcessProposal> {
let txs = request.txs.into_iter().map(|tx| tx.to_vec()).collect();

let accept = self
.interpreter
.process((), txs)
.await
.context("failed to process proposal")?;

if accept {
Ok(response::ProcessProposal::Accept)
} else {
Ok(response::ProcessProposal::Reject)
}
}

/// Signals the beginning of a new block, prior to any `DeliverTx` calls.
async fn begin_block(&self, request: request::BeginBlock) -> AbciResult<response::BeginBlock> {
let db = self.state_store_clone();
Expand Down
7 changes: 5 additions & 2 deletions fendermint/app/src/cmd/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use fendermint_abci::ApplicationService;
use fendermint_app::{App, AppStore};
use fendermint_rocksdb::{blockstore::NamespaceBlockstore, namespaces, RocksDb, RocksDbConfig};
use fendermint_vm_interpreter::{
bytes::BytesMessageInterpreter, chain::ChainMessageInterpreter, fvm::FvmMessageInterpreter,
bytes::{BytesMessageInterpreter, ProposalPrepareMode},
chain::ChainMessageInterpreter,
fvm::FvmMessageInterpreter,
signed::SignedMessageInterpreter,
};
use tracing::info;
Expand All @@ -27,7 +29,8 @@ async fn run(settings: Settings) -> anyhow::Result<()> {
);
let interpreter = SignedMessageInterpreter::new(interpreter);
let interpreter = ChainMessageInterpreter::new(interpreter);
let interpreter = BytesMessageInterpreter::new(interpreter);
let interpreter =
BytesMessageInterpreter::new(interpreter, ProposalPrepareMode::AppendOnly, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the rationale for not rejecting a malformed proposal here (if any, as maybe is just a placeholder value for now)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I included the considerations in the code as comments where this flag is used.

The reason I thought we might not want to reject them for now is because

  1. They should not happen as our nodes are honest.
  2. If they happen, we have a bug with a submitted transaction, and this way the error will show up in the transaction results which the RPC client will get and print, so the test will fail with an error, rather than a timeout. Whereas if we reject the proposal, CometBFT itself will most likely die as a result because our honest nodes will have proposed something that they subsequently reject themselves. And with AppendOnly we are not going to inspect the bytes before adding them to the proposal, as it was pointed out as wasted effort by @cryptoAtwill . That will have to change if we want to limit gas, though.
  3. If we admitted a malformed transaction to our mempool, inspected it (by using PassThrough instead of AppendOnly), and not included it in our proposal, it will still stay in the mempool, resulting in a memory leak in CometBFT if we keep piling up such transactions. This would be another bug that is potentially hard to figure out, apart from flooding the logs with warnings by trying to propose the same transaction every 2 seconds.

If all our nodes are not honest and some are proposing faulty transactions then this would be a stupid way to attack instead of just not proposing any block to cause a timeout. If it happened, our only chance to weed out such actors is to admit the transaction and penalize the miner. If that happened due to a bug, we'd risk consensus failure, but at least we'd see what's happening, and by coding the penalty mechanism we'd discourage anyone from intentionally doing this, and to implement these checks in check_tx.

So, overall, I thought that a malformed TX here is most likely a bug that is easiest to detect by letting it through.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense! Assuming our implementation honest saving ourselves from nasty bugs is the way to go, thanks!


let ns = Namespaces::default();
let db = open_db(&settings, &ns).context("error opening DB")?;
Expand Down
125 changes: 121 additions & 4 deletions fendermint/vm/interpreter/src/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2022-2023 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT
use anyhow::anyhow;
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use cid::Cid;
use fendermint_vm_genesis::Genesis;
Expand All @@ -9,7 +9,7 @@ use fendermint_vm_message::chain::ChainMessage;
use crate::{
chain::{ChainMessageApplyRet, ChainMessageCheckRet},
fvm::{FvmQuery, FvmQueryRet},
CheckInterpreter, ExecInterpreter, GenesisInterpreter, QueryInterpreter,
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProposalInterpreter, QueryInterpreter,
};

pub type BytesMessageApplyRet = Result<ChainMessageApplyRet, fvm_ipld_encoding::Error>;
Expand All @@ -19,15 +19,124 @@ pub type BytesMessageQueryRet = Result<FvmQueryRet, fvm_ipld_encoding::Error>;
/// Close to what the ABCI sends: (Path, Bytes).
pub type BytesMessageQuery = (String, Vec<u8>);

/// Behavour of proposal preparation. It's an optimisation to cut down needless serialization
/// when we know we aren't doing anything with the messages.
#[derive(Debug, Default, Clone)]
pub enum ProposalPrepareMode {
/// Deserialize all messages and pass them to the inner interpreter.
#[default]
PassThrough,
/// Does not pass messages to the inner interpreter, only appends what is returned from it.
AppendOnly,
/// Does not pass messages to the inner interpreter, only prepends what is returned from it.
PrependOnly,
}

/// Interpreter working on raw bytes.
#[derive(Clone)]
pub struct BytesMessageInterpreter<I> {
inner: I,
/// Should we parse and pass on all messages during prepare.
prepare_mode: ProposalPrepareMode,
/// Should we reject proposals with transactions we cannot parse.
reject_malformed_proposal: bool,
}

impl<I> BytesMessageInterpreter<I> {
pub fn new(inner: I) -> Self {
Self { inner }
pub fn new(
inner: I,
prepare_mode: ProposalPrepareMode,
reject_malformed_proposal: bool,
) -> Self {
Self {
inner,
prepare_mode,
reject_malformed_proposal,
}
}
}

#[async_trait]
impl<I> ProposalInterpreter for BytesMessageInterpreter<I>
where
I: ProposalInterpreter<Message = ChainMessage>,
{
type State = I::State;
type Message = Vec<u8>;

/// Parse messages in the mempool and pass them into the inner `ChainMessage` interpreter.
async fn prepare(
&self,
state: Self::State,
msgs: Vec<Self::Message>,
) -> anyhow::Result<Vec<Self::Message>> {
// Collect the messages to pass to the inner interpreter.
let chain_msgs = match self.prepare_mode {
ProposalPrepareMode::PassThrough => {
let mut chain_msgs = Vec::new();
for msg in msgs.iter() {
match fvm_ipld_encoding::from_slice::<ChainMessage>(msg) {
Err(e) => {
// This should not happen because the `CheckInterpreter` implementation below would
// have rejected any such user transaction.
tracing::warn!(
error = e.to_string(),
"failed to decode message in mempool as ChainMessage"
);
}
Ok(msg) => chain_msgs.push(msg),
}
}
chain_msgs
}
ProposalPrepareMode::AppendOnly | ProposalPrepareMode::PrependOnly => Vec::new(),
};

let chain_msgs = self.inner.prepare(state, chain_msgs).await?;
adlrocha marked this conversation as resolved.
Show resolved Hide resolved

let chain_msgs = chain_msgs
.into_iter()
.map(|msg| {
fvm_ipld_encoding::to_vec(&msg).context("failed to encode ChainMessage as IPLD")
})
.collect::<anyhow::Result<Vec<Self::Message>>>()?;

match self.prepare_mode {
ProposalPrepareMode::PassThrough => Ok(chain_msgs),
ProposalPrepareMode::AppendOnly => Ok(vec![msgs, chain_msgs].concat()),
ProposalPrepareMode::PrependOnly => Ok(vec![chain_msgs, msgs].concat()),
}
}

/// Parse messages in the block, reject if unknown format. Pass the rest to the inner `ChainMessage` interpreter.
async fn process(&self, state: Self::State, msgs: Vec<Self::Message>) -> anyhow::Result<bool> {
let mut chain_msgs = Vec::new();

for msg in msgs {
match fvm_ipld_encoding::from_slice::<ChainMessage>(&msg) {
Err(e) => {
// If we cannot parse a message, then either:
// * The proposer is Byzantine - as an attack this isn't very effective as they could just not send a proposal and cause a timeout.
// * Our or the proposer node have different versions, or contain bugs
// We can either vote for it or not:
// * If we accept, we can punish the validator during block execution, and if it turns out we had a bug, we will have a consensus failure.
// * If we accept, then the serialization error will become visible in the transaction results through RPC.
// * If we reject, the majority can still accept the block, which indicates we had the bug (that way we might even panic during delivery, since we know it got voted on),
// but a buggy transaction format that fails for everyone would cause liveness issues.
// * If we reject, then the serialization error will only be visible in the logs (and potentially earlier check_tx results).
tracing::warn!(
error = e.to_string(),
"failed to decode message in proposal as ChainMessage"
);
if self.reject_malformed_proposal {
return Ok(false);
}
}
Ok(msg) => chain_msgs.push(msg),
}
}

self.inner.process(state, chain_msgs).await
}
}

Expand All @@ -53,6 +162,14 @@ where
// There is always the possibility that our codebase is incompatible,
// but then we'll have a consensus failure later when we don't agree on the ledger.
{
if self.reject_malformed_proposal {
// We could consider panicking here, otherwise if the majority executes this transaction (they voted for it)
// then we will just get a consensu failure after the block.
tracing::warn!(
error = e.to_string(),
"failed to decode delivered message as ChainMessage; we did not vote for it, maybe our node is buggy?"
);
}
Ok((state, Err(e)))
}
Ok(msg) => {
Expand Down
36 changes: 35 additions & 1 deletion fendermint/vm/interpreter/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use fendermint_vm_message::{chain::ChainMessage, signed::SignedMessage};

use crate::{
signed::{SignedMessageApplyRet, SignedMessageCheckRet},
CheckInterpreter, ExecInterpreter, GenesisInterpreter, QueryInterpreter,
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProposalInterpreter, QueryInterpreter,
};

/// A message a user is not supposed to send.
Expand All @@ -34,6 +34,40 @@ impl<I> ChainMessageInterpreter<I> {
}
}

#[async_trait]
impl<I> ProposalInterpreter for ChainMessageInterpreter<I>
where
I: Sync + Send,
{
// TODO: The state can include the IPLD Resolver mempool, for example by using STM
// to implement a shared memory space.
type State = ();
type Message = ChainMessage;

/// Check whether there are any "ready" messages in the IPLD resolution mempool which can be appended to the proposal.
///
/// We could also use this to select the most profitable user transactions, within the gas limit. We can also take into
/// account the transactions which are part of top-down or bottom-up checkpoints, to stay within gas limits.
async fn prepare(
&self,
_state: Self::State,
msgs: Vec<Self::Message>,
) -> anyhow::Result<Vec<Self::Message>> {
// For now this is just a placeholder.
Ok(msgs)
}

/// Perform finality checks on top-down transactions and availability checks on bottom-up transactions.
async fn process(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can call it validate or something else along the line as process I feel it's consuming or processing the message, but we are actually inspecting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I know what you mean, I based all names in all the interfaces on what they are called in ABCI but without their prefix they are a bit confusing, like begin and end, or this process - they make sense in the context of their trait, but when the interpreter implements all interfaces, they are not very good.

Should we just use the full names, like begin_block and process_proposal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate would be similar to process in that it would be clashing with check, which lives in the CheckInterpreter.

Copy link
Contributor Author

@aakoshh aakoshh Aug 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created https://github.com/consensus-shipyard/fendermint/issues/207

I think it would be good to tackle the naming separately after all the PRs are merged to reduce the rebasing churn, and to treat all of them in a uniform way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My 2-cents arriving late to the party: I think interpreters are a really powerful abstraction, and having them well documented can take us a long way. I agree, let's come back to naming once everything is in place so we can figure out one that is clear and general enough if this was to be ported to some other code base.

&self,
_state: Self::State,
_msgs: Vec<Self::Message>,
) -> anyhow::Result<bool> {
// For now this is just a placeholder.
Ok(true)
}
}

#[async_trait]
impl<I> ExecInterpreter for ChainMessageInterpreter<I>
where
Expand Down
Loading