From 77b4ab25a17248481d34255f535069b4cc47c007 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= <18039094+staffik@users.noreply.github.com> Date: Fri, 3 Jan 2025 22:57:34 +0100 Subject: [PATCH 1/5] test(resharding): base resharding scenario state cleanup (#12678) A test for cleanup after resharding. Adds an extra client without a role, that initially tracks the parent shard, then a child shard, and then an unrelated shard. After a few epochs, only the last tracked shard should reside in State, while currently the test fails because it has entries from the first tracked shard too. **Changes** - Fix a bug in test infra: pass `gc_num_epochs_to_keep` to the runtime. - Add `tracked_shard_schedule` option to resharding testloop, in order to test the cleanup. - Add the cleanup test itself (ignored for now). --- chain/chain/src/runtime/test_utils.rs | 3 +- integration-tests/src/test_loop/builder.rs | 2 + .../src/test_loop/tests/resharding_v3.rs | 122 ++++++++++++++++-- .../src/test_loop/utils/resharding.rs | 116 ++++++++++++++++- nearcore/src/test_utils.rs | 3 +- 5 files changed, 230 insertions(+), 16 deletions(-) diff --git a/chain/chain/src/runtime/test_utils.rs b/chain/chain/src/runtime/test_utils.rs index 2f36e46a40b..2810c5d3e2c 100644 --- a/chain/chain/src/runtime/test_utils.rs +++ b/chain/chain/src/runtime/test_utils.rs @@ -48,6 +48,7 @@ impl NightshadeRuntime { runtime_config_store: Option, trie_config: TrieConfig, state_snapshot_type: StateSnapshotType, + gc_num_epochs_to_keep: u64, ) -> Arc { Self::new( store, @@ -57,7 +58,7 @@ impl NightshadeRuntime { None, None, runtime_config_store, - DEFAULT_GC_NUM_EPOCHS_TO_KEEP, + gc_num_epochs_to_keep, trie_config, StateSnapshotConfig { state_snapshot_type, diff --git a/integration-tests/src/test_loop/builder.rs b/integration-tests/src/test_loop/builder.rs index 436eb230501..377d2fee833 100644 --- a/integration-tests/src/test_loop/builder.rs +++ b/integration-tests/src/test_loop/builder.rs @@ -593,6 +593,7 @@ impl TestLoopBuilder { self.runtime_config_store.clone(), TrieConfig::from_store_config(&store_config), StateSnapshotType::EveryEpoch, + client_config.gc.gc_num_epochs_to_keep, ); let state_snapshot = StateSnapshotActor::new( @@ -671,6 +672,7 @@ impl TestLoopBuilder { self.runtime_config_store.clone(), TrieConfig::from_store_config(&store_config), StateSnapshotType::EveryEpoch, + client_config.gc.gc_num_epochs_to_keep, ); (view_epoch_manager, view_shard_tracker, view_runtime_adapter) } else { diff --git a/integration-tests/src/test_loop/tests/resharding_v3.rs b/integration-tests/src/test_loop/tests/resharding_v3.rs index f6064417095..0dc4441d884 100644 --- a/integration-tests/src/test_loop/tests/resharding_v3.rs +++ b/integration-tests/src/test_loop/tests/resharding_v3.rs @@ -21,8 +21,8 @@ use crate::test_loop::utils::receipts::{ #[cfg(feature = "test_features")] use crate::test_loop::utils::resharding::fork_before_resharding_block; use crate::test_loop::utils::resharding::{ - call_burn_gas_contract, call_promise_yield, execute_money_transfers, - temporary_account_during_resharding, + call_burn_gas_contract, call_promise_yield, check_state_cleanup_after_resharding, + execute_money_transfers, temporary_account_during_resharding, TrackedShardSchedule, }; use crate::test_loop::utils::sharding::print_and_assert_shard_accounts; use crate::test_loop::utils::transactions::{ @@ -45,6 +45,18 @@ const DEFAULT_EPOCH_LENGTH: u64 = 6; /// and later we would hit the `DBNotFoundErr("Transaction ...)` error in tests. const INCREASED_EPOCH_LENGTH: u64 = 8; +/// Garbage collection window length. +const GC_NUM_EPOCHS_TO_KEEP: u64 = 3; + +/// Maximum number of epochs under which the test should finish. +const TESTLOOP_NUM_EPOCHS_TO_WAIT: u64 = 8; + +/// Default shard layout version used in resharding tests. +const DEFAULT_SHARD_LAYOUT_VERSION: u64 = 2; + +/// Account used in resharding tests as a split boundary. +const NEW_BOUNDARY_ACCOUNT: &str = "account6"; + #[derive(derive_builder::Builder)] #[builder(pattern = "owned", build_fn(skip))] #[allow(unused)] @@ -84,6 +96,12 @@ struct TestReshardingParameters { chunk_ranges_to_drop: HashMap>, shuffle_shard_assignment_for_chunk_producers: bool, track_all_shards: bool, + // Manually specify what shards will be tracked for a given client ID. + // The client ID must not be used for any other role (validator, RPC, etc.). + // The schedule length must be more than `TESTLOOP_NUM_EPOCHS_TO_WAIT` so that it covers all epoch heights used in the test. + // The suffix must consist of `GC_NUM_EPOCHS_TO_KEEP` repetitions of the same shard, + // so that we can assert at the end of the test that the state of all other shards have been cleaned up. + tracked_shard_schedule: Option, load_mem_tries_for_tracked_shards: bool, /// Custom behavior executed at every iteration of test loop. #[builder(setter(custom))] @@ -115,7 +133,10 @@ struct TestReshardingParameters { impl TestReshardingParametersBuilder { fn build(self) -> TestReshardingParameters { + // Give enough time for GC to kick in after resharding. + assert!(GC_NUM_EPOCHS_TO_KEEP + 2 < TESTLOOP_NUM_EPOCHS_TO_WAIT); let epoch_length = self.epoch_length.unwrap_or(DEFAULT_EPOCH_LENGTH); + let tracked_shard_schedule = self.tracked_shard_schedule.unwrap_or(None); let num_accounts = self.num_accounts.unwrap_or(8); let num_clients = self.num_clients.unwrap_or(7); @@ -123,8 +144,12 @@ impl TestReshardingParametersBuilder { let num_validators = self.num_validators.unwrap_or(2); let num_rpcs = self.num_rpcs.unwrap_or(1); let num_archivals = self.num_archivals.unwrap_or(1); + let num_extra_nodes = if tracked_shard_schedule.is_some() { 1 } else { 0 }; - assert!(num_clients >= num_producers + num_validators + num_rpcs + num_archivals); + assert!( + num_clients + >= num_producers + num_validators + num_rpcs + num_archivals + num_extra_nodes + ); // #12195 prevents number of BPs bigger than `epoch_length`. assert!(num_producers > 0 && num_producers <= epoch_length); @@ -157,9 +182,23 @@ impl TestReshardingParametersBuilder { let validators = validators.to_vec(); let (rpcs, tmp) = tmp.split_at(num_rpcs as usize); let rpcs = rpcs.to_vec(); - let (archivals, _) = tmp.split_at(num_archivals as usize); + let (archivals, clients_without_role) = tmp.split_at(num_archivals as usize); let archivals = archivals.to_vec(); + if let Some(tracked_shard_schedule) = &tracked_shard_schedule { + assert!(clients_without_role.contains(&clients[tracked_shard_schedule.client_index])); + let schedule_length = tracked_shard_schedule.schedule.len(); + assert!(schedule_length > TESTLOOP_NUM_EPOCHS_TO_WAIT as usize); + for i in + (TESTLOOP_NUM_EPOCHS_TO_WAIT - GC_NUM_EPOCHS_TO_KEEP - 1) as usize..schedule_length + { + assert_eq!( + tracked_shard_schedule.schedule[i - 1], + tracked_shard_schedule.schedule[i] + ); + } + } + let client_index = if rpcs.is_empty() { 0 } else { num_producers + num_validators } as usize; let client_id = clients[client_index].clone(); @@ -167,10 +206,12 @@ impl TestReshardingParametersBuilder { println!("Clients setup:"); println!("Producers: {producers:?}"); println!("Validators: {validators:?}"); - println!("Rpcs: {rpcs:?}, to serve requests we use client: {client_id}"); + println!("Rpcs: {rpcs:?}"); println!("Archivals: {archivals:?}"); + println!("To serve requests, we use client: {client_id}"); + println!("Num extra nodes: {num_extra_nodes}"); - let new_boundary_account: AccountId = "account6".parse().unwrap(); + let new_boundary_account: AccountId = NEW_BOUNDARY_ACCOUNT.parse().unwrap(); let temporary_account_id: AccountId = format!("{}.{}", new_boundary_account, new_boundary_account).parse().unwrap(); let mut loop_actions = self.loop_actions.unwrap_or_default(); @@ -186,7 +227,9 @@ impl TestReshardingParametersBuilder { } TestReshardingParameters { - base_shard_layout_version: self.base_shard_layout_version.unwrap_or(2), + base_shard_layout_version: self + .base_shard_layout_version + .unwrap_or(DEFAULT_SHARD_LAYOUT_VERSION), num_accounts, num_clients, num_producers, @@ -208,6 +251,7 @@ impl TestReshardingParametersBuilder { .shuffle_shard_assignment_for_chunk_producers .unwrap_or(false), track_all_shards: self.track_all_shards.unwrap_or(false), + tracked_shard_schedule, load_mem_tries_for_tracked_shards: self .load_mem_tries_for_tracked_shards .unwrap_or(true), @@ -266,12 +310,20 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { init_test_logger(); let mut builder = TestLoopBuilder::new(); + let tracked_shard_schedule = params.tracked_shard_schedule.clone(); - // Adjust the resharding configuration to make the tests faster. - builder = builder.config_modifier(|config, _| { + builder = builder.config_modifier(move |config, client_index| { + // Adjust the resharding configuration to make the tests faster. let mut resharding_config = config.resharding_config.get(); resharding_config.batch_delay = Duration::milliseconds(1); config.resharding_config.update(resharding_config); + // Set the tracked shard schedule if specified for the client at the given index. + if let Some(tracked_shard_schedule) = &tracked_shard_schedule { + if client_index == tracked_shard_schedule.client_index { + config.tracked_shards = vec![]; + config.tracked_shard_schedule = tracked_shard_schedule.schedule.clone(); + } + } }); // Prepare shard split configuration. @@ -356,6 +408,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { base_protocol_version + 1, params.chunk_ranges_to_drop.clone(), ) + .gc_num_epochs_to_keep(GC_NUM_EPOCHS_TO_KEEP) .build(); let mut test_setup_transactions = vec![]; @@ -403,7 +456,6 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { client_handles.iter().map(|handle| &env.test_loop.data.get(handle).client).collect_vec(); let mut trie_sanity_check = TrieSanityCheck::new(&clients, params.load_mem_tries_for_tracked_shards); - let gc_num_epochs_to_keep = clients[client_index].config.gc.gc_num_epochs_to_keep; let latest_block_height = Cell::new(0u64); // Height of a block after resharding. @@ -456,6 +508,8 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { // Just passed an epoch with increased number of shards. new_layout_block_height.set(Some(latest_block_height.get())); new_layout_epoch_height.set(Some(epoch_height)); + // Assert that we will have a chance for gc to kick in before the test is over. + assert!(epoch_height + GC_NUM_EPOCHS_TO_KEEP < TESTLOOP_NUM_EPOCHS_TO_WAIT); println!("State after resharding:"); print_and_assert_shard_accounts(&clients, &tip); } @@ -467,7 +521,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { ); // Return false if garbage collection window has not passed yet since resharding. - if epoch_height <= new_layout_epoch_height.get().unwrap() + gc_num_epochs_to_keep { + if epoch_height <= new_layout_epoch_height.get().unwrap() + GC_NUM_EPOCHS_TO_KEEP { return false; } for loop_action in ¶ms.loop_actions { @@ -478,8 +532,8 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { env.test_loop.run_until( success_condition, - // Give enough time to produce ~7 epochs. - Duration::seconds((7 * params.epoch_length) as i64), + // Give enough time to produce ~TESTLOOP_NUM_EPOCHS_TO_WAIT epochs. + Duration::seconds((TESTLOOP_NUM_EPOCHS_TO_WAIT * params.epoch_length) as i64), ); let client = &env.test_loop.data.get(&client_handles[client_index]).client; trie_sanity_check.check_epochs(client); @@ -492,6 +546,48 @@ fn test_resharding_v3() { test_resharding_v3_base(TestReshardingParametersBuilder::default().build()); } +// Takes a sequence of shard ids to track in consecutive epochs, +// repeats the last element `TESTLOOP_NUM_EPOCHS_TO_WAIT` times, +// and maps each element: |id| -> vec![id], to the format required by `TrackedShardSchedule`. +fn shard_sequence_to_schedule(mut shard_sequence: Vec) -> Vec> { + shard_sequence.extend( + std::iter::repeat(*shard_sequence.last().unwrap()) + .take(TESTLOOP_NUM_EPOCHS_TO_WAIT as usize), + ); + shard_sequence.iter().map(|shard_id| vec![*shard_id]).collect() +} + +#[test] +// TODO(resharding): fix nearcore and un-ignore this test +#[ignore] +fn test_resharding_v3_state_cleanup() { + // Track parent shard before resharding, child shard after resharding, and then an unrelated shard forever. + // Eventually, the State column should only contain entries belonging to the last tracked shard. + let account_in_stable_shard: AccountId = "account0".parse().unwrap(); + let split_boundary_account: AccountId = NEW_BOUNDARY_ACCOUNT.parse().unwrap(); + let base_shard_layout = get_base_shard_layout(DEFAULT_SHARD_LAYOUT_VERSION); + let new_shard_layout = + ShardLayout::derive_shard_layout(&base_shard_layout, split_boundary_account.clone()); + let parent_shard_id = base_shard_layout.account_id_to_shard_id(&split_boundary_account); + let child_shard_id = new_shard_layout.account_id_to_shard_id(&split_boundary_account); + let unrelated_shard_id = new_shard_layout.account_id_to_shard_id(&account_in_stable_shard); + + let tracked_shard_sequence = + vec![parent_shard_id, parent_shard_id, child_shard_id, unrelated_shard_id]; + let num_clients = 8; + let tracked_shard_schedule = TrackedShardSchedule { + client_index: (num_clients - 1) as usize, + schedule: shard_sequence_to_schedule(tracked_shard_sequence), + }; + test_resharding_v3_base( + TestReshardingParametersBuilder::default() + .num_clients(num_clients) + .tracked_shard_schedule(Some(tracked_shard_schedule.clone())) + .add_loop_action(check_state_cleanup_after_resharding(tracked_shard_schedule)) + .build(), + ); +} + #[test] fn test_resharding_v3_track_all_shards() { test_resharding_v3_base( diff --git a/integration-tests/src/test_loop/utils/resharding.rs b/integration-tests/src/test_loop/utils/resharding.rs index 005fc3f3fad..6f05c2607ba 100644 --- a/integration-tests/src/test_loop/utils/resharding.rs +++ b/integration-tests/src/test_loop/utils/resharding.rs @@ -1,16 +1,25 @@ use std::cell::Cell; +use std::collections::HashSet; +use std::num::NonZero; use assert_matches::assert_matches; +use borsh::BorshDeserialize; use itertools::Itertools; use near_async::test_loop::data::TestLoopData; +use near_chain::ChainStoreAccess; +use near_client::Client; use near_client::{Query, QueryError::GarbageCollectedBlock}; use near_crypto::Signer; +use near_primitives::hash::CryptoHash; use near_primitives::test_utils::create_user_test_signer; use near_primitives::transaction::SignedTransaction; -use near_primitives::types::{AccountId, BlockId, BlockReference, Gas}; +use near_primitives::types::{AccountId, BlockId, BlockReference, Gas, ShardId}; use near_primitives::views::{ FinalExecutionStatus, QueryRequest, QueryResponse, QueryResponseKind, }; +use near_store::adapter::StoreAdapter; +use near_store::db::refcount::decode_value_with_rc; +use near_store::{DBCol, ShardUId}; use rand::seq::SliceRandom; use rand::{Rng, SeedableRng}; use rand_chacha::ChaCha20Rng; @@ -24,6 +33,14 @@ use crate::test_loop::utils::transactions::{ }; use crate::test_loop::utils::{get_node_data, retrieve_client_actor, ONE_NEAR, TGAS}; +/// A config to tell what shards will be tracked by the client at the given index. +/// For more details, see `TrackedConfig::Schedule`. +#[derive(Clone, Debug)] +pub(crate) struct TrackedShardSchedule { + pub client_index: usize, + pub schedule: Vec>, +} + // Returns a callable function that, when invoked inside a test loop iteration, can force the creation of a chain fork. #[cfg(feature = "test_features")] pub(crate) fn fork_before_resharding_block(double_signing: bool) -> LoopAction { @@ -481,3 +498,100 @@ pub(crate) fn temporary_account_during_resharding( ); LoopAction::new(action_fn, succeeded) } + +/// Removes from State column all entries where key does not start with `the_only_shard_uid` ShardUId prefix. +fn retain_the_only_shard_state(client: &Client, the_only_shard_uid: ShardUId) { + let store = client.chain.chain_store.store().trie_store(); + let mut store_update = store.store_update(); + for kv in store.store().iter_raw_bytes(DBCol::State) { + let (key, value) = kv.unwrap(); + let shard_uid = ShardUId::try_from_slice(&key[0..8]).unwrap(); + if shard_uid == the_only_shard_uid { + continue; + } + let (_, rc) = decode_value_with_rc(&value); + assert!(rc > 0); + let node_hash = CryptoHash::try_from_slice(&key[8..]).unwrap(); + store_update.decrement_refcount_by(shard_uid, &node_hash, NonZero::new(rc as u32).unwrap()); + } + store_update.commit().unwrap(); +} + +/// Asserts that all other shards State except `the_only_shard_uid` have been cleaned-up. +fn check_has_the_only_shard_state(client: &Client, the_only_shard_uid: ShardUId) { + let store = client.chain.chain_store.store().trie_store(); + let mut shard_uid_prefixes = HashSet::new(); + for kv in store.store().iter_raw_bytes(DBCol::State) { + let (key, _) = kv.unwrap(); + let shard_uid = ShardUId::try_from_slice(&key[0..8]).unwrap(); + shard_uid_prefixes.insert(shard_uid); + } + let shard_uid_prefixes = shard_uid_prefixes.into_iter().collect_vec(); + assert_eq!(shard_uid_prefixes, [the_only_shard_uid]); +} + +// Loop action testing state cleanup after resharding. +// It assumes single shard tracking and it waits for gc after resharding. +// Then it checks whether the last shard tracked by the client +// is the only ShardUId prefix for nodes in the State column. +pub(crate) fn check_state_cleanup_after_resharding( + tracked_shard_schedule: TrackedShardSchedule, +) -> LoopAction { + let client_index = tracked_shard_schedule.client_index; + let latest_height = Cell::new(0); + let target_height = Cell::new(None); + + let (done, succeeded) = LoopAction::shared_success_flag(); + let action_fn = Box::new( + move |node_datas: &[TestData], test_loop_data: &mut TestLoopData, _: AccountId| { + if done.get() { + return; + } + + let client_handle = node_datas[client_index].client_sender.actor_handle(); + let client = &test_loop_data.get_mut(&client_handle).client; + let tip = client.chain.head().unwrap(); + + // Run this action only once at every block height. + if latest_height.get() == tip.height { + return; + } + + let epoch_height = client + .epoch_manager + .get_epoch_height_from_prev_block(&tip.prev_block_hash) + .unwrap(); + let [tracked_shard_id] = + tracked_shard_schedule.schedule[epoch_height as usize].clone().try_into().unwrap(); + let tracked_shard_uid = + client.epoch_manager.shard_id_to_uid(tracked_shard_id, &tip.epoch_id).unwrap(); + + if latest_height.get() == 0 { + // This is beginning of the test, and the first epoch after genesis has height 1. + assert_eq!(epoch_height, 1); + // Get rid of the part of the Genesis State other than the shard we initially track. + retain_the_only_shard_state(client, tracked_shard_uid); + } + latest_height.set(tip.height); + + if target_height.get().is_none() { + if !this_block_has_new_shard_layout(client.epoch_manager.as_ref(), &tip) { + return; + } + // Just resharded. Set the target height high enough so that gc will kick in. + let epoch_length = client.config.epoch_length; + let gc_num_epochs_to_keep = client.config.gc.gc_num_epochs_to_keep; + target_height + .set(Some(latest_height.get() + (gc_num_epochs_to_keep + 1) * epoch_length)); + } + + if latest_height.get() < target_height.get().unwrap() { + return; + } + // At this point, we should only have State from the last tracked shard. + check_has_the_only_shard_state(&client, tracked_shard_uid); + done.set(true); + }, + ); + LoopAction::new(action_fn, succeeded) +} diff --git a/nearcore/src/test_utils.rs b/nearcore/src/test_utils.rs index 6586172dfb6..baafc533102 100644 --- a/nearcore/src/test_utils.rs +++ b/nearcore/src/test_utils.rs @@ -1,5 +1,5 @@ use near_chain::types::RuntimeAdapter; -use near_chain_configs::Genesis; +use near_chain_configs::{Genesis, DEFAULT_GC_NUM_EPOCHS_TO_KEEP}; use near_client::test_utils::TestEnvBuilder; use near_epoch_manager::EpochManagerHandle; use near_parameters::RuntimeConfigStore; @@ -101,6 +101,7 @@ impl TestEnvNightshadeSetupExt for TestEnvBuilder { Some(runtime_config_store), trie_config, state_snapshot_type.clone(), + DEFAULT_GC_NUM_EPOCHS_TO_KEEP, ) }; let dummy_runtime_configs = From 88ec6c83b10e687474316517279f5bab76698db5 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Sat, 4 Jan 2025 00:34:27 -0400 Subject: [PATCH 2/5] indexer: make block finality configurable (#12685) Tested this out by using Finality::None in mpc node indexers. Observed reduction of latency. --- Cargo.lock | 1 + chain/indexer/src/lib.rs | 4 +++- chain/indexer/src/streamer/fetchers.rs | 3 ++- chain/indexer/src/streamer/mod.rs | 13 +++++++------ tools/indexer/example/Cargo.toml | 1 + tools/indexer/example/src/main.rs | 1 + tools/mirror/src/lib.rs | 1 + 7 files changed, 16 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c2a49977688..f67435739dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3335,6 +3335,7 @@ dependencies = [ "near-config-utils", "near-indexer", "near-o11y", + "near-primitives", "openssl-probe", "serde_json", "tokio", diff --git a/chain/indexer/src/lib.rs b/chain/indexer/src/lib.rs index 754f3267f6b..70f314c7624 100644 --- a/chain/indexer/src/lib.rs +++ b/chain/indexer/src/lib.rs @@ -6,7 +6,7 @@ use tokio::sync::mpsc; use near_chain_configs::GenesisValidationMode; pub use near_primitives; -use near_primitives::types::Gas; +use near_primitives::types::{Finality, Gas}; pub use nearcore::{get_default_home, init_configs, NearConfig}; pub use near_indexer_primitives::{ @@ -83,6 +83,8 @@ pub struct IndexerConfig { pub sync_mode: SyncModeEnum, /// Whether await for node to be synced or not pub await_for_node_synced: AwaitForNodeSyncedEnum, + /// Finality level at which blocks are streamed + pub finality: Finality, /// Tells whether to validate the genesis file before starting pub validate_genesis: bool, } diff --git a/chain/indexer/src/streamer/fetchers.rs b/chain/indexer/src/streamer/fetchers.rs index 9e5d93b4665..bdae2b3bdd0 100644 --- a/chain/indexer/src/streamer/fetchers.rs +++ b/chain/indexer/src/streamer/fetchers.rs @@ -29,12 +29,13 @@ pub(crate) async fn fetch_status( /// entire block or we already fetched this block. pub(crate) async fn fetch_latest_block( client: &Addr, + finality: &near_primitives::types::Finality, ) -> Result { tracing::debug!(target: INDEXER, "Fetching latest block"); client .send( near_client::GetBlock(near_primitives::types::BlockReference::Finality( - near_primitives::types::Finality::Final, + finality.clone(), )) .with_span_context(), ) diff --git a/chain/indexer/src/streamer/mod.rs b/chain/indexer/src/streamer/mod.rs index 43a66b761d0..677096be693 100644 --- a/chain/indexer/src/streamer/mod.rs +++ b/chain/indexer/src/streamer/mod.rs @@ -37,7 +37,7 @@ static DELAYED_LOCAL_RECEIPTS_CACHE: std::sync::LazyLock< Arc>>, > = std::sync::LazyLock::new(|| Arc::new(RwLock::new(HashMap::new()))); -const INTERVAL: Duration = Duration::from_millis(500); +const INTERVAL: Duration = Duration::from_millis(250); /// Blocks #47317863 and #47317864 with restored receipts. const PROBLEMATIC_BLOCKS: [CryptoHash; 2] = [ @@ -413,11 +413,12 @@ pub(crate) async fn start( AwaitForNodeSyncedEnum::StreamWhileSyncing => {} }; - let block = if let Ok(block) = fetch_latest_block(&view_client).await { - block - } else { - continue; - }; + let block = + if let Ok(block) = fetch_latest_block(&view_client, &indexer_config.finality).await { + block + } else { + continue; + }; let latest_block_height = block.header.height; let start_syncing_block_height = if let Some(last_synced_block_height) = diff --git a/tools/indexer/example/Cargo.toml b/tools/indexer/example/Cargo.toml index 2801855ae3f..b4ee9468943 100644 --- a/tools/indexer/example/Cargo.toml +++ b/tools/indexer/example/Cargo.toml @@ -23,3 +23,4 @@ tracing.workspace = true near-config-utils.workspace = true near-indexer.workspace = true near-o11y.workspace = true +near-primitives.workspace = true diff --git a/tools/indexer/example/src/main.rs b/tools/indexer/example/src/main.rs index 007feded2eb..5df39bfcbf4 100644 --- a/tools/indexer/example/src/main.rs +++ b/tools/indexer/example/src/main.rs @@ -275,6 +275,7 @@ fn main() -> Result<()> { home_dir, sync_mode: near_indexer::SyncModeEnum::FromInterruption, await_for_node_synced: near_indexer::AwaitForNodeSyncedEnum::WaitForFullSync, + finality: near_primitives::types::Finality::Final, validate_genesis: true, }; let system = actix::System::new(); diff --git a/tools/mirror/src/lib.rs b/tools/mirror/src/lib.rs index 027c60a8ec6..bfde1e38358 100644 --- a/tools/mirror/src/lib.rs +++ b/tools/mirror/src/lib.rs @@ -1763,6 +1763,7 @@ impl TxMirror { home_dir, sync_mode: near_indexer::SyncModeEnum::FromInterruption, await_for_node_synced: near_indexer::AwaitForNodeSyncedEnum::StreamWhileSyncing, + finality: Finality::Final, validate_genesis: false, }) .context("failed to start target chain indexer")?; From 07ca150d9b55efd3e99f1e8812f86a694cd14ad6 Mon Sep 17 00:00:00 2001 From: Andrea Date: Mon, 6 Jan 2025 10:38:53 +0100 Subject: [PATCH 3/5] fix: allow loading of memtrie during catchup, for shards pending resharding (#12679) This PR contains a small code change to fix a failure in the tests `test_resharding_v3_load_mem_trie_`. In the test a client that doesn't track the parent shard is assigned to track one of the children in the next epoch after resharding. However, in order to track the children shard, the client must state sync the state of the parent and perform the shard split. This fix allows the catchup routine to build the memtrie even with `load_mem_tries_for_tracked_shards=false`, only if the shard is "pending resharding in the future". The first two commits are updates of code comments. --- chain/chain/src/chain.rs | 5 ++--- chain/client/src/sync/state/shard.rs | 13 +++++++++---- core/store/src/trie/shard_tries.rs | 6 +++++- .../src/test_loop/tests/resharding_v3.rs | 8 +++----- 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 77fa1a00aa8..d367d0308ba 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -2024,7 +2024,6 @@ impl Chain { tracing::debug!(target: "chain", ?shard_id, need_storage_update, "Updating storage"); if need_storage_update { - // TODO(resharding): consider adding to catchup flow. self.resharding_manager.start_resharding( self.chain_store.store_update(), &block, @@ -3127,8 +3126,8 @@ impl Chain { } blocks_catch_up_state.done_blocks.push(queued_block); } - Err(_) => { - error!("Error processing block during catch up, retrying"); + Err(err) => { + error!(target: "chain", ?err, "Error processing block during catch up, retrying"); blocks_catch_up_state.pending_blocks.push(queued_block); } } diff --git a/chain/client/src/sync/state/shard.rs b/chain/client/src/sync/state/shard.rs index 0ef25a07bd6..12ca94715d3 100644 --- a/chain/client/src/sync/state/shard.rs +++ b/chain/client/src/sync/state/shard.rs @@ -2,7 +2,6 @@ use super::downloader::StateSyncDownloader; use super::task_tracker::TaskTracker; use crate::metrics; use crate::sync::state::chain_requests::ChainFinalizationRequest; -use crate::sync::state::util::query_epoch_id_and_height_for_block; use futures::{StreamExt, TryStreamExt}; use near_async::futures::{FutureSpawner, FutureSpawnerExt}; use near_async::messaging::AsyncSender; @@ -15,6 +14,7 @@ use near_primitives::sharding::ShardChunk; use near_primitives::state_part::PartId; use near_primitives::state_sync::StatePartKey; use near_primitives::types::{EpochId, ShardId}; +use near_primitives::version::PROTOCOL_VERSION; use near_store::adapter::{StoreAdapter, StoreUpdateAdapter}; use near_store::flat::{FlatStorageReadyStatus, FlatStorageStatus}; use near_store::{DBCol, ShardUId, Store}; @@ -162,8 +162,6 @@ pub(super) async fn run_state_sync_for_shard( return_if_cancelled!(cancel); // Create flat storage. { - let (epoch_id, _) = query_epoch_id_and_height_for_block(&store, sync_hash)?; - let shard_uid = epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?; let chunk = header.cloned_chunk(); let block_hash = chunk.prev_block(); @@ -180,8 +178,15 @@ pub(super) async fn run_state_sync_for_shard( // Load memtrie. { let handle = computation_task_tracker.get_handle(&format!("shard {}", shard_id)).await; + let head_protocol_version = epoch_manager.get_epoch_protocol_version(&epoch_id)?; + let shard_uids_pending_resharding = epoch_manager + .get_shard_uids_pending_resharding(head_protocol_version, PROTOCOL_VERSION)?; handle.set_status("Loading memtrie"); - runtime.get_tries().load_mem_trie_on_catchup(&shard_uid, &state_root)?; + runtime.get_tries().load_mem_trie_on_catchup( + &shard_uid, + &state_root, + &shard_uids_pending_resharding, + )?; } return_if_cancelled!(cancel); diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index 40bf4717c5e..0ca4e580a4f 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -429,12 +429,16 @@ impl ShardTries { /// Loads in-memory trie upon catchup, if it is enabled. /// Requires state root because `ChunkExtra` is not available at the time mem-trie is being loaded. + /// Mem-tries of shards that are pending resharding must be loaded in any case. pub fn load_mem_trie_on_catchup( &self, shard_uid: &ShardUId, state_root: &StateRoot, + shard_uids_pending_resharding: &HashSet, ) -> Result<(), StorageError> { - if !self.0.trie_config.load_mem_tries_for_tracked_shards { + if !self.0.trie_config.load_mem_tries_for_tracked_shards + && !shard_uids_pending_resharding.contains(shard_uid) + { return Ok(()); } // It should not happen that memtrie is already loaded for a shard diff --git a/integration-tests/src/test_loop/tests/resharding_v3.rs b/integration-tests/src/test_loop/tests/resharding_v3.rs index 0dc4441d884..78315c43582 100644 --- a/integration-tests/src/test_loop/tests/resharding_v3.rs +++ b/integration-tests/src/test_loop/tests/resharding_v3.rs @@ -660,7 +660,9 @@ fn test_resharding_v3_resharding_block_in_fork() { #[test] // TODO(resharding): fix nearcore and un-ignore this test // TODO(resharding): duplicate this test so that in one case resharding is performed on block -// B(height=13) and in another case resharding is performed on block B'(height=13) +// B(height=13) and in another case resharding is performed on block B'(height=13). +// In the current scenario the real resharding happens on block B'. Low priority TODO +// since it's a very rare corner case. #[ignore] #[cfg(feature = "test_features")] fn test_resharding_v3_double_sign_resharding_block() { @@ -866,8 +868,6 @@ fn test_resharding_v3_load_mem_trie_v1() { let params = TestReshardingParametersBuilder::default() .base_shard_layout_version(1) .load_mem_tries_for_tracked_shards(false) - // TODO(resharding): should it work without tracking all shards? - .track_all_shards(true) .build(); test_resharding_v3_base(params); } @@ -877,8 +877,6 @@ fn test_resharding_v3_load_mem_trie_v2() { let params = TestReshardingParametersBuilder::default() .base_shard_layout_version(2) .load_mem_tries_for_tracked_shards(false) - // TODO(resharding): should it work without tracking all shards? - .track_all_shards(true) .build(); test_resharding_v3_base(params); } From df175ba75c66cdcae8d97f6421985397eb02eb47 Mon Sep 17 00:00:00 2001 From: teenager-ETH Date: Mon, 6 Jan 2025 13:57:07 +0100 Subject: [PATCH 4/5] Typos fix (#12666) # Typos Fix ## Description This pull request addresses and corrects typos in two files: - `CHANGELOG.md`: Fixed "requset" to "request". - `up.sql`: Fixed "defaul" to "default". ## Changes - Improved documentation and comments by correcting minor spelling errors. - Adjusted wording for better clarity and accuracy. ## Testing These changes are non-functional and impact only documentation and comments. No tests are required, and all existing functionality remains unchanged. ## Notes for Reviewers Please verify that the corrections align with the project's style and guidelines. Let me know if additional adjustments are needed. Thank you! --- CHANGELOG.md | 2 +- .../migrations/2024-06-19-160030_ft_transfers_metadata/up.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fb0c49613e7..f0967068146 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -305,7 +305,7 @@ to pay for the storage of their accounts. `sum(rate(near_peer_message_received_by_type_total{...}[5m]))`. [#7548](https://github.com/near/nearcore/pull/7548) * Few changes to `view_state` JSON RPC query: - - The requset has now an optional `include_proof` argument. When set to + - The request has now an optional `include_proof` argument. When set to `true`, response’s `proof` will be populated. - The `proof` within each value in `values` list of a `view_state` response is now deprecated and will be removed in the future. Client code should ignore diff --git a/benchmarks/continuous/db/tool/orm/migrations/2024-06-19-160030_ft_transfers_metadata/up.sql b/benchmarks/continuous/db/tool/orm/migrations/2024-06-19-160030_ft_transfers_metadata/up.sql index e91bf893b76..87aa6a4f1d4 100644 --- a/benchmarks/continuous/db/tool/orm/migrations/2024-06-19-160030_ft_transfers_metadata/up.sql +++ b/benchmarks/continuous/db/tool/orm/migrations/2024-06-19-160030_ft_transfers_metadata/up.sql @@ -1,4 +1,4 @@ --- Setting defaul values to enable `not null` by filling values for existing +-- Setting default values to enable `not null` by filling values for existing -- rows. alter table ft_transfers add column initiator text not null default 'crt-benchmarks', From 445ee6f84b9f2dff8c39b8671566768859f785c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= <18039094+staffik@users.noreply.github.com> Date: Mon, 6 Jan 2025 14:27:11 +0100 Subject: [PATCH 5/5] fix(resharding): double resharding state mapping (#12688) Introduce the fix from https://github.com/near/nearcore/pull/12635 now, will add test later as it requires one of two: - memtrie support for double resharding without restart - testloop support for restarting a node --- chain/chain/src/resharding/manager.rs | 10 +++++++--- integration-tests/src/test_loop/utils/trie_sanity.rs | 3 +++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/chain/chain/src/resharding/manager.rs b/chain/chain/src/resharding/manager.rs index 9249a30006d..d1a6f4405dc 100644 --- a/chain/chain/src/resharding/manager.rs +++ b/chain/chain/src/resharding/manager.rs @@ -14,6 +14,7 @@ use near_primitives::challenge::PartialState; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::{get_block_shard_uid, ShardLayout}; use near_primitives::types::chunk_extra::ChunkExtra; +use near_store::adapter::trie_store::get_shard_uid_mapping; use near_store::adapter::{StoreAdapter, StoreUpdateAdapter}; use near_store::flat::BlockInfo; use near_store::trie::mem::mem_trie_update::TrackingMode; @@ -124,7 +125,7 @@ impl ReshardingManager { return Ok(()); } - // Reshard the State column by setting ShardUId mapping from children to parent. + // Reshard the State column by setting ShardUId mapping from children to ancestor. self.set_state_shard_uid_mapping(&split_shard_event)?; // Create temporary children memtries by freezing parent memtrie and referencing it. @@ -146,16 +147,19 @@ impl ReshardingManager { } /// Store in the database the mapping of ShardUId from children to the parent shard, - /// so that subsequent accesses to the State will use the parent shard's UId as a prefix for the database key. + /// so that subsequent accesses to the State will use the ancestor's ShardUId prefix + /// as a prefix for the database key. + // TODO(resharding) add testloop where grandparent ShardUId is used fn set_state_shard_uid_mapping( &mut self, split_shard_event: &ReshardingSplitShardParams, ) -> io::Result<()> { let mut store_update = self.store.trie_store().store_update(); let parent_shard_uid = split_shard_event.parent_shard; + let parent_shard_uid_prefix = get_shard_uid_mapping(&self.store, parent_shard_uid); // TODO(resharding) No need to set the mapping for children shards that we won't track just after resharding? for child_shard_uid in split_shard_event.children_shards() { - store_update.set_shard_uid_mapping(child_shard_uid, parent_shard_uid); + store_update.set_shard_uid_mapping(child_shard_uid, parent_shard_uid_prefix); } store_update.commit() } diff --git a/integration-tests/src/test_loop/utils/trie_sanity.rs b/integration-tests/src/test_loop/utils/trie_sanity.rs index ca12fe83b35..31acd1e32cb 100644 --- a/integration-tests/src/test_loop/utils/trie_sanity.rs +++ b/integration-tests/src/test_loop/utils/trie_sanity.rs @@ -351,6 +351,7 @@ pub fn check_state_shard_uid_mapping_after_resharding( assert_eq!(children_shard_uids.len(), 2); let store = client.chain.chain_store.store().trie_store(); + let mut checked_any = false; for kv in store.store().iter_raw_bytes(DBCol::State) { let (key, value) = kv.unwrap(); let shard_uid = ShardUId::try_from_slice(&key[0..8]).unwrap(); @@ -359,6 +360,7 @@ pub fn check_state_shard_uid_mapping_after_resharding( if shard_uid != parent_shard_uid { continue; } + checked_any = true; let node_hash = CryptoHash::try_from_slice(&key[8..]).unwrap(); let (value, rc) = decode_value_with_rc(&value); // It is possible we have delayed receipts leftovers on disk, @@ -381,4 +383,5 @@ pub fn check_state_shard_uid_mapping_after_resharding( assert_eq!(&child_value.unwrap()[..], value.unwrap()); } } + assert!(checked_any); }