diff --git a/Cargo.lock b/Cargo.lock index b56fc565cb..776c19ca72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10288,6 +10288,7 @@ dependencies = [ "axum", "blockifier", "cairo-lang-starknet-classes", + "chrono", "futures", "indexmap 2.6.0", "mempool_test_utils", @@ -10310,6 +10311,7 @@ dependencies = [ "starknet_gateway", "starknet_gateway_types", "starknet_http_server", + "starknet_mempool_p2p", "starknet_monitoring_endpoint", "starknet_sequencer_infra", "starknet_sequencer_node", diff --git a/crates/mempool_p2p/src/lib.rs b/crates/mempool_p2p/src/lib.rs index 109d78801e..118cb87c63 100644 --- a/crates/mempool_p2p/src/lib.rs +++ b/crates/mempool_p2p/src/lib.rs @@ -10,7 +10,7 @@ use crate::config::MempoolP2pConfig; use crate::propagator::MempoolP2pPropagator; use crate::runner::MempoolP2pRunner; -const MEMPOOL_TOPIC: &str = "starknet_mempool_transaction_propagation/0.1.0"; +pub const MEMPOOL_TOPIC: &str = "starknet_mempool_transaction_propagation/0.1.0"; pub fn create_p2p_propagator_and_runner( mempool_p2p_config: MempoolP2pConfig, diff --git a/crates/papyrus_protobuf/src/converters/mod.rs b/crates/papyrus_protobuf/src/converters/mod.rs index 5fff153657..a72d774d25 100644 --- a/crates/papyrus_protobuf/src/converters/mod.rs +++ b/crates/papyrus_protobuf/src/converters/mod.rs @@ -5,7 +5,7 @@ pub mod consensus; mod event; mod header; mod receipt; -mod rpc_transaction; +pub mod rpc_transaction; mod state_diff; #[cfg(test)] mod test_instances; diff --git a/crates/tests_integration/Cargo.toml b/crates/tests_integration/Cargo.toml index 0aac6c931b..7abfda180a 100644 --- a/crates/tests_integration/Cargo.toml +++ b/crates/tests_integration/Cargo.toml @@ -14,6 +14,8 @@ assert_matches.workspace = true axum.workspace = true blockifier.workspace = true cairo-lang-starknet-classes.workspace = true +chrono.workspace = true +futures.workspace = true indexmap.workspace = true mempool_test_utils.workspace = true papyrus_common.workspace = true @@ -33,6 +35,7 @@ starknet_consensus_manager.workspace = true starknet_gateway = { workspace = true, features = ["testing"] } starknet_gateway_types.workspace = true starknet_http_server.workspace = true +starknet_mempool_p2p.workspace = true starknet_monitoring_endpoint = { workspace = true, features = ["testing"] } starknet_sequencer_infra.workspace = true starknet_sequencer_node = { workspace = true, features = ["testing"] } diff --git a/crates/tests_integration/src/utils.rs b/crates/tests_integration/src/utils.rs index f5fb162f7b..1db1d3e1c6 100644 --- a/crates/tests_integration/src/utils.rs +++ b/crates/tests_integration/src/utils.rs @@ -245,7 +245,7 @@ pub async fn run_transaction_generator_test_scenario<'a, Fut>( send_rpc_txs(rpc_txs, send_rpc_tx_fn).await; } -async fn create_gateway_config(chain_info: ChainInfo) -> GatewayConfig { +pub async fn create_gateway_config(chain_info: ChainInfo) -> GatewayConfig { let stateless_tx_validator_config = StatelessTransactionValidatorConfig { validate_non_zero_l1_gas_fee: true, max_calldata_length: 10, @@ -257,13 +257,13 @@ async fn create_gateway_config(chain_info: ChainInfo) -> GatewayConfig { GatewayConfig { stateless_tx_validator_config, stateful_tx_validator_config, chain_info } } -async fn create_http_server_config() -> HttpServerConfig { +pub async fn create_http_server_config() -> HttpServerConfig { // TODO(Tsabary): use ser_generated_param. let socket = get_available_socket().await; HttpServerConfig { ip: socket.ip(), port: socket.port() } } -fn create_batcher_config( +pub fn create_batcher_config( batcher_storage_config: StorageConfig, chain_info: ChainInfo, ) -> BatcherConfig { diff --git a/crates/tests_integration/tests/end_to_end_integration_test.rs b/crates/tests_integration/tests/end_to_end_integration_test.rs index c0ece13fd6..5e5766e503 100644 --- a/crates/tests_integration/tests/end_to_end_integration_test.rs +++ b/crates/tests_integration/tests/end_to_end_integration_test.rs @@ -71,13 +71,9 @@ async fn test_end_to_end_integration(tx_generator: MultiAccountTransactionGenera info!("Running integration test simulator."); -<<<<<<< HEAD let send_rpc_tx_fn = &mut |rpc_tx| integration_test_setup.add_tx_http_client.assert_add_tx_success(rpc_tx); -======= - let send_rpc_tx_fn = &mut |rpc_tx| http_test_client.assert_add_tx_success(rpc_tx); ->>>>>>> d24924dfe (chore: changing closure Fn requirements on integration test utils) let n_txs = 50; info!("Sending {n_txs} txs."); run_transaction_generator_test_scenario(tx_generator, n_txs, send_rpc_tx_fn).await; diff --git a/crates/tests_integration/tests/mempool_p2p_flow_test.rs b/crates/tests_integration/tests/mempool_p2p_flow_test.rs new file mode 100644 index 0000000000..8a019a0a12 --- /dev/null +++ b/crates/tests_integration/tests/mempool_p2p_flow_test.rs @@ -0,0 +1,131 @@ +use std::collections::HashSet; +use std::net::SocketAddr; + +use blockifier::context::ChainInfo; +use futures::StreamExt; +use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator; +// use papyrus_consensus::config::ConsensusConfig; +use papyrus_network::gossipsub_impl::Topic; +use papyrus_network::network_manager::test_utils::create_network_config_connected_to_broadcast_channels; +use papyrus_protobuf::mempool::RpcTransactionWrapper; +use rstest::{fixture, rstest}; +// use starknet_api::block::BlockNumber; +use starknet_api::rpc_transaction::RpcTransaction; +// use starknet_consensus_manager::config::ConsensusManagerConfig; +use starknet_http_server::config::HttpServerConfig; +use starknet_integration_tests::state_reader::{spawn_test_rpc_state_reader, StorageTestSetup}; +use starknet_integration_tests::utils::{ + create_batcher_config, + create_gateway_config, + create_http_server_config, + create_integration_test_tx_generator, + run_integration_test_scenario, + test_rpc_state_reader_config, + HttpTestClient, +}; +use starknet_mempool_p2p::config::MempoolP2pConfig; +use starknet_mempool_p2p::MEMPOOL_TOPIC; +use starknet_sequencer_node::config::component_config::ComponentConfig; +use starknet_sequencer_node::config::{ + ComponentExecutionConfig, + ComponentExecutionMode, + SequencerNodeConfig, +}; +use starknet_sequencer_node::servers::run_component_servers; +use starknet_sequencer_node::utils::create_node_modules; +use starknet_task_executor::tokio_executor::TokioExecutor; +use tokio::runtime::Handle; + +#[fixture] +fn tx_generator() -> MultiAccountTransactionGenerator { + create_integration_test_tx_generator() +} + +// TODO: remove code duplication with FlowTestSetup +#[rstest] +#[tokio::test] +async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransactionGenerator) { + let handle = Handle::current(); + let task_executor = TokioExecutor::new(handle); + + let accounts = tx_generator.accounts(); + let storage_for_test = StorageTestSetup::new(accounts); + + // Spawn a papyrus rpc server for a papyrus storage reader. + let rpc_server_addr = + spawn_test_rpc_state_reader(storage_for_test.rpc_storage_reader, storage_for_test.chain_id) + .await; + + // Derive the configuration for the mempool node. + let components = ComponentConfig { + consensus_manager: ComponentExecutionConfig { + execution_mode: ComponentExecutionMode::Disabled, + local_server_config: None, + ..Default::default() + }, + batcher: ComponentExecutionConfig { + execution_mode: ComponentExecutionMode::Disabled, + local_server_config: None, + ..Default::default() + }, + ..Default::default() + }; + + let chain_id = storage_for_test.batcher_storage_config.db_config.chain_id.clone(); + let mut chain_info = ChainInfo::create_for_testing(); + chain_info.chain_id = chain_id.clone(); + let batcher_config = + create_batcher_config(storage_for_test.batcher_storage_config, chain_info.clone()); + let gateway_config = create_gateway_config(chain_info).await; + let http_server_config = create_http_server_config().await; + let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr); + let (network_config, mut broadcast_channels) = + create_network_config_connected_to_broadcast_channels::(Topic::new( + MEMPOOL_TOPIC, + )); + let mempool_p2p_config = MempoolP2pConfig { network_config, ..Default::default() }; + let config = SequencerNodeConfig { + components, + batcher_config, + gateway_config, + http_server_config, + rpc_state_reader_config, + mempool_p2p_config, + ..SequencerNodeConfig::default() + }; + + let (_clients, servers) = create_node_modules(&config); + + let HttpServerConfig { ip, port } = config.http_server_config; + let add_tx_http_client = HttpTestClient::new(SocketAddr::from((ip, port))); + + // Build and run the sequencer node. + let sequencer_node_future = run_component_servers(servers); + let _sequencer_node_handle = task_executor.spawn_with_handle(sequencer_node_future); + + // Wait for server to spin up and for p2p to discover other peer. + // TODO(Gilad): Replace with a persistent Client with a built-in retry to protect against CI + // flakiness. + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + + let mut expected_txs = HashSet::new(); + + // Create and send transactions. + let _tx_hashes = run_integration_test_scenario(tx_generator, &mut |tx: RpcTransaction| { + expected_txs.insert(tx.clone()); // push the sent tx to the expected_txs list + add_tx_http_client.assert_add_tx_success(tx) + }) + .await; + + let mut received_txs_counter = 0; + + // verify that the broadcasted transactions are the same as the expected_txs + for _ in 0..expected_txs.len() { + let tx = broadcast_channels.broadcasted_messages_receiver.next().await.unwrap(); + received_txs_counter += 1; + assert!(expected_txs.contains(&tx.0.unwrap().0)); + } + // every tx in received_tx is in expected_txs and |received_txs| = |expected_txs| => they are + // equal + assert!(expected_txs.len() == received_txs_counter); +}