Skip to content

Commit

Permalink
Revert "Revert "feat: add --stateless flag to forest (#3593)" (#3… (
Browse files Browse the repository at this point in the history
  • Loading branch information
LesnyRumcajs authored Dec 14, 2023
1 parent eccc15f commit e7561aa
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 43 deletions.
27 changes: 27 additions & 0 deletions .github/workflows/forest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,33 @@ jobs:
- name: Other commands check
run: ./scripts/tests/calibnet_other_check.sh

# forest stateless mode tests done on calibnet
calibnet-stateless-mode-check:
needs:
- build-ubuntu
name: Calibnet stateless mode check
runs-on: ubuntu-latest
steps:
# To help investigate transient test failures
- run: lscpu
- name: Checkout Sources
uses: actions/checkout@v4
- uses: actions/download-artifact@v3
with:
name: forest-${{ runner.os }}
path: ~/.cargo/bin
- uses: actions/download-artifact@v3
with:
name: forest-${{ runner.os }}
path: ~/.cargo/bin
# Permissions are lost during artifact-upload
# https://github.com/actions/upload-artifact#permission-loss
- name: Set permissions
run: |
chmod +x ~/.cargo/bin/forest*
- run: ./scripts/tests/calibnet_stateless_mode_check.sh
timeout-minutes: 10

# state migration regression tests
state-migrations-check:
needs:
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ usability improvements.

- [#3422](https://github.com/ChainSafe/forest/issues/3422) Add NV21 (Watermelon)
support for calibration network.
- [#3593](https://github.com/ChainSafe/forest/pull/3593): Add `--stateless` flag
to `forest`. In stateless mode, forest connects to the P2P network but does
not sync to HEAD.

### Changed

Expand Down
38 changes: 38 additions & 0 deletions scripts/tests/calibnet_stateless_mode_check.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/bin/bash
set -euxo pipefail

# This script tests the stateless mode of a forest node

source "$(dirname "$0")/harness.sh"

forest_init_stateless

echo "Verifying the heaviest tipset to be the genesis"
HEAD_CID=$($FOREST_CLI_PATH chain head)
assert_eq "$HEAD_CID" "bafy2bzacecyaggy24wol5ruvs6qm73gjibs2l2iyhcqmvi7r7a4ph7zx3yqd4"

# Example format: /ip4/127.0.0.1/tcp/41937/p2p/12D3KooWAB9z7vZ1x1v9t4BViVkX1Hy1ScoRnWV2GgGy5ec6pfUZ
STATELESS_NODE_ADDRESS=$($FOREST_CLI_PATH net listen | tail -n 1)
echo "Stateless node address: $STATELESS_NODE_ADDRESS"
# Example format: 12D3KooWAB9z7vZ1x1v9t4BViVkX1Hy1ScoRnWV2GgGy5ec6pfUZ
STATELESS_NODE_PEER_ID=$(echo "$STATELESS_NODE_ADDRESS" | cut --delimiter="/" --fields=7 --zero-terminated)
echo "Stateless node peer id: $STATELESS_NODE_PEER_ID"

# Run a normal forest node that only connects to the stateless node
CONFIG_PATH="./forest_config.toml"
cat <<- EOF > $CONFIG_PATH
[network]
listening_multiaddrs = ["/ip4/127.0.0.1/tcp/0"]
bootstrap_peers = ["$STATELESS_NODE_ADDRESS"]
mdns = false
kademlia = false
EOF

# Disable discovery to not connect to more nodes
$FOREST_PATH --chain calibnet --encrypt-keystore false --auto-download-snapshot --config "$CONFIG_PATH" --rpc false --metrics-address 127.0.0.1:6117 &
FOREST_NODE_PID=$!
# Verify that the stateless node can respond to chain exchange requests
until curl http://127.0.0.1:6117/metrics | grep "chain_exchange_response_in"; do
sleep 1s;
done
kill -KILL $FOREST_NODE_PID
25 changes: 25 additions & 0 deletions scripts/tests/harness.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@ function forest_run_node_detached {
$FOREST_PATH --chain calibnet --encrypt-keystore false --log-dir "$LOG_DIRECTORY" --detach --save-token ./admin_token --track-peak-rss
}

function forest_run_node_stateless_detached {
CONFIG_PATH="./stateless_forest_config.toml"
echo "${CONFIG_PATH}"
echo "Running forest in stateless and detached mode"
cat <<- EOF > $CONFIG_PATH
[client]
data_dir = "/tmp/stateless_forest_data"
[network]
listening_multiaddrs = ["/ip4/127.0.0.1/tcp/0"]
EOF

$FOREST_PATH --detach --chain calibnet --encrypt-keystore false --config "$CONFIG_PATH" --log-dir "$LOG_DIRECTORY" --save-token ./stateless_admin_token --skip-load-actors --stateless
}

function forest_wait_for_sync {
echo "Waiting for sync"
timeout 30m $FOREST_CLI_PATH sync wait
Expand All @@ -67,6 +82,16 @@ function forest_init {
forest_check_db_stats
}

function forest_init_stateless {
forest_run_node_stateless_detached

ADMIN_TOKEN=$(cat stateless_admin_token)
FULLNODE_API_INFO="$ADMIN_TOKEN:/ip4/127.0.0.1/tcp/2345/http"

export ADMIN_TOKEN
export FULLNODE_API_INFO
}

function forest_print_logs_and_metrics {
echo "Get and print metrics"
wget -O metrics.log http://localhost:6116/metrics
Expand Down
8 changes: 8 additions & 0 deletions src/blocks/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::message::SignedMessage;
use crate::shim::message::Message;
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use serde_tuple::{self, Deserialize_tuple, Serialize_tuple};

use super::BlockHeader;
Expand Down Expand Up @@ -38,6 +39,13 @@ impl Block {
pub fn cid(&self) -> &Cid {
self.header.cid()
}

/// Persists the block in the given block store
pub fn persist(&self, db: &impl Blockstore) -> Result<(), crate::chain::store::Error> {
crate::chain::persist_objects(&db, &[self.header()])?;
crate::chain::persist_objects(&db, self.bls_msgs())?;
crate::chain::persist_objects(&db, self.secp_msgs())
}
}

/// Tracks the Merkle roots of both SECP and BLS messages separately.
Expand Down
66 changes: 61 additions & 5 deletions src/chain_sync/chain_muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ pub struct ChainMuxer<DB, M> {

/// Tipset channel receiver
tipset_receiver: flume::Receiver<Arc<Tipset>>,

/// When `stateless_mode` is true, forest connects to the P2P network but does not sync to HEAD.
stateless_mode: bool,
}

impl<DB, M> ChainMuxer<DB, M>
Expand All @@ -177,6 +180,7 @@ where
genesis: Arc<Tipset>,
tipset_sender: flume::Sender<Arc<Tipset>>,
tipset_receiver: flume::Receiver<Arc<Tipset>>,
stateless_mode: bool,
) -> Result<Self, ChainMuxerError> {
let network =
SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned());
Expand All @@ -192,6 +196,7 @@ where
tipset_sender,
tipset_receiver,
state_manager,
stateless_mode,
})
}

Expand Down Expand Up @@ -254,9 +259,10 @@ where
genesis_block_cid: Cid,
) {
// Query the heaviest TipSet from the store
let heaviest = chain_store.heaviest_tipset();
if network.peer_manager().is_peer_new(&peer_id) {
// Since the peer is new, send them a hello request
// Query the heaviest TipSet from the store
let heaviest = chain_store.heaviest_tipset();
let request = HelloRequest {
heaviest_tip_set: heaviest.cids(),
heaviest_tipset_height: heaviest.epoch(),
Expand Down Expand Up @@ -503,9 +509,7 @@ where

// Store block messages in the block store
for block in tipset.blocks() {
crate::chain::persist_objects(&chain_store.db, &[block.header()])?;
crate::chain::persist_objects(&chain_store.db, block.bls_msgs())?;
crate::chain::persist_objects(&chain_store.db, block.secp_msgs())?;
block.persist(&chain_store.db)?;
}

// Update the peer head
Expand All @@ -519,6 +523,48 @@ where
Ok(Some((tipset, source)))
}

fn stateless_node(&self) -> ChainMuxerFuture<(), ChainMuxerError> {
let p2p_messages = self.net_handler.clone();
let chain_store = self.state_manager.chain_store().clone();
let network = self.network.clone();
let genesis = self.genesis.clone();
let bad_block_cache = self.bad_blocks.clone();
let mem_pool = self.mpool.clone();
let block_delay = self.state_manager.chain_config().block_delay_secs as u64;

let future = async move {
loop {
let event = match p2p_messages.recv_async().await {
Ok(event) => event,
Err(why) => {
debug!("Receiving event from p2p event stream failed: {why}");
return Err(ChainMuxerError::P2PEventStreamReceive(why.to_string()));
}
};

match Self::process_gossipsub_event(
event,
network.clone(),
chain_store.clone(),
bad_block_cache.clone(),
mem_pool.clone(),
genesis.clone(),
PubsubMessageProcessingStrategy::DoNotProcess,
block_delay,
)
.await
{
Ok(_) => {}
Err(why) => {
debug!("Processing GossipSub event failed: {why:?}");
}
};
}
};

Box::pin(future)
}

fn evaluate_network_head(&self) -> ChainMuxerFuture<NetworkHeadEvaluation, ChainMuxerError> {
let p2p_messages = self.net_handler.clone();
let chain_store = self.state_manager.chain_store().clone();
Expand Down Expand Up @@ -833,6 +879,8 @@ enum ChainMuxerState {
Connect(ChainMuxerFuture<NetworkHeadEvaluation, ChainMuxerError>),
Bootstrap(ChainMuxerFuture<(), ChainMuxerError>),
Follow(ChainMuxerFuture<(), ChainMuxerError>),
/// In stateless mode, forest still connects to the P2P swarm but does not sync to HEAD.
Stateless(ChainMuxerFuture<(), ChainMuxerError>),
}

impl<DB, M> Future for ChainMuxer<DB, M>
Expand All @@ -846,7 +894,10 @@ where
loop {
match self.state {
ChainMuxerState::Idle => {
if self.state_manager.sync_config().tipset_sample_size == 0 {
if self.stateless_mode {
info!("Running chain muxer in stateless mode...");
self.state = ChainMuxerState::Stateless(self.stateless_node());
} else if self.state_manager.sync_config().tipset_sample_size == 0 {
// A standalone node might use this option to not be stuck waiting for P2P
// messages.
info!("Skip evaluating network head, assume in-sync.");
Expand All @@ -857,6 +908,11 @@ where
self.state = ChainMuxerState::Connect(self.evaluate_network_head());
}
}
ChainMuxerState::Stateless(ref mut future) => {
if let Err(why) = std::task::ready!(future.as_mut().poll(cx)) {
return Poll::Ready(why);
}
}
ChainMuxerState::Connect(ref mut connect) => match connect.as_mut().poll(cx) {
Poll::Ready(Ok(evaluation)) => match evaluation {
NetworkHeadEvaluation::Behind {
Expand Down
3 changes: 3 additions & 0 deletions src/cli_shared/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ pub struct CliOpts {
/// Disable the automatic database garbage collection.
#[arg(long)]
pub no_gc: bool,
/// In stateless mode, forest connects to the P2P network but does not sync to HEAD.
#[arg(long)]
pub stateless: bool,
/// Check your command-line options and configuration file if one is used
#[arg(long)]
pub dry_run: bool,
Expand Down
3 changes: 2 additions & 1 deletion src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ pub(super) async fn start(
Arc::new(Tipset::from(genesis_header)),
tipset_sink,
tipset_stream,
opts.stateless,
)?;
let bad_blocks = chain_muxer.bad_blocks_cloned();
let sync_state = chain_muxer.sync_state_cloned();
Expand Down Expand Up @@ -394,7 +395,7 @@ pub(super) async fn start(

// Sets the latest snapshot if needed for downloading later
let mut config = config;
if config.client.snapshot_path.is_none() {
if config.client.snapshot_path.is_none() && !opts.stateless {
set_snapshot_path_if_needed(
&mut config,
&chain_config,
Expand Down
75 changes: 38 additions & 37 deletions src/libp2p/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,45 +719,46 @@ async fn handle_chain_exchange_event<DB>(
DB: Blockstore + Sync + Send + 'static,
{
match ce_event {
request_response::Event::Message { peer, message } => {
match message {
request_response::Message::Request {
request,
channel,
request_id,
} => {
trace!("Received chain_exchange request (request_id:{request_id}, peer_id: {peer:?})",);
emit_event(
network_sender_out,
NetworkEvent::ChainExchangeRequestInbound { request_id },
)
.await;
let db = db.clone();
tokio::task::spawn(async move {
if let Err(e) = cx_response_tx.send((
request_id,
channel,
make_chain_exchange_response(&db, &request),
)) {
debug!("Failed to send ChainExchangeResponse: {e:?}");
}
});
}
request_response::Message::Response {
request_id,
response,
} => {
emit_event(
network_sender_out,
NetworkEvent::ChainExchangeResponseInbound { request_id },
)
request_response::Event::Message { peer, message } => match message {
request_response::Message::Request {
request,
channel,
request_id,
} => {
trace!(
"Received chain_exchange request (request_id:{request_id}, peer_id: {peer:?})",
);
emit_event(
network_sender_out,
NetworkEvent::ChainExchangeRequestInbound { request_id },
)
.await;

let db = db.clone();
tokio::task::spawn(async move {
if let Err(e) = cx_response_tx.send((
request_id,
channel,
make_chain_exchange_response(&db, &request),
)) {
debug!("Failed to send ChainExchangeResponse: {e:?}");
}
});
}
request_response::Message::Response {
request_id,
response,
} => {
emit_event(
network_sender_out,
NetworkEvent::ChainExchangeResponseInbound { request_id },
)
.await;
chain_exchange
.handle_inbound_response(&request_id, response)
.await;
chain_exchange
.handle_inbound_response(&request_id, response)
.await;
}
}
}
},
request_response::Event::OutboundFailure {
peer: _,
request_id,
Expand Down

0 comments on commit e7561aa

Please sign in to comment.