Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

experiment tokio partial witness actor #12693

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod encoding;
mod partial_deploys_tracker;
pub mod partial_witness_actor;
pub mod partial_witness_actor_v2;
mod partial_witness_tracker;

pub use encoding::witness_part_length;

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use lru::LruCache;
use near_async::messaging::CanSend;
use near_async::time::Instant;
use near_cache::SyncLruCache;
use near_chain::chain::ChunkStateWitnessMessage;
use near_chain::Error;
use near_epoch_manager::EpochManagerAdapter;
Expand Down Expand Up @@ -308,13 +309,13 @@ pub struct PartialEncodedStateWitnessTracker {
/// Epoch manager to get the set of chunk validators
epoch_manager: Arc<dyn EpochManagerAdapter>,
/// Keeps track of state witness parts received from chunk producers.
parts_cache: LruCache<ChunkProductionKey, CacheEntry>,
parts_cache: Mutex<LruCache<ChunkProductionKey, CacheEntry>>,
/// Keeps track of the already decoded witnesses. This is needed
/// to protect chunk validator from processing the same witness multiple
/// times.
processed_witnesses: LruCache<ChunkProductionKey, ()>,
processed_witnesses: SyncLruCache<ChunkProductionKey, ()>,
/// Reed Solomon encoder for decoding state witness parts.
encoders: ReedSolomonEncoderCache,
encoders: Mutex<ReedSolomonEncoderCache>,
}

impl PartialEncodedStateWitnessTracker {
Expand All @@ -325,16 +326,16 @@ impl PartialEncodedStateWitnessTracker {
Self {
client_sender,
epoch_manager,
parts_cache: LruCache::new(NonZeroUsize::new(WITNESS_PARTS_CACHE_SIZE).unwrap()),
processed_witnesses: LruCache::new(
NonZeroUsize::new(PROCESSED_WITNESSES_CACHE_SIZE).unwrap(),
),
encoders: ReedSolomonEncoderCache::new(WITNESS_RATIO_DATA_PARTS),
parts_cache: Mutex::new(LruCache::new(
NonZeroUsize::new(WITNESS_PARTS_CACHE_SIZE).unwrap(),
)),
processed_witnesses: SyncLruCache::new(PROCESSED_WITNESSES_CACHE_SIZE),
encoders: Mutex::new(ReedSolomonEncoderCache::new(WITNESS_RATIO_DATA_PARTS)),
}
}

pub fn store_partial_encoded_state_witness(
&mut self,
&self,
partial_witness: PartialEncodedStateWitness,
) -> Result<(), Error> {
tracing::debug!(target: "client", ?partial_witness, "store_partial_encoded_state_witness");
Expand All @@ -345,7 +346,7 @@ impl PartialEncodedStateWitnessTracker {
}

pub fn store_accessed_contract_hashes(
&mut self,
&self,
key: ChunkProductionKey,
hashes: HashSet<CodeHash>,
) -> Result<(), Error> {
Expand All @@ -355,7 +356,7 @@ impl PartialEncodedStateWitnessTracker {
}

pub fn store_accessed_contract_codes(
&mut self,
&self,
key: ChunkProductionKey,
codes: Vec<CodeBytes>,
) -> Result<(), Error> {
Expand All @@ -365,7 +366,7 @@ impl PartialEncodedStateWitnessTracker {
}

fn process_update(
&mut self,
&self,
key: ChunkProductionKey,
create_if_not_exists: bool,
update: CacheUpdate,
Expand All @@ -382,17 +383,23 @@ impl PartialEncodedStateWitnessTracker {
if create_if_not_exists {
self.maybe_insert_new_entry_in_parts_cache(&key);
}
let Some(entry) = self.parts_cache.get_mut(&key) else {
let mut parts_cache = self.parts_cache.lock().unwrap();
let Some(entry) = parts_cache.get_mut(&key) else {
return Ok(());
};
if let Some((decode_result, accessed_contracts)) = entry.update(update) {
let total_size: usize = if let Some((decode_result, accessed_contracts)) =
entry.update(update)
{
// Record the time taken from receiving first part to decoding partial witness.
let time_to_last_part = Instant::now().signed_duration_since(entry.created_at);
metrics::PARTIAL_WITNESS_TIME_TO_LAST_PART
.with_label_values(&[key.shard_id.to_string().as_str()])
.observe(time_to_last_part.as_seconds_f64());

self.parts_cache.pop(&key);
parts_cache.pop(&key);
let total_size = parts_cache.iter().map(|(_, entry)| entry.total_size()).sum();
drop(parts_cache);

self.processed_witnesses.push(key.clone(), ());

let encoded_witness = match decode_result {
Expand Down Expand Up @@ -428,26 +435,33 @@ impl PartialEncodedStateWitnessTracker {

tracing::debug!(target: "client", ?key, "Sending encoded witness to client.");
self.client_sender.send(ChunkStateWitnessMessage { witness, raw_witness_size });
}
self.record_total_parts_cache_size_metric();

total_size
} else {
parts_cache.iter().map(|(_, entry)| entry.total_size()).sum()
};
metrics::PARTIAL_WITNESS_CACHE_SIZE.set(total_size as f64);

Ok(())
}

fn get_encoder(&mut self, key: &ChunkProductionKey) -> Result<Arc<ReedSolomonEncoder>, Error> {
fn get_encoder(&self, key: &ChunkProductionKey) -> Result<Arc<ReedSolomonEncoder>, Error> {
// The expected number of parts for the Reed Solomon encoding is the number of chunk validators.
let num_parts = self
.epoch_manager
.get_chunk_validator_assignments(&key.epoch_id, key.shard_id, key.height_created)?
.len();
Ok(self.encoders.entry(num_parts))
let mut encoders = self.encoders.lock().unwrap();
Ok(encoders.entry(num_parts))
}

// Function to insert a new entry into the cache for the chunk hash if it does not already exist
// We additionally check if an evicted entry has been fully decoded and processed.
fn maybe_insert_new_entry_in_parts_cache(&mut self, key: &ChunkProductionKey) {
if !self.parts_cache.contains(key) {
fn maybe_insert_new_entry_in_parts_cache(&self, key: &ChunkProductionKey) {
let mut parts_cache = self.parts_cache.lock().unwrap();
if !parts_cache.contains(key) {
if let Some((evicted_key, evicted_entry)) =
self.parts_cache.push(key.clone(), CacheEntry::new(key.shard_id))
parts_cache.push(key.clone(), CacheEntry::new(key.shard_id))
{
tracing::warn!(
target: "client",
Expand All @@ -460,11 +474,6 @@ impl PartialEncodedStateWitnessTracker {
}
}

fn record_total_parts_cache_size_metric(&self) {
let total_size: usize = self.parts_cache.iter().map(|(_, entry)| entry.total_size()).sum();
metrics::PARTIAL_WITNESS_CACHE_SIZE.set(total_size as f64);
}

fn decode_state_witness(
&self,
encoded_witness: &EncodedChunkStateWitness,
Expand Down
6 changes: 5 additions & 1 deletion chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use actix::{Actor, Addr, Context};
use futures::{future, FutureExt};
use near_async::actix::AddrWithAutoSpanContextExt;
use near_async::actix_wrapper::{spawn_actix_actor, ActixWrapper};
use near_async::futures::ActixFutureSpawner;
use near_async::futures::{ActixFutureSpawner, TokioRuntimeFutureSpawner};
use near_async::messaging::{
noop, CanSend, IntoMultiSender, IntoSender, LateBoundSender, SendAsync, Sender,
};
Expand Down Expand Up @@ -156,14 +156,18 @@ pub fn setup(
);

let client_adapter_for_partial_witness_actor = LateBoundSender::new();
let networking_rt = Arc::new(tokio::runtime::Builder::new_current_thread().build().unwrap());
let networking_spawner = Arc::new(TokioRuntimeFutureSpawner(networking_rt));
let (partial_witness_addr, _) = spawn_actix_actor(PartialWitnessActor::new(
networking_spawner,
clock.clone(),
network_adapter.clone(),
client_adapter_for_partial_witness_actor.as_multi_sender(),
signer.clone(),
epoch_manager.clone(),
runtime.clone(),
Arc::new(RayonAsyncComputationSpawner),
Arc::new(RayonAsyncComputationSpawner),
));
let partial_witness_adapter = partial_witness_addr.with_auto_span_context();

Expand Down
7 changes: 6 additions & 1 deletion integration-tests/src/test_loop/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use tempfile::TempDir;

use near_async::futures::FutureSpawner;
use near_async::futures::{FutureSpawner, TokioRuntimeFutureSpawner};
use near_async::messaging::{noop, IntoMultiSender, IntoSender, LateBoundSender};
use near_async::test_loop::sender::TestLoopSender;
use near_async::test_loop::TestLoopV2;
Expand Down Expand Up @@ -719,14 +719,19 @@ impl TestLoopBuilder {
)
.unwrap();

let networking_rt =
Arc::new(tokio::runtime::Builder::new_current_thread().build().unwrap());
let networking_spawner = Arc::new(TokioRuntimeFutureSpawner(networking_rt));
let partial_witness_actor = PartialWitnessActor::new(
networking_spawner,
self.test_loop.clock(),
network_adapter.as_multi_sender(),
client_adapter.as_multi_sender(),
validator_signer.clone(),
epoch_manager.clone(),
runtime_adapter.clone(),
Arc::new(self.test_loop.async_computation_spawner(|_| Duration::milliseconds(80))),
Arc::new(self.test_loop.async_computation_spawner(|_| Duration::milliseconds(80))),
);

let gc_actor = GCActor::new(
Expand Down
6 changes: 5 additions & 1 deletion integration-tests/src/tests/network/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use actix::{Actor, Addr};
use anyhow::{anyhow, bail, Context};
use near_async::actix::AddrWithAutoSpanContextExt;
use near_async::actix_wrapper::{spawn_actix_actor, ActixWrapper};
use near_async::futures::ActixFutureSpawner;
use near_async::futures::{ActixFutureSpawner, TokioRuntimeFutureSpawner};
use near_async::messaging::{noop, IntoMultiSender, IntoSender, LateBoundSender};
use near_async::time::{self, Clock};
use near_chain::rayon_spawner::RayonAsyncComputationSpawner;
Expand Down Expand Up @@ -140,14 +140,18 @@ fn setup_network_node(
runtime.store().clone(),
client_config.chunk_request_retry_period,
);
let networking_rt = Arc::new(tokio::runtime::Builder::new_current_thread().build().unwrap());
let networking_spawner = Arc::new(TokioRuntimeFutureSpawner(networking_rt));
let (partial_witness_actor, _) = spawn_actix_actor(PartialWitnessActor::new(
networking_spawner,
Clock::real(),
network_adapter.as_multi_sender(),
client_actor.clone().with_auto_span_context().into_multi_sender(),
validator_signer,
epoch_manager,
runtime,
Arc::new(RayonAsyncComputationSpawner),
Arc::new(RayonAsyncComputationSpawner),
));
shards_manager_adapter.bind(shards_manager_actor.with_auto_span_context());
let peer_manager = PeerManagerActor::spawn(
Expand Down
9 changes: 9 additions & 0 deletions nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ pub struct NearNode {
pub state_sync_runtime: Arc<tokio::runtime::Runtime>,
/// Shard tracker, allows querying of which shards are tracked by this node.
pub shard_tracker: ShardTracker,
// The threads that the networking layer runs in.
pub networking_rt: Arc<tokio::runtime::Runtime>,
}

pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result<NearNode> {
Expand Down Expand Up @@ -363,15 +365,21 @@ pub fn start_with_config_and_synchronization(
);
let snapshot_callbacks = SnapshotCallbacks { make_snapshot_callback, delete_snapshot_callback };

let networking_rt =
Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap());
let networking_spawner = Arc::new(TokioRuntimeFutureSpawner(networking_rt.clone()));

let (partial_witness_actor, partial_witness_arbiter) =
spawn_actix_actor(PartialWitnessActor::new(
networking_spawner,
Clock::real(),
network_adapter.as_multi_sender(),
client_adapter_for_partial_witness_actor.as_multi_sender(),
config.validator_signer.clone(),
epoch_manager.clone(),
runtime.clone(),
Arc::new(RayonAsyncComputationSpawner),
Arc::new(RayonAsyncComputationSpawner),
));

let (_gc_actor, gc_arbiter) = spawn_actix_actor(GCActor::new(
Expand Down Expand Up @@ -516,5 +524,6 @@ pub fn start_with_config_and_synchronization(
resharding_handle,
state_sync_runtime,
shard_tracker,
networking_rt,
})
}
12 changes: 12 additions & 0 deletions utils/near-cache/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ where
self.inner.lock().unwrap().is_empty()
}

/// Returns true if the cache contains the key and false otherwise.
pub fn contains(&self, key: &K) -> bool {
self.inner.lock().unwrap().contains(key)
}

/// Pushes a key-value pair into the cache. If an entry with key `k` already exists in
/// the cache or another cache entry is removed (due to the lru's capacity),
/// then it returns the old entry's key-value pair. Otherwise, returns `None`.
pub fn push(&self, key: K, value: V) -> Option<(K, V)> {
self.inner.lock().unwrap().push(key, value)
}

/// Return the value of they key in the cache otherwise computes the value and inserts it into
/// the cache. If the key is already in the cache, they get moved to the head of
/// the LRU list.
Expand Down
Loading