Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(mempool_p2p): test tx received from p2p reach mempool #1990

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 107 additions & 1 deletion crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
use std::collections::HashSet;
use std::future::ready;
use std::net::SocketAddr;

use blockifier::context::ChainInfo;
use futures::StreamExt;
use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::test_utils::create_network_config_connected_to_broadcast_channels;
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_protobuf::mempool::RpcTransactionWrapper;
use rstest::{fixture, rstest};
use starknet_api::rpc_transaction::RpcTransaction;
use starknet_api::executable_transaction::AccountTransaction;
use starknet_api::rpc_transaction::{
RpcDeployAccountTransaction,
RpcInvokeTransaction,
RpcTransaction,
};
use starknet_api::transaction::TransactionHash;
use starknet_http_server::config::HttpServerConfig;
use starknet_integration_tests::state_reader::{spawn_test_rpc_state_reader, StorageTestSetup};
use starknet_integration_tests::utils::{
Expand Down Expand Up @@ -121,3 +129,101 @@ async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransacti
expected_txs.remove(&tx);
}
}

#[rstest]
#[tokio::test]
async fn test_mempool_receives_tx_from_other_peer(tx_generator: MultiAccountTransactionGenerator) {
let handle = Handle::current();
let task_executor = TokioExecutor::new(handle);
let accounts = tx_generator.accounts();
let storage_for_test = StorageTestSetup::new(accounts);
// Spawn a papyrus rpc server for a papyrus storage reader.
let rpc_server_addr =
spawn_test_rpc_state_reader(storage_for_test.rpc_storage_reader, storage_for_test.chain_id)
.await;
// Derive the configuration for the mempool node.
let components = ComponentConfig {
consensus_manager: ComponentExecutionConfig {
execution_mode: ComponentExecutionMode::Disabled,
local_server_config: None,
..Default::default()
},
batcher: ComponentExecutionConfig {
execution_mode: ComponentExecutionMode::Disabled,
local_server_config: None,
..Default::default()
},
..Default::default()
};
let chain_id = storage_for_test.batcher_storage_config.db_config.chain_id.clone();
let mut chain_info = ChainInfo::create_for_testing();
chain_info.chain_id = chain_id.clone();
let batcher_config =
create_batcher_config(storage_for_test.batcher_storage_config, chain_info.clone());
let gateway_config = create_gateway_config(chain_info).await;
let http_server_config = create_http_server_config().await;
let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr);
let (network_config, mut broadcast_channels) =
create_network_config_connected_to_broadcast_channels::<RpcTransactionWrapper>(Topic::new(
MEMPOOL_TOPIC,
));
let mempool_p2p_config = MempoolP2pConfig { network_config, ..Default::default() };
let config = SequencerNodeConfig {
components,
batcher_config,
gateway_config,
http_server_config,
rpc_state_reader_config,
mempool_p2p_config,
..SequencerNodeConfig::default()
};
let (clients, servers) = create_node_modules(&config);
let mempool_client = clients.get_mempool_client().unwrap();
// Build and run the sequencer node.
let sequencer_node_future = run_component_servers(servers);
let _sequencer_node_handle = task_executor.spawn_with_handle(sequencer_node_future);
// Wait for server to spin up and for p2p to discover other peer.
// TODO(Gilad): Replace with a persistent Client with a built-in retry to protect against CI
// flakiness.
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;

let mut expected_txs = HashSet::new();

let _tx_hashes = run_integration_test_scenario(tx_generator, &mut |tx: RpcTransaction| {
expected_txs.insert(tx.clone());
ready(TransactionHash::default()) // using the default value because we don't use the hash anyways.
})
.await;
for tx in expected_txs.iter() {
broadcast_channels
.broadcast_topic_client
.broadcast_message(RpcTransactionWrapper(tx.clone()))
.await
.unwrap();
}

// waiting for the tx to be received (TODO: figure out a better solution)
tokio::time::sleep(std::time::Duration::from_millis(2000)).await;

let received_tx = mempool_client.get_txs(expected_txs.len()).await.unwrap();

// make sure we received all the transactions
assert_eq!(received_tx.len(), expected_txs.len());

for tx in received_tx.iter() {
let converted_tx: RpcTransaction = match tx {
AccountTransaction::Declare(_declare_tx) => {
panic!("No implementation for converting DeclareTransaction to an RpcTransaction")
}
AccountTransaction::DeployAccount(deploy_account_transaction) => {
RpcTransaction::DeployAccount(RpcDeployAccountTransaction::V3(
deploy_account_transaction.clone().into(),
))
}
AccountTransaction::Invoke(invoke_transaction) => {
RpcTransaction::Invoke(RpcInvokeTransaction::V3(invoke_transaction.clone().into()))
}
};
assert!(expected_txs.contains(&converted_tx));
}
}
Loading