From 76280aa4fb63180c287f06ced463781f44c00539 Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Tue, 7 Jan 2025 15:58:58 +0200 Subject: [PATCH] . --- .../partial_witness/partial_witness_actor.rs | 3 +++ .../partial_witness/partial_witness_actor_v2.rs | 5 +++-- chain/client/src/test_utils/setup.rs | 3 +++ integration-tests/src/test_loop/builder.rs | 3 +++ integration-tests/src/tests/network/runner.rs | 2 ++ nearcore/src/lib.rs | 6 ++++++ 6 files changed, 20 insertions(+), 2 deletions(-) diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index 719fa2834a5..85df8abed9e 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -16,6 +16,7 @@ use near_primitives::stateless_validation::contract_distribution::ContractUpdate use near_primitives::stateless_validation::state_witness::ChunkStateWitness; use near_primitives::types::ShardId; use std::sync::Arc; +use tokio::runtime::Handle; use crate::client_actor::ClientSenderForPartialWitness; @@ -119,6 +120,7 @@ impl Handler for PartialWitnessActor { impl PartialWitnessActor { pub fn new( + rt: Handle, clock: Clock, network_adapter: PeerManagerAdapter, client_sender: ClientSenderForPartialWitness, @@ -129,6 +131,7 @@ impl PartialWitnessActor { partial_witness_spawner: Arc, ) -> Self { let tx = PartialWitnessService::new( + rt, clock, network_adapter, client_sender, diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor_v2.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor_v2.rs index ff3e326e505..8dd97eb1e2a 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor_v2.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor_v2.rs @@ -38,6 +38,7 @@ use near_store::adapter::trie_store::TrieStoreAdapter; use near_store::{DBCol, StorageError, TrieDBStorage, TrieStorage}; use near_vm_runner::{get_contract_cache_key, ContractCode, ContractRuntimeCache}; use rand::Rng; +use tokio::runtime::Handle; use crate::client_actor::ClientSenderForPartialWitness; use crate::stateless_validation::state_witness_tracker::ChunkStateWitnessTracker; @@ -110,6 +111,7 @@ pub struct PartialWitnessSenderForClient { impl PartialWitnessService { pub fn new( + rt: Handle, clock: Clock, network_adapter: PeerManagerAdapter, client_sender: ClientSenderForPartialWitness, @@ -144,7 +146,7 @@ impl PartialWitnessService { ), }; - tokio::spawn(async move { + rt.spawn(async move { actor.run().await.expect("Failed to run PartialWitnessActor"); }); @@ -155,7 +157,6 @@ impl PartialWitnessService { /// Main async loop processing all incoming PartialWitnessMsg. pub async fn run(mut self) -> Result<(), Error> { while let Some(msg) = self.rx.recv().await { - // Match on the enum variant and dispatch the appropriate handler: match msg { PartialWitnessMsg::DistributeStateWitnessRequest(req) => { if let Err(err) = self.handle_distribute_state_witness_request(req).await { diff --git a/chain/client/src/test_utils/setup.rs b/chain/client/src/test_utils/setup.rs index 37422fd7597..adea5091579 100644 --- a/chain/client/src/test_utils/setup.rs +++ b/chain/client/src/test_utils/setup.rs @@ -74,6 +74,7 @@ use std::cmp::max; use std::collections::{HashMap, HashSet}; use std::ops::DerefMut; use std::sync::{Arc, RwLock}; +use tokio::runtime::Runtime; pub const TEST_SEED: RngSeed = [3; 32]; @@ -156,7 +157,9 @@ pub fn setup( ); let client_adapter_for_partial_witness_actor = LateBoundSender::new(); + let networking_rt = Runtime::new().unwrap(); let (partial_witness_addr, _) = spawn_actix_actor(PartialWitnessActor::new( + networking_rt.handle().clone(), clock.clone(), network_adapter.clone(), client_adapter_for_partial_witness_actor.as_multi_sender(), diff --git a/integration-tests/src/test_loop/builder.rs b/integration-tests/src/test_loop/builder.rs index aabffeaff05..9a7d742777b 100644 --- a/integration-tests/src/test_loop/builder.rs +++ b/integration-tests/src/test_loop/builder.rs @@ -42,6 +42,7 @@ use near_store::{Store, StoreConfig, TrieConfig}; use near_vm_runner::logic::ProtocolVersion; use near_vm_runner::{ContractRuntimeCache, FilesystemContractRuntimeCache}; use nearcore::state_sync::StateSyncDumper; +use tokio::runtime::Runtime; use super::env::{ClientToShardsManagerSender, TestData, TestLoopChunksStorage, TestLoopEnv}; use super::utils::network::{chunk_endorsement_dropper, chunk_endorsement_dropper_by_hash}; @@ -719,7 +720,9 @@ impl TestLoopBuilder { ) .unwrap(); + let networking_rt = Runtime::new().unwrap(); let partial_witness_actor = PartialWitnessActor::new( + networking_rt.handle().clone(), self.test_loop.clock(), network_adapter.as_multi_sender(), client_adapter.as_multi_sender(), diff --git a/integration-tests/src/tests/network/runner.rs b/integration-tests/src/tests/network/runner.rs index a6fe267d014..530bfbead7e 100644 --- a/integration-tests/src/tests/network/runner.rs +++ b/integration-tests/src/tests/network/runner.rs @@ -140,7 +140,9 @@ fn setup_network_node( runtime.store().clone(), client_config.chunk_request_retry_period, ); + let networking_rt = tokio::runtime::Builder::new_current_thread().enable_all().build()?; let (partial_witness_actor, _) = spawn_actix_actor(PartialWitnessActor::new( + networking_rt.handle().clone(), Clock::real(), network_adapter.as_multi_sender(), client_actor.clone().with_auto_span_context().into_multi_sender(), diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index acd35c35cc1..4e30312753c 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -227,6 +227,8 @@ pub struct NearNode { pub state_sync_runtime: Arc, /// Shard tracker, allows querying of which shards are tracked by this node. pub shard_tracker: ShardTracker, + // The threads that the networking layer runs in. + _networking_rt: tokio::runtime::Runtime, } pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result { @@ -363,8 +365,11 @@ pub fn start_with_config_and_synchronization( ); let snapshot_callbacks = SnapshotCallbacks { make_snapshot_callback, delete_snapshot_callback }; + let networking_rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); + let (partial_witness_actor, partial_witness_arbiter) = spawn_actix_actor(PartialWitnessActor::new( + networking_rt.handle().clone(), Clock::real(), network_adapter.as_multi_sender(), client_adapter_for_partial_witness_actor.as_multi_sender(), @@ -517,5 +522,6 @@ pub fn start_with_config_and_synchronization( resharding_handle, state_sync_runtime, shard_tracker, + _networking_rt: networking_rt, }) }