Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
stedfn committed Jan 7, 2025
1 parent 1e6c4b4 commit 76280aa
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use near_primitives::stateless_validation::contract_distribution::ContractUpdate
use near_primitives::stateless_validation::state_witness::ChunkStateWitness;
use near_primitives::types::ShardId;
use std::sync::Arc;
use tokio::runtime::Handle;

use crate::client_actor::ClientSenderForPartialWitness;

Expand Down Expand Up @@ -119,6 +120,7 @@ impl Handler<ContractCodeResponseMessage> for PartialWitnessActor {

impl PartialWitnessActor {
pub fn new(
rt: Handle,
clock: Clock,
network_adapter: PeerManagerAdapter,
client_sender: ClientSenderForPartialWitness,
Expand All @@ -129,6 +131,7 @@ impl PartialWitnessActor {
partial_witness_spawner: Arc<dyn AsyncComputationSpawner>,
) -> Self {
let tx = PartialWitnessService::new(
rt,
clock,
network_adapter,
client_sender,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use near_store::adapter::trie_store::TrieStoreAdapter;
use near_store::{DBCol, StorageError, TrieDBStorage, TrieStorage};
use near_vm_runner::{get_contract_cache_key, ContractCode, ContractRuntimeCache};
use rand::Rng;
use tokio::runtime::Handle;

use crate::client_actor::ClientSenderForPartialWitness;
use crate::stateless_validation::state_witness_tracker::ChunkStateWitnessTracker;
Expand Down Expand Up @@ -110,6 +111,7 @@ pub struct PartialWitnessSenderForClient {

impl PartialWitnessService {
pub fn new(
rt: Handle,
clock: Clock,
network_adapter: PeerManagerAdapter,
client_sender: ClientSenderForPartialWitness,
Expand Down Expand Up @@ -144,7 +146,7 @@ impl PartialWitnessService {
),
};

tokio::spawn(async move {
rt.spawn(async move {
actor.run().await.expect("Failed to run PartialWitnessActor");
});

Expand All @@ -155,7 +157,6 @@ impl PartialWitnessService {
/// Main async loop processing all incoming PartialWitnessMsg.
pub async fn run(mut self) -> Result<(), Error> {
while let Some(msg) = self.rx.recv().await {
// Match on the enum variant and dispatch the appropriate handler:
match msg {
PartialWitnessMsg::DistributeStateWitnessRequest(req) => {
if let Err(err) = self.handle_distribute_state_witness_request(req).await {
Expand Down
3 changes: 3 additions & 0 deletions chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use std::cmp::max;
use std::collections::{HashMap, HashSet};
use std::ops::DerefMut;
use std::sync::{Arc, RwLock};
use tokio::runtime::Runtime;

pub const TEST_SEED: RngSeed = [3; 32];

Expand Down Expand Up @@ -156,7 +157,9 @@ pub fn setup(
);

let client_adapter_for_partial_witness_actor = LateBoundSender::new();
let networking_rt = Runtime::new().unwrap();
let (partial_witness_addr, _) = spawn_actix_actor(PartialWitnessActor::new(
networking_rt.handle().clone(),
clock.clone(),
network_adapter.clone(),
client_adapter_for_partial_witness_actor.as_multi_sender(),
Expand Down
3 changes: 3 additions & 0 deletions integration-tests/src/test_loop/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use near_store::{Store, StoreConfig, TrieConfig};
use near_vm_runner::logic::ProtocolVersion;
use near_vm_runner::{ContractRuntimeCache, FilesystemContractRuntimeCache};
use nearcore::state_sync::StateSyncDumper;
use tokio::runtime::Runtime;

use super::env::{ClientToShardsManagerSender, TestData, TestLoopChunksStorage, TestLoopEnv};
use super::utils::network::{chunk_endorsement_dropper, chunk_endorsement_dropper_by_hash};
Expand Down Expand Up @@ -719,7 +720,9 @@ impl TestLoopBuilder {
)
.unwrap();

let networking_rt = Runtime::new().unwrap();
let partial_witness_actor = PartialWitnessActor::new(
networking_rt.handle().clone(),
self.test_loop.clock(),
network_adapter.as_multi_sender(),
client_adapter.as_multi_sender(),
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/src/tests/network/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ fn setup_network_node(
runtime.store().clone(),
client_config.chunk_request_retry_period,
);
let networking_rt = tokio::runtime::Builder::new_current_thread().enable_all().build()?;
let (partial_witness_actor, _) = spawn_actix_actor(PartialWitnessActor::new(
networking_rt.handle().clone(),
Clock::real(),
network_adapter.as_multi_sender(),
client_actor.clone().with_auto_span_context().into_multi_sender(),
Expand Down
6 changes: 6 additions & 0 deletions nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ pub struct NearNode {
pub state_sync_runtime: Arc<tokio::runtime::Runtime>,
/// Shard tracker, allows querying of which shards are tracked by this node.
pub shard_tracker: ShardTracker,
// The threads that the networking layer runs in.
_networking_rt: tokio::runtime::Runtime,
}

pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result<NearNode> {
Expand Down Expand Up @@ -363,8 +365,11 @@ pub fn start_with_config_and_synchronization(
);
let snapshot_callbacks = SnapshotCallbacks { make_snapshot_callback, delete_snapshot_callback };

let networking_rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();

let (partial_witness_actor, partial_witness_arbiter) =
spawn_actix_actor(PartialWitnessActor::new(
networking_rt.handle().clone(),
Clock::real(),
network_adapter.as_multi_sender(),
client_adapter_for_partial_witness_actor.as_multi_sender(),
Expand Down Expand Up @@ -517,5 +522,6 @@ pub fn start_with_config_and_synchronization(
resharding_handle,
state_sync_runtime,
shard_tracker,
_networking_rt: networking_rt,
})
}

0 comments on commit 76280aa

Please sign in to comment.