Skip to content

Commit

Permalink
test(mempool_p2p): test received rpc transactions are being broadcasted
Browse files Browse the repository at this point in the history
  • Loading branch information
AlonLStarkWare committed Nov 12, 2024
1 parent 7abe414 commit 0ac1457
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 9 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/mempool_p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::config::MempoolP2pConfig;
use crate::propagator::MempoolP2pPropagator;
use crate::runner::MempoolP2pRunner;

const MEMPOOL_TOPIC: &str = "starknet_mempool_transaction_propagation/0.1.0";
pub const MEMPOOL_TOPIC: &str = "starknet_mempool_transaction_propagation/0.1.0";

pub fn create_p2p_propagator_and_runner(
mempool_p2p_config: MempoolP2pConfig,
Expand Down
2 changes: 1 addition & 1 deletion crates/papyrus_protobuf/src/converters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod consensus;
mod event;
mod header;
mod receipt;
mod rpc_transaction;
pub mod rpc_transaction;
mod state_diff;
#[cfg(test)]
mod test_instances;
Expand Down
4 changes: 4 additions & 0 deletions crates/tests_integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ axum.workspace = true
blockifier.workspace = true
cairo-lang-starknet-classes.workspace = true
chrono.workspace = true
futures.workspace = true
indexmap.workspace = true
mempool_test_utils.workspace = true
papyrus_common.workspace = true
papyrus_consensus.workspace = true
papyrus_network = { workspace = true, features = ["testing"] }
papyrus_protobuf.workspace = true
papyrus_rpc.workspace = true
papyrus_storage = { workspace = true, features = ["testing"] }
reqwest.workspace = true
Expand All @@ -32,6 +35,7 @@ starknet_consensus_manager.workspace = true
starknet_gateway = { workspace = true, features = ["testing"] }
starknet_gateway_types.workspace = true
starknet_http_server.workspace = true
starknet_mempool_p2p.workspace = true
starknet_monitoring_endpoint = { workspace = true, features = ["testing"] }
starknet_sequencer_infra.workspace = true
starknet_sequencer_node = { workspace = true, features = ["testing"] }
Expand Down
6 changes: 3 additions & 3 deletions crates/tests_integration/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ pub async fn run_transaction_generator_test_scenario<'a, Fut>(
send_rpc_txs(rpc_txs, send_rpc_tx_fn).await;
}

async fn create_gateway_config(chain_info: ChainInfo) -> GatewayConfig {
pub async fn create_gateway_config(chain_info: ChainInfo) -> GatewayConfig {
let stateless_tx_validator_config = StatelessTransactionValidatorConfig {
validate_non_zero_l1_gas_fee: true,
max_calldata_length: 10,
Expand All @@ -250,13 +250,13 @@ async fn create_gateway_config(chain_info: ChainInfo) -> GatewayConfig {
GatewayConfig { stateless_tx_validator_config, stateful_tx_validator_config, chain_info }
}

async fn create_http_server_config() -> HttpServerConfig {
pub async fn create_http_server_config() -> HttpServerConfig {
// TODO(Tsabary): use ser_generated_param.
let socket = get_available_socket().await;
HttpServerConfig { ip: socket.ip(), port: socket.port() }
}

fn create_batcher_config(
pub fn create_batcher_config(
batcher_storage_config: StorageConfig,
chain_info: ChainInfo,
) -> BatcherConfig {
Expand Down
4 changes: 0 additions & 4 deletions crates/tests_integration/tests/end_to_end_integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,9 @@ async fn test_end_to_end_integration(tx_generator: MultiAccountTransactionGenera

info!("Running integration test simulator.");

<<<<<<< HEAD
let send_rpc_tx_fn =
&mut |rpc_tx| integration_test_setup.add_tx_http_client.assert_add_tx_success(rpc_tx);

=======
let send_rpc_tx_fn = &mut |rpc_tx| http_test_client.assert_add_tx_success(rpc_tx);
>>>>>>> d24924dfe (chore: changing closure Fn requirements on integration test utils)
let n_txs = 50;
info!("Sending {n_txs} txs.");
run_transaction_generator_test_scenario(tx_generator, n_txs, send_rpc_tx_fn).await;
Expand Down
131 changes: 131 additions & 0 deletions crates/tests_integration/tests/mempool_p2p_flow_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use std::collections::HashSet;
use std::net::SocketAddr;

use blockifier::context::ChainInfo;
use futures::StreamExt;
use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
// use papyrus_consensus::config::ConsensusConfig;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::test_utils::create_network_config_connected_to_broadcast_channels;
use papyrus_protobuf::mempool::RpcTransactionWrapper;
use rstest::{fixture, rstest};
// use starknet_api::block::BlockNumber;
use starknet_api::rpc_transaction::RpcTransaction;
// use starknet_consensus_manager::config::ConsensusManagerConfig;
use starknet_http_server::config::HttpServerConfig;
use starknet_integration_tests::state_reader::{spawn_test_rpc_state_reader, StorageTestSetup};
use starknet_integration_tests::utils::{
create_batcher_config,
create_gateway_config,
create_http_server_config,
create_integration_test_tx_generator,
run_integration_test_scenario,
test_rpc_state_reader_config,
HttpTestClient,
};
use starknet_mempool_p2p::config::MempoolP2pConfig;
use starknet_mempool_p2p::MEMPOOL_TOPIC;
use starknet_sequencer_node::config::component_config::ComponentConfig;
use starknet_sequencer_node::config::{
ComponentExecutionConfig,
ComponentExecutionMode,
SequencerNodeConfig,
};
use starknet_sequencer_node::servers::run_component_servers;
use starknet_sequencer_node::utils::create_node_modules;
use starknet_task_executor::tokio_executor::TokioExecutor;
use tokio::runtime::Handle;

#[fixture]
fn tx_generator() -> MultiAccountTransactionGenerator {
create_integration_test_tx_generator()
}

// TODO: remove code duplication with FlowTestSetup
#[rstest]
#[tokio::test]
async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransactionGenerator) {
let handle = Handle::current();
let task_executor = TokioExecutor::new(handle);

let accounts = tx_generator.accounts();
let storage_for_test = StorageTestSetup::new(accounts);

// Spawn a papyrus rpc server for a papyrus storage reader.
let rpc_server_addr =
spawn_test_rpc_state_reader(storage_for_test.rpc_storage_reader, storage_for_test.chain_id)
.await;

// Derive the configuration for the mempool node.
let components = ComponentConfig {
consensus_manager: ComponentExecutionConfig {
execution_mode: ComponentExecutionMode::Disabled,
local_server_config: None,
..Default::default()
},
batcher: ComponentExecutionConfig {
execution_mode: ComponentExecutionMode::Disabled,
local_server_config: None,
..Default::default()
},
..Default::default()
};

let chain_id = storage_for_test.batcher_storage_config.db_config.chain_id.clone();
let mut chain_info = ChainInfo::create_for_testing();
chain_info.chain_id = chain_id.clone();
let batcher_config =
create_batcher_config(storage_for_test.batcher_storage_config, chain_info.clone());
let gateway_config = create_gateway_config(chain_info).await;
let http_server_config = create_http_server_config().await;
let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr);
let (network_config, mut broadcast_channels) =
create_network_config_connected_to_broadcast_channels::<RpcTransactionWrapper>(Topic::new(
MEMPOOL_TOPIC,
));
let mempool_p2p_config = MempoolP2pConfig { network_config, ..Default::default() };
let config = SequencerNodeConfig {
components,
batcher_config,
gateway_config,
http_server_config,
rpc_state_reader_config,
mempool_p2p_config,
..SequencerNodeConfig::default()
};

let (_clients, servers) = create_node_modules(&config);

let HttpServerConfig { ip, port } = config.http_server_config;
let add_tx_http_client = HttpTestClient::new(SocketAddr::from((ip, port)));

// Build and run the sequencer node.
let sequencer_node_future = run_component_servers(servers);
let _sequencer_node_handle = task_executor.spawn_with_handle(sequencer_node_future);

// Wait for server to spin up and for p2p to discover other peer.
// TODO(Gilad): Replace with a persistent Client with a built-in retry to protect against CI
// flakiness.
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;

let mut expected_txs = HashSet::new();

// Create and send transactions.
let _tx_hashes = run_integration_test_scenario(tx_generator, &mut |tx: RpcTransaction| {
expected_txs.insert(tx.clone()); // push the sent tx to the expected_txs list
add_tx_http_client.assert_add_tx_success(tx)
})
.await;

let mut received_txs_counter = 0;

// verify that the broadcasted transactions are the same as the expected_txs
for _ in 0..expected_txs.len() {
let tx = broadcast_channels.broadcasted_messages_receiver.next().await.unwrap();
received_txs_counter += 1;
assert!(expected_txs.contains(&tx.0.unwrap().0));
}
// every tx in received_tx is in expected_txs and |received_txs| = |expected_txs| => they are
// equal
assert!(expected_txs.len() == received_txs_counter);
}

0 comments on commit 0ac1457

Please sign in to comment.