Skip to content

Commit

Permalink
[stateless_validation] Cleanup for bypass sending stateless validatio…
Browse files Browse the repository at this point in the history
…n messages to ourselves (#10539)

I wasn't too happy exposing chunk_endorsements cache in
`ChunkEndorsementTracker` that was introduced as part of this PR:
#10531

Hopefully this is better structured. Suggestions are welcomed!
  • Loading branch information
Shreyan Gupta authored Jan 31, 2024
1 parent cf2caa3 commit 0e1c647
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 44 deletions.
2 changes: 1 addition & 1 deletion chain/client/src/chunk_inclusion_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl ChunkInclusionTracker {
pub fn prepare_chunk_headers_ready_for_inclusion(
&mut self,
prev_block_hash: &CryptoHash,
endorsement_tracker: &mut ChunkEndorsementTracker,
endorsement_tracker: &ChunkEndorsementTracker,
) -> Result<(), Error> {
let Some(entry) = self.prev_block_to_chunk_hash_ready.get(prev_block_hash) else {
return Ok(());
Expand Down
8 changes: 5 additions & 3 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ pub struct Client {
/// Also tracks banned chunk producers and filters out chunks produced by them
pub chunk_inclusion_tracker: ChunkInclusionTracker,
/// Tracks chunk endorsements received from chunk validators. Used to filter out chunks ready for inclusion
pub chunk_endorsement_tracker: ChunkEndorsementTracker,
pub chunk_endorsement_tracker: Arc<ChunkEndorsementTracker>,
}

impl Client {
Expand Down Expand Up @@ -341,13 +341,15 @@ impl Client {
validator_signer.clone(),
doomslug_threshold_mode,
);
let chunk_endorsement_tracker =
Arc::new(ChunkEndorsementTracker::new(epoch_manager.clone()));
let chunk_validator = ChunkValidator::new(
validator_signer.clone(),
epoch_manager.clone(),
network_adapter.clone().into_sender(),
runtime_adapter.clone(),
chunk_endorsement_tracker.clone(),
);
let chunk_endorsement_tracker = ChunkEndorsementTracker::new(epoch_manager.clone());
Ok(Self {
#[cfg(feature = "test_features")]
adv_produce_blocks: None,
Expand Down Expand Up @@ -539,7 +541,7 @@ impl Client {

self.chunk_inclusion_tracker.prepare_chunk_headers_ready_for_inclusion(
&head.last_block_hash,
&mut self.chunk_endorsement_tracker,
self.chunk_endorsement_tracker.as_ref(),
)?;

self.produce_block_on(height, head.last_block_hash)
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ impl ClientActor {
if me == next_block_producer_account {
self.client.chunk_inclusion_tracker.prepare_chunk_headers_ready_for_inclusion(
&head.last_block_hash,
&mut self.client.chunk_endorsement_tracker,
self.client.chunk_endorsement_tracker.as_ref(),
)?;
let num_chunks = self
.client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct ChunkEndorsementTracker {
/// We store the validated chunk endorsements received from chunk validators
/// This is keyed on chunk_hash and account_id of validator to avoid duplicates.
/// Chunk endorsements would later be used as a part of block production.
pub chunk_endorsements: Arc<SyncLruCache<ChunkHash, HashMap<AccountId, ChunkEndorsement>>>,
chunk_endorsements: SyncLruCache<ChunkHash, HashMap<AccountId, ChunkEndorsement>>,
}

impl Client {
Expand All @@ -31,24 +31,24 @@ impl Client {
endorsement: ChunkEndorsement,
) -> Result<(), Error> {
let chunk_header = self.chain.get_chunk(endorsement.chunk_hash())?.cloned_header();
self.chunk_endorsement_tracker.process_chunk_endorsement(chunk_header, endorsement)
self.chunk_endorsement_tracker.process_chunk_endorsement(&chunk_header, endorsement)
}
}

impl ChunkEndorsementTracker {
pub fn new(epoch_manager: Arc<dyn EpochManagerAdapter>) -> Self {
Self {
epoch_manager,
chunk_endorsements: Arc::new(SyncLruCache::new(NUM_CHUNKS_IN_CHUNK_ENDORSEMENTS_CACHE)),
chunk_endorsements: SyncLruCache::new(NUM_CHUNKS_IN_CHUNK_ENDORSEMENTS_CACHE),
}
}

/// Function to process an incoming chunk endorsement from chunk validators.
/// We first verify the chunk endorsement and then store it in a cache.
/// We would later include the endorsements in the block production.
fn process_chunk_endorsement(
pub(crate) fn process_chunk_endorsement(
&self,
chunk_header: ShardChunkHeader,
chunk_header: &ShardChunkHeader,
endorsement: ChunkEndorsement,
) -> Result<(), Error> {
let chunk_hash = endorsement.chunk_hash();
Expand All @@ -64,7 +64,7 @@ impl ChunkEndorsementTracker {
return Ok(());
}

if !self.epoch_manager.verify_chunk_endorsement(&chunk_header, &endorsement)? {
if !self.epoch_manager.verify_chunk_endorsement(chunk_header, &endorsement)? {
tracing::error!(target: "stateless_validation", ?endorsement, "Invalid chunk endorsement.");
return Err(Error::InvalidChunkEndorsement);
}
Expand All @@ -76,7 +76,7 @@ impl ChunkEndorsementTracker {
// Maybe add check to ensure we don't accept endorsements from chunks already included in some block?
// Maybe add check to ensure we don't accept endorsements from chunks that have too old height_created?
tracing::debug!(target: "stateless_validation", ?endorsement, "Received and saved chunk endorsement.");
let mut guard = self.chunk_endorsements.as_ref().lock();
let mut guard = self.chunk_endorsements.lock();
guard.get_or_insert(chunk_hash.clone(), || HashMap::new());
let chunk_endorsements = guard.get_mut(chunk_hash).unwrap();
chunk_endorsements.insert(account_id.clone(), endorsement);
Expand Down
42 changes: 21 additions & 21 deletions chain/client/src/stateless_validation/chunk_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::stateless_validation::chunk_endorsement_tracker::ChunkEndorsementTrac
use crate::{metrics, Client};
use itertools::Itertools;
use near_async::messaging::{CanSend, Sender};
use near_cache::SyncLruCache;
use near_chain::chain::{
apply_new_chunk, apply_old_chunk, NewChunkData, NewChunkResult, OldChunkData, OldChunkResult,
ShardContext, StorageContext,
Expand All @@ -28,7 +27,7 @@ use near_primitives::stateless_validation::{
};
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::chunk_extra::ChunkExtra;
use near_primitives::types::{AccountId, ShardId};
use near_primitives::types::ShardId;
use near_primitives::validator_signer::ValidatorSigner;
use near_store::PartialStorage;
use std::collections::HashMap;
Expand All @@ -52,6 +51,7 @@ pub struct ChunkValidator {
epoch_manager: Arc<dyn EpochManagerAdapter>,
network_sender: Sender<PeerManagerMessageRequest>,
runtime_adapter: Arc<dyn RuntimeAdapter>,
chunk_endorsement_tracker: Arc<ChunkEndorsementTracker>,
}

impl ChunkValidator {
Expand All @@ -60,8 +60,15 @@ impl ChunkValidator {
epoch_manager: Arc<dyn EpochManagerAdapter>,
network_sender: Sender<PeerManagerMessageRequest>,
runtime_adapter: Arc<dyn RuntimeAdapter>,
chunk_endorsement_tracker: Arc<ChunkEndorsementTracker>,
) -> Self {
Self { my_signer, epoch_manager, network_sender, runtime_adapter }
Self {
my_signer,
epoch_manager,
network_sender,
runtime_adapter,
chunk_endorsement_tracker,
}
}

/// Performs the chunk validation logic. When done, it will send the chunk
Expand All @@ -72,7 +79,6 @@ impl ChunkValidator {
state_witness: ChunkStateWitness,
chain: &Chain,
peer_id: PeerId,
chunk_endorsement_tracker: &ChunkEndorsementTracker,
) -> Result<(), Error> {
if !self.epoch_manager.verify_chunk_state_witness_signature(&state_witness)? {
return Err(Error::InvalidChunkStateWitness("Invalid signature".to_string()));
Expand Down Expand Up @@ -108,7 +114,7 @@ impl ChunkValidator {
let signer = my_signer.clone();
let epoch_manager = self.epoch_manager.clone();
let runtime_adapter = self.runtime_adapter.clone();
let my_chunk_endorsements = chunk_endorsement_tracker.chunk_endorsements.clone();
let chunk_endorsement_tracker = self.chunk_endorsement_tracker.clone();
rayon::spawn(move || {
match validate_chunk_state_witness(
state_witness_inner,
Expand All @@ -122,7 +128,7 @@ impl ChunkValidator {
epoch_manager.as_ref(),
signer.as_ref(),
&network_sender,
my_chunk_endorsements.as_ref(),
chunk_endorsement_tracker.as_ref(),
);
}
Err(err) => {
Expand Down Expand Up @@ -524,12 +530,12 @@ fn apply_result_to_chunk_extra(
)
}

pub fn send_chunk_endorsement_to_block_producers(
pub(crate) fn send_chunk_endorsement_to_block_producers(
chunk_header: &ShardChunkHeader,
epoch_manager: &dyn EpochManagerAdapter,
signer: &dyn ValidatorSigner,
network_sender: &Sender<PeerManagerMessageRequest>,
my_chunk_endorsements: &SyncLruCache<ChunkHash, HashMap<AccountId, ChunkEndorsement>>,
chunk_endorsement_tracker: &ChunkEndorsementTracker,
) {
let epoch_id =
epoch_manager.get_epoch_id_from_prev_block(chunk_header.prev_block_hash()).unwrap();
Expand All @@ -553,12 +559,10 @@ pub fn send_chunk_endorsement_to_block_producers(
let endorsement = ChunkEndorsement::new(chunk_header.chunk_hash(), signer);
for block_producer in block_producers {
if signer.validator_id() == &block_producer {
// Add endorsement to the cache of our chunk endorsements
// immediately, because network won't handle message to ourselves.
let mut guard = my_chunk_endorsements.lock();
guard.get_or_insert(chunk_hash.clone(), || HashMap::new());
let chunk_endorsements = guard.get_mut(&chunk_hash).unwrap();
chunk_endorsements.insert(block_producer.clone(), endorsement.clone());
// Unwrap here as we always expect our own endorsements to be valid
chunk_endorsement_tracker
.process_chunk_endorsement(chunk_header, endorsement.clone())
.unwrap();
} else {
network_sender.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ChunkEndorsement(block_producer, endorsement.clone()),
Expand Down Expand Up @@ -600,19 +604,15 @@ impl Client {
self.epoch_manager.as_ref(),
signer.as_ref(),
&self.chunk_validator.network_sender,
self.chunk_endorsement_tracker.chunk_endorsements.as_ref(),
self.chunk_endorsement_tracker.as_ref(),
);
return Ok(None);
}

// TODO(#10265): If the previous block does not exist, we should
// queue this (similar to orphans) to retry later.
let result = self.chunk_validator.start_validating_chunk(
witness,
&self.chain,
peer_id.clone(),
&self.chunk_endorsement_tracker,
);
let result =
self.chunk_validator.start_validating_chunk(witness, &self.chain, peer_id.clone());
if let Err(Error::InvalidChunkStateWitness(_)) = &result {
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::BanPeer {
Expand Down
16 changes: 6 additions & 10 deletions chain/client/src/stateless_validation/state_witness_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Client {
}

let chunk_header = chunk.cloned_header();
let chunk_validators = self
let mut chunk_validators = self
.epoch_manager
.get_chunk_validator_assignments(
epoch_id,
Expand All @@ -48,23 +48,19 @@ impl Client {
transactions_storage_proof,
)?;

// TODO(#10508): Remove this block once we have better ways to handle chunk state witness and
// chunk endorsement related network messages.
if let Some(my_signer) = self.validator_signer.clone() {
let validator_id = my_signer.validator_id();
if chunk_validators.contains(validator_id) {
// Endorse the chunk immediately, bypassing sending state witness
// to ourselves, because network can't send messages to ourselves.
// Also useful in tests where we don't have a good way to handle
// network messages and there's only a single client.
if chunk_validators.contains(my_signer.validator_id()) {
// Bypass state witness validation if we created state witness. Endorse the chunk immediately.
send_chunk_endorsement_to_block_producers(
&chunk_header,
self.epoch_manager.as_ref(),
my_signer.as_ref(),
&self.network_adapter.clone().into_sender(),
self.chunk_endorsement_tracker.chunk_endorsements.as_ref(),
self.chunk_endorsement_tracker.as_ref(),
);
}
// Remove ourselves from the list of chunk validators. Network can't send messages to ourselves.
chunk_validators.retain(|validator| validator != my_signer.validator_id());
};

tracing::debug!(
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/src/tests/client/process_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2407,7 +2407,7 @@ fn test_validate_chunk_extra() {
.chunk_inclusion_tracker
.prepare_chunk_headers_ready_for_inclusion(
block1.hash(),
&mut client.chunk_endorsement_tracker,
client.chunk_endorsement_tracker.as_ref(),
)
.unwrap();
let block = client.produce_block_on(next_height + 2, *block1.hash()).unwrap().unwrap();
Expand Down

0 comments on commit 0e1c647

Please sign in to comment.