diff --git a/chain/chain/src/flat_storage_resharder.rs b/chain/chain/src/flat_storage_resharder.rs index 83a9434115d..c2d1b100a4e 100644 --- a/chain/chain/src/flat_storage_resharder.rs +++ b/chain/chain/src/flat_storage_resharder.rs @@ -1000,7 +1000,10 @@ fn shard_split_handle_key_value( | col::BANDWIDTH_SCHEDULER_STATE => { copy_kv_to_all_children(&split_params, key, value, store_update) } - col::BUFFERED_RECEIPT_INDICES | col::BUFFERED_RECEIPT => { + col::BUFFERED_RECEIPT_INDICES + | col::BUFFERED_RECEIPT + | col::BUFFERED_RECEIPT_GROUPS_QUEUE_DATA + | col::BUFFERED_RECEIPT_GROUPS_QUEUE_ITEM => { copy_kv_to_left_child(&split_params, key, value, store_update) } _ => unreachable!("key: {:?} should not appear in flat store!", key), diff --git a/chain/chain/src/resharding/manager.rs b/chain/chain/src/resharding/manager.rs index d1a6f4405dc..b5ee4a6652e 100644 --- a/chain/chain/src/resharding/manager.rs +++ b/chain/chain/src/resharding/manager.rs @@ -1,3 +1,4 @@ +use std::cell::RefCell; use std::io; use std::sync::Arc; @@ -6,11 +7,13 @@ use super::types::ReshardingSender; use crate::flat_storage_resharder::{FlatStorageResharder, FlatStorageResharderController}; use crate::types::RuntimeAdapter; use crate::ChainStoreUpdate; +use itertools::Itertools; use near_chain_configs::{MutableConfigValue, ReshardingConfig, ReshardingHandle}; use near_chain_primitives::Error; use near_epoch_manager::EpochManagerAdapter; use near_primitives::block::Block; use near_primitives::challenge::PartialState; +use near_primitives::congestion_info::CongestionInfo; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::{get_block_shard_uid, ShardLayout}; use near_primitives::types::chunk_extra::ChunkExtra; @@ -19,8 +22,9 @@ use near_store::adapter::{StoreAdapter, StoreUpdateAdapter}; use near_store::flat::BlockInfo; use near_store::trie::mem::mem_trie_update::TrackingMode; use near_store::trie::ops::resharding::RetainMode; +use near_store::trie::outgoing_metadata::ReceiptGroupsQueue; use near_store::trie::TrieRecorder; -use near_store::{DBCol, ShardTries, ShardUId, Store}; +use near_store::{DBCol, ShardTries, ShardUId, Store, TrieAccess}; pub struct ReshardingManager { store: Store, @@ -187,7 +191,7 @@ impl ReshardingManager { // blocks, the second finalization will crash. tries.freeze_mem_tries(parent_shard_uid, split_shard_event.children_shards())?; - let chunk_extra = self.get_chunk_extra(block_hash, &parent_shard_uid)?; + let parent_chunk_extra = self.get_chunk_extra(block_hash, &parent_shard_uid)?; let boundary_account = split_shard_event.boundary_account; let mut trie_store_update = self.store.store_update(); @@ -214,20 +218,48 @@ impl ReshardingManager { let mut mem_tries = mem_tries.write().unwrap(); let mut trie_recorder = TrieRecorder::new(); let mode = TrackingMode::RefcountsAndAccesses(&mut trie_recorder); - let mem_trie_update = mem_tries.update(*chunk_extra.state_root(), mode)?; + let mem_trie_update = mem_tries.update(*parent_chunk_extra.state_root(), mode)?; let trie_changes = mem_trie_update.retain_split_shard(&boundary_account, retain_mode); - let partial_storage = trie_recorder.recorded_storage(); + let mem_changes = trie_changes.mem_trie_changes.as_ref().unwrap(); + let new_state_root = mem_tries.apply_memtrie_changes(block_height, mem_changes); + drop(mem_tries); + + // Get the congestion info for the child. + let parent_epoch_id = block.header().epoch_id(); + let parent_shard_layout = self.epoch_manager.get_shard_layout(&parent_epoch_id)?; + let parent_state_root = *parent_chunk_extra.state_root(); + let parent_trie = tries.get_trie_for_shard(parent_shard_uid, parent_state_root); + let parent_congestion_info = + parent_chunk_extra.congestion_info().expect("The congestion info must exist!"); + + let trie_recorder = RefCell::new(trie_recorder); + let parent_trie = parent_trie.recording_reads_with_recorder(trie_recorder); + + let child_epoch_id = self.epoch_manager.get_next_epoch_id(block.hash())?; + let child_shard_layout = self.epoch_manager.get_shard_layout(&child_epoch_id)?; + let child_congestion_info = Self::get_child_congestion_info( + &parent_trie, + &parent_shard_layout, + parent_congestion_info, + &child_shard_layout, + new_shard_uid, + retain_mode, + )?; + + let trie_recorder = parent_trie.take_recorder().unwrap(); + let partial_storage = trie_recorder.borrow_mut().recorded_storage(); let partial_state_len = match &partial_storage.nodes { PartialState::TrieValues(values) => values.len(), }; - let mem_changes = trie_changes.mem_trie_changes.as_ref().unwrap(); - let new_state_root = mem_tries.apply_memtrie_changes(block_height, mem_changes); + // TODO(resharding): set all fields of `ChunkExtra`. Consider stronger // typing. Clarify where it should happen when `State` and // `FlatState` update is implemented. - let mut child_chunk_extra = ChunkExtra::clone(&chunk_extra); + let mut child_chunk_extra = ChunkExtra::clone(&parent_chunk_extra); *child_chunk_extra.state_root_mut() = new_state_root; + *child_chunk_extra.congestion_info_mut().expect("The congestion info must exist!") = + child_congestion_info; chain_store_update.save_chunk_extra(block_hash, &new_shard_uid, child_chunk_extra); chain_store_update.save_state_transition_data( @@ -260,6 +292,100 @@ impl ReshardingManager { Ok(()) } + pub fn get_child_congestion_info( + parent_trie: &dyn TrieAccess, + parent_shard_layout: &ShardLayout, + parent_congestion_info: CongestionInfo, + child_shard_layout: &ShardLayout, + child_shard_uid: ShardUId, + retain_mode: RetainMode, + ) -> Result { + // Get the congestion info based on the parent shard. + let mut child_congestion_info = Self::get_child_congestion_info_not_finalized( + parent_trie, + &parent_shard_layout, + parent_congestion_info, + retain_mode, + )?; + + // Set the allowed shard based on the child shard. + Self::finalize_allowed_shard( + &child_shard_layout, + child_shard_uid, + &mut child_congestion_info, + )?; + + Ok(child_congestion_info) + } + + // Get the congestion info for the child shard. The congestion info can be + // inferred efficiently from the combination of the parent shard's + // congestion info and the receipt group metadata, that is available in the + // parent shard's trie. + fn get_child_congestion_info_not_finalized( + parent_trie: &dyn TrieAccess, + parent_shard_layout: &ShardLayout, + parent_congestion_info: CongestionInfo, + retain_mode: RetainMode, + ) -> Result { + // The left child contains all the delayed and buffered receipts from the + // parent so it should have identical congestion info. + if retain_mode == RetainMode::Left { + return Ok(parent_congestion_info); + } + + // The right child contains all the delayed receipts from the parent but it + // has no buffered receipts. It's info needs to be computed by subtracting + // the parent's buffered receipts from the parent's congestion info. + let mut congestion_info = parent_congestion_info; + for shard_id in parent_shard_layout.shard_ids() { + let receipt_groups = ReceiptGroupsQueue::load(parent_trie, shard_id)?; + let Some(receipt_groups) = receipt_groups else { + continue; + }; + + let bytes = receipt_groups.total_size(); + let gas = receipt_groups.total_gas(); + + congestion_info + .remove_buffered_receipt_gas(gas) + .expect("Buffered gas must not exceed congestion info buffered gas"); + congestion_info + .remove_receipt_bytes(bytes) + .expect("Buffered size must not exceed congestion info buffered size"); + } + + // The right child does not inherit any buffered receipts. The + // congestion info must match this invariant. + assert_eq!(congestion_info.buffered_receipts_gas(), 0); + + Ok(congestion_info) + } + + fn finalize_allowed_shard( + child_shard_layout: &ShardLayout, + child_shard_uid: ShardUId, + congestion_info: &mut CongestionInfo, + ) -> Result<(), Error> { + let all_shards = child_shard_layout.shard_ids().collect_vec(); + let own_shard = child_shard_uid.shard_id(); + let own_shard_index = child_shard_layout + .get_shard_index(own_shard)? + .try_into() + .expect("ShardIndex must fit in u64"); + // Please note that the congestion seed used during resharding is + // different than the one used during normal operation. In runtime the + // seed is set to the sum of shard index and block height. The block + // height isn't easily available on all call sites which is why the + // simplified seed is used. This is valid because it's deterministic and + // resharding is a very rare event. However in a perfect world it should + // be the same. + // TODO - Use proper congestion control seed during resharding. + let congestion_seed = own_shard_index; + congestion_info.finalize_allowed_shard(own_shard, &all_shards, congestion_seed); + Ok(()) + } + // TODO(store): Use proper store interface fn get_chunk_extra( &self, diff --git a/chain/chain/src/runtime/mod.rs b/chain/chain/src/runtime/mod.rs index 5e331cbd0a0..83baa60461c 100644 --- a/chain/chain/src/runtime/mod.rs +++ b/chain/chain/src/runtime/mod.rs @@ -650,7 +650,7 @@ impl RuntimeAdapter for NightshadeRuntime { if ProtocolFeature::StatelessValidation.enabled(next_protocol_version) || cfg!(feature = "shadow_chunk_validation") { - trie = trie.recording_reads(); + trie = trie.recording_reads_new_recorder(); } let mut state_update = TrieUpdate::new(trie); @@ -799,7 +799,7 @@ impl RuntimeAdapter for NightshadeRuntime { } } } - debug!(target: "runtime", "Transaction filtering results {} valid out of {} pulled from the pool", result.transactions.len(), num_checked_transactions); + debug!(target: "runtime", limited_by=?result.limited_by, "Transaction filtering results {} valid out of {} pulled from the pool", result.transactions.len(), num_checked_transactions); let shard_label = shard_id.to_string(); metrics::PREPARE_TX_SIZE.with_label_values(&[&shard_label]).observe(total_size as f64); metrics::PREPARE_TX_REJECTED @@ -882,7 +882,7 @@ impl RuntimeAdapter for NightshadeRuntime { if ProtocolFeature::StatelessValidation.enabled(next_protocol_version) || cfg!(feature = "shadow_chunk_validation") { - trie = trie.recording_reads(); + trie = trie.recording_reads_new_recorder(); } match self.process_state_update( diff --git a/chain/chain/src/stateless_validation/chunk_validation.rs b/chain/chain/src/stateless_validation/chunk_validation.rs index cf89a9efe87..5998e165c29 100644 --- a/chain/chain/src/stateless_validation/chunk_validation.rs +++ b/chain/chain/src/stateless_validation/chunk_validation.rs @@ -4,6 +4,7 @@ use crate::chain::{ }; use crate::rayon_spawner::RayonAsyncComputationSpawner; use crate::resharding::event_type::ReshardingEventType; +use crate::resharding::manager::ReshardingManager; use crate::sharding::shuffle_receipt_proofs; use crate::stateless_validation::processing_tracker::ProcessingDoneTracker; use crate::store::filter_incoming_receipts_for_shard; @@ -96,6 +97,7 @@ pub fn validate_prepared_transactions( ) -> Result { let parent_block = chain.chain_store().get_block(chunk_header.prev_block_hash())?; let last_chunk_transactions_size = borsh::to_vec(last_chunk_transactions)?.len(); + tracing::info!("boom prepare_transactions from validator"); runtime_adapter.prepare_transactions( storage_config, crate::types::PrepareTransactionsChunkContext { @@ -690,7 +692,7 @@ pub fn validate_chunk_state_witness( .into_iter() .zip(state_witness.implicit_transitions.into_iter()) { - let (shard_uid, new_state_root) = match implicit_transition_params { + let (shard_uid, new_state_root, new_congestion_info) = match implicit_transition_params { ImplicitTransitionParams::ApplyOldChunk(block, shard_uid) => { let shard_context = ShardContext { shard_uid, should_apply_chunk: false }; let old_chunk_data = OldChunkData { @@ -710,7 +712,9 @@ pub fn validate_chunk_state_witness( shard_context, runtime_adapter, )?; - (shard_uid, apply_result.new_root) + let congestion_info = + chunk_extra.congestion_info().expect("The congestion info must exist!"); + (shard_uid, apply_result.new_root, congestion_info) } ImplicitTransitionParams::Resharding( boundary_account, @@ -718,17 +722,37 @@ pub fn validate_chunk_state_witness( child_shard_uid, ) => { let old_root = *chunk_extra.state_root(); - let trie = Trie::from_recorded_storage( - PartialStorage { nodes: transition.base_state }, - old_root, - true, - ); - let new_root = trie.retain_split_shard(&boundary_account, retain_mode)?; - (child_shard_uid, new_root) + let partial_storage = PartialStorage { nodes: transition.base_state }; + let parent_trie = Trie::from_recorded_storage(partial_storage, old_root, true); + + // Update the congestion info based on the parent shard. It's + // important to do this step before the `retain_split_shard` + // because only the parent trie has the needed information. + let epoch_id = epoch_manager.get_epoch_id(&block_hash)?; + let parent_shard_layout = epoch_manager.get_shard_layout(&epoch_id)?; + let parent_congestion_info = + chunk_extra.congestion_info().expect("The congestion info must exist"); + + let child_epoch_id = epoch_manager.get_next_epoch_id(&block_hash)?; + let child_shard_layout = epoch_manager.get_shard_layout(&child_epoch_id)?; + let child_congestion_info = ReshardingManager::get_child_congestion_info( + &parent_trie, + &parent_shard_layout, + parent_congestion_info, + &child_shard_layout, + child_shard_uid, + retain_mode, + )?; + + let new_root = parent_trie.retain_split_shard(&boundary_account, retain_mode)?; + + (child_shard_uid, new_root, child_congestion_info) } }; *chunk_extra.state_root_mut() = new_state_root; + *chunk_extra.congestion_info_mut().expect("The congestion info must exist!") = + new_congestion_info; if chunk_extra.state_root() != &transition.post_state_root { // This is an early check, it's not for correctness, only for better // error reporting in case of an invalid state witness due to a bug. diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 00dc12711ac..0ad96830816 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -1072,6 +1072,7 @@ impl Client { } else { 0 }; + tracing::info!("boom prepare_transactions from client"); runtime.prepare_transactions( storage_config, PrepareTransactionsChunkContext { @@ -2255,7 +2256,8 @@ impl Client { validators.remove(account_id); } for validator in validators { - trace!(target: "client", me = ?signer.as_ref().map(|bp| bp.validator_id()), ?tx, ?validator, ?shard_id, "Routing a transaction"); + let tx_hash = tx.get_hash(); + trace!(target: "client", me = ?signer.as_ref().map(|bp| bp.validator_id()), ?tx_hash, ?validator, ?shard_id, "Routing a transaction"); // Send message to network to actually forward transaction. self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( diff --git a/core/primitives-core/src/version.rs b/core/primitives-core/src/version.rs index d9b15819502..4fe631c9fc4 100644 --- a/core/primitives-core/src/version.rs +++ b/core/primitives-core/src/version.rs @@ -272,12 +272,18 @@ impl ProtocolFeature { // TODO(#11201): When stabilizing this feature in mainnet, also remove the temporary code // that always enables this for mocknet (see config_mocknet function). ProtocolFeature::ShuffleShardAssignments => 143, + // CurrentEpochStateSync must be enabled before ReshardingV3! When + // releasing this feature please make sure to schedule separate + // protocol upgrades for those features! ProtocolFeature::CurrentEpochStateSync => 144, - ProtocolFeature::SimpleNightshadeV4 => 145, + // BandwidthScheduler must be enabled before ReshardingV3! When + // releasing this feature please make sure to schedule separate + // protocol upgrades for those features! + ProtocolFeature::BandwidthScheduler => 145, + ProtocolFeature::SimpleNightshadeV4 => 146, #[cfg(feature = "protocol_feature_relaxed_chunk_validation")] - ProtocolFeature::RelaxedChunkValidation => 146, - ProtocolFeature::ExcludeExistingCodeFromWitnessForCodeLen => 147, - ProtocolFeature::BandwidthScheduler => 148, + ProtocolFeature::RelaxedChunkValidation => 147, + ProtocolFeature::ExcludeExistingCodeFromWitnessForCodeLen => 148, ProtocolFeature::BlockHeightForReceiptId => 149, // Place features that are not yet in Nightly below this line. } @@ -345,3 +351,18 @@ macro_rules! checked_feature { } }}; } + +#[cfg(test)] +mod tests { + use super::ProtocolFeature; + + #[test] + fn test_resharding_dependencies() { + let state_sync = ProtocolFeature::CurrentEpochStateSync.protocol_version(); + let bandwidth_scheduler = ProtocolFeature::BandwidthScheduler.protocol_version(); + let resharding_v3 = ProtocolFeature::SimpleNightshadeV4.protocol_version(); + + assert!(state_sync < resharding_v3); + assert!(bandwidth_scheduler < resharding_v3); + } +} diff --git a/core/primitives/src/congestion_info.rs b/core/primitives/src/congestion_info.rs index f7e7c886357..b8433de7331 100644 --- a/core/primitives/src/congestion_info.rs +++ b/core/primitives/src/congestion_info.rs @@ -300,11 +300,11 @@ impl CongestionInfo { Ok(()) } - pub fn remove_buffered_receipt_gas(&mut self, gas: Gas) -> Result<(), RuntimeError> { + pub fn remove_buffered_receipt_gas(&mut self, gas: u128) -> Result<(), RuntimeError> { match self { CongestionInfo::V1(inner) => { inner.buffered_receipts_gas = - inner.buffered_receipts_gas.checked_sub(gas as u128).ok_or_else(|| { + inner.buffered_receipts_gas.checked_sub(gas).ok_or_else(|| { RuntimeError::UnexpectedIntegerOverflow( "remove_buffered_receipt_gas".into(), ) @@ -730,7 +730,8 @@ mod tests { assert_eq!(config.max_tx_gas, control.process_tx_limit()); // remove halve the congestion - info.remove_buffered_receipt_gas(config.max_congestion_outgoing_gas / 2).unwrap(); + let gas_diff = config.max_congestion_outgoing_gas / 2; + info.remove_buffered_receipt_gas(gas_diff.into()).unwrap(); let control = CongestionControl::new(config, info, 0); assert_eq!(0.5, control.congestion_level()); assert_eq!( @@ -741,7 +742,8 @@ mod tests { assert!(control.shard_accepts_transactions().is_no()); // reduce congestion to 1/8 - info.remove_buffered_receipt_gas(3 * config.max_congestion_outgoing_gas / 8).unwrap(); + let gas_diff = 3 * config.max_congestion_outgoing_gas / 8; + info.remove_buffered_receipt_gas(gas_diff.into()).unwrap(); let control = CongestionControl::new(config, info, 0); assert_eq!(0.125, control.congestion_level()); assert_eq!( diff --git a/core/primitives/src/types.rs b/core/primitives/src/types.rs index c636452b91d..a143fbdea7c 100644 --- a/core/primitives/src/types.rs +++ b/core/primitives/src/types.rs @@ -957,6 +957,16 @@ pub mod chunk_extra { } } + #[inline] + pub fn congestion_info_mut(&mut self) -> Option<&mut CongestionInfo> { + match self { + Self::V1(_) => None, + Self::V2(_) => None, + Self::V3(v3) => Some(&mut v3.congestion_info), + Self::V4(v4) => Some(&mut v4.congestion_info), + } + } + #[inline] pub fn bandwidth_requests(&self) -> Option<&BandwidthRequests> { match self { diff --git a/core/store/src/trie/mod.rs b/core/store/src/trie/mod.rs index c75de1e5cf6..67c6ce9f29b 100644 --- a/core/store/src/trie/mod.rs +++ b/core/store/src/trie/mod.rs @@ -722,18 +722,28 @@ impl Trie { /// Makes a new trie that has everything the same except that access /// through that trie accumulates a state proof for all nodes accessed. - pub fn recording_reads(&self) -> Self { + pub fn recording_reads_new_recorder(&self) -> Self { + self.recording_reads_with_recorder(RefCell::new(TrieRecorder::new())) + } + + /// Makes a new trie that has everything the same except that access + /// through that trie accumulates a state proof for all nodes accessed. + pub fn recording_reads_with_recorder(&self, recorder: RefCell) -> Self { let mut trie = Self::new_with_memtries( self.storage.clone(), self.memtries.clone(), self.root, self.flat_storage_chunk_view.clone(), ); - trie.recorder = Some(RefCell::new(TrieRecorder::new())); + trie.recorder = Some(recorder); trie.charge_gas_for_trie_node_access = self.charge_gas_for_trie_node_access; trie } + pub fn take_recorder(self) -> Option> { + self.recorder + } + /// Takes the recorded state proof out of the trie. pub fn recorded_storage(&self) -> Option { self.recorder.as_ref().map(|recorder| recorder.borrow_mut().recorded_storage()) @@ -2229,7 +2239,8 @@ mod tests { ]; let root = test_populate_trie(&tries, &empty_root, ShardUId::single_shard(), changes); - let trie2 = tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(); + let trie2 = + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(); trie2.get(b"dog").unwrap(); trie2.get(b"horse").unwrap(); let partial_storage = trie2.recorded_storage(); @@ -2258,14 +2269,18 @@ mod tests { let root = test_populate_trie(&tries, &empty_root, ShardUId::single_shard(), changes); // Trie: extension -> branch -> 2 leaves { - let trie2 = tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(); + let trie2 = tries + .get_trie_for_shard(ShardUId::single_shard(), root) + .recording_reads_new_recorder(); trie2.get(b"doge").unwrap(); // record extension, branch and one leaf with value, but not the other assert_eq!(trie2.recorded_storage().unwrap().nodes.len(), 4); } { - let trie2 = tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(); + let trie2 = tries + .get_trie_for_shard(ShardUId::single_shard(), root) + .recording_reads_new_recorder(); let updates = vec![(b"doge".to_vec(), None)]; trie2.update(updates).unwrap(); // record extension, branch and both leaves, but not the value. @@ -2273,7 +2288,9 @@ mod tests { } { - let trie2 = tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(); + let trie2 = tries + .get_trie_for_shard(ShardUId::single_shard(), root) + .recording_reads_new_recorder(); let updates = vec![(b"dodo".to_vec(), Some(b"asdf".to_vec()))]; trie2.update(updates).unwrap(); // record extension and branch, but not leaves diff --git a/core/store/src/trie/ops/resharding.rs b/core/store/src/trie/ops/resharding.rs index e7a78ddd4ae..05c0ed42665 100644 --- a/core/store/src/trie/ops/resharding.rs +++ b/core/store/src/trie/ops/resharding.rs @@ -14,7 +14,7 @@ use super::interface::{ }; use super::squash::GenericTrieUpdateSquash; -#[derive(Debug)] +#[derive(Debug, Clone, Copy, Eq, PartialEq)] /// Whether to retain left or right part of trie after shard split. pub enum RetainMode { Left, diff --git a/core/store/src/trie/state_parts.rs b/core/store/src/trie/state_parts.rs index 1e578947f03..5044a432ff1 100644 --- a/core/store/src/trie/state_parts.rs +++ b/core/store/src/trie/state_parts.rs @@ -81,7 +81,7 @@ impl Trie { &self, part_id: PartId, ) -> Result { - let with_recording = self.recording_reads(); + let with_recording = self.recording_reads_new_recorder(); with_recording.visit_nodes_for_state_part(part_id)?; let recorded = with_recording.recorded_storage().unwrap(); Ok(recorded.nodes) @@ -147,7 +147,7 @@ impl Trie { let PartId { idx, total } = part_id; // 1. Extract nodes corresponding to state part boundaries. - let recording_trie = self.recording_reads(); + let recording_trie = self.recording_reads_new_recorder(); let boundaries_read_timer = metrics::GET_STATE_PART_BOUNDARIES_ELAPSED .with_label_values(&[&shard_id.to_string()]) .start_timer(); @@ -826,7 +826,7 @@ mod tests { for part_id in 0..num_parts { // Compute proof with size and check that it doesn't exceed theoretical boundary for // the path with full set of left siblings of maximal possible size. - let trie_recording = trie.recording_reads(); + let trie_recording = trie.recording_reads_new_recorder(); let left_nibbles_boundary = trie_recording.find_state_part_boundary(part_id, num_parts).unwrap(); let left_key_boundary = NibbleSlice::nibbles_to_bytes(&left_nibbles_boundary); diff --git a/core/store/src/trie/trie_recording.rs b/core/store/src/trie/trie_recording.rs index 99559fabb24..9b59d657658 100644 --- a/core/store/src/trie/trie_recording.rs +++ b/core/store/src/trie/trie_recording.rs @@ -552,7 +552,7 @@ mod trie_recording_tests { // Now let's do this again while recording, and make sure that the counters // we get are exactly the same. let trie = get_trie_for_shard(&tries, shard_uid, state_root, use_flat_storage) - .recording_reads(); + .recording_reads_new_recorder(); trie.accounting_cache.borrow().enable_switch().set(enable_accounting_cache); for key in &keys_to_get { assert_eq!(trie.get(key).unwrap(), data_in_trie.get(key).cloned()); @@ -577,7 +577,7 @@ mod trie_recording_tests { // in-memory tries. destructively_delete_in_memory_state_from_disk(&store.trie_store(), &data_in_trie); let trie = get_trie_for_shard(&tries, shard_uid, state_root, use_flat_storage) - .recording_reads(); + .recording_reads_new_recorder(); trie.accounting_cache.borrow().enable_switch().set(enable_accounting_cache); for key in &keys_to_get { assert_eq!(trie.get(key).unwrap(), data_in_trie.get(key).cloned()); @@ -621,7 +621,7 @@ mod trie_recording_tests { // Build a Trie using recorded storage and enable recording_reads on this Trie let trie = Trie::from_recorded_storage(partial_storage, state_root, use_flat_storage) - .recording_reads(); + .recording_reads_new_recorder(); trie.accounting_cache.borrow().enable_switch().set(enable_accounting_cache); for key in &keys_to_get { assert_eq!(trie.get(key).unwrap(), data_in_trie.get(key).cloned()); diff --git a/core/store/src/trie/trie_tests.rs b/core/store/src/trie/trie_tests.rs index da916ee4712..55a40483c8b 100644 --- a/core/store/src/trie/trie_tests.rs +++ b/core/store/src/trie/trie_tests.rs @@ -64,7 +64,7 @@ where F: FnMut(Trie) -> Result<(Trie, Out), StorageError>, Out: PartialEq + Debug, { - let recording_trie = trie.recording_reads(); + let recording_trie = trie.recording_reads_new_recorder(); let (recording_trie, output) = test(recording_trie).expect("should not fail"); (recording_trie.recorded_storage().unwrap(), recording_trie, output) } @@ -434,7 +434,7 @@ mod trie_storage_tests { let state_root = test_populate_trie(&tries, &Trie::EMPTY_ROOT, shard_uid, base_changes.clone()); - let trie = tries.get_trie_for_shard(shard_uid, state_root).recording_reads(); + let trie = tries.get_trie_for_shard(shard_uid, state_root).recording_reads_new_recorder(); let changes = trie.update(updates.clone()).unwrap(); tracing::info!("Changes: {:?}", changes); @@ -445,7 +445,7 @@ mod trie_storage_tests { let shard_uid = ShardUId::single_shard(); let state_root = test_populate_trie(&tries, &Trie::EMPTY_ROOT, shard_uid, base_changes); - let trie = tries.get_trie_for_shard(shard_uid, state_root).recording_reads(); + let trie = tries.get_trie_for_shard(shard_uid, state_root).recording_reads_new_recorder(); let changes = trie.update(updates).unwrap(); tracing::info!("Changes: {:?}", changes); @@ -503,7 +503,8 @@ mod trie_storage_tests { vec![(vec![7], vec![1]), (vec![7, 0], vec![2]), (vec![7, 1], vec![3])]; let disk_iter_recorded = { - let trie = tries.get_trie_for_shard(shard_uid, state_root).recording_reads(); + let trie = + tries.get_trie_for_shard(shard_uid, state_root).recording_reads_new_recorder(); let mut disk_iter = trie.disk_iter().unwrap(); disk_iter.seek_prefix(&iter_prefix).unwrap(); let disk_iter_results = disk_iter.collect::, _>>().unwrap(); @@ -512,7 +513,8 @@ mod trie_storage_tests { }; let memtrie_iter_recorded = { - let trie = tries.get_trie_for_shard(shard_uid, state_root).recording_reads(); + let trie = + tries.get_trie_for_shard(shard_uid, state_root).recording_reads_new_recorder(); let lock = trie.lock_for_iter(); let mut memtrie_iter = lock.iter().unwrap(); match memtrie_iter { @@ -531,7 +533,7 @@ mod trie_storage_tests { let partial_recorded = { let trie = Trie::from_recorded_storage(memtrie_iter_recorded, state_root, true) - .recording_reads(); + .recording_reads_new_recorder(); let mut disk_iter = trie.disk_iter().unwrap(); disk_iter.seek_prefix(&iter_prefix).unwrap(); let disk_iter_results = disk_iter.collect::, _>>().unwrap(); diff --git a/integration-tests/src/test_loop/tests/resharding_v3.rs b/integration-tests/src/test_loop/tests/resharding_v3.rs index 21d6015128c..95dacbf9e47 100644 --- a/integration-tests/src/test_loop/tests/resharding_v3.rs +++ b/integration-tests/src/test_loop/tests/resharding_v3.rs @@ -262,7 +262,7 @@ impl TestReshardingParametersBuilder { limit_outgoing_gas: self.limit_outgoing_gas.unwrap_or(false), delay_flat_state_resharding: self.delay_flat_state_resharding.unwrap_or(0), short_yield_timeout: self.short_yield_timeout.unwrap_or(false), - allow_negative_refcount: self.allow_negative_refcount.unwrap_or(false), + allow_negative_refcount: self.allow_negative_refcount.unwrap_or(true), disable_temporary_account_test, temporary_account_id, } @@ -494,6 +494,14 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { assert!(block_header.chunk_mask().iter().all(|chunk_bit| *chunk_bit)); } + let shard_layout = client.epoch_manager.get_shard_layout(&tip.epoch_id).unwrap(); + println!( + "new block #{} shards: {:?} chunk mask {:?}", + tip.height, + shard_layout.shard_ids().collect_vec(), + block_header.chunk_mask().to_vec() + ); + trie_sanity_check.assert_state_sanity(&clients, expected_num_shards); let epoch_height = diff --git a/runtime/runtime/src/congestion_control.rs b/runtime/runtime/src/congestion_control.rs index 5427f83cee1..29cd84ef677 100644 --- a/runtime/runtime/src/congestion_control.rs +++ b/runtime/runtime/src/congestion_control.rs @@ -258,17 +258,27 @@ impl ReceiptSinkV2 { parent_shard_ids.intersection(&shard_ids.clone().into_iter().collect()).count() == 0 ); + let mut all_buffers_empty = true; + // First forward any receipts that may still be in the outgoing buffers // of the parent shards. for &shard_id in &parent_shard_ids { self.forward_from_buffer_to_shard(shard_id, state_update, apply_state, &shard_layout)?; + let is_buffer_empty = self.outgoing_buffers.to_shard(shard_id).len() == 0; + all_buffers_empty &= is_buffer_empty; } // Then forward receipts from the outgoing buffers of the shard in the // current shard layout. for &shard_id in &shard_ids { self.forward_from_buffer_to_shard(shard_id, state_update, apply_state, &shard_layout)?; + let is_buffer_empty = self.outgoing_buffers.to_shard(shard_id).len() == 0; + all_buffers_empty &= is_buffer_empty; } + + // Assert that empty buffers match zero buffered gas. + assert_eq!(all_buffers_empty, self.own_congestion_info.buffered_receipts_gas() == 0); + Ok(()) } @@ -311,7 +321,7 @@ impl ReceiptSinkV2 { )? { ReceiptForwarding::Forwarded => { self.own_congestion_info.remove_receipt_bytes(size)?; - self.own_congestion_info.remove_buffered_receipt_gas(gas)?; + self.own_congestion_info.remove_buffered_receipt_gas(gas.into())?; if should_update_outgoing_metadatas { // Can't update metadatas immediately because state_update is borrowed by iterator. outgoing_metadatas_updates.push((ByteSize::b(size), gas)); diff --git a/runtime/runtime/src/tests/apply.rs b/runtime/runtime/src/tests/apply.rs index f48a8fa7db6..80cc41d1984 100644 --- a/runtime/runtime/src/tests/apply.rs +++ b/runtime/runtime/src/tests/apply.rs @@ -1153,7 +1153,7 @@ fn test_main_storage_proof_size_soft_limit() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[ @@ -1195,7 +1195,7 @@ fn test_main_storage_proof_size_soft_limit() { // The function call to bob_account should hit the main_storage_proof_size_soft_limit let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[ @@ -1258,7 +1258,7 @@ fn test_exclude_contract_code_from_witness() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[ @@ -1300,7 +1300,7 @@ fn test_exclude_contract_code_from_witness() { // The function call to bob_account should hit the main_storage_proof_size_soft_limit let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[ @@ -1372,7 +1372,7 @@ fn test_exclude_contract_code_from_witness_with_failed_call() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[ @@ -1413,7 +1413,7 @@ fn test_exclude_contract_code_from_witness_with_failed_call() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[ @@ -1506,7 +1506,7 @@ fn test_deploy_and_call_different_contracts() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[first_deploy_receipt, second_deploy_receipt], @@ -1533,7 +1533,7 @@ fn test_deploy_and_call_different_contracts() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[first_call_receipt, second_call_receipt], @@ -1612,7 +1612,7 @@ fn test_deploy_and_call_different_contracts_with_failed_call() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[first_deploy_receipt, second_deploy_receipt], @@ -1639,7 +1639,7 @@ fn test_deploy_and_call_different_contracts_with_failed_call() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[first_call_receipt, second_call_receipt], @@ -1716,7 +1716,7 @@ fn test_deploy_and_call_in_apply() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[first_deploy_receipt, second_deploy_receipt, first_call_receipt, second_call_receipt], @@ -1795,7 +1795,7 @@ fn test_deploy_and_call_in_apply_with_failed_call() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[first_deploy_receipt, second_deploy_receipt, first_call_receipt, second_call_receipt], @@ -1850,7 +1850,7 @@ fn test_deploy_existing_contract_to_different_account() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[first_deploy_receipt, first_call_receipt], @@ -1892,7 +1892,7 @@ fn test_deploy_existing_contract_to_different_account() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[second_deploy_receipt, second_call_receipt], @@ -1941,7 +1941,7 @@ fn test_deploy_and_call_in_same_receipt() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[receipt], @@ -1990,7 +1990,7 @@ fn test_deploy_and_call_in_same_receipt_with_failed_call() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[receipt], @@ -2026,7 +2026,7 @@ fn test_call_account_without_contract() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[receipt], @@ -2070,7 +2070,7 @@ fn test_contract_accesses_when_validating_chunk() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[deploy_receipt], @@ -2096,7 +2096,7 @@ fn test_contract_accesses_when_validating_chunk() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[call_receipt.clone()], @@ -2117,7 +2117,7 @@ fn test_contract_accesses_when_validating_chunk() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[call_receipt], @@ -2164,7 +2164,7 @@ fn test_exclude_existing_contract_code_for_deploy_action() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[deploy_receipt1], @@ -2188,7 +2188,7 @@ fn test_exclude_existing_contract_code_for_deploy_action() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[deploy_receipt2], @@ -2265,7 +2265,7 @@ fn test_exclude_existing_contract_code_for_delete_account_action() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[create_account_receipt, deploy_receipt], @@ -2289,7 +2289,7 @@ fn test_exclude_existing_contract_code_for_delete_account_action() { let apply_result = runtime .apply( - tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads(), + tries.get_trie_for_shard(ShardUId::single_shard(), root).recording_reads_new_recorder(), &None, &apply_state, &[delete_account_receipt],